You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/18 15:50:09 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #12590: ARROW-15639 [C++][Python] UDF Scalar Function Implementation

pitrou commented on code in PR #12590:
URL: https://github.com/apache/arrow/pull/12590#discussion_r852148142


##########
cpp/src/arrow/compute/function.h:
##########
@@ -245,11 +245,11 @@ class ARROW_EXPORT Function {
 
  protected:
   Function(std::string name, Function::Kind kind, const Arity& arity,
-           const FunctionDoc* doc, const FunctionOptions* default_options)
+           const FunctionDoc doc, const FunctionOptions* default_options)

Review Comment:
   No need for `const` on value-passed arguments:
   ```suggestion
              FunctionDoc doc, const FunctionOptions* default_options)
   ```



##########
cpp/src/arrow/compute/function.h:
##########
@@ -245,11 +245,11 @@ class ARROW_EXPORT Function {
 
  protected:
   Function(std::string name, Function::Kind kind, const Arity& arity,
-           const FunctionDoc* doc, const FunctionOptions* default_options)
+           const FunctionDoc doc, const FunctionOptions* default_options)
       : name_(std::move(name)),
         kind_(kind),
         arity_(arity),
-        doc_(doc ? doc : &FunctionDoc::Empty()),
+        doc_(doc),

Review Comment:
   ```suggestion
           doc_(std::move(doc)),
   ```



##########
cpp/src/arrow/compute/function.h:
##########
@@ -280,7 +280,7 @@ class FunctionImpl : public Function {
 
  protected:
   FunctionImpl(std::string name, Function::Kind kind, const Arity& arity,
-               const FunctionDoc* doc, const FunctionOptions* default_options)
+               const FunctionDoc doc, const FunctionOptions* default_options)
       : Function(std::move(name), kind, arity, doc, default_options) {}

Review Comment:
   ```suggestion
                  FunctionDoc doc, const FunctionOptions* default_options)
         : Function(std::move(name), kind, arity, std::move(doc), default_options) {}
   ```



