You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/04 12:37:23 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #12590: ARROW-15639 [C++][Python] UDF Scalar Function Implementation

lidavidm commented on code in PR #12590:
URL: https://github.com/apache/arrow/pull/12590#discussion_r841667889


##########
cpp/examples/arrow/udf_example.cc:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>  // ARROW-15263
+#include <arrow/util/async_generator.h>
+#include <arrow/util/future.h>
+#include <arrow/util/make_unique.h>
+#include <arrow/util/vector.h>

Review Comment:
   Do we actually need all these headers? They don't seem used.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid("Function Arity and Input data shape doesn't match, expected ",
+                           arity.num_args, ", got ", batch.values.size());
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* options) {
+  if (function == NULL) {
+    return Status::ExecutionError("python function cannot be null");

Review Comment:
   Status::Invalid perhaps?



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +202,76 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An interface for defining input-types for streaming execution engine
+    applications. 
+    """
+
+    def __init__(self):
+        raise TypeError("Cannot use constructor to initialize InputType")
+
+    cdef void init(self, const CInputType &input_type):
+        self.input_type = input_type
+
+    @staticmethod
+    def scalar(data_type):
+        """
+        create a scalar input type of the given data type

Review Comment:
   ```suggestion
           Create a scalar input type of the given data type.
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")
+
+        if func_doc["description"] and isinstance(func_doc["description"], str):
+            f_doc.description = func_doc["description"].encode()
+        else:
+            raise ValueError("key `description` cannot be None")
+
+        if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list):
+            for arg_name in func_doc["arg_names"]:
+                if isinstance(arg_name, str):
+                    c_arg_names.push_back(arg_name.encode())
+                else:
+                    raise ValueError(
+                        "key `arg_names` must be a list of strings")
+            f_doc.arg_names = c_arg_names
+        else:
+            raise ValueError("key `arg_names` cannot be None")
+
+        # UDFOptions integration:
+        # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+        f_doc.options_class = tobytes("None")
+
+        c_options_required = False
+        f_doc.options_required = c_options_required
+
+        return f_doc
+    else:
+        raise ValueError(f"func_doc must be a dictionary")
+
+
+def register_function(func_name, num_args, function_doc, in_types,
+                      out_type, callback):
+    """
+    Register a user-defined-function
+
+    Parameters
+    ----------
+
+    func_name : str
+        function name 
+    num_args : int
+       number of arguments in the function
+    function_doc : dict
+        a dictionary object with keys 
+        ("summary", 
+        "description", 
+        "arg_names"
+        )
+    in_types : List[InputType]
+        list of InputType objects which defines the input 
+        types for the function
+    out_type : DataType
+        output type of the function
+    callback : callable
+        user defined function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output is a datum object which can be
+        an Array or a ChunkedArray or a Table or a RecordBatch.
+
+    Example
+    -------
+
+    >>> from pyarrow import compute as pc
+    >>> from pyarrow.compute import register_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> func_doc["arg_names"] = ["x"]
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> arity = 1
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, arity, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_callback
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+        object obj
+
+    if func_name and isinstance(func_name, str):
+        c_func_name = tobytes(func_name)
+    else:
+        raise ValueError("func_name should be str")
+
+    if num_args and isinstance(num_args, int):
+        assert num_args > 0
+        if num_args == 0:
+            c_arity = CArity.Nullary()
+        elif num_args == 1:
+            c_arity = CArity.Unary()
+        elif num_args == 2:
+            c_arity = CArity.Binary()
+        elif num_args == 3:
+            c_arity = CArity.Ternary()
+        elif num_args > 3:
+            c_arity = CArity.VarArgs(num_args)

Review Comment:
   This isn't right, the equivalent C++ would be `Arity(num_args, /*is_varargs=*/false)`



##########
cpp/src/arrow/compute/function.h:
##########
@@ -205,7 +205,7 @@ class ARROW_EXPORT Function {
   const Arity& arity() const { return arity_; }
 
   /// \brief Return the function documentation
-  const FunctionDoc& doc() const { return *doc_; }
+  const FunctionDoc doc() const { return doc_; }

Review Comment:
   Returning by const reference is still OK, IMO, and will avoid a copy.



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -2328,6 +2370,8 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil:
         const shared_ptr[CTable]& table() const
         const shared_ptr[CScalar]& scalar() const
 
+        CArrayData* mutable_array() const

Review Comment:
   Do we use this?



##########
cpp/src/arrow/python/udf.h:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+// Exposing the UDFOptions: https://issues.apache.org/jira/browse/ARROW-16041
+class ARROW_PYTHON_EXPORT UdfOptions {
+ public:
+  UdfOptions(const compute::Function::Kind kind, const compute::Arity arity,
+             const compute::FunctionDoc func_doc,
+             const std::vector<compute::InputType> in_types,
+             const compute::OutputType out_type)
+      : kind_(kind),
+        arity_(arity),
+        func_doc_(func_doc),
+        in_types_(in_types),

Review Comment:
   move the vector?



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid("Function Arity and Input data shape doesn't match, expected ",
+                           arity.num_args, ", got ", batch.values.size());
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* options) {
+  if (function == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), arity, doc);
+
+  // lambda function
+  auto exec = [this, arity](compute::KernelContext* ctx, const compute::ExecBatch& batch,
+                            Datum* out) -> Status {
+    PyAcquireGIL lock;
+    RETURN_NOT_OK(VerifyArityAndInput(arity, batch));
+    if (batch[0].is_array()) {  // checke 0-th element to select array callable
+      RETURN_NOT_OK(ExecFunctionArray(batch, function_.obj(), arity.num_args, out));
+    } else if (batch[0].is_scalar()) {  // check 0-th element to select scalar callable
+      RETURN_NOT_OK(ExecFunctionScalar(batch, function_.obj(), arity.num_args, out));
+    } else {
+      return Status::Invalid("Unexpected input type, scalar or array type expected.");
+    }

Review Comment:
   It seems we need to capture `input_types` and check/validate the type of each argument in accordance with input types, instead of hardcoding a switch based solely on the first argument. Either that, or we shouldn't allow Python control over the input shapes and just rely on dynamic typing instead.



##########
cpp/src/arrow/python/udf.h:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+// Exposing the UDFOptions: https://issues.apache.org/jira/browse/ARROW-16041
+class ARROW_PYTHON_EXPORT UdfOptions {
+ public:
+  UdfOptions(const compute::Function::Kind kind, const compute::Arity arity,
+             const compute::FunctionDoc func_doc,
+             const std::vector<compute::InputType> in_types,
+             const compute::OutputType out_type)
+      : kind_(kind),
+        arity_(arity),
+        func_doc_(func_doc),
+        in_types_(in_types),
+        out_type_(out_type) {}
+
+  compute::Function::Kind kind() { return kind_; }
+
+  const compute::Arity& arity() const { return arity_; }
+
+  const compute::FunctionDoc doc() const { return func_doc_; }
+
+  const std::vector<compute::InputType>& input_types() const { return in_types_; }
+
+  const compute::OutputType& output_type() const { return out_type_; }
+
+ private:
+  compute::Function::Kind kind_;
+  compute::Arity arity_;
+  const compute::FunctionDoc func_doc_;
+  std::vector<compute::InputType> in_types_;
+  compute::OutputType out_type_;
+};
+
+class ARROW_PYTHON_EXPORT ScalarUdfOptions : public UdfOptions {
+ public:
+  ScalarUdfOptions(const std::string func_name, const compute::Arity arity,
+                   const compute::FunctionDoc func_doc,
+                   const std::vector<compute::InputType> in_types,
+                   const compute::OutputType out_type)
+      : UdfOptions(compute::Function::SCALAR, arity, func_doc, in_types, out_type),

Review Comment:
   move the vector?



##########
cpp/src/arrow/python/udf.h:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+// Exposing the UDFOptions: https://issues.apache.org/jira/browse/ARROW-16041
+class ARROW_PYTHON_EXPORT UdfOptions {
+ public:
+  UdfOptions(const compute::Function::Kind kind, const compute::Arity arity,
+             const compute::FunctionDoc func_doc,
+             const std::vector<compute::InputType> in_types,
+             const compute::OutputType out_type)
+      : kind_(kind),
+        arity_(arity),
+        func_doc_(func_doc),
+        in_types_(in_types),
+        out_type_(out_type) {}
+
+  compute::Function::Kind kind() { return kind_; }
+
+  const compute::Arity& arity() const { return arity_; }
+
+  const compute::FunctionDoc doc() const { return func_doc_; }
+
+  const std::vector<compute::InputType>& input_types() const { return in_types_; }
+
+  const compute::OutputType& output_type() const { return out_type_; }
+
+ private:
+  compute::Function::Kind kind_;
+  compute::Arity arity_;
+  const compute::FunctionDoc func_doc_;
+  std::vector<compute::InputType> in_types_;
+  compute::OutputType out_type_;
+};
+
+class ARROW_PYTHON_EXPORT ScalarUdfOptions : public UdfOptions {
+ public:
+  ScalarUdfOptions(const std::string func_name, const compute::Arity arity,
+                   const compute::FunctionDoc func_doc,
+                   const std::vector<compute::InputType> in_types,
+                   const compute::OutputType out_type)
+      : UdfOptions(compute::Function::SCALAR, arity, func_doc, in_types, out_type),
+        func_name_(func_name) {}
+
+  const std::string& name() const { return func_name_; }
+
+ private:
+  std::string func_name_;
+};
+
+class ARROW_PYTHON_EXPORT UdfBuilder {
+ public:
+  UdfBuilder() {}
+};
+
+class ARROW_PYTHON_EXPORT ScalarUdfBuilder : public UdfBuilder {
+ public:
+  ScalarUdfBuilder() : UdfBuilder() {}
+
+  Status MakeFunction(PyObject* function, ScalarUdfOptions* options = NULLPTR);

Review Comment:
   Why default to NULLPTR, it seems above we don't check the value before use so this can't be null.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid("Function Arity and Input data shape doesn't match, expected ",
+                           arity.num_args, ", got ", batch.values.size());
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* options) {
+  if (function == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), arity, doc);
+
+  // lambda function

Review Comment:
   nit: are these comments useful?



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")
+
+        if func_doc["description"] and isinstance(func_doc["description"], str):
+            f_doc.description = func_doc["description"].encode()
+        else:
+            raise ValueError("key `description` cannot be None")
+
+        if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list):
+            for arg_name in func_doc["arg_names"]:
+                if isinstance(arg_name, str):
+                    c_arg_names.push_back(arg_name.encode())
+                else:
+                    raise ValueError(
+                        "key `arg_names` must be a list of strings")
+            f_doc.arg_names = c_arg_names
+        else:
+            raise ValueError("key `arg_names` cannot be None")
+
+        # UDFOptions integration:
+        # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+        f_doc.options_class = tobytes("None")
+
+        c_options_required = False
+        f_doc.options_required = c_options_required
+
+        return f_doc
+    else:
+        raise ValueError(f"func_doc must be a dictionary")
+
+
+def register_function(func_name, num_args, function_doc, in_types,
+                      out_type, callback):
+    """
+    Register a user-defined-function

Review Comment:
   ```suggestion
       Register a user-defined-function.
   ```



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid("Function Arity and Input data shape doesn't match, expected ",
+                           arity.num_args, ", got ", batch.values.size());
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* options) {
+  if (function == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), arity, doc);
+
+  // lambda function
+  auto exec = [this, arity](compute::KernelContext* ctx, const compute::ExecBatch& batch,

Review Comment:
   Is it safe to capture `this`?



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +202,76 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object

Review Comment:
   ```suggestion
       Wrap a C++ InputType in an InputType object.
   ```



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])

Review Comment:
   It would be great if we could infer this from the function definition itself, incidentally.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid("Function Arity and Input data shape doesn't match, expected ",
+                           arity.num_args, ", got ", batch.values.size());
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* options) {
+  if (function == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), arity, doc);
+
+  // lambda function
+  auto exec = [this, arity](compute::KernelContext* ctx, const compute::ExecBatch& batch,
+                            Datum* out) -> Status {
+    PyAcquireGIL lock;
+    RETURN_NOT_OK(VerifyArityAndInput(arity, batch));
+    if (batch[0].is_array()) {  // checke 0-th element to select array callable
+      RETURN_NOT_OK(ExecFunctionArray(batch, function_.obj(), arity.num_args, out));
+    } else if (batch[0].is_scalar()) {  // check 0-th element to select scalar callable
+      RETURN_NOT_OK(ExecFunctionScalar(batch, function_.obj(), arity.num_args, out));
+    } else {
+      return Status::Invalid("Unexpected input type, scalar or array type expected.");
+    }

Review Comment:
   Ah, and this is about the output type? Shouldn't we look at `options->output_type` to determine that?



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(a, x, b, y, c):
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+@pytest.fixture
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())

Review Comment:
   etc.



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +202,76 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An interface for defining input-types for streaming execution engine
+    applications. 
+    """
+
+    def __init__(self):
+        raise TypeError("Cannot use constructor to initialize InputType")
+
+    cdef void init(self, const CInputType &input_type):
+        self.input_type = input_type

Review Comment:
   For this sort of thing there's a specific error we usually raise: https://github.com/apache/arrow/blob/4c3edd27a9a2cec90a186250eebdda37b30238fd/python/pyarrow/_compute.pyx#L244-L250



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):

Review Comment:
   Similarly I don't think we need this check here, we can just assume, it's not super idiomatic to have these checks. If you really want it, declare it in the signature above as Cython will insert an idiomatic check (`cdef CFunctionDoc _make_function_doc(dict func_doc)`)



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")
+
+        if func_doc["description"] and isinstance(func_doc["description"], str):
+            f_doc.description = func_doc["description"].encode()
+        else:
+            raise ValueError("key `description` cannot be None")
+
+        if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list):
+            for arg_name in func_doc["arg_names"]:
+                if isinstance(arg_name, str):
+                    c_arg_names.push_back(arg_name.encode())
+                else:
+                    raise ValueError(
+                        "key `arg_names` must be a list of strings")
+            f_doc.arg_names = c_arg_names
+        else:
+            raise ValueError("key `arg_names` cannot be None")
+
+        # UDFOptions integration:
+        # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+        f_doc.options_class = tobytes("None")
+
+        c_options_required = False
+        f_doc.options_required = c_options_required
+
+        return f_doc
+    else:
+        raise ValueError(f"func_doc must be a dictionary")
+
+
+def register_function(func_name, num_args, function_doc, in_types,
+                      out_type, callback):
+    """
+    Register a user-defined-function
+
+    Parameters
+    ----------
+
+    func_name : str
+        function name 
+    num_args : int
+       number of arguments in the function
+    function_doc : dict
+        a dictionary object with keys 
+        ("summary", 
+        "description", 
+        "arg_names"
+        )
+    in_types : List[InputType]
+        list of InputType objects which defines the input 
+        types for the function
+    out_type : DataType
+        output type of the function
+    callback : callable
+        user defined function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output is a datum object which can be
+        an Array or a ChunkedArray or a Table or a RecordBatch.
+
+    Example
+    -------
+
+    >>> from pyarrow import compute as pc
+    >>> from pyarrow.compute import register_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> func_doc["arg_names"] = ["x"]
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> arity = 1
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, arity, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_callback
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+        object obj
+
+    if func_name and isinstance(func_name, str):
+        c_func_name = tobytes(func_name)
+    else:
+        raise ValueError("func_name should be str")

Review Comment:
   Ditto here, just `c_func_name = tobytes(func_name)` and let the exceptions propagate. 



##########
cpp/src/arrow/python/udf.h:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+// Exposing the UDFOptions: https://issues.apache.org/jira/browse/ARROW-16041
+class ARROW_PYTHON_EXPORT UdfOptions {
+ public:
+  UdfOptions(const compute::Function::Kind kind, const compute::Arity arity,
+             const compute::FunctionDoc func_doc,
+             const std::vector<compute::InputType> in_types,
+             const compute::OutputType out_type)
+      : kind_(kind),
+        arity_(arity),
+        func_doc_(func_doc),
+        in_types_(in_types),
+        out_type_(out_type) {}
+
+  compute::Function::Kind kind() { return kind_; }
+
+  const compute::Arity& arity() const { return arity_; }
+
+  const compute::FunctionDoc doc() const { return func_doc_; }
+
+  const std::vector<compute::InputType>& input_types() const { return in_types_; }
+
+  const compute::OutputType& output_type() const { return out_type_; }
+
+ private:
+  compute::Function::Kind kind_;
+  compute::Arity arity_;
+  const compute::FunctionDoc func_doc_;
+  std::vector<compute::InputType> in_types_;
+  compute::OutputType out_type_;
+};
+
+class ARROW_PYTHON_EXPORT ScalarUdfOptions : public UdfOptions {
+ public:
+  ScalarUdfOptions(const std::string func_name, const compute::Arity arity,
+                   const compute::FunctionDoc func_doc,
+                   const std::vector<compute::InputType> in_types,
+                   const compute::OutputType out_type)
+      : UdfOptions(compute::Function::SCALAR, arity, func_doc, in_types, out_type),
+        func_name_(func_name) {}

Review Comment:
   Wouldn't function name belong on the base class?



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")

Review Comment:
   Generally you would just do
   
   ```
   f_doc.summary = tobytes(func_doc["summary"])
   ```
   and let the KeyError or TypeError propagate.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")
+
+        if func_doc["description"] and isinstance(func_doc["description"], str):
+            f_doc.description = func_doc["description"].encode()
+        else:
+            raise ValueError("key `description` cannot be None")
+
+        if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list):
+            for arg_name in func_doc["arg_names"]:
+                if isinstance(arg_name, str):
+                    c_arg_names.push_back(arg_name.encode())
+                else:
+                    raise ValueError(
+                        "key `arg_names` must be a list of strings")
+            f_doc.arg_names = c_arg_names
+        else:
+            raise ValueError("key `arg_names` cannot be None")
+
+        # UDFOptions integration:
+        # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+        f_doc.options_class = tobytes("None")
+
+        c_options_required = False
+        f_doc.options_required = c_options_required
+
+        return f_doc
+    else:
+        raise ValueError(f"func_doc must be a dictionary")
+
+
+def register_function(func_name, num_args, function_doc, in_types,
+                      out_type, callback):
+    """
+    Register a user-defined-function
+
+    Parameters
+    ----------
+
+    func_name : str
+        function name 
+    num_args : int
+       number of arguments in the function
+    function_doc : dict
+        a dictionary object with keys 
+        ("summary", 
+        "description", 
+        "arg_names"
+        )
+    in_types : List[InputType]
+        list of InputType objects which defines the input 
+        types for the function
+    out_type : DataType
+        output type of the function
+    callback : callable
+        user defined function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output is a datum object which can be
+        an Array or a ChunkedArray or a Table or a RecordBatch.

Review Comment:
   "The output should be an Array, Scalar, ChunkedArray, Table, or RecordBatch based on the out_type."



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")
+
+        if func_doc["description"] and isinstance(func_doc["description"], str):
+            f_doc.description = func_doc["description"].encode()
+        else:
+            raise ValueError("key `description` cannot be None")
+
+        if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list):
+            for arg_name in func_doc["arg_names"]:
+                if isinstance(arg_name, str):
+                    c_arg_names.push_back(arg_name.encode())
+                else:
+                    raise ValueError(
+                        "key `arg_names` must be a list of strings")
+            f_doc.arg_names = c_arg_names
+        else:
+            raise ValueError("key `arg_names` cannot be None")
+
+        # UDFOptions integration:
+        # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+        f_doc.options_class = tobytes("None")
+
+        c_options_required = False
+        f_doc.options_required = c_options_required
+
+        return f_doc
+    else:
+        raise ValueError(f"func_doc must be a dictionary")
+
+
+def register_function(func_name, num_args, function_doc, in_types,
+                      out_type, callback):
+    """
+    Register a user-defined-function
+
+    Parameters
+    ----------
+
+    func_name : str
+        function name 
+    num_args : int
+       number of arguments in the function
+    function_doc : dict
+        a dictionary object with keys 
+        ("summary", 
+        "description", 
+        "arg_names"
+        )

Review Comment:
   ```suggestion
           a dictionary object with keys "summary" (str), 
           "description" (str), and "arg_names" (list of str).
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")
+
+        if func_doc["description"] and isinstance(func_doc["description"], str):
+            f_doc.description = func_doc["description"].encode()
+        else:
+            raise ValueError("key `description` cannot be None")
+
+        if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list):
+            for arg_name in func_doc["arg_names"]:
+                if isinstance(arg_name, str):
+                    c_arg_names.push_back(arg_name.encode())
+                else:
+                    raise ValueError(
+                        "key `arg_names` must be a list of strings")
+            f_doc.arg_names = c_arg_names
+        else:
+            raise ValueError("key `arg_names` cannot be None")
+
+        # UDFOptions integration:
+        # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+        f_doc.options_class = tobytes("None")
+
+        c_options_required = False
+        f_doc.options_required = c_options_required
+
+        return f_doc
+    else:
+        raise ValueError(f"func_doc must be a dictionary")
+
+
+def register_function(func_name, num_args, function_doc, in_types,
+                      out_type, callback):
+    """
+    Register a user-defined-function
+
+    Parameters
+    ----------
+
+    func_name : str
+        function name 
+    num_args : int
+       number of arguments in the function
+    function_doc : dict
+        a dictionary object with keys 
+        ("summary", 
+        "description", 
+        "arg_names"
+        )
+    in_types : List[InputType]
+        list of InputType objects which defines the input 
+        types for the function
+    out_type : DataType
+        output type of the function
+    callback : callable
+        user defined function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output is a datum object which can be
+        an Array or a ChunkedArray or a Table or a RecordBatch.
+
+    Example
+    -------
+
+    >>> from pyarrow import compute as pc
+    >>> from pyarrow.compute import register_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> func_doc["arg_names"] = ["x"]
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> arity = 1
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, arity, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_callback
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+        object obj
+
+    if func_name and isinstance(func_name, str):
+        c_func_name = tobytes(func_name)
+    else:
+        raise ValueError("func_name should be str")
+
+    if num_args and isinstance(num_args, int):
+        assert num_args > 0

Review Comment:
   Don't assert, raise an actual error. Assertions can be removed at runtime (and the exception class won't be idiomatic anyways)



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")
+
+        if func_doc["description"] and isinstance(func_doc["description"], str):
+            f_doc.description = func_doc["description"].encode()
+        else:
+            raise ValueError("key `description` cannot be None")
+
+        if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list):
+            for arg_name in func_doc["arg_names"]:
+                if isinstance(arg_name, str):
+                    c_arg_names.push_back(arg_name.encode())
+                else:
+                    raise ValueError(
+                        "key `arg_names` must be a list of strings")
+            f_doc.arg_names = c_arg_names
+        else:
+            raise ValueError("key `arg_names` cannot be None")
+
+        # UDFOptions integration:
+        # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+        f_doc.options_class = tobytes("None")
+
+        c_options_required = False
+        f_doc.options_required = c_options_required
+
+        return f_doc
+    else:
+        raise ValueError(f"func_doc must be a dictionary")
+
+
+def register_function(func_name, num_args, function_doc, in_types,
+                      out_type, callback):
+    """
+    Register a user-defined-function
+
+    Parameters
+    ----------
+
+    func_name : str
+        function name 
+    num_args : int
+       number of arguments in the function
+    function_doc : dict
+        a dictionary object with keys 
+        ("summary", 
+        "description", 
+        "arg_names"
+        )
+    in_types : List[InputType]
+        list of InputType objects which defines the input 
+        types for the function
+    out_type : DataType
+        output type of the function
+    callback : callable
+        user defined function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output is a datum object which can be
+        an Array or a ChunkedArray or a Table or a RecordBatch.
+
+    Example
+    -------
+
+    >>> from pyarrow import compute as pc
+    >>> from pyarrow.compute import register_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> func_doc["arg_names"] = ["x"]
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> arity = 1
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, arity, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_callback
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+        object obj
+
+    if func_name and isinstance(func_name, str):
+        c_func_name = tobytes(func_name)
+    else:
+        raise ValueError("func_name should be str")
+
+    if num_args and isinstance(num_args, int):

Review Comment:
   Same here, don't check the type (I'll stop commenting on this from now on)



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -1793,6 +1795,21 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil:
         CExecContext()
         CExecContext(CMemoryPool* pool)
 
