You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/01 20:39:26 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #12590: ARROW-15639 [C++][Python] UDF Scalar Function Implementation

westonpace commented on a change in pull request #12590:
URL: https://github.com/apache/arrow/pull/12590#discussion_r840882112



##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();

Review comment:
       ```suggestion
     ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
     *out = unwrapped_result;
     return Status::OK();
   ```
   It may be a little odd you can do `*out = unwrapped_result` where `out` is a `Datum` and `unwrapped_result` is a `Scalar` but `Datum` was intentionally designed with a lot of implicit constructors to allow for [`implicit conversion`](https://en.cppreference.com/w/cpp/language/implicit_conversion)
   
   If you really want to be concise...we have a pattern for "set output parameter from result and return status":
   ```
   return unwrap_scalar(result).Value(out);
   ```
   
   How it is written right now is actually a memory leak.  We use `shared_ptr` and `unique_ptr` almost exclusively for lifecycle management in Arrow.  Any time you are doing a `new` (except for `std::unique_ptr<Foo>(new Foo(...))` it should be a red flag to double check that you really do intend for this.
   
   When a function has an out parameter (`Datum * out`) you know that the caller has already allocated a spot for this (possibly on the stack, possibly on the heap) so there is no need to do any allocation here.  All you need to do is fill in the spot the caller has allocated.
   
   You could do...
   ```
   auto datum = new Datum(c_res_data);
   *out = *datum;
   delete datum;
   ```
   but that would just be both complex and inefficient.

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid(
+        "Function Arity and Input data shape doesn't match, expected {}");
+  }
+  return Status::OK();
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* options) {
+  // creating a copy of objects for the lambda function
+  Py_INCREF(function);
+  function_.reset(function);
+  if (function_.obj() == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), arity, &doc);
+
+  // lambda function
+  auto call_back = [&, arity](compute::KernelContext* ctx,

Review comment:
       Nit: Normally we write `callback` without the space.
   
   I'm not sure callback is a good name though.  I think of a callback as something that runs when a task is finished.  This is the kernel's exec function.  Can we just call it `exec`?

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();

Review comment:
       ```suggestion
     return unwrap_array(result).Value(out);
   ```

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;

Review comment:
       It's a bit odd to have this defined so far away from where it is used.

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid(
+        "Function Arity and Input data shape doesn't match, expected {}");

Review comment:
       This error message seems incomplete.

##########
File path: cpp/src/arrow/python/udf.h
##########
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+// Exposing the UDFOptions: https://issues.apache.org/jira/browse/ARROW-16041
+class ARROW_PYTHON_EXPORT UdfOptions {
+ public:
+  UdfOptions(const compute::Function::Kind kind, const compute::Arity arity,
+             const compute::FunctionDoc func_doc,
+             const std::vector<compute::InputType> in_types,
+             const compute::OutputType out_type,
+             const compute::MemAllocation::type mem_allocation,
+             const compute::NullHandling::type null_handling)
+      : kind_(kind),
+        arity_(arity),
+        func_doc_(func_doc),
+        in_types_(in_types),
+        out_type_(out_type),
+        mem_allocation_(mem_allocation),
+        null_handling_(null_handling) {}
+
+  compute::Function::Kind kind() { return kind_; }
+
+  const compute::Arity& arity() const { return arity_; }
+
+  const compute::FunctionDoc doc() const { return func_doc_; }
+
+  const std::vector<compute::InputType>& input_types() const { return in_types_; }
+
+  const compute::OutputType& output_type() const { return out_type_; }
+
+  compute::MemAllocation::type mem_allocation() { return mem_allocation_; }

Review comment:
       I'm not sure it makes sense to expose `mem_allocation` to the user.  If I understand correctly, using `PREALLOCATE` means that the `out` datum passed to the kernel function is already populated with allocated buffers.  So the kernel function can just fill in the buffers instead of allocating new buffers.  Right now there is no way for a python function to fill in preallocated buffers and I can't imagine that is something we are going to need anytime soon.
   
   So for this PR I would hide `mem_allocation` from the user entirely and just hard code it to `NO_PREALLOCATE`.

##########
File path: python/pyarrow/tests/test_udf.py
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(a, x, b, y, c):
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+@pytest.fixture
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ]
+    ]
+
+
+@pytest.fixture
+def function_output_types():
+    return [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64()
+    ]
+
+
+@pytest.fixture
+def function_names():
+    return [
+        # scalar data function names
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+        # array data function names
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+
+@pytest.fixture
+def function_arities():
+    return [
+        1,
+        2,
+        3,
+        5,
+    ]
+
+
+@pytest.fixture
+def function_docs():
+    return [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+
+@pytest.fixture
+def functions():
+    return [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+
+@pytest.fixture
+def function_inputs():
+    return [
+        # scalar input data
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        # array input data
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+
+@pytest.fixture
+def expected_outputs():
+    return [
+        # scalar output data
+        pa.scalar(11, pa.int64()),  # 10 + 1
+        pa.scalar(20, pa.int64()),  # 10 * 2
+        pa.scalar(25, pa.int64()),  # 10 * 2 + 5
+        pa.scalar(85, pa.int64()),  # (2 * 10) + (3 * 20) + 5
+        # array output data
+        pa.array([11, 21], pa.int64()),  # [10 + 1, 20 + 1]
+        pa.array([20, 80], pa.int64()),  # [10 * 2, 20 * 4]
+        pa.array([25, 90], pa.int64()),  # [(10 * 2) + 5, (20 * 4) + 10]
+        # [(2 * 10) + (3 * 20) + 5, (3 * 20) + (7 * 30) + 10]
+        pa.array([85, 280], pa.int64())
+    ]
+
+
+def test_scalar_udf_function_with_scalar_data(function_names,

Review comment:
       I was trying to reproduce an error on this test and I noticed that I can't repeat this test because it modifies the global state (adds a function to the global function registry) and so the second run fails (function already exists).
   
   I don't know a great way to fix this.  We could expose the ability to create new function registries to python but that might be a bit of work and we would need to update the register function to take a new registry argument that is rarely specified so it adds complexity.
   
   We could also add the ability to unregister a function.  That might be the easiest.  Since function names have to be unique it should be straightforward to unregister a function by name.  I don't know if either of those things have to be fixed as part of this PR but I was curious to get your thoughts.

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid(
+        "Function Arity and Input data shape doesn't match, expected {}");
+  }
+  return Status::OK();
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* options) {
+  // creating a copy of objects for the lambda function
+  Py_INCREF(function);
+  function_.reset(function);
+  if (function_.obj() == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }

Review comment:
       Nit: These checks could probably be the first things we do (before the `Py_INCREF`)

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);

Review comment:
       Can we / should we confirm that `result` is a scalar here?

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid(
+        "Function Arity and Input data shape doesn't match, expected {}");
+  }
+  return Status::OK();
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* options) {
+  // creating a copy of objects for the lambda function
+  Py_INCREF(function);
+  function_.reset(function);
+  if (function_.obj() == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), arity, &doc);
+
+  // lambda function
+  auto call_back = [&, arity](compute::KernelContext* ctx,

Review comment:
       The default capture specifier of `&` is a little misleading.  The only thing you are capturing with this flag is `this` and `this` is never captured by reference.  When I see a default capture of `&` it usually tells me that the lambda is going to only be executed within the current scope (otherwise anything it's referencing would be out of scope) and I know this lambda is going to be executed outside this scope so it is strange to see.
   
   I often like to be specific when I'm not capturing too many things.  You could use `[this, arity]` to be more clear.
   

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid(
+        "Function Arity and Input data shape doesn't match, expected {}");
+  }
+  return Status::OK();
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* options) {
+  // creating a copy of objects for the lambda function
+  Py_INCREF(function);
+  function_.reset(function);
+  if (function_.obj() == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), arity, &doc);

Review comment:
       doc is a stack allocated variable that will be destroyed when this function exits.  It is not safe to take the address to it in this way.  More on this in the top-level review comment.  This is most likely what is causing the python tests to fail.

##########
File path: cpp/src/arrow/python/udf.h
##########
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+// Exposing the UDFOptions: https://issues.apache.org/jira/browse/ARROW-16041
+class ARROW_PYTHON_EXPORT UdfOptions {
+ public:
+  UdfOptions(const compute::Function::Kind kind, const compute::Arity arity,
+             const compute::FunctionDoc func_doc,
+             const std::vector<compute::InputType> in_types,
+             const compute::OutputType out_type,
+             const compute::MemAllocation::type mem_allocation,
+             const compute::NullHandling::type null_handling)
+      : kind_(kind),
+        arity_(arity),
+        func_doc_(func_doc),
+        in_types_(in_types),
+        out_type_(out_type),
+        mem_allocation_(mem_allocation),
+        null_handling_(null_handling) {}
+
+  compute::Function::Kind kind() { return kind_; }
+
+  const compute::Arity& arity() const { return arity_; }
+
+  const compute::FunctionDoc doc() const { return func_doc_; }
+
+  const std::vector<compute::InputType>& input_types() const { return in_types_; }
+
+  const compute::OutputType& output_type() const { return out_type_; }
+
+  compute::MemAllocation::type mem_allocation() { return mem_allocation_; }
+
+  compute::NullHandling::type null_handling() { return null_handling_; }

Review comment:
       Similiarly, we should hardcode this to `COMPUTED_NO_PREALLOCATE` and hide this detail from the user.  I don't think the other options make sense if we are doing `NO_PREALLOCATE`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org