##########
cpp/src/arrow/compute/kernels/scalar_compare.cc:
##########
@@ -241,7 +241,7 @@ struct VarArgsCompareFunction : ScalarFunction {
 
 template <typename Op>
 std::shared_ptr<ScalarFunction> MakeCompareFunction(std::string name,
-                                                    const FunctionDoc* doc) {
+                                                    const FunctionDoc doc) {

Review Comment:
   Same here and below.



##########
cpp/src/arrow/compute/kernels/scalar_string_utf8.cc:
##########
@@ -39,7 +39,7 @@ namespace {
 
 template <template <typename> class Transformer>
 void MakeUnaryStringUTF8TransformKernel(std::string name, FunctionRegistry* registry,
-                                        const FunctionDoc* doc) {
+                                        const FunctionDoc doc) {

Review Comment:
   Same here and below.



##########
cpp/src/arrow/compute/kernels/scalar_temporal_binary.cc:
##########
@@ -326,7 +326,7 @@ struct BinaryTemporalFactory {
 
   template <typename... WithTypes>
   static std::shared_ptr<ScalarFunction> Make(
-      std::string name, OutputType out_type, const FunctionDoc* doc,
+      std::string name, OutputType out_type, const FunctionDoc doc,

Review Comment:
   Same here and below.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,
+                       const compute::OutputType& exp_out_type, Datum* out) {
+  size_t num_args = batch.values.size();
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  // wrap exec_batch objects into Python objects based on the datum type
+  for (size_t arg_id = 0; arg_id < num_args; arg_id++) {
+    switch (batch[arg_id].kind()) {
+      case Datum::SCALAR: {
+        auto c_data = batch[arg_id].scalar();
+        PyObject* data = wrap_scalar(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      case Datum::ARRAY: {
+        auto c_data = batch[arg_id].make_array();
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "User-defined-functions are not supported for the datum kind ",
+            batch[arg_id].kind());
+    }
+  }
+  // call to Python executing the function
+  PyObject* result;
+  auto st = SafeCallIntoPython([&]() -> Status {
+    result = PyObject_CallObject(function, arg_tuple);
+    return CheckPyError();
+  });
+  RETURN_NOT_OK(st);
+  if (result == nullptr) {
+    return Status::ExecutionError("Output is null, but expected an array");

Review Comment:
   `ExecutionError` is for use by Gandiva (and apparently by mistake in Flight SQL).



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,
+                       const compute::OutputType& exp_out_type, Datum* out) {
+  size_t num_args = batch.values.size();
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  // wrap exec_batch objects into Python objects based on the datum type
+  for (size_t arg_id = 0; arg_id < num_args; arg_id++) {
+    switch (batch[arg_id].kind()) {
+      case Datum::SCALAR: {
+        auto c_data = batch[arg_id].scalar();
+        PyObject* data = wrap_scalar(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      case Datum::ARRAY: {
+        auto c_data = batch[arg_id].make_array();
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "User-defined-functions are not supported for the datum kind ",
+            batch[arg_id].kind());
+    }
+  }
+  // call to Python executing the function
+  PyObject* result;
+  auto st = SafeCallIntoPython([&]() -> Status {
+    result = PyObject_CallObject(function, arg_tuple);
+    return CheckPyError();
+  });
+  RETURN_NOT_OK(st);
+  if (result == nullptr) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  // wrapping the output for expected output type

Review Comment:
   You mean "unwrapping"?



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,
+                       const compute::OutputType& exp_out_type, Datum* out) {
+  size_t num_args = batch.values.size();
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  // wrap exec_batch objects into Python objects based on the datum type
+  for (size_t arg_id = 0; arg_id < num_args; arg_id++) {
+    switch (batch[arg_id].kind()) {
+      case Datum::SCALAR: {
+        auto c_data = batch[arg_id].scalar();
+        PyObject* data = wrap_scalar(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      case Datum::ARRAY: {
+        auto c_data = batch[arg_id].make_array();
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "User-defined-functions are not supported for the datum kind ",
+            batch[arg_id].kind());

Review Comment:
   This will probably output the numeric id of the Datum kind, which is not very user-friendly. Can we expose a
   ```c++
   ARROW_EXPORT
   std::string ToString(Datum::Kind kind);
   ```
   
   somewhere in `datum.h`?
   



##########
python/pyarrow/_compute.pyx:
##########
@@ -23,12 +23,16 @@ from cpython.object cimport Py_LT, Py_EQ, Py_GT, Py_LE, Py_NE, Py_GE
 from cython.operator cimport dereference as deref
 
 from collections import namedtuple
+import inspect
+from typing import Dict

Review Comment:
   This import shouldn't be necessary.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined scalar function. 
+
+    A scalar function is a function that executes elementwise
+    operations on arrays or scalars, and therefore whose results
+    generally do not depend on the order of the values in the
+    arguments. Accepts and returns arrays that are all of the
+    same size. These functions roughly correspond to the functions
+    used in SQL expressions.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input label, InputType
+        objects which defines the arguments to the function.
+        The input label is a str that will be used to generate
+        documentation for the function. The number of arguments
+        specified here determines the function arity.

Review Comment:
   ```suggestion
           Dictionary mapping function argument names to
           their respective InputType specifications.
           The argument names will be used to generate
           documentation for the function. The number of
           arguments specified here determines the function
           arity.
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):

Review Comment:
   I think it would be better to make the function the first argument. This way, we can later add some mechanism to infer the other arguments from the function docstring and/or annotations.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined scalar function. 
+
+    A scalar function is a function that executes elementwise
+    operations on arrays or scalars, and therefore whose results
+    generally do not depend on the order of the values in the
+    arguments. Accepts and returns arrays that are all of the
+    same size. These functions roughly correspond to the functions
+    used in SQL expressions.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input label, InputType
+        objects which defines the arguments to the function.
+        The input label is a str that will be used to generate
+        documentation for the function. The number of arguments
+        specified here determines the function arity.
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        A callable implementing the user-defined function.
+        It must take arguments equal to the number of
+        in_types defined. It must return an Array or Scalar
+        matching the out_type. It must return a Scalar if
+        all arguments are scalar, else it must return an array.
+
+        To define a varargs function, pass a callable that takes
+        varargs. The last in_type will be the type of the all
+        varargs arguments.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = {"array": InputType.array(pa.int64())}
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise TypeError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            in_tmp = (<InputType> in_type).input_type
+            c_in_types.push_back(in_tmp)
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        raise TypeError(
+            "in_types must be a dictionary of InputType")
+
+    if func_spec.varargs:
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        c_arity = CArity(num_args, False)
+
+    c_func_doc = _make_function_doc(function_doc)
+
+    if out_type:
+        c_type = pyarrow_unwrap_data_type(out_type)
+    else:
+        raise TypeError(
+            f"out_type must be a DataType, not {out_type!r}")
+
+    c_out_type = new COutputType(c_type)
+    # Note: The VectorUDF, TableUDF and AggregatorUDFs will be defined
+    # when they are implemented. Only ScalarUDFBuilder is supported at the
+    # moment.
+    c_options = new CScalarUdfOptions(c_func_name, c_arity, c_func_doc,

Review Comment:
   This pointer is leaking as well?



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,447 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+unary_doc = {"summary": "add function",
+             "description": "test add function"}
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+
+binary_doc = {"summary": "y=mx",
+              "description": "find y from y = mx"}
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+
+ternary_doc = {"summary": "y=mx+c",
+               "description": "find y from y = mx + c"}
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+
+varargs_doc = {"summary": "z=ax+by+c",
+               "description": "find z from z = ax + by + c"
+               }
+
+
+def varargs_function(*values):
+    base_val = values[:2]
+    res = pc.call_function("add", base_val)
+    for other_val in values[2:]:
+        res = pc.call_function("add", [res, other_val])
+    return res
+
+
+def test_scalar_udf_function_with_scalar_valued_functions():
+    function_names = [

Review Comment:
   This way of writing tests is unreadable. Can you instead define a helper test function and call it once for every set of arguments? e.g.:
   ```
     check_scalar_function("scalar_y=x+k", ...)
     check_scalar_function("scalar_y=mx", ...)
   ```



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,447 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType

Review Comment:
   Please remove these imports and just prefix the symbols with `pc.`



##########
cpp/src/arrow/compute/function.h:
##########
@@ -329,7 +329,7 @@ class ARROW_EXPORT VectorFunction : public detail::FunctionImpl<VectorKernel> {
  public:
   using KernelType = VectorKernel;
 
-  VectorFunction(std::string name, const Arity& arity, const FunctionDoc* doc,
+  VectorFunction(std::string name, const Arity& arity, const FunctionDoc doc,

Review Comment:
   Same here and below.



##########
cpp/src/arrow/python/udf.h:
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+/// TODO: TODO(ARROW-16041): UDF Options are not exposed to the Python
+/// users. This feature will be included when extending to provide advanced

Review Comment:
   Also, I don't understand what this comment says. What is not "exposed to the Python users" exactly?



##########
cpp/src/arrow/compute/kernels/scalar_arithmetic.cc:
##########
@@ -1968,7 +1968,7 @@ std::shared_ptr<ScalarFunction> MakeArithmeticFunction(std::string name,
 // only on non-null output.
 template <typename Op, typename FunctionImpl = ArithmeticFunction>
 std::shared_ptr<ScalarFunction> MakeArithmeticFunctionNotNull(std::string name,
-                                                              const FunctionDoc* doc) {
+                                                              const FunctionDoc doc) {

Review Comment:
   Same here and below.



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,447 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+unary_doc = {"summary": "add function",
+             "description": "test add function"}
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+
+binary_doc = {"summary": "y=mx",
+              "description": "find y from y = mx"}
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+
+ternary_doc = {"summary": "y=mx+c",
+               "description": "find y from y = mx + c"}
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+
+varargs_doc = {"summary": "z=ax+by+c",
+               "description": "find z from z = ax + by + c"
+               }
+
+
+def varargs_function(*values):
+    base_val = values[:2]
+    res = pc.call_function("add", base_val)
+    for other_val in values[2:]:
+        res = pc.call_function("add", [res, other_val])
+    return res
+
+
+def test_scalar_udf_function_with_scalar_valued_functions():
+    function_names = [
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+    ]
+
+    function_input_types = [
+        {
+            "scalar": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+            "scalar4": InputType.scalar(pa.int64()),
+            "scalar5": InputType.scalar(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_scalar_udf_with_array_data_functions():
+    function_names = [
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+    function_input_types = [
+        {
+            "array": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+            "array4": InputType.array(pa.int64()),
+            "array5": InputType.array(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_udf_input():
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate function name
+    doc = {
+        "summary": "test udf input",
+        "description": "parameters are validated"
+    }
+    in_types = {"scalar": InputType.scalar(pa.int64())}
+    out_type = pa.int64()
+    with pytest.raises(TypeError):
+        register_scalar_function(None, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function
+    with pytest.raises(TypeError, match="Object must be a callable"):
+        register_scalar_function("none_function", doc, in_types,
+                                 out_type, None)
+
+    # validate output type
+    expected_expr = "out_type must be a DataType, not None"
+    with pytest.raises(TypeError, match=expected_expr):
+        register_scalar_function("output_function", doc, in_types,
+                                 None, unary_scalar_function)
+
+    # validate input type
+    expected_expr = r'in_types must be a dictionary of InputType'
+    with pytest.raises(TypeError, match=expected_expr):
+        register_scalar_function("input_function", doc, None,
+                                 out_type, unary_scalar_function)
+
+
+def test_varargs_function_validation():
+    def n_add(*values):
+        base_val = values[:2]
+        res = pc.call_function("add", base_val)
+        for other_val in values[2:]:
+            res = pc.call_function("add", [res, other_val])
+        return res
+
+    in_types = {"array1": InputType.array(pa.int64()),
+                "array2": InputType.array(pa.int64())
+                }
+    doc = {"summary": "n add function",
+           "description": "add N number of arrays"
+           }
+    register_scalar_function("n_add", doc,
+                             in_types, pa.int64(), n_add)
+
+    func = pc.get_function("n_add")
+
+    assert func.name == "n_add"
+    error_msg = "VarArgs function 'n_add' needs at least 2 arguments"
+    with pytest.raises(pa.lib.ArrowInvalid, match=error_msg):
+        pc.call_function("n_add", [pa.array([1, 10]),
+                                   ])
+
+
+def test_function_doc_validation():
+
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    in_types = {"scalar": InputType.scalar(pa.int64())}
+    out_type = pa.int64()
+
+    # doc with no summary
+    func_doc = {
+        "description": "desc"
+    }
+
+    expected_expr = "Function doc must contain a summary"
+
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("no_summary", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # doc with no decription
+    func_doc = {
+        "summary": "test summary"
+    }
+
+    expected_expr = "Function doc must contain a description"
+
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("no_desc", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # doc with empty dictionary
+    func_doc = {}
+    expected_expr = r"Function doc must contain a summary,"
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("empty_dictionary", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+
+def test_non_uniform_input_udfs():
+
+    def unary_scalar_function(scalar1, array1, scalar2):
+        coeff = pc.call_function("add", [scalar1, scalar2])
+        return pc.call_function("multiply", [coeff, array1])
+
+    in_types = {"scalar1": InputType.scalar(pa.int64()),
+                "scalar2": InputType.array(pa.int64()),
+                "scalar3": InputType.scalar(pa.int64()),
+                }
+    func_doc = {
+        "summary": "multi type udf",
+        "description": "desc"
+    }
+    register_scalar_function("multi_type_udf", func_doc,
+                             in_types,
+                             pa.int64(), unary_scalar_function)
+
+    res = pc.call_function("multi_type_udf",
+                           [pa.scalar(10), pa.array([1, 2, 3]), pa.scalar(20)])
+
+    assert res == pa.array([30, 60, 90])
+
+
+def test_nullary_functions():
+
+    def gen_random():
+        import random
+        val = random.randint(0, 10)
+        return pa.scalar(val)
+
+    func_doc = {
+        "summary": "random function",
+        "description": "generates a random value"
+    }
+
+    register_scalar_function("random_func", func_doc,
+                             {},
+                             pa.int64(), gen_random)
+
+    res = pc.call_function("random_func", [])
+    assert res.as_py() >= 0 and res.as_py() <= 10
+
+
+def test_output_datatype():
+    def add_one(array):
+        ar = pc.call_function("add", [array, 1])
+        ar = ar.cast(pa.int32())
+        return ar
+
+    func_name = "py_add_to_scalar"
+    in_types = {"array": InputType.array(pa.int64())}
+    out_type = pa.int64()
+    doc = {
+        "summary": "add function scalar",
+        "description": "add function"
+    }
+    register_scalar_function(func_name, doc,
+                             in_types, out_type, add_one)
+
+    func = pc.get_function(func_name)
+
+    assert func.name == func_name
+
+    expected_expr = "Expected output type, int64," \
+        + " but function returned type int32"
+
+    with pytest.raises(ValueError, match=expected_expr):

Review Comment:
   This should be `TypeError`



##########
cpp/src/arrow/compute/kernels/scalar_string_internal.h:
##########
@@ -171,7 +171,7 @@ struct StringTransformExecWithState
 
 template <template <typename> class ExecFunctor>
 void MakeUnaryStringBatchKernel(
-    std::string name, FunctionRegistry* registry, const FunctionDoc* doc,
+    std::string name, FunctionRegistry* registry, const FunctionDoc doc,

Review Comment:
   Same here and below.



##########
cpp/src/arrow/compute/function.h:
##########
@@ -305,7 +305,7 @@ class ARROW_EXPORT ScalarFunction : public detail::FunctionImpl<ScalarKernel> {
  public:
   using KernelType = ScalarKernel;
 
-  ScalarFunction(std::string name, const Arity& arity, const FunctionDoc* doc,
+  ScalarFunction(std::string name, const Arity& arity, const FunctionDoc doc,
                  const FunctionOptions* default_options = NULLPTR)
       : detail::FunctionImpl<ScalarKernel>(std::move(name), Function::SCALAR, arity, doc,
                                            default_options) {}

Review Comment:
   ```suggestion
     ScalarFunction(std::string name, const Arity& arity, FunctionDoc doc,
                    const FunctionOptions* default_options = NULLPTR)
         : detail::FunctionImpl<ScalarKernel>(std::move(name), Function::SCALAR, arity, std::move(doc),
                                              default_options) {}
   ```



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,

Review Comment:
   `function` may be a temporary object, so we must keep a strong reference to it. Something like:
   ```c++
   
   struct PythonUdf {
     OwnedRefNoGIL callable;
     OutputType out_type;
   
     Status operator()(compute::KernelContext* ctx, const compute::ExecBatch& batch, Datum* out) {
       return SafeCallIntoPython([&]() -> Status {
         return Execute(ctx, batch, out);
       });
     }
   
     Status Execute(compute::KernelContext* ctx, const compute::ExecBatch& batch, Datum* out) {
       const auto num_args = batch.values.size();
       PyObject* arg_tuple = PyTuple_New(num_args);
       /* etc. */
     }
   };
   ```
   
   and then:
   ```c++
   Status RegisterScalarFunction(PyObject* function, const ScalarUdfOptions& options) {
     /* ... */
     auto exec = PythonUdf{function, exp_out_type};
     compute::ScalarKernel kernel(
         compute::KernelSignature::Make(options.input_types(), options.output_type(),
                                        arity.is_varargs),
         exec);
     /* ... */
   ```



##########
cpp/src/arrow/compute/kernels/scalar_temporal_unary.cc:
##########
@@ -1474,7 +1474,7 @@ struct UnaryTemporalFactory {
 
   template <typename... WithTypes>
   static std::shared_ptr<ScalarFunction> Make(
-      std::string name, OutputType out_type, const FunctionDoc* doc,
+      std::string name, OutputType out_type, const FunctionDoc doc,

Review Comment:
   Same here and below.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,
+                       const compute::OutputType& exp_out_type, Datum* out) {
+  size_t num_args = batch.values.size();
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  // wrap exec_batch objects into Python objects based on the datum type
+  for (size_t arg_id = 0; arg_id < num_args; arg_id++) {
+    switch (batch[arg_id].kind()) {
+      case Datum::SCALAR: {
+        auto c_data = batch[arg_id].scalar();
+        PyObject* data = wrap_scalar(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      case Datum::ARRAY: {
+        auto c_data = batch[arg_id].make_array();
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "User-defined-functions are not supported for the datum kind ",
+            batch[arg_id].kind());
+    }
+  }
+  // call to Python executing the function
+  PyObject* result;
+  auto st = SafeCallIntoPython([&]() -> Status {
+    result = PyObject_CallObject(function, arg_tuple);
+    return CheckPyError();
+  });
+  RETURN_NOT_OK(st);
+  if (result == nullptr) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  // wrapping the output for expected output type
+  if (is_scalar(result)) {
+    ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result));
+    if (!exp_out_type.type()->Equals(val->type)) {
+      return Status::Invalid("Expected output type, ", exp_out_type.type()->name(),
+                             ", but function returned type ", val->type->name());
+    }
+    *out = Datum(val);
+    return Status::OK();
+  } else if (is_array(result)) {
+    ARROW_ASSIGN_OR_RAISE(auto val, unwrap_array(result));
+    if (!exp_out_type.type()->Equals(val->type())) {
+      return Status::Invalid("Expected output type, ", exp_out_type.type()->name(),
+                             ", but function returned type ", val->type()->name());
+    }
+    *out = Datum(val);
+    return Status::OK();
+  } else {
+    PyTypeObject* type = result->ob_type;
+    const char* tp_name = type->tp_name;
+    return Status::Invalid("Not a supported output type: ", tp_name);
+  }
+  return Status::OK();
+}
+
+Status RegisterScalarFunction(PyObject* function, const ScalarUdfOptions& options) {
+  if (function == nullptr) {
+    return Status::Invalid("Python function cannot be null");
+  }
+  if (!PyCallable_Check(function)) {
+    return Status::TypeError("Expected a callable Python object.");
+  }
+  auto doc = options.doc();
+  auto arity = options.arity();
+  auto exp_out_type = options.output_type();
+  auto scalar_func =
+      std::make_shared<compute::ScalarFunction>(options.name(), arity, std::move(doc));
+  auto exec = [function, exp_out_type](compute::KernelContext* ctx,
+                                       const compute::ExecBatch& batch,
+                                       Datum* out) -> Status {
+    PyAcquireGIL lock;

Review Comment:
   Note that `SafeCallIntoPython` acquires the GIL for you.



##########
cpp/src/arrow/python/udf.h:
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+/// TODO: TODO(ARROW-16041): UDF Options are not exposed to the Python
+/// users. This feature will be included when extending to provide advanced
+/// options for the users.
+class ARROW_PYTHON_EXPORT ScalarUdfOptions {

Review Comment:
   Since this has just the semantics of a simple struct, can you make this a simple struct?
   ```c++
   struct ScalarUdfOptions {
     std::string func_name;
     // etc.
   };
   ```



##########
cpp/src/arrow/compute/kernels/scalar_validity.cc:
##########
@@ -153,7 +153,7 @@ struct IsNanOperator {
   }
 };
 
-void MakeFunction(std::string name, const FunctionDoc* doc,
+void MakeFunction(std::string name, const FunctionDoc doc,

Review Comment:
   Same here and below.



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +203,89 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object.
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An input type specification for a user-defined function.
+    """
+
+    def __init__(self):
+        raise TypeError("Do not call {}'s constructor directly"
+                        .format(self.__class__.__name__))
+
+    cdef void init(self, const CInputType &input_type):
+        self.input_type = input_type
+
+    @staticmethod
+    def scalar(data_type):
+        """
+        Create a scalar input type of the given data type.
+
+        Arguments to a UDF have both a data type and a shape,
+        either array or scalar. A scalar InputType means that
+        this argument must be passed a Scalar.  
+
+        Parameter
+        ---------
+        data_type : DataType
+            DataType represented by the InputType
+
+        Examples
+        --------
+
+        >>> import pyarrow as pa
+        >>> from pyarrow.compute import InputType
+        >>> in_type = InputType.scalar(pa.int32())
+        scalar[int32]
+        """
+        cdef:
+            shared_ptr[CDataType] c_data_type
+            CInputType c_input_type
+        c_data_type = pyarrow_unwrap_data_type(data_type)
+        c_input_type = CInputType.Scalar(c_data_type)
+        return wrap_input_type(c_input_type)
+
+    @staticmethod
+    def array(data_type):
+        """
+        Create an array input type of the given data type.
+
+        Arguments to a UDF have both a data type and a shape,
+        either array or scalar. An array InputType means that
+        this argument must be passed an Array.
+
+        Parameter
+        ---------
+        data_type : DataType
+            DataType represented in the input type

Review Comment:
   Above it's "represented by", here it's "represented in". I'm not a native speaker, so which one should it be? (@lidavidm ?)



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,447 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+unary_doc = {"summary": "add function",
+             "description": "test add function"}
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+
+binary_doc = {"summary": "y=mx",
+              "description": "find y from y = mx"}
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+
+ternary_doc = {"summary": "y=mx+c",
+               "description": "find y from y = mx + c"}
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+
+varargs_doc = {"summary": "z=ax+by+c",
+               "description": "find z from z = ax + by + c"
+               }
+
+
+def varargs_function(*values):
+    base_val = values[:2]
+    res = pc.call_function("add", base_val)
+    for other_val in values[2:]:
+        res = pc.call_function("add", [res, other_val])
+    return res
+
+
+def test_scalar_udf_function_with_scalar_valued_functions():
+    function_names = [
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+    ]
+
+    function_input_types = [
+        {
+            "scalar": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+            "scalar4": InputType.scalar(pa.int64()),
+            "scalar5": InputType.scalar(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_scalar_udf_with_array_data_functions():
+    function_names = [
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+    function_input_types = [
+        {
+            "array": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+            "array4": InputType.array(pa.int64()),
+            "array5": InputType.array(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_udf_input():
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate function name
+    doc = {
+        "summary": "test udf input",
+        "description": "parameters are validated"
+    }
+    in_types = {"scalar": InputType.scalar(pa.int64())}
+    out_type = pa.int64()
+    with pytest.raises(TypeError):
+        register_scalar_function(None, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function
+    with pytest.raises(TypeError, match="Object must be a callable"):
+        register_scalar_function("none_function", doc, in_types,
+                                 out_type, None)
+
+    # validate output type
+    expected_expr = "out_type must be a DataType, not None"
+    with pytest.raises(TypeError, match=expected_expr):
+        register_scalar_function("output_function", doc, in_types,
+                                 None, unary_scalar_function)
+
+    # validate input type
+    expected_expr = r'in_types must be a dictionary of InputType'
+    with pytest.raises(TypeError, match=expected_expr):
+        register_scalar_function("input_function", doc, None,
+                                 out_type, unary_scalar_function)
+
+
+def test_varargs_function_validation():
+    def n_add(*values):
+        base_val = values[:2]
+        res = pc.call_function("add", base_val)
+        for other_val in values[2:]:
+            res = pc.call_function("add", [res, other_val])
+        return res
+
+    in_types = {"array1": InputType.array(pa.int64()),
+                "array2": InputType.array(pa.int64())
+                }
+    doc = {"summary": "n add function",
+           "description": "add N number of arrays"
+           }
+    register_scalar_function("n_add", doc,
+                             in_types, pa.int64(), n_add)
+
+    func = pc.get_function("n_add")
+
+    assert func.name == "n_add"
+    error_msg = "VarArgs function 'n_add' needs at least 2 arguments"
+    with pytest.raises(pa.lib.ArrowInvalid, match=error_msg):
+        pc.call_function("n_add", [pa.array([1, 10]),
+                                   ])
+
+
+def test_function_doc_validation():
+
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    in_types = {"scalar": InputType.scalar(pa.int64())}
+    out_type = pa.int64()
+
+    # doc with no summary
+    func_doc = {
+        "description": "desc"
+    }
+
+    expected_expr = "Function doc must contain a summary"
+
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("no_summary", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # doc with no decription
+    func_doc = {
+        "summary": "test summary"
+    }
+
+    expected_expr = "Function doc must contain a description"
+
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("no_desc", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # doc with empty dictionary
+    func_doc = {}
+    expected_expr = r"Function doc must contain a summary,"
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("empty_dictionary", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+
+def test_non_uniform_input_udfs():
+
+    def unary_scalar_function(scalar1, array1, scalar2):
+        coeff = pc.call_function("add", [scalar1, scalar2])
+        return pc.call_function("multiply", [coeff, array1])
+
+    in_types = {"scalar1": InputType.scalar(pa.int64()),
+                "scalar2": InputType.array(pa.int64()),
+                "scalar3": InputType.scalar(pa.int64()),
+                }
+    func_doc = {
+        "summary": "multi type udf",
+        "description": "desc"
+    }
+    register_scalar_function("multi_type_udf", func_doc,
+                             in_types,
+                             pa.int64(), unary_scalar_function)
+
+    res = pc.call_function("multi_type_udf",
+                           [pa.scalar(10), pa.array([1, 2, 3]), pa.scalar(20)])
+
+    assert res == pa.array([30, 60, 90])
+
+
+def test_nullary_functions():
+
+    def gen_random():
+        import random
+        val = random.randint(0, 10)
+        return pa.scalar(val)
+
+    func_doc = {
+        "summary": "random function",
+        "description": "generates a random value"
+    }
+
+    register_scalar_function("random_func", func_doc,
+                             {},
+                             pa.int64(), gen_random)
+
+    res = pc.call_function("random_func", [])
+    assert res.as_py() >= 0 and res.as_py() <= 10
+
+
+def test_output_datatype():
+    def add_one(array):
+        ar = pc.call_function("add", [array, 1])
+        ar = ar.cast(pa.int32())
+        return ar
+
+    func_name = "py_add_to_scalar"
+    in_types = {"array": InputType.array(pa.int64())}
+    out_type = pa.int64()
+    doc = {
+        "summary": "add function scalar",
+        "description": "add function"
+    }
+    register_scalar_function(func_name, doc,
+                             in_types, out_type, add_one)
+
+    func = pc.get_function(func_name)
+
+    assert func.name == func_name
+
+    expected_expr = "Expected output type, int64," \
+        + " but function returned type int32"
+
+    with pytest.raises(ValueError, match=expected_expr):
+        pc.call_function(func_name, [pa.array([20, 30])])
+
+
+def test_output_type():
+    def add_one(array):
+        ar = pc.call_function("add", [array, 1])
+        ar = ar.cast(pa.int32())
+        return ar[0].as_py()

Review Comment:
   You can make this much simpler, e.g.:
   ```suggestion
           return 42
   ```



##########
cpp/src/arrow/python/udf.h:
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+/// TODO: TODO(ARROW-16041): UDF Options are not exposed to the Python

Review Comment:
   Please don't put TODOs in docstrings. Just make this a double-slash comment (`//`), not a triple-slash docstring (`///`).



##########
cpp/src/arrow/compute/kernels/scalar_boolean.cc:
##########
@@ -455,7 +455,7 @@ struct KleeneAndNotOp {
 };
 
 void MakeFunction(const std::string& name, int arity, ArrayKernelExec exec,
-                  const FunctionDoc* doc, FunctionRegistry* registry,
+                  const FunctionDoc doc, FunctionRegistry* registry,

Review Comment:
   Same here.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,
+                       const compute::OutputType& exp_out_type, Datum* out) {
+  size_t num_args = batch.values.size();
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  // wrap exec_batch objects into Python objects based on the datum type
+  for (size_t arg_id = 0; arg_id < num_args; arg_id++) {
+    switch (batch[arg_id].kind()) {
+      case Datum::SCALAR: {
+        auto c_data = batch[arg_id].scalar();
+        PyObject* data = wrap_scalar(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      case Datum::ARRAY: {
+        auto c_data = batch[arg_id].make_array();
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "User-defined-functions are not supported for the datum kind ",
+            batch[arg_id].kind());
+    }
+  }
+  // call to Python executing the function
+  PyObject* result;
+  auto st = SafeCallIntoPython([&]() -> Status {
+    result = PyObject_CallObject(function, arg_tuple);
+    return CheckPyError();
+  });
+  RETURN_NOT_OK(st);
+  if (result == nullptr) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  // wrapping the output for expected output type
+  if (is_scalar(result)) {
+    ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result));
+    if (!exp_out_type.type()->Equals(val->type)) {
+      return Status::Invalid("Expected output type, ", exp_out_type.type()->name(),

Review Comment:
   Can you make this `TypeError` instead?



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")

Review Comment:
   Why "None"? If no options are supported, this should be the empty string.



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +203,89 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object.
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An input type specification for a user-defined function.
+    """
+
+    def __init__(self):
+        raise TypeError("Do not call {}'s constructor directly"
+                        .format(self.__class__.__name__))
+
+    cdef void init(self, const CInputType &input_type):
+        self.input_type = input_type
+
+    @staticmethod
+    def scalar(data_type):
+        """
+        Create a scalar input type of the given data type.
+
+        Arguments to a UDF have both a data type and a shape,
+        either array or scalar. A scalar InputType means that
+        this argument must be passed a Scalar.  
+
+        Parameter
+        ---------
+        data_type : DataType
+            DataType represented by the InputType
+
+        Examples
+        --------
+
+        >>> import pyarrow as pa
+        >>> from pyarrow.compute import InputType
+        >>> in_type = InputType.scalar(pa.int32())
+        scalar[int32]
+        """
+        cdef:
+            shared_ptr[CDataType] c_data_type
+            CInputType c_input_type
+        c_data_type = pyarrow_unwrap_data_type(data_type)
+        c_input_type = CInputType.Scalar(c_data_type)
+        return wrap_input_type(c_input_type)
+
+    @staticmethod
+    def array(data_type):
+        """
+        Create an array input type of the given data type.
+
+        Arguments to a UDF have both a data type and a shape,
+        either array or scalar. An array InputType means that
+        this argument must be passed an Array.
+
+        Parameter

Review Comment:
   Same here.



##########
cpp/src/arrow/compute/kernels/scalar_arithmetic.cc:
##########
@@ -1954,7 +1954,7 @@ void AddNullExec(ScalarFunction* func) {
 
 template <typename Op, typename FunctionImpl = ArithmeticFunction>
 std::shared_ptr<ScalarFunction> MakeArithmeticFunction(std::string name,
-                                                       const FunctionDoc* doc) {
+                                                       const FunctionDoc doc) {
   auto func = std::make_shared<FunctionImpl>(name, Arity::Binary(), doc);

Review Comment:
   ```suggestion
                                                          FunctionDoc doc) {
     auto func = std::make_shared<FunctionImpl>(name, Arity::Binary(), std::move(doc));
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined scalar function. 
+
+    A scalar function is a function that executes elementwise
+    operations on arrays or scalars, and therefore whose results
+    generally do not depend on the order of the values in the
+    arguments. Accepts and returns arrays that are all of the
+    same size. These functions roughly correspond to the functions
+    used in SQL expressions.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input label, InputType
+        objects which defines the arguments to the function.
+        The input label is a str that will be used to generate
+        documentation for the function. The number of arguments
+        specified here determines the function arity.
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        A callable implementing the user-defined function.
+        It must take arguments equal to the number of
+        in_types defined. It must return an Array or Scalar
+        matching the out_type. It must return a Scalar if
+        all arguments are scalar, else it must return an array.
+
+        To define a varargs function, pass a callable that takes
+        varargs. The last in_type will be the type of the all
+        varargs arguments.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = {"array": InputType.array(pa.int64())}
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])

Review Comment:
   "ans" is for "answer"?



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,
+                       const compute::OutputType& exp_out_type, Datum* out) {
+  size_t num_args = batch.values.size();
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  // wrap exec_batch objects into Python objects based on the datum type
+  for (size_t arg_id = 0; arg_id < num_args; arg_id++) {
+    switch (batch[arg_id].kind()) {
+      case Datum::SCALAR: {
+        auto c_data = batch[arg_id].scalar();
+        PyObject* data = wrap_scalar(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      case Datum::ARRAY: {
+        auto c_data = batch[arg_id].make_array();
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "User-defined-functions are not supported for the datum kind ",
+            batch[arg_id].kind());
+    }
+  }
+  // call to Python executing the function
+  PyObject* result;
+  auto st = SafeCallIntoPython([&]() -> Status {
+    result = PyObject_CallObject(function, arg_tuple);
+    return CheckPyError();
+  });
+  RETURN_NOT_OK(st);
+  if (result == nullptr) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  // wrapping the output for expected output type
+  if (is_scalar(result)) {
+    ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result));
+    if (!exp_out_type.type()->Equals(val->type)) {
+      return Status::Invalid("Expected output type, ", exp_out_type.type()->name(),
+                             ", but function returned type ", val->type->name());
+    }
+    *out = Datum(val);
+    return Status::OK();
+  } else if (is_array(result)) {
+    ARROW_ASSIGN_OR_RAISE(auto val, unwrap_array(result));
+    if (!exp_out_type.type()->Equals(val->type())) {
+      return Status::Invalid("Expected output type, ", exp_out_type.type()->name(),
+                             ", but function returned type ", val->type()->name());
+    }
+    *out = Datum(val);
+    return Status::OK();
+  } else {
+    PyTypeObject* type = result->ob_type;
+    const char* tp_name = type->tp_name;
+    return Status::Invalid("Not a supported output type: ", tp_name);

Review Comment:
   ```suggestion
       return Status::TypeError("Unexpected output type: ", Py_TYPE(result)->tp_name,
         " (expected Scalar or Array)");
   ```



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,
+                       const compute::OutputType& exp_out_type, Datum* out) {
+  size_t num_args = batch.values.size();
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  // wrap exec_batch objects into Python objects based on the datum type
+  for (size_t arg_id = 0; arg_id < num_args; arg_id++) {
+    switch (batch[arg_id].kind()) {
+      case Datum::SCALAR: {
+        auto c_data = batch[arg_id].scalar();
+        PyObject* data = wrap_scalar(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      case Datum::ARRAY: {
+        auto c_data = batch[arg_id].make_array();
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "User-defined-functions are not supported for the datum kind ",
+            batch[arg_id].kind());
+    }
+  }
+  // call to Python executing the function
+  PyObject* result;
+  auto st = SafeCallIntoPython([&]() -> Status {
+    result = PyObject_CallObject(function, arg_tuple);
+    return CheckPyError();
+  });
+  RETURN_NOT_OK(st);
+  if (result == nullptr) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  // wrapping the output for expected output type
+  if (is_scalar(result)) {
+    ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result));
+    if (!exp_out_type.type()->Equals(val->type)) {
+      return Status::Invalid("Expected output type, ", exp_out_type.type()->name(),
+                             ", but function returned type ", val->type->name());
+    }
+    *out = Datum(val);
+    return Status::OK();
+  } else if (is_array(result)) {
+    ARROW_ASSIGN_OR_RAISE(auto val, unwrap_array(result));
+    if (!exp_out_type.type()->Equals(val->type())) {
+      return Status::Invalid("Expected output type, ", exp_out_type.type()->name(),
+                             ", but function returned type ", val->type()->name());
+    }

Review Comment:
   Please don't copy-paste.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")

Review Comment:
   Parameter checking should be done in `register_scalar_function` and this helper function should take the already validated arguments.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined scalar function. 
+
+    A scalar function is a function that executes elementwise
+    operations on arrays or scalars, and therefore whose results
+    generally do not depend on the order of the values in the
+    arguments. Accepts and returns arrays that are all of the
+    same size. These functions roughly correspond to the functions
+    used in SQL expressions.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input label, InputType
+        objects which defines the arguments to the function.
+        The input label is a str that will be used to generate
+        documentation for the function. The number of arguments
+        specified here determines the function arity.
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        A callable implementing the user-defined function.
+        It must take arguments equal to the number of
+        in_types defined. It must return an Array or Scalar
+        matching the out_type. It must return a Scalar if
+        all arguments are scalar, else it must return an array.
+
+        To define a varargs function, pass a callable that takes
+        varargs. The last in_type will be the type of the all
+        varargs arguments.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType

Review Comment:
   These are not necessary, just prefix them with `pc.`



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined scalar function. 
+
+    A scalar function is a function that executes elementwise
+    operations on arrays or scalars, and therefore whose results
+    generally do not depend on the order of the values in the
+    arguments. Accepts and returns arrays that are all of the
+    same size. These functions roughly correspond to the functions
+    used in SQL expressions.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input label, InputType
+        objects which defines the arguments to the function.
+        The input label is a str that will be used to generate
+        documentation for the function. The number of arguments
+        specified here determines the function arity.
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        A callable implementing the user-defined function.
+        It must take arguments equal to the number of
+        in_types defined. It must return an Array or Scalar
+        matching the out_type. It must return a Scalar if
+        all arguments are scalar, else it must return an array.

Review Comment:
   ```suggestion
           matching the out_type. It must return a Scalar if
           all arguments are scalar, else it must return an Array.
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined scalar function. 
+
+    A scalar function is a function that executes elementwise
+    operations on arrays or scalars, and therefore whose results
+    generally do not depend on the order of the values in the
+    arguments. Accepts and returns arrays that are all of the
+    same size. These functions roughly correspond to the functions
+    used in SQL expressions.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input label, InputType
+        objects which defines the arguments to the function.
+        The input label is a str that will be used to generate
+        documentation for the function. The number of arguments
+        specified here determines the function arity.
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        A callable implementing the user-defined function.
+        It must take arguments equal to the number of
+        in_types defined. It must return an Array or Scalar
+        matching the out_type. It must return a Scalar if
+        all arguments are scalar, else it must return an array.
+
+        To define a varargs function, pass a callable that takes
+        varargs. The last in_type will be the type of the all
+        varargs arguments.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = {"array": InputType.array(pa.int64())}
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise TypeError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            in_tmp = (<InputType> in_type).input_type
+            c_in_types.push_back(in_tmp)
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        raise TypeError(
+            "in_types must be a dictionary of InputType")
+
+    if func_spec.varargs:
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        c_arity = CArity(num_args, False)
+
+    c_func_doc = _make_function_doc(function_doc)
+
+    if out_type:
+        c_type = pyarrow_unwrap_data_type(out_type)
+    else:
+        raise TypeError(
+            f"out_type must be a DataType, not {out_type!r}")
+
+    c_out_type = new COutputType(c_type)

Review Comment:
   So `c_out_type` is leaking? Where is it deallocated?
   



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -2934,3 +2966,10 @@ cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:
     int64_t TotalBufferSize(const CChunkedArray& array)
     int64_t TotalBufferSize(const CRecordBatch& record_batch)
     int64_t TotalBufferSize(const CTable& table)
+
+cdef extern from "arrow/python/udf.h" namespace "arrow::py" nogil:

Review Comment:
   This shouldn't be "nogil" as `RegisterScalarFunction` takes a Python object.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined scalar function. 
+
+    A scalar function is a function that executes elementwise
+    operations on arrays or scalars, and therefore whose results
+    generally do not depend on the order of the values in the
+    arguments. Accepts and returns arrays that are all of the
+    same size. These functions roughly correspond to the functions
+    used in SQL expressions.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input label, InputType
+        objects which defines the arguments to the function.
+        The input label is a str that will be used to generate
+        documentation for the function. The number of arguments
+        specified here determines the function arity.
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        A callable implementing the user-defined function.
+        It must take arguments equal to the number of
+        in_types defined. It must return an Array or Scalar
+        matching the out_type. It must return a Scalar if
+        all arguments are scalar, else it must return an array.
+
+        To define a varargs function, pass a callable that takes
+        varargs. The last in_type will be the type of the all
+        varargs arguments.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])

Review Comment:
   Can be made slightly simpler:
   ```suggestion
       >>> def add_constant(array):
       ...     return pc.add(array, 1)
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined scalar function. 
+
+    A scalar function is a function that executes elementwise
+    operations on arrays or scalars, and therefore whose results
+    generally do not depend on the order of the values in the
+    arguments. Accepts and returns arrays that are all of the
+    same size. These functions roughly correspond to the functions
+    used in SQL expressions.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input label, InputType
+        objects which defines the arguments to the function.
+        The input label is a str that will be used to generate
+        documentation for the function. The number of arguments
+        specified here determines the function arity.
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        A callable implementing the user-defined function.
+        It must take arguments equal to the number of
+        in_types defined. It must return an Array or Scalar
+        matching the out_type. It must return a Scalar if
+        all arguments are scalar, else it must return an array.
+
+        To define a varargs function, pass a callable that takes
+        varargs. The last in_type will be the type of the all
+        varargs arguments.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = {"array": InputType.array(pa.int64())}
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise TypeError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            in_tmp = (<InputType> in_type).input_type
+            c_in_types.push_back(in_tmp)
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        raise TypeError(
+            "in_types must be a dictionary of InputType")
+
+    if func_spec.varargs:
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        c_arity = CArity(num_args, False)
+
+    c_func_doc = _make_function_doc(function_doc)
+
+    if out_type:
+        c_type = pyarrow_unwrap_data_type(out_type)
+    else:
+        raise TypeError(
+            f"out_type must be a DataType, not {out_type!r}")

Review Comment:
   Or simpler:
   ```suggestion
       c_type = pyarrow_unwrap_data_type(ensure_type(out_type))
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined scalar function. 
+
+    A scalar function is a function that executes elementwise
+    operations on arrays or scalars, and therefore whose results
+    generally do not depend on the order of the values in the
+    arguments. Accepts and returns arrays that are all of the
+    same size. These functions roughly correspond to the functions
+    used in SQL expressions.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input label, InputType
+        objects which defines the arguments to the function.
+        The input label is a str that will be used to generate
+        documentation for the function. The number of arguments
+        specified here determines the function arity.
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        A callable implementing the user-defined function.
+        It must take arguments equal to the number of
+        in_types defined. It must return an Array or Scalar
+        matching the out_type. It must return a Scalar if
+        all arguments are scalar, else it must return an array.
+
+        To define a varargs function, pass a callable that takes
+        varargs. The last in_type will be the type of the all
+        varargs arguments.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = {"array": InputType.array(pa.int64())}
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise TypeError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            in_tmp = (<InputType> in_type).input_type

Review Comment:
   What happens if `in_type` is not an `InputType`?



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,447 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+unary_doc = {"summary": "add function",
+             "description": "test add function"}
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+
+binary_doc = {"summary": "y=mx",
+              "description": "find y from y = mx"}
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+
+ternary_doc = {"summary": "y=mx+c",
+               "description": "find y from y = mx + c"}
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+
+varargs_doc = {"summary": "z=ax+by+c",
+               "description": "find z from z = ax + by + c"
+               }
+
+
+def varargs_function(*values):
+    base_val = values[:2]
+    res = pc.call_function("add", base_val)
+    for other_val in values[2:]:
+        res = pc.call_function("add", [res, other_val])
+    return res
+
+
+def test_scalar_udf_function_with_scalar_valued_functions():
+    function_names = [
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+    ]
+
+    function_input_types = [
+        {
+            "scalar": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+            "scalar4": InputType.scalar(pa.int64()),
+            "scalar5": InputType.scalar(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(

Review Comment:
   Would the tests fail if run multiple times due to `name` being already registered? It would be nice if we could avoid that problem.



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,447 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+unary_doc = {"summary": "add function",
+             "description": "test add function"}
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+
+binary_doc = {"summary": "y=mx",
+              "description": "find y from y = mx"}
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+
+ternary_doc = {"summary": "y=mx+c",
+               "description": "find y from y = mx + c"}
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+
+varargs_doc = {"summary": "z=ax+by+c",
+               "description": "find z from z = ax + by + c"
+               }
+
+
+def varargs_function(*values):
+    base_val = values[:2]
+    res = pc.call_function("add", base_val)
+    for other_val in values[2:]:
+        res = pc.call_function("add", [res, other_val])
+    return res
+
+
+def test_scalar_udf_function_with_scalar_valued_functions():
+    function_names = [
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+    ]
+
+    function_input_types = [
+        {
+            "scalar": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+            "scalar4": InputType.scalar(pa.int64()),
+            "scalar5": InputType.scalar(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_scalar_udf_with_array_data_functions():
+    function_names = [
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+    function_input_types = [
+        {
+            "array": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+            "array4": InputType.array(pa.int64()),
+            "array5": InputType.array(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_udf_input():
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate function name
+    doc = {
+        "summary": "test udf input",
+        "description": "parameters are validated"
+    }
+    in_types = {"scalar": InputType.scalar(pa.int64())}
+    out_type = pa.int64()
+    with pytest.raises(TypeError):
+        register_scalar_function(None, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function
+    with pytest.raises(TypeError, match="Object must be a callable"):
+        register_scalar_function("none_function", doc, in_types,
+                                 out_type, None)
+
+    # validate output type
+    expected_expr = "out_type must be a DataType, not None"
+    with pytest.raises(TypeError, match=expected_expr):
+        register_scalar_function("output_function", doc, in_types,
+                                 None, unary_scalar_function)
+
+    # validate input type
+    expected_expr = r'in_types must be a dictionary of InputType'
+    with pytest.raises(TypeError, match=expected_expr):
+        register_scalar_function("input_function", doc, None,
+                                 out_type, unary_scalar_function)

Review Comment:
   What happens if `in_types` is a dictionary of something else?



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,447 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+unary_doc = {"summary": "add function",
+             "description": "test add function"}
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+
+binary_doc = {"summary": "y=mx",
+              "description": "find y from y = mx"}
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+
+ternary_doc = {"summary": "y=mx+c",
+               "description": "find y from y = mx + c"}
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+
+varargs_doc = {"summary": "z=ax+by+c",
+               "description": "find z from z = ax + by + c"
+               }
+
+
+def varargs_function(*values):
+    base_val = values[:2]
+    res = pc.call_function("add", base_val)
+    for other_val in values[2:]:
+        res = pc.call_function("add", [res, other_val])
+    return res
+
+
+def test_scalar_udf_function_with_scalar_valued_functions():
+    function_names = [
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+    ]
+
+    function_input_types = [
+        {
+            "scalar": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+            "scalar4": InputType.scalar(pa.int64()),
+            "scalar5": InputType.scalar(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_scalar_udf_with_array_data_functions():
+    function_names = [
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+    function_input_types = [
+        {
+            "array": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+            "array4": InputType.array(pa.int64()),
+            "array5": InputType.array(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_udf_input():
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate function name
+    doc = {
+        "summary": "test udf input",
+        "description": "parameters are validated"
+    }
+    in_types = {"scalar": InputType.scalar(pa.int64())}
+    out_type = pa.int64()
+    with pytest.raises(TypeError):
+        register_scalar_function(None, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function
+    with pytest.raises(TypeError, match="Object must be a callable"):
+        register_scalar_function("none_function", doc, in_types,
+                                 out_type, None)
+
+    # validate output type
+    expected_expr = "out_type must be a DataType, not None"
+    with pytest.raises(TypeError, match=expected_expr):
+        register_scalar_function("output_function", doc, in_types,
+                                 None, unary_scalar_function)

Review Comment:
   What if out_type is something that is neither None nor a DataType?



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -19,6 +19,8 @@
 
 from pyarrow.includes.common cimport *
 
+from cpython.ref cimport PyObject

Review Comment:
   I think you can still pass `object`. `PyObject*` should ideally only be used as a special case, since it's less safe.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,
+                       const compute::OutputType& exp_out_type, Datum* out) {
+  int num_args = static_cast<int64_t>(batch.values.size());
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  // wrap exec_batch objects into Python objects based on the datum type
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    switch (batch[arg_id].kind()) {
+      case Datum::SCALAR: {
+        auto c_data = batch[arg_id].scalar();
+        PyObject* data = wrap_scalar(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      case Datum::ARRAY: {
+        auto c_data = batch[arg_id].make_array();
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "User-defined-functions are not supported for the datum kind ",
+            batch[arg_id].kind());
+    }
+  }
+  // call to Python executing the function
+  PyObject* result;
+  auto st = SafeCallIntoPython([&]() -> Status {
+    result = PyObject_CallObject(function, arg_tuple);
+    return CheckPyError();
+  });
+  RETURN_NOT_OK(st);
+  if (result == nullptr) {
+    return Status::ExecutionError("Output is null, but expected an array");

Review Comment:
   @lidavidm is right. The return value is NULL only if an error occurred (which is checked by `CheckPyError`).



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:

Review Comment:
   As @lidavidm said, this can be removed in favor of the checks below.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,
+                       const compute::OutputType& exp_out_type, Datum* out) {
+  size_t num_args = batch.values.size();
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  // wrap exec_batch objects into Python objects based on the datum type
+  for (size_t arg_id = 0; arg_id < num_args; arg_id++) {
+    switch (batch[arg_id].kind()) {
+      case Datum::SCALAR: {
+        auto c_data = batch[arg_id].scalar();
+        PyObject* data = wrap_scalar(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      case Datum::ARRAY: {
+        auto c_data = batch[arg_id].make_array();
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "User-defined-functions are not supported for the datum kind ",
+            batch[arg_id].kind());
+    }
+  }
+  // call to Python executing the function
+  PyObject* result;
+  auto st = SafeCallIntoPython([&]() -> Status {
+    result = PyObject_CallObject(function, arg_tuple);
+    return CheckPyError();
+  });
+  RETURN_NOT_OK(st);
+  if (result == nullptr) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  // wrapping the output for expected output type
+  if (is_scalar(result)) {
+    ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result));
+    if (!exp_out_type.type()->Equals(val->type)) {
+      return Status::Invalid("Expected output type, ", exp_out_type.type()->name(),

Review Comment:
   Also please use `DataType::ToString()` as that will be more precise than `DataType::name()`.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2338,161 @@ cdef CExpression _bind(Expression filter, Schema schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):

Review Comment:
   So for example:
   ```suggestion
   def register_scalar_function(function, function_name, function_doc,
                                in_types, out_type):
   ```



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,447 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+unary_doc = {"summary": "add function",
+             "description": "test add function"}
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+
+binary_doc = {"summary": "y=mx",
+              "description": "find y from y = mx"}
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+
+ternary_doc = {"summary": "y=mx+c",
+               "description": "find y from y = mx + c"}
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+
+varargs_doc = {"summary": "z=ax+by+c",
+               "description": "find z from z = ax + by + c"
+               }
+
+
+def varargs_function(*values):
+    base_val = values[:2]
+    res = pc.call_function("add", base_val)
+    for other_val in values[2:]:
+        res = pc.call_function("add", [res, other_val])
+    return res
+
+
+def test_scalar_udf_function_with_scalar_valued_functions():
+    function_names = [
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+    ]
+
+    function_input_types = [
+        {
+            "scalar": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+            "scalar4": InputType.scalar(pa.int64()),
+            "scalar5": InputType.scalar(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_scalar_udf_with_array_data_functions():
+    function_names = [
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+    function_input_types = [
+        {
+            "array": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+            "array4": InputType.array(pa.int64()),
+            "array5": InputType.array(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input in zip(function_names,
+                     function_input_types,
+                     function_output_types,
+                     function_docs,
+                     functions,
+                     function_inputs):
+        expected_output = function(*input)
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_udf_input():
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate function name
+    doc = {
+        "summary": "test udf input",
+        "description": "parameters are validated"
+    }
+    in_types = {"scalar": InputType.scalar(pa.int64())}
+    out_type = pa.int64()
+    with pytest.raises(TypeError):
+        register_scalar_function(None, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function
+    with pytest.raises(TypeError, match="Object must be a callable"):
+        register_scalar_function("none_function", doc, in_types,
+                                 out_type, None)
+
+    # validate output type
+    expected_expr = "out_type must be a DataType, not None"
+    with pytest.raises(TypeError, match=expected_expr):
+        register_scalar_function("output_function", doc, in_types,
+                                 None, unary_scalar_function)
+
+    # validate input type
+    expected_expr = r'in_types must be a dictionary of InputType'
+    with pytest.raises(TypeError, match=expected_expr):
+        register_scalar_function("input_function", doc, None,
+                                 out_type, unary_scalar_function)
+
+
+def test_varargs_function_validation():
+    def n_add(*values):
+        base_val = values[:2]
+        res = pc.call_function("add", base_val)
+        for other_val in values[2:]:
+            res = pc.call_function("add", [res, other_val])
+        return res
+
+    in_types = {"array1": InputType.array(pa.int64()),
+                "array2": InputType.array(pa.int64())
+                }
+    doc = {"summary": "n add function",
+           "description": "add N number of arrays"
+           }
+    register_scalar_function("n_add", doc,
+                             in_types, pa.int64(), n_add)
+
+    func = pc.get_function("n_add")
+
+    assert func.name == "n_add"
+    error_msg = "VarArgs function 'n_add' needs at least 2 arguments"
+    with pytest.raises(pa.lib.ArrowInvalid, match=error_msg):
+        pc.call_function("n_add", [pa.array([1, 10]),
+                                   ])
+
+
+def test_function_doc_validation():
+
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    in_types = {"scalar": InputType.scalar(pa.int64())}
+    out_type = pa.int64()
+
+    # doc with no summary
+    func_doc = {
+        "description": "desc"
+    }
+
+    expected_expr = "Function doc must contain a summary"
+
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("no_summary", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # doc with no decription
+    func_doc = {
+        "summary": "test summary"
+    }
+
+    expected_expr = "Function doc must contain a description"
+
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("no_desc", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # doc with empty dictionary
+    func_doc = {}
+    expected_expr = r"Function doc must contain a summary,"
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("empty_dictionary", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+
+def test_non_uniform_input_udfs():
+
+    def unary_scalar_function(scalar1, array1, scalar2):
+        coeff = pc.call_function("add", [scalar1, scalar2])
+        return pc.call_function("multiply", [coeff, array1])
+
+    in_types = {"scalar1": InputType.scalar(pa.int64()),
+                "scalar2": InputType.array(pa.int64()),
+                "scalar3": InputType.scalar(pa.int64()),
+                }
+    func_doc = {
+        "summary": "multi type udf",
+        "description": "desc"
+    }
+    register_scalar_function("multi_type_udf", func_doc,
+                             in_types,
+                             pa.int64(), unary_scalar_function)
+
+    res = pc.call_function("multi_type_udf",
+                           [pa.scalar(10), pa.array([1, 2, 3]), pa.scalar(20)])
+
+    assert res == pa.array([30, 60, 90])
+
+
+def test_nullary_functions():
+
+    def gen_random():
+        import random
+        val = random.randint(0, 10)
+        return pa.scalar(val)
+
+    func_doc = {
+        "summary": "random function",
+        "description": "generates a random value"
+    }
+
+    register_scalar_function("random_func", func_doc,
+                             {},
+                             pa.int64(), gen_random)
+
+    res = pc.call_function("random_func", [])
+    assert res.as_py() >= 0 and res.as_py() <= 10
+
+
+def test_output_datatype():
+    def add_one(array):
+        ar = pc.call_function("add", [array, 1])
+        ar = ar.cast(pa.int32())
+        return ar
+
+    func_name = "py_add_to_scalar"
+    in_types = {"array": InputType.array(pa.int64())}
+    out_type = pa.int64()
+    doc = {
+        "summary": "add function scalar",
+        "description": "add function"
+    }
+    register_scalar_function(func_name, doc,
+                             in_types, out_type, add_one)
+
+    func = pc.get_function(func_name)
+
+    assert func.name == func_name
+
+    expected_expr = "Expected output type, int64," \
+        + " but function returned type int32"
+
+    with pytest.raises(ValueError, match=expected_expr):
+        pc.call_function(func_name, [pa.array([20, 30])])
+
+
+def test_output_type():
+    def add_one(array):
+        ar = pc.call_function("add", [array, 1])
+        ar = ar.cast(pa.int32())
+        return ar[0].as_py()
+
+    func_name = "add_to_scalar_as_py"
+    in_types = {"array": InputType.array(pa.int64())}
+    out_type = pa.int64()
+    doc = {
+        "summary": "add function scalar",
+        "description": "add function"
+    }
+    register_scalar_function(func_name, doc,
+                             in_types, out_type, add_one)
+
+    func = pc.get_function(func_name)
+
+    assert func.name == func_name
+
+    expected_expr = "Not a supported output type: int"
+
+    with pytest.raises(pa.lib.ArrowInvalid, match=expected_expr):

Review Comment:
   This should be `TypeError` as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org