+    cdef cppclass CExecBatch" arrow::compute::ExecBatch":
+        CExecBatch(const CRecordBatch& batch)
+
+        @staticmethod
+        CResult[CExecBatch] Make(vector[CDatum] values)
+        CResult[shared_ptr[CRecordBatch]] ToRecordBatch(
+            shared_ptr[CSchema] schema, CMemoryPool* pool) const
+
+        # inline const CDatum& operator[](i) const
+        vector[CDatum] values
+        c_string ToString() const
+
+    cdef cppclass CKernelContext" arrow::compute::KernelContext":
+        CKernelContext(CExecContext* exec_ctx)
+

Review Comment:
   Are these used?



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")
+
+        if func_doc["description"] and isinstance(func_doc["description"], str):
+            f_doc.description = func_doc["description"].encode()
+        else:
+            raise ValueError("key `description` cannot be None")
+
+        if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list):
+            for arg_name in func_doc["arg_names"]:
+                if isinstance(arg_name, str):
+                    c_arg_names.push_back(arg_name.encode())
+                else:
+                    raise ValueError(
+                        "key `arg_names` must be a list of strings")
+            f_doc.arg_names = c_arg_names
+        else:
+            raise ValueError("key `arg_names` cannot be None")
+
+        # UDFOptions integration:
+        # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+        f_doc.options_class = tobytes("None")
+
+        c_options_required = False
+        f_doc.options_required = c_options_required
+
+        return f_doc
+    else:
+        raise ValueError(f"func_doc must be a dictionary")
+
+
+def register_function(func_name, num_args, function_doc, in_types,
+                      out_type, callback):
+    """
+    Register a user-defined-function
+
+    Parameters
+    ----------
+
+    func_name : str
+        function name 
+    num_args : int
+       number of arguments in the function
+    function_doc : dict
+        a dictionary object with keys 
+        ("summary", 
+        "description", 
+        "arg_names"
+        )
+    in_types : List[InputType]
+        list of InputType objects which defines the input 
+        types for the function
+    out_type : DataType
+        output type of the function
+    callback : callable
+        user defined function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output is a datum object which can be
+        an Array or a ChunkedArray or a Table or a RecordBatch.
+
+    Example
+    -------
+
+    >>> from pyarrow import compute as pc
+    >>> from pyarrow.compute import register_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> func_doc["arg_names"] = ["x"]
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> arity = 1
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, arity, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_callback
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+        object obj
+
+    if func_name and isinstance(func_name, str):
+        c_func_name = tobytes(func_name)
+    else:
+        raise ValueError("func_name should be str")
+
+    if num_args and isinstance(num_args, int):
+        assert num_args > 0
+        if num_args == 0:
+            c_arity = CArity.Nullary()
+        elif num_args == 1:
+            c_arity = CArity.Unary()
+        elif num_args == 2:
+            c_arity = CArity.Binary()
+        elif num_args == 3:
+            c_arity = CArity.Ternary()
+        elif num_args > 3:
+            c_arity = CArity.VarArgs(num_args)
+    else:
+        raise ValueError("arity must be an instance of Arity")

Review Comment:
   In any case, even if we were to check, these would be TypeError, and the messages reference the wrong types.



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +202,76 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An interface for defining input-types for streaming execution engine
+    applications. 
+    """
+
+    def __init__(self):
+        raise TypeError("Cannot use constructor to initialize InputType")
+
+    cdef void init(self, const CInputType &input_type):
+        self.input_type = input_type
+
+    @staticmethod
+    def scalar(data_type):
+        """
+        create a scalar input type of the given data type
+
+        Parameter
+        ---------
+        data_type : DataType
+
+        Examples
+        --------
+
+        >>> import pyarrow as pa
+        >>> from pyarrow.compute import InputType
+        >>> in_type = InputType.scalar(pa.int32())
+        <pyarrow._compute.InputType object at 0x1029fdcb0>
+        """
+        cdef:
+            shared_ptr[CDataType] c_data_type
+            CInputType c_input_type
+        c_data_type = pyarrow_unwrap_data_type(data_type)
+        c_input_type = CInputType.Scalar(c_data_type)
+        return wrap_input_type(c_input_type)
+
+    @staticmethod
+    def array(data_type):
+        """
+        create an array input type of the given data type

Review Comment:
   ```suggestion
           Create an array input type of the given data type.
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")
+
+        if func_doc["description"] and isinstance(func_doc["description"], str):
+            f_doc.description = func_doc["description"].encode()
+        else:
+            raise ValueError("key `description` cannot be None")
+
+        if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list):
+            for arg_name in func_doc["arg_names"]:
+                if isinstance(arg_name, str):
+                    c_arg_names.push_back(arg_name.encode())
+                else:
+                    raise ValueError(
+                        "key `arg_names` must be a list of strings")
+            f_doc.arg_names = c_arg_names
+        else:
+            raise ValueError("key `arg_names` cannot be None")
+
+        # UDFOptions integration:
+        # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+        f_doc.options_class = tobytes("None")
+
+        c_options_required = False
+        f_doc.options_required = c_options_required
+
+        return f_doc
+    else:
+        raise ValueError(f"func_doc must be a dictionary")
+
+
+def register_function(func_name, num_args, function_doc, in_types,
+                      out_type, callback):
+    """
+    Register a user-defined-function
+
+    Parameters
+    ----------
+
+    func_name : str
+        function name 
+    num_args : int
+       number of arguments in the function
+    function_doc : dict
+        a dictionary object with keys 
+        ("summary", 
+        "description", 
+        "arg_names"
+        )
+    in_types : List[InputType]
+        list of InputType objects which defines the input 
+        types for the function
+    out_type : DataType
+        output type of the function
+    callback : callable
+        user defined function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output is a datum object which can be
+        an Array or a ChunkedArray or a Table or a RecordBatch.
+
+    Example
+    -------
+
+    >>> from pyarrow import compute as pc
+    >>> from pyarrow.compute import register_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> func_doc["arg_names"] = ["x"]
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> arity = 1
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, arity, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_callback
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+        object obj
+
+    if func_name and isinstance(func_name, str):
+        c_func_name = tobytes(func_name)
+    else:
+        raise ValueError("func_name should be str")
+
+    if num_args and isinstance(num_args, int):
+        assert num_args > 0
+        if num_args == 0:
+            c_arity = CArity.Nullary()
+        elif num_args == 1:
+            c_arity = CArity.Unary()
+        elif num_args == 2:
+            c_arity = CArity.Binary()
+        elif num_args == 3:
+            c_arity = CArity.Ternary()
+        elif num_args > 3:
+            c_arity = CArity.VarArgs(num_args)
+    else:
+        raise ValueError("arity must be an instance of Arity")
+
+    c_func_doc = _make_function_doc(function_doc)
+
+    if in_types and isinstance(in_types, list):
+        for in_type in in_types:
+            in_tmp = (<InputType> in_type).input_type
+            c_in_types.push_back(in_tmp)
+    else:
+        raise ValueError("input types must be of type InputType")
+
+    if out_type:
+        c_type = pyarrow_unwrap_data_type(out_type)
+    else:
+        raise ValueError("Output value type must be defined")
+
+    if callback and callable(callback):

