You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/09 13:00:48 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

pitrou commented on code in PR #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r867970555


##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include "arrow/engine/substrait/api.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+
+namespace arrow {
+
+namespace engine {
+
+/// \brief Retrieve a RecordBatchReader from a Substrait plan.
+ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
+    std::shared_ptr<Buffer> substrait_buffer);
+
+/// \brief Get a Serialized Plan from a Substrait JSON plan.
+/// This is a helper method for Python tests.
+ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> ParseJsonPlan(
+    const std::string& substrait_json);
+
+}  // namespace engine

Review Comment:
   Shouldn't there be a `arrow::substrait` or `arrow::engine::substrait` namespace for those APIs?



##########
python/pyarrow/_substrait.pyx:
##########
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow import Buffer
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan):
+    """
+    Execute a Substrait plan and read the results as a RecordBatchReader.
+
+    Parameters
+    ----------
+    plan : Buffer
+        The serialized Substrait plan to execute.
+    """
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        RecordBatchReader reader
+        c_string c_str_plan
+        shared_ptr[CBuffer] c_buf_plan
+
+    c_buf_plan = pyarrow_unwrap_buffer(plan)
+    c_res_reader = ExecuteSerializedPlan(c_buf_plan)
+
+    c_reader = GetResultValue(c_res_reader)
+
+    reader = RecordBatchReader.__new__(RecordBatchReader)
+    reader.reader = c_reader
+    return reader
+
+
+def _parse_json_plan(plan):
+    """
+    Parse a JSON plan into equivalent serialized Protobuf.
+
+    Parameters
+    ----------
+    plan: bytes
+        Parse a Substrait plan in JSON to a serialized plan.

Review Comment:
   Can you describe the argument here instead of the function itself?



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+import pathlib
+import pytest
+
+import pyarrow as pa
+from pyarrow.lib import tobytes
+from pyarrow.lib import ArrowInvalid
+
+try:
+    import pyarrow.parquet as pq
+except ImportError:
+    pq = None
+
+try:
+    import pyarrow.substrait as substrait
+except ImportError:
+    substrait = None
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not substrait'
+pytestmark = [pytest.mark.parquet, pytest.mark.substrait]
+
+
+def resource_root():
+    """Get the path to the test resources directory."""
+    if not os.environ.get("PARQUET_TEST_DATA"):

Review Comment:
   Also, is there a particular reason we're using the Parquet test data files? They are not meant to exercise query processing, so this seems a bit unexpected.



##########
python/pyarrow/_substrait.pyx:
##########
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow import Buffer
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan):
+    """
+    Execute a Substrait plan and read the results as a RecordBatchReader.
+
+    Parameters
+    ----------
+    plan : Buffer
+        The serialized Substrait plan to execute.
+    """
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        RecordBatchReader reader
+        c_string c_str_plan
+        shared_ptr[CBuffer] c_buf_plan
+
+    c_buf_plan = pyarrow_unwrap_buffer(plan)
+    c_res_reader = ExecuteSerializedPlan(c_buf_plan)
+
+    c_reader = GetResultValue(c_res_reader)
+
+    reader = RecordBatchReader.__new__(RecordBatchReader)
+    reader.reader = c_reader
+    return reader
+
+
+def _parse_json_plan(plan):
+    """
+    Parse a JSON plan into equivalent serialized Protobuf.
+
+    Parameters
+    ----------
+    plan: bytes
+        Parse a Substrait plan in JSON to a serialized plan.
+
+    Returns
+    -------
+    Buffer
+        A buffer containing the serialized Protobuf plan.
+    """
+
+    cdef:
+        CResult[shared_ptr[CBuffer]] c_res_buffer
+        c_string c_str_plan
+        shared_ptr[CBuffer] c_buf_plan
+
+    c_str_plan = plan
+    c_res_buffer = ParseJsonPlan(c_str_plan)
+    c_buf_plan = GetResultValue(c_res_buffer)

Review Comment:
   Can release the GIL here:
   ```suggestion
       with nogil:
           c_buf_plan = GetResultValue(c_res_buffer)
   ```



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }
+
+  static arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer MakeProducer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen);
+
+  Future<> Finish() override {
+    producer_.Push(IterationEnd<util::optional<compute::ExecBatch>>());
+    ARROW_UNUSED(producer_.Close());
+    return Future<>::MakeFinished();
+  }
+
+ private:
+  PushGenerator<util::optional<compute::ExecBatch>>::Producer producer_;
+};
+
+/// \brief An executor to run a Substrait Query
+/// This interface is provided as a utility when creating language
+/// bindings for consuming a Substrait plan.
+class SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::shared_ptr<Buffer> substrait_buffer,
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator,
+      std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context)
+      : substrait_buffer_(std::move(substrait_buffer)),
+        generator_(generator),
+        plan_(std::move(plan)),
+        exec_context_(exec_context) {}
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute() {
+    for (const compute::Declaration& decl : declarations_) {
+      RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+    }
+    RETURN_NOT_OK(plan_->Validate());
+    RETURN_NOT_OK(plan_->StartProducing());
+    // schema of the output can be obtained by the output_schema
+    // of the input to the sink node.
+    auto schema = plan_->sinks()[0]->inputs()[0]->output_schema();
+    std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader(
+        schema, std::move(*generator_), exec_context_.memory_pool());
+    return sink_reader;
+  }
+
+  Status Close() { return plan_->finished().status(); }
+
+  Status Init() {
+    if (substrait_buffer_ == NULLPTR) {
+      return Status::Invalid("Buffer containing Substrait plan is null.");
+    }
+    std::function<std::shared_ptr<compute::SinkNodeConsumer>()> consumer_factory = [&] {
+      return std::make_shared<SubstraitSinkConsumer>(generator_);
+    };
+    ARROW_ASSIGN_OR_RAISE(declarations_,
+                          engine::DeserializePlan(*substrait_buffer_, consumer_factory));
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<Buffer> substrait_buffer_;
+  AsyncGenerator<util::optional<compute::ExecBatch>>* generator_;
+  std::vector<compute::Declaration> declarations_;
+  std::shared_ptr<compute::ExecPlan> plan_;
+  compute::ExecContext exec_context_;
+};
+
+arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer
+SubstraitSinkConsumer::MakeProducer(
+    AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen) {
+  arrow::PushGenerator<util::optional<compute::ExecBatch>> push_gen;
+  auto out = push_gen.producer();
+  *out_gen = std::move(push_gen);
+  return out;
+}
+}  // namespace
+
+Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
+    std::shared_ptr<Buffer> substrait_buffer) {
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make());
+  compute::ExecContext exec_context(arrow::default_memory_pool(),
+                                    ::arrow::internal::GetCpuThreadPool());
+  arrow::engine::SubstraitExecutor executor(std::move(substrait_buffer), &sink_gen,
+                                            std::move(plan), exec_context);
+  RETURN_NOT_OK(executor.Init());
+  ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute());
+  RETURN_NOT_OK(executor.Close());

Review Comment:
   IMHO we should just add a `RecordBatchReader::Close` method with a no-op default implementation.
   (and the default destructor should perhaps call Close() under the hood)



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -2693,3 +2693,7 @@ cdef extern from "arrow/python/udf.h" namespace "arrow::py":
 
     CStatus RegisterScalarFunction(PyObject* function,
                                    function[CallbackUdf] wrapper, const CScalarUdfOptions& options)
+
+cdef extern from "arrow/engine/substrait/util.h" namespace "arrow::engine" nogil:

Review Comment:
   Can you create a separate `libarrow_substrait.pyx` instead holding all declarations from the `arrow_substrait` library?



##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include "arrow/engine/substrait/api.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+
+namespace arrow {
+
+namespace engine {
+
+/// \brief Retrieve a RecordBatchReader from a Substrait plan.
+ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
+    std::shared_ptr<Buffer> substrait_buffer);
+
+/// \brief Get a Serialized Plan from a Substrait JSON plan.
+/// This is a helper method for Python tests.
+ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> ParseJsonPlan(

Review Comment:
   Call this `SerializeJsonPlan` or perhaps `SubstraitFromJson` instead?



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -750,5 +751,66 @@ TEST(Substrait, ExtensionSetFromPlanMissingFunc) {
           &ext_set));
 }
 
+Result<std::string> GetSubstraitJSON() {
+  ARROW_ASSIGN_OR_RAISE(std::string dir_string,
+                        arrow::internal::GetEnvVar("PARQUET_TEST_DATA"));
+  auto file_name =
+      arrow::internal::PlatformFilename::FromString(dir_string)->Join("binary.parquet");
+  auto file_path = file_name->ToString();
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  return substrait_json;
+}
+
+TEST(Substrait, GetRecordBatchReader) {
+#ifdef _WIN32
+  GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#else
+  ASSERT_OK_AND_ASSIGN(std::string substrait_json, GetSubstraitJSON());
+  ASSERT_OK_AND_ASSIGN(auto buf, engine::ParseJsonPlan(substrait_json));
+  ASSERT_OK_AND_ASSIGN(auto reader, engine::ExecuteSerializedPlan(buf));
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get()));
+  // Note: assuming the binary.parquet file contains fixed amount of records
+  // in case of a test failure, re-evalaute the content in the file
+  EXPECT_EQ(table->num_rows(), 12);
+#endif
+}
+
+TEST(Substrait, InvalidPlan) {
+  std::string substrait_json = R"({
+    "relations": [
+    ]
+  })";
+  ASSERT_OK_AND_ASSIGN(auto buf, engine::ParseJsonPlan(substrait_json));

Review Comment:
   I'm curious, why is it ok to serialize an invalid plan? There are no a priori validations in place?



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");

Review Comment:
   Make this `Status::Invalid` instead?



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+import pathlib
+import pytest
+
+import pyarrow as pa
+from pyarrow.lib import tobytes
+from pyarrow.lib import ArrowInvalid
+
+try:
+    import pyarrow.parquet as pq
+except ImportError:
+    pq = None
+
+try:
+    import pyarrow.substrait as substrait
+except ImportError:
+    substrait = None
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not substrait'
+pytestmark = [pytest.mark.parquet, pytest.mark.substrait]
+
+
+def resource_root():
+    """Get the path to the test resources directory."""
+    if not os.environ.get("PARQUET_TEST_DATA"):

Review Comment:
   Instead of copy/pasting this from other test files, can we centralize we helper function somewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org