Review Comment:
   ```suggestion
       if callable(callback):
   ```



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -19,6 +19,8 @@
 
 from pyarrow.includes.common cimport *
 
+from cpython.ref cimport PyObject

Review Comment:
   I _think_ just declaring `object` works, no need to reach for this



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -2819,3 +2863,31 @@ cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:
     int64_t TotalBufferSize(const CChunkedArray& array)
     int64_t TotalBufferSize(const CRecordBatch& record_batch)
     int64_t TotalBufferSize(const CTable& table)
+
+cdef extern from "arrow/compute/kernel.h" namespace "arrow::compute" nogil:
+    cdef enum MemAllocation" arrow::compute::MemAllocation::type":
+        MemAllocation_PREALLOCATE" arrow::compute::MemAllocation::PREALLOCATE"
+        MemAllocation_NO_PREALLOCATE" arrow::compute::MemAllocation::NO_PREALLOCATE"
+
+    cdef enum NullHandling" arrow::compute::NullHandling::type":
+        NullHandling_INTERSECTION" arrow::compute::NullHandling::INTERSECTION"
+        NullHandling_COMPUTED_PREALLOCATE" arrow::compute::NullHandling::COMPUTED_PREALLOCATE"
+        NullHandling_COMPUTED_NO_PREALLOCATE" arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE"
+        NullHandling_OUTPUT_NOT_NULL" arrow::compute::NullHandling::OUTPUT_NOT_NULL"

Review Comment:
   Ditto here.



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(a, x, b, y, c):

Review Comment:
   This isn't varargs, though?



##########
python/pyarrow/_compute.pyx:
##########
@@ -2182,3 +2255,175 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(func_doc):
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+    if func_doc and isinstance(func_doc, dict):
+        if func_doc["summary"] and isinstance(func_doc["summary"], str):
+            f_doc.summary = func_doc["summary"].encode()
+        else:
+            raise ValueError("key `summary` cannot be None")
+
+        if func_doc["description"] and isinstance(func_doc["description"], str):
+            f_doc.description = func_doc["description"].encode()
+        else:
+            raise ValueError("key `description` cannot be None")
+
+        if func_doc["arg_names"] and isinstance(func_doc["arg_names"], list):
+            for arg_name in func_doc["arg_names"]:
+                if isinstance(arg_name, str):
+                    c_arg_names.push_back(arg_name.encode())
+                else:
+                    raise ValueError(
+                        "key `arg_names` must be a list of strings")
+            f_doc.arg_names = c_arg_names
+        else:
+            raise ValueError("key `arg_names` cannot be None")
+
+        # UDFOptions integration:
+        # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+        f_doc.options_class = tobytes("None")
+
+        c_options_required = False
+        f_doc.options_required = c_options_required
+
+        return f_doc
+    else:
+        raise ValueError(f"func_doc must be a dictionary")
+
+
+def register_function(func_name, num_args, function_doc, in_types,
+                      out_type, callback):
+    """
+    Register a user-defined-function
+
+    Parameters
+    ----------
+
+    func_name : str
+        function name 
+    num_args : int
+       number of arguments in the function
+    function_doc : dict
+        a dictionary object with keys 
+        ("summary", 
+        "description", 
+        "arg_names"
+        )
+    in_types : List[InputType]
+        list of InputType objects which defines the input 
+        types for the function
+    out_type : DataType
+        output type of the function
+    callback : callable
+        user defined function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output is a datum object which can be
+        an Array or a ChunkedArray or a Table or a RecordBatch.
+
+    Example
+    -------
+
+    >>> from pyarrow import compute as pc
+    >>> from pyarrow.compute import register_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> func_doc["arg_names"] = ["x"]
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> arity = 1
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, arity, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_callback
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+        object obj
+
+    if func_name and isinstance(func_name, str):
+        c_func_name = tobytes(func_name)
+    else:
+        raise ValueError("func_name should be str")
+
+    if num_args and isinstance(num_args, int):
+        assert num_args > 0
+        if num_args == 0:
+            c_arity = CArity.Nullary()
+        elif num_args == 1:
+            c_arity = CArity.Unary()
+        elif num_args == 2:
+            c_arity = CArity.Binary()
+        elif num_args == 3:
+            c_arity = CArity.Ternary()
+        elif num_args > 3:
+            c_arity = CArity.VarArgs(num_args)
+    else:
+        raise ValueError("arity must be an instance of Arity")
+
+    c_func_doc = _make_function_doc(function_doc)
+
+    if in_types and isinstance(in_types, list):
+        for in_type in in_types:
+            in_tmp = (<InputType> in_type).input_type
+            c_in_types.push_back(in_tmp)
+    else:
+        raise ValueError("input types must be of type InputType")
+
+    if out_type:
+        c_type = pyarrow_unwrap_data_type(out_type)
+    else:
+        raise ValueError("Output value type must be defined")
+
+    if callback and callable(callback):
+        c_callback = <PyObject*>callback
+    else:
+        raise ValueError("callback must be a callable")
+
+    c_out_type = new COutputType(c_type)
+    # Note: The VectorUDF, TableUDF and AggregatorUDFs will be defined
+    # when they are implemented. Only ScalarUDFBuilder is supported at the
+    # moment.
+    c_options = new CScalarUdfOptions(c_func_name, c_arity, c_func_doc,
+                                      c_in_types, deref(c_out_type))
+    c_sc_builder = new CScalarUdfBuilder()
+    st = c_sc_builder.MakeFunction(c_callback, c_options)
+    if not st.ok():

Review Comment:
   We have `check_status` for this



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(a, x, b, y, c):
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+@pytest.fixture
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ]

Review Comment:
   ```suggestion
           ],
   ```



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(a, x, b, y, c):
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+@pytest.fixture
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ]
+    ]
+
+
+@pytest.fixture
+def function_output_types():
+    return [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64()
+    ]
+
+
+@pytest.fixture
+def function_names():
+    return [
+        # scalar data function names
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+        # array data function names
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+
+@pytest.fixture
+def function_arities():
+    return [
+        1,
+        2,
+        3,
+        5,
+    ]
+
+
+@pytest.fixture
+def function_docs():
+    return [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+
+@pytest.fixture
+def functions():
+    return [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+
+@pytest.fixture
+def function_inputs():
+    return [
+        # scalar input data
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        # array input data
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+
+@pytest.fixture
+def expected_outputs():
+    return [
+        # scalar output data
+        pa.scalar(11, pa.int64()),  # 10 + 1
+        pa.scalar(20, pa.int64()),  # 10 * 2
+        pa.scalar(25, pa.int64()),  # 10 * 2 + 5
+        pa.scalar(85, pa.int64()),  # (2 * 10) + (3 * 20) + 5
+        # array output data
+        pa.array([11, 21], pa.int64()),  # [10 + 1, 20 + 1]
+        pa.array([20, 80], pa.int64()),  # [10 * 2, 20 * 4]
+        pa.array([25, 90], pa.int64()),  # [(10 * 2) + 5, (20 * 4) + 10]
+        # [(2 * 10) + (3 * 20) + 5, (3 * 20) + (7 * 30) + 10]
+        pa.array([85, 280], pa.int64())
+    ]
+
+
+def test_scalar_udf_function_with_scalar_data(function_names,
+                                              function_arities,
+                                              function_input_types,
+                                              function_output_types,
+                                              function_docs,
+                                              functions,
+                                              function_inputs,
+                                              expected_outputs):
+
+    # Note: 2 * -> used to duplicate the list
+    # Because the values are same irrespective of the type i.e scalar or array
+    for name, \
+        arity, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input, \
+        expected_output in zip(function_names,
+                               2 * function_arities,
+                               function_input_types,
+                               2 * function_output_types,
+                               2 * function_docs,
+                               2 * functions,
+                               function_inputs,
+                               expected_outputs):
+
+        register_function(name, arity, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_udf_input():
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    arity = -1
+    func_name = "py_scalar_add_func"
+    in_types = [InputType.scalar(pa.int64())]
+    out_type = pa.int64()
+    doc = get_function_doc("scalar add function", "scalar add function",
+                           ["scalar_value"])
+    try:
+        register_function(func_name, arity, doc, in_types,
+                          out_type, unary_scalar_function)
+    except Exception as ex:
+        assert isinstance(ex, AssertionError)

Review Comment:
   use `pytest.raises`, and the proper exception for most of these is TypeError, and Python should raise it automatically as mentioned above



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(a, x, b, y, c):
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+@pytest.fixture
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())

Review Comment:
   ```suggestion
               InputType.array(pa.int64()),
   ```



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(a, x, b, y, c):
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+@pytest.fixture
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ]
+    ]
+
+
+@pytest.fixture
+def function_output_types():
+    return [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64()

Review Comment:
   ```suggestion
           pa.int64(),
   ```



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(a, x, b, y, c):
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+@pytest.fixture
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ]
+    ]
+
+
+@pytest.fixture
+def function_output_types():
+    return [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64()
+    ]
+
+
+@pytest.fixture
+def function_names():
+    return [
+        # scalar data function names
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+        # array data function names
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+
+@pytest.fixture
+def function_arities():
+    return [
+        1,
+        2,
+        3,
+        5,
+    ]
+
+
+@pytest.fixture
+def function_docs():
+    return [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+
+@pytest.fixture
+def functions():
+    return [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+
+@pytest.fixture
+def function_inputs():
+    return [
+        # scalar input data
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        # array input data
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+
+@pytest.fixture
+def expected_outputs():
+    return [
+        # scalar output data
+        pa.scalar(11, pa.int64()),  # 10 + 1
+        pa.scalar(20, pa.int64()),  # 10 * 2
+        pa.scalar(25, pa.int64()),  # 10 * 2 + 5
+        pa.scalar(85, pa.int64()),  # (2 * 10) + (3 * 20) + 5
+        # array output data
+        pa.array([11, 21], pa.int64()),  # [10 + 1, 20 + 1]
+        pa.array([20, 80], pa.int64()),  # [10 * 2, 20 * 4]
+        pa.array([25, 90], pa.int64()),  # [(10 * 2) + 5, (20 * 4) + 10]
+        # [(2 * 10) + (3 * 20) + 5, (3 * 20) + (7 * 30) + 10]
+        pa.array([85, 280], pa.int64())
+    ]
+
+
+def test_scalar_udf_function_with_scalar_data(function_names,
+                                              function_arities,
+                                              function_input_types,
+                                              function_output_types,
+                                              function_docs,
+                                              functions,
+                                              function_inputs,
+                                              expected_outputs):

Review Comment:
   IMO, rather than a bunch of fixtures, the idiomatic thing would be to just use plain lists and loops



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])

Review Comment:
   Idiomatically I would say most people would just do `_UNARY_DOC = {"summary": ..., ...}`



##########
cpp/src/arrow/python/udf.h:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+// Exposing the UDFOptions: https://issues.apache.org/jira/browse/ARROW-16041
+class ARROW_PYTHON_EXPORT UdfOptions {
+ public:
+  UdfOptions(const compute::Function::Kind kind, const compute::Arity arity,
+             const compute::FunctionDoc func_doc,
+             const std::vector<compute::InputType> in_types,
+             const compute::OutputType out_type)
+      : kind_(kind),
+        arity_(arity),
+        func_doc_(func_doc),
+        in_types_(in_types),

Review Comment:
   And actually, move the FunctionDoc too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org