You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/07 16:00:11 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #14043: ARROW-17613: [C++] Add function execution API for a preconfigured kernel

pitrou commented on code in PR #14043:
URL: https://github.com/apache/arrow/pull/14043#discussion_r1015564731


##########
cpp/src/arrow/compute/function.h:
##########
@@ -159,6 +159,20 @@ struct ARROW_EXPORT FunctionDoc {
   static const FunctionDoc& Empty();
 };
 
+/// \brief An executor of a function with a preconfigured kernel
+struct ARROW_EXPORT FunctionExecutor {

Review Comment:
   Style nit, but we should probably use `struct` for data classes, not interfaces like this.
   ```suggestion
   class ARROW_EXPORT FunctionExecutor {
   ```



##########
cpp/src/arrow/compute/exec.h:
##########
@@ -47,6 +47,7 @@ class CpuInfo;
 
 namespace compute {
 
+struct FunctionExecutor;
 class FunctionOptions;
 class FunctionRegistry;

Review Comment:
   Can we move these forward-decls to `compute/type_fwd.h` where they should belong?



##########
cpp/src/arrow/compute/function.cc:
##########
@@ -167,6 +179,109 @@ const Kernel* DispatchExactImpl(const Function* func,
   return nullptr;
 }
 
+struct FunctionExecutorImpl : public FunctionExecutor {
+  FunctionExecutorImpl(std::vector<TypeHolder> in_types, const Kernel* kernel,
+                       std::unique_ptr<detail::KernelExecutor> executor,
+                       const Function& func)
+      : in_types(std::move(in_types)),
+        kernel(kernel),
+        kernel_ctx(default_exec_context(), kernel),
+        executor(std::move(executor)),
+        func(func),
+        state(),
+        options(NULLPTR),
+        inited(false) {}
+  virtual ~FunctionExecutorImpl() {}
+
+  Status KernelInit(const FunctionOptions* options) {
+    RETURN_NOT_OK(CheckOptions(func, options));
+    if (options == NULLPTR) {
+      options = func.default_options();
+    }
+    if (kernel->init) {
+      ARROW_ASSIGN_OR_RAISE(state,
+                            kernel->init(&kernel_ctx, {kernel, in_types, options}));
+      kernel_ctx.SetState(state.get());
+    }
+
+    RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options}));
+    this->options = options;
+    inited = true;
+    return Status::OK();
+  }
+
+  Status Init(const FunctionOptions* options, ExecContext* exec_ctx) override {
+    kernel_ctx = KernelContext{exec_ctx, kernel};
+    return KernelInit(options);
+  }
+
+  Result<Datum> Execute(const std::vector<Datum>& args, int64_t passed_length) override {
+    util::tracing::Span span;
+
+    auto func_kind = func.kind();
+    auto func_name = func.name();

Review Comment:
   `const auto&` perhaps? No need to copy the string.



##########
cpp/src/arrow/compute/function.cc:
##########
@@ -167,6 +179,109 @@ const Kernel* DispatchExactImpl(const Function* func,
   return nullptr;
 }
 
+struct FunctionExecutorImpl : public FunctionExecutor {
+  FunctionExecutorImpl(std::vector<TypeHolder> in_types, const Kernel* kernel,
+                       std::unique_ptr<detail::KernelExecutor> executor,
+                       const Function& func)
+      : in_types(std::move(in_types)),
+        kernel(kernel),
+        kernel_ctx(default_exec_context(), kernel),
+        executor(std::move(executor)),
+        func(func),
+        state(),
+        options(NULLPTR),
+        inited(false) {}
+  virtual ~FunctionExecutorImpl() {}
+
+  Status KernelInit(const FunctionOptions* options) {
+    RETURN_NOT_OK(CheckOptions(func, options));
+    if (options == NULLPTR) {
+      options = func.default_options();
+    }
+    if (kernel->init) {
+      ARROW_ASSIGN_OR_RAISE(state,
+                            kernel->init(&kernel_ctx, {kernel, in_types, options}));
+      kernel_ctx.SetState(state.get());
+    }
+
+    RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options}));
+    this->options = options;
+    inited = true;
+    return Status::OK();
+  }
+
+  Status Init(const FunctionOptions* options, ExecContext* exec_ctx) override {
+    kernel_ctx = KernelContext{exec_ctx, kernel};
+    return KernelInit(options);
+  }
+
+  Result<Datum> Execute(const std::vector<Datum>& args, int64_t passed_length) override {
+    util::tracing::Span span;
+
+    auto func_kind = func.kind();
+    auto func_name = func.name();
+    START_COMPUTE_SPAN(span, func_name,
+                       {{"function.name", func_name},
+                        {"function.options", options ? options->ToString() : "<NULLPTR>"},
+                        {"function.kind", func_kind}});
+
+    if (!inited) {
+      ARROW_RETURN_NOT_OK(Init(NULLPTR, default_exec_context()));
+    }
+    ExecContext* ctx = kernel_ctx.exec_context();
+    // Cast arguments if necessary
+    std::vector<Datum> args_with_cast;
+    for (size_t i = 0; i != args.size(); ++i) {
+      const auto& in_type = in_types[i];
+      auto arg = args[i];
+      if (in_type != args[i].type()) {
+        ARROW_ASSIGN_OR_RAISE(arg, Cast(args[i], CastOptions::Safe(in_type), ctx));
+      }
+      args_with_cast.push_back(arg);

Review Comment:
   ```suggestion
         args_with_cast.push_back(std::move(arg));
   ```



##########
cpp/src/arrow/compute/function_internal.cc:
##########
@@ -108,6 +108,27 @@ Result<std::unique_ptr<FunctionOptions>> DeserializeFunctionOptions(
   return FunctionOptionsFromStructScalar(scalar);
 }
 
+Status CheckAllArrayOrScalar(const std::vector<Datum>& values) {
+  for (const auto& value : values) {
+    if (!value.is_value()) {
+      return Status::TypeError("Tried executing function with non-value type: ",

Review Comment:
   "Value type" is a bit cryptic TBH. Can we reword this more explicitly?
   ```suggestion
         return Status::TypeError("Tried executing function with non-array, non-scalar type: ",
   ```



##########
cpp/src/arrow/compute/function.cc:
##########
@@ -167,6 +179,109 @@ const Kernel* DispatchExactImpl(const Function* func,
   return nullptr;
 }
 
+struct FunctionExecutorImpl : public FunctionExecutor {
+  FunctionExecutorImpl(std::vector<TypeHolder> in_types, const Kernel* kernel,
+                       std::unique_ptr<detail::KernelExecutor> executor,
+                       const Function& func)
+      : in_types(std::move(in_types)),
+        kernel(kernel),
+        kernel_ctx(default_exec_context(), kernel),
+        executor(std::move(executor)),
+        func(func),
+        state(),
+        options(NULLPTR),
+        inited(false) {}
+  virtual ~FunctionExecutorImpl() {}
+
+  Status KernelInit(const FunctionOptions* options) {
+    RETURN_NOT_OK(CheckOptions(func, options));
+    if (options == NULLPTR) {
+      options = func.default_options();
+    }
+    if (kernel->init) {
+      ARROW_ASSIGN_OR_RAISE(state,
+                            kernel->init(&kernel_ctx, {kernel, in_types, options}));
+      kernel_ctx.SetState(state.get());
+    }
+
+    RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options}));
+    this->options = options;
+    inited = true;
+    return Status::OK();
+  }
+
+  Status Init(const FunctionOptions* options, ExecContext* exec_ctx) override {
+    kernel_ctx = KernelContext{exec_ctx, kernel};
+    return KernelInit(options);
+  }
+
+  Result<Datum> Execute(const std::vector<Datum>& args, int64_t passed_length) override {
+    util::tracing::Span span;
+
+    auto func_kind = func.kind();
+    auto func_name = func.name();
+    START_COMPUTE_SPAN(span, func_name,
+                       {{"function.name", func_name},
+                        {"function.options", options ? options->ToString() : "<NULLPTR>"},
+                        {"function.kind", func_kind}});
+
+    if (!inited) {
+      ARROW_RETURN_NOT_OK(Init(NULLPTR, default_exec_context()));
+    }
+    ExecContext* ctx = kernel_ctx.exec_context();
+    // Cast arguments if necessary
+    std::vector<Datum> args_with_cast;
+    for (size_t i = 0; i != args.size(); ++i) {
+      const auto& in_type = in_types[i];
+      auto arg = args[i];
+      if (in_type != args[i].type()) {
+        ARROW_ASSIGN_OR_RAISE(arg, Cast(args[i], CastOptions::Safe(in_type), ctx));
+      }
+      args_with_cast.push_back(arg);
+    }
+
+    detail::DatumAccumulator listener;
+
+    ExecBatch input(std::move(args_with_cast), /*length=*/0);
+    if (input.num_values() == 0) {
+      if (passed_length != -1) {
+        input.length = passed_length;
+      }
+    } else {
+      bool all_same_length = false;
+      int64_t inferred_length = detail::InferBatchLength(input.values, &all_same_length);
+      input.length = inferred_length;
+      if (func_kind == Function::SCALAR) {
+        if (passed_length != -1 && passed_length != inferred_length) {
+          return Status::Invalid(
+              "Passed batch length for execution did not match actual"
+              " length of values for scalar function execution");
+        }
+      } else if (func_kind == Function::VECTOR) {
+        auto vkernel = static_cast<const VectorKernel*>(kernel);
+        if (!(all_same_length || !vkernel->can_execute_chunkwise)) {
+          return Status::Invalid("Vector kernel arguments must all be the same length");

Review Comment:
   ```suggestion
             return Status::Invalid("Arguments for execution of vector function '", func_name, "' must all be the same length");
   ```



##########
cpp/src/arrow/compute/function.h:
##########
@@ -159,6 +159,20 @@ struct ARROW_EXPORT FunctionDoc {
   static const FunctionDoc& Empty();
 };
 
+/// \brief An executor of a function with a preconfigured kernel
+struct ARROW_EXPORT FunctionExecutor {
+  virtual ~FunctionExecutor() = default;
+  /// \brief Initialize or re-initialize the preconfigured kernel
+  ///
+  /// This method may be called zero or more times. Depending on how
+  /// the FunctionExecutor was obtained, it may already have been initialized.
+  virtual Status Init(const FunctionOptions* options = NULLPTR,
+                      ExecContext* exec_ctx = NULLPTR) = 0;
+  /// \brief Execute the preconfigured kernel with arguments that must fit it

Review Comment:
   Can we detail what happens if the arguments don't fit? Is that checked for?



##########
cpp/src/arrow/compute/function_test.cc:
##########
@@ -351,5 +354,87 @@ TEST(ScalarAggregateFunction, DispatchExact) {
   ASSERT_TRUE(selected_kernel->signature->MatchesInputs(dispatch_args));
 }
 
+namespace {
+
+struct TestFunctionOptions : public FunctionOptions {
+  TestFunctionOptions();
+
+  static const char* kTypeName;
+};
+
+static auto kTestFunctionOptionsType =
+    internal::GetFunctionOptionsType<TestFunctionOptions>();
+
+TestFunctionOptions::TestFunctionOptions() : FunctionOptions(kTestFunctionOptionsType) {}
+
+const char* TestFunctionOptions::kTypeName = "test_options";
+
+}  // namespace
+
+TEST(FunctionExecutor, Basics) {
+  VectorFunction func("vector_test", Arity::Binary(), /*doc=*/FunctionDoc::Empty());
+  bool init_called = false;
+  ExecContext exec_ctx;
+  TestFunctionOptions options;
+  auto init =
+      [&init_called, &exec_ctx, &options](
+          KernelContext* kernel_ctx,
+          const KernelInitArgs& init_args) -> Result<std::unique_ptr<KernelState>> {
+    init_called = true;
+    if (&exec_ctx != kernel_ctx->exec_context()) {
+      return Status::Invalid("expected exec context not found in kernel context");
+    }
+    if (&options != init_args.options) {
+      return Status::Invalid("expected options not found in kernel init args");
+    }
+    return NULLPTR;
+  };
+  auto exec = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) {
+    DCHECK_EQ(2, args.values.size());

Review Comment:
   Should use GTest assertions/expectations in test, not debug-mode checks.
   ```suggestion
       EXPECT_EQ(2, args.values.size());
   ```



##########
cpp/src/arrow/compute/function_test.cc:
##########
@@ -307,7 +310,7 @@ TEST(ScalarAggregateFunction, Basics) {
 }
 
 Result<std::unique_ptr<KernelState>> NoopInit(KernelContext*, const KernelInitArgs&) {
-  return nullptr;
+  return NULLPTR;

Review Comment:
   The `NULLPTR` macro is only useful in header files to avoid weird issues with Microsoft's C++/CLI, AFAIU.



##########
cpp/src/arrow/compute/function.cc:
##########
@@ -167,6 +179,109 @@ const Kernel* DispatchExactImpl(const Function* func,
   return nullptr;
 }
 
+struct FunctionExecutorImpl : public FunctionExecutor {
+  FunctionExecutorImpl(std::vector<TypeHolder> in_types, const Kernel* kernel,
+                       std::unique_ptr<detail::KernelExecutor> executor,
+                       const Function& func)
+      : in_types(std::move(in_types)),
+        kernel(kernel),
+        kernel_ctx(default_exec_context(), kernel),
+        executor(std::move(executor)),
+        func(func),
+        state(),
+        options(NULLPTR),
+        inited(false) {}
+  virtual ~FunctionExecutorImpl() {}
+
+  Status KernelInit(const FunctionOptions* options) {
+    RETURN_NOT_OK(CheckOptions(func, options));
+    if (options == NULLPTR) {
+      options = func.default_options();
+    }
+    if (kernel->init) {
+      ARROW_ASSIGN_OR_RAISE(state,
+                            kernel->init(&kernel_ctx, {kernel, in_types, options}));
+      kernel_ctx.SetState(state.get());
+    }
+
+    RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options}));
+    this->options = options;
+    inited = true;
+    return Status::OK();
+  }
+
+  Status Init(const FunctionOptions* options, ExecContext* exec_ctx) override {
+    kernel_ctx = KernelContext{exec_ctx, kernel};
+    return KernelInit(options);
+  }
+
+  Result<Datum> Execute(const std::vector<Datum>& args, int64_t passed_length) override {
+    util::tracing::Span span;
+
+    auto func_kind = func.kind();
+    auto func_name = func.name();
+    START_COMPUTE_SPAN(span, func_name,
+                       {{"function.name", func_name},
+                        {"function.options", options ? options->ToString() : "<NULLPTR>"},
+                        {"function.kind", func_kind}});
+
+    if (!inited) {
+      ARROW_RETURN_NOT_OK(Init(NULLPTR, default_exec_context()));
+    }
+    ExecContext* ctx = kernel_ctx.exec_context();
+    // Cast arguments if necessary
+    std::vector<Datum> args_with_cast;
+    for (size_t i = 0; i != args.size(); ++i) {
+      const auto& in_type = in_types[i];
+      auto arg = args[i];
+      if (in_type != args[i].type()) {
+        ARROW_ASSIGN_OR_RAISE(arg, Cast(args[i], CastOptions::Safe(in_type), ctx));
+      }
+      args_with_cast.push_back(arg);
+    }
+
+    detail::DatumAccumulator listener;
+
+    ExecBatch input(std::move(args_with_cast), /*length=*/0);
+    if (input.num_values() == 0) {
+      if (passed_length != -1) {
+        input.length = passed_length;
+      }
+    } else {
+      bool all_same_length = false;
+      int64_t inferred_length = detail::InferBatchLength(input.values, &all_same_length);
+      input.length = inferred_length;
+      if (func_kind == Function::SCALAR) {
+        if (passed_length != -1 && passed_length != inferred_length) {
+          return Status::Invalid(
+              "Passed batch length for execution did not match actual"
+              " length of values for scalar function execution");

Review Comment:
   Let's make it more informative by giving the function name?
   ```suggestion
             return Status::Invalid(
                 "Passed batch length for execution did not match actual"
                 " length of values for execution of scalar function '", func_name, "'");
   ```



##########
cpp/src/arrow/compute/function_test.cc:
##########
@@ -351,5 +354,87 @@ TEST(ScalarAggregateFunction, DispatchExact) {
   ASSERT_TRUE(selected_kernel->signature->MatchesInputs(dispatch_args));
 }
 
+namespace {
+
+struct TestFunctionOptions : public FunctionOptions {
+  TestFunctionOptions();
+
+  static const char* kTypeName;
+};
+
+static auto kTestFunctionOptionsType =
+    internal::GetFunctionOptionsType<TestFunctionOptions>();
+
+TestFunctionOptions::TestFunctionOptions() : FunctionOptions(kTestFunctionOptionsType) {}
+
+const char* TestFunctionOptions::kTypeName = "test_options";
+
+}  // namespace
+
+TEST(FunctionExecutor, Basics) {
+  VectorFunction func("vector_test", Arity::Binary(), /*doc=*/FunctionDoc::Empty());
+  bool init_called = false;
+  ExecContext exec_ctx;
+  TestFunctionOptions options;
+  auto init =
+      [&init_called, &exec_ctx, &options](
+          KernelContext* kernel_ctx,
+          const KernelInitArgs& init_args) -> Result<std::unique_ptr<KernelState>> {
+    init_called = true;
+    if (&exec_ctx != kernel_ctx->exec_context()) {
+      return Status::Invalid("expected exec context not found in kernel context");
+    }
+    if (&options != init_args.options) {
+      return Status::Invalid("expected options not found in kernel init args");
+    }
+    return NULLPTR;
+  };
+  auto exec = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) {
+    DCHECK_EQ(2, args.values.size());
+    const int32_t* vals[2];
+    for (size_t i = 0; i < 2; i++) {
+      DCHECK(args.values[i].is_array());
+      const ArraySpan& array = args.values[i].array;
+      DCHECK_EQ(*int32(), *array.type);

Review Comment:
   Use `AssertTypeEqual` or check the type id.
   ```suggestion
         AssertTypeEqual(int32(), array.type);
   ```
   or
   ```suggestion
         EXPECT_EQ(array.type->id(), Type::INT32);
   ```
   



##########
cpp/src/arrow/compute/function_test.cc:
##########
@@ -351,5 +354,87 @@ TEST(ScalarAggregateFunction, DispatchExact) {
   ASSERT_TRUE(selected_kernel->signature->MatchesInputs(dispatch_args));
 }
 
+namespace {
+
+struct TestFunctionOptions : public FunctionOptions {
+  TestFunctionOptions();
+
+  static const char* kTypeName;
+};
+
+static auto kTestFunctionOptionsType =
+    internal::GetFunctionOptionsType<TestFunctionOptions>();
+
+TestFunctionOptions::TestFunctionOptions() : FunctionOptions(kTestFunctionOptionsType) {}
+
+const char* TestFunctionOptions::kTypeName = "test_options";
+
+}  // namespace
+
+TEST(FunctionExecutor, Basics) {
+  VectorFunction func("vector_test", Arity::Binary(), /*doc=*/FunctionDoc::Empty());
+  bool init_called = false;
+  ExecContext exec_ctx;
+  TestFunctionOptions options;
+  auto init =
+      [&init_called, &exec_ctx, &options](
+          KernelContext* kernel_ctx,
+          const KernelInitArgs& init_args) -> Result<std::unique_ptr<KernelState>> {
+    init_called = true;
+    if (&exec_ctx != kernel_ctx->exec_context()) {
+      return Status::Invalid("expected exec context not found in kernel context");
+    }
+    if (&options != init_args.options) {
+      return Status::Invalid("expected options not found in kernel init args");
+    }
+    return NULLPTR;
+  };
+  auto exec = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) {
+    DCHECK_EQ(2, args.values.size());
+    const int32_t* vals[2];
+    for (size_t i = 0; i < 2; i++) {
+      DCHECK(args.values[i].is_array());
+      const ArraySpan& array = args.values[i].array;
+      DCHECK_EQ(*int32(), *array.type);
+      vals[i] = array.GetValues<int32_t>(1);
+    }
+    DCHECK(out->is_array_data());
+    auto out_data = out->array_data();
+    Int32Builder builder;
+    for (int64_t i = 0; i < args.length; i++) {
+      ARROW_RETURN_NOT_OK(builder.Append(vals[0][i] + vals[1][i]));
+    }
+    ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish());
+    *out_data.get() = *array->data();
+    return Status::OK();
+  };
+  std::vector<InputType> in_types = {int32(), int32()};
+  OutputType out_type = int32();
+  ASSERT_OK(func.AddKernel(in_types, out_type, exec, init));
+  ASSERT_OK_AND_ASSIGN(const Kernel* dispatched, func.DispatchExact({int32(), int32()}));
+  ASSERT_EQ(exec, static_cast<const ScalarKernel*>(dispatched)->exec);
+  std::vector<TypeHolder> inputs = {int32(), int32()};
+  ASSERT_OK_AND_ASSIGN(auto func_exec, func.GetBestExecutor(inputs));
+  ASSERT_FALSE(init_called);
+  ASSERT_OK(func_exec->Init(&options, &exec_ctx));
+  ASSERT_TRUE(init_called);
+  auto build_array = [](int32_t i) -> Result<Datum> {
+    Int32Builder builder;
+    ARROW_RETURN_NOT_OK(builder.Append(i));
+    ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish());
+    return Datum(array->data());
+  };
+  for (int32_t i = 1; i <= 3; i++) {
+    ASSERT_OK_AND_ASSIGN(auto value0, build_array(i));
+    ASSERT_OK_AND_ASSIGN(auto value1, build_array(i + 1));
+    std::vector<Datum> values = {value0, value1};
+    ASSERT_OK_AND_ASSIGN(auto result, func_exec->Execute(values, 1));
+    ASSERT_TRUE(result.is_array());
+    auto int32_data = dynamic_cast<const ArrayData*>(result.array().get());

Review Comment:
   Can use `ArrayFromJSON` to construct the expected result and then `AssertArraysEqual`.



##########
cpp/src/arrow/compute/function_test.cc:
##########
@@ -351,5 +354,87 @@ TEST(ScalarAggregateFunction, DispatchExact) {
   ASSERT_TRUE(selected_kernel->signature->MatchesInputs(dispatch_args));
 }
 
+namespace {
+
+struct TestFunctionOptions : public FunctionOptions {
+  TestFunctionOptions();
+
+  static const char* kTypeName;
+};
+
+static auto kTestFunctionOptionsType =
+    internal::GetFunctionOptionsType<TestFunctionOptions>();
+
+TestFunctionOptions::TestFunctionOptions() : FunctionOptions(kTestFunctionOptionsType) {}
+
+const char* TestFunctionOptions::kTypeName = "test_options";
+
+}  // namespace
+
+TEST(FunctionExecutor, Basics) {
+  VectorFunction func("vector_test", Arity::Binary(), /*doc=*/FunctionDoc::Empty());
+  bool init_called = false;
+  ExecContext exec_ctx;
+  TestFunctionOptions options;
+  auto init =
+      [&init_called, &exec_ctx, &options](
+          KernelContext* kernel_ctx,
+          const KernelInitArgs& init_args) -> Result<std::unique_ptr<KernelState>> {
+    init_called = true;
+    if (&exec_ctx != kernel_ctx->exec_context()) {
+      return Status::Invalid("expected exec context not found in kernel context");
+    }
+    if (&options != init_args.options) {
+      return Status::Invalid("expected options not found in kernel init args");
+    }
+    return NULLPTR;
+  };
+  auto exec = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) {
+    DCHECK_EQ(2, args.values.size());
+    const int32_t* vals[2];
+    for (size_t i = 0; i < 2; i++) {
+      DCHECK(args.values[i].is_array());
+      const ArraySpan& array = args.values[i].array;
+      DCHECK_EQ(*int32(), *array.type);
+      vals[i] = array.GetValues<int32_t>(1);
+    }
+    DCHECK(out->is_array_data());
+    auto out_data = out->array_data();
+    Int32Builder builder;
+    for (int64_t i = 0; i < args.length; i++) {
+      ARROW_RETURN_NOT_OK(builder.Append(vals[0][i] + vals[1][i]));
+    }
+    ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish());
+    *out_data.get() = *array->data();
+    return Status::OK();
+  };
+  std::vector<InputType> in_types = {int32(), int32()};
+  OutputType out_type = int32();
+  ASSERT_OK(func.AddKernel(in_types, out_type, exec, init));
+  ASSERT_OK_AND_ASSIGN(const Kernel* dispatched, func.DispatchExact({int32(), int32()}));
+  ASSERT_EQ(exec, static_cast<const ScalarKernel*>(dispatched)->exec);

Review Comment:
   ```suggestion
     ASSERT_EQ(exec, checked_cast<const ScalarKernel*>(dispatched)->exec);
   ```



##########
cpp/src/arrow/compute/function_test.cc:
##########
@@ -351,5 +354,87 @@ TEST(ScalarAggregateFunction, DispatchExact) {
   ASSERT_TRUE(selected_kernel->signature->MatchesInputs(dispatch_args));
 }
 
+namespace {
+
+struct TestFunctionOptions : public FunctionOptions {
+  TestFunctionOptions();
+
+  static const char* kTypeName;
+};
+
+static auto kTestFunctionOptionsType =
+    internal::GetFunctionOptionsType<TestFunctionOptions>();
+
+TestFunctionOptions::TestFunctionOptions() : FunctionOptions(kTestFunctionOptionsType) {}
+
+const char* TestFunctionOptions::kTypeName = "test_options";
+
+}  // namespace
+
+TEST(FunctionExecutor, Basics) {
+  VectorFunction func("vector_test", Arity::Binary(), /*doc=*/FunctionDoc::Empty());
+  bool init_called = false;
+  ExecContext exec_ctx;
+  TestFunctionOptions options;
+  auto init =
+      [&init_called, &exec_ctx, &options](
+          KernelContext* kernel_ctx,
+          const KernelInitArgs& init_args) -> Result<std::unique_ptr<KernelState>> {
+    init_called = true;
+    if (&exec_ctx != kernel_ctx->exec_context()) {
+      return Status::Invalid("expected exec context not found in kernel context");
+    }
+    if (&options != init_args.options) {
+      return Status::Invalid("expected options not found in kernel init args");
+    }
+    return NULLPTR;
+  };
+  auto exec = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) {
+    DCHECK_EQ(2, args.values.size());
+    const int32_t* vals[2];
+    for (size_t i = 0; i < 2; i++) {
+      DCHECK(args.values[i].is_array());
+      const ArraySpan& array = args.values[i].array;
+      DCHECK_EQ(*int32(), *array.type);
+      vals[i] = array.GetValues<int32_t>(1);
+    }
+    DCHECK(out->is_array_data());
+    auto out_data = out->array_data();
+    Int32Builder builder;
+    for (int64_t i = 0; i < args.length; i++) {
+      ARROW_RETURN_NOT_OK(builder.Append(vals[0][i] + vals[1][i]));
+    }
+    ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish());
+    *out_data.get() = *array->data();
+    return Status::OK();
+  };
+  std::vector<InputType> in_types = {int32(), int32()};
+  OutputType out_type = int32();
+  ASSERT_OK(func.AddKernel(in_types, out_type, exec, init));
+  ASSERT_OK_AND_ASSIGN(const Kernel* dispatched, func.DispatchExact({int32(), int32()}));
+  ASSERT_EQ(exec, static_cast<const ScalarKernel*>(dispatched)->exec);
+  std::vector<TypeHolder> inputs = {int32(), int32()};
+  ASSERT_OK_AND_ASSIGN(auto func_exec, func.GetBestExecutor(inputs));
+  ASSERT_FALSE(init_called);
+  ASSERT_OK(func_exec->Init(&options, &exec_ctx));
+  ASSERT_TRUE(init_called);
+  auto build_array = [](int32_t i) -> Result<Datum> {
+    Int32Builder builder;
+    ARROW_RETURN_NOT_OK(builder.Append(i));
+    ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish());
+    return Datum(array->data());
+  };
+  for (int32_t i = 1; i <= 3; i++) {
+    ASSERT_OK_AND_ASSIGN(auto value0, build_array(i));
+    ASSERT_OK_AND_ASSIGN(auto value1, build_array(i + 1));
+    std::vector<Datum> values = {value0, value1};
+    ASSERT_OK_AND_ASSIGN(auto result, func_exec->Execute(values, 1));
+    ASSERT_TRUE(result.is_array());
+    auto int32_data = dynamic_cast<const ArrayData*>(result.array().get());
+    ASSERT_NE(NULLPTR, int32_data);
+    EXPECT_EQ(2 * i + 1, int32_data->GetValues<int32_t>(1)[0]);
+  }
+}

Review Comment:
   It would be nice to also add tests for the high-level `GetFunctionExecutor` functions. These can look up one of the existing compute functions.
   (exercise the different cases: with optional options, with required options, calling `Init()` once again...)



##########
cpp/src/arrow/compute/function.h:
##########
@@ -159,6 +159,20 @@ struct ARROW_EXPORT FunctionDoc {
   static const FunctionDoc& Empty();
 };
 
+/// \brief An executor of a function with a preconfigured kernel
+struct ARROW_EXPORT FunctionExecutor {
+  virtual ~FunctionExecutor() = default;
+  /// \brief Initialize or re-initialize the preconfigured kernel
+  ///
+  /// This method may be called zero or more times. Depending on how
+  /// the FunctionExecutor was obtained, it may already have been initialized.
+  virtual Status Init(const FunctionOptions* options = NULLPTR,
+                      ExecContext* exec_ctx = NULLPTR) = 0;
+  /// \brief Execute the preconfigured kernel with arguments that must fit it

Review Comment:
   Also, would be nice to document the arguments, as `passed_length` is probably not self-explanatory?



##########
cpp/src/arrow/compute/function_test.cc:
##########
@@ -351,5 +354,87 @@ TEST(ScalarAggregateFunction, DispatchExact) {
   ASSERT_TRUE(selected_kernel->signature->MatchesInputs(dispatch_args));
 }
 
+namespace {
+
+struct TestFunctionOptions : public FunctionOptions {
+  TestFunctionOptions();
+
+  static const char* kTypeName;
+};
+
+static auto kTestFunctionOptionsType =
+    internal::GetFunctionOptionsType<TestFunctionOptions>();
+
+TestFunctionOptions::TestFunctionOptions() : FunctionOptions(kTestFunctionOptionsType) {}
+
+const char* TestFunctionOptions::kTypeName = "test_options";
+
+}  // namespace
+
+TEST(FunctionExecutor, Basics) {
+  VectorFunction func("vector_test", Arity::Binary(), /*doc=*/FunctionDoc::Empty());
+  bool init_called = false;
+  ExecContext exec_ctx;
+  TestFunctionOptions options;
+  auto init =
+      [&init_called, &exec_ctx, &options](
+          KernelContext* kernel_ctx,
+          const KernelInitArgs& init_args) -> Result<std::unique_ptr<KernelState>> {
+    init_called = true;
+    if (&exec_ctx != kernel_ctx->exec_context()) {
+      return Status::Invalid("expected exec context not found in kernel context");
+    }
+    if (&options != init_args.options) {
+      return Status::Invalid("expected options not found in kernel init args");
+    }
+    return NULLPTR;
+  };
+  auto exec = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) {
+    DCHECK_EQ(2, args.values.size());
+    const int32_t* vals[2];
+    for (size_t i = 0; i < 2; i++) {
+      DCHECK(args.values[i].is_array());
+      const ArraySpan& array = args.values[i].array;
+      DCHECK_EQ(*int32(), *array.type);
+      vals[i] = array.GetValues<int32_t>(1);
+    }
+    DCHECK(out->is_array_data());
+    auto out_data = out->array_data();
+    Int32Builder builder;
+    for (int64_t i = 0; i < args.length; i++) {
+      ARROW_RETURN_NOT_OK(builder.Append(vals[0][i] + vals[1][i]));
+    }
+    ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish());
+    *out_data.get() = *array->data();
+    return Status::OK();
+  };
+  std::vector<InputType> in_types = {int32(), int32()};
+  OutputType out_type = int32();
+  ASSERT_OK(func.AddKernel(in_types, out_type, exec, init));
+  ASSERT_OK_AND_ASSIGN(const Kernel* dispatched, func.DispatchExact({int32(), int32()}));
+  ASSERT_EQ(exec, static_cast<const ScalarKernel*>(dispatched)->exec);
+  std::vector<TypeHolder> inputs = {int32(), int32()};
+  ASSERT_OK_AND_ASSIGN(auto func_exec, func.GetBestExecutor(inputs));
+  ASSERT_FALSE(init_called);
+  ASSERT_OK(func_exec->Init(&options, &exec_ctx));
+  ASSERT_TRUE(init_called);
+  auto build_array = [](int32_t i) -> Result<Datum> {
+    Int32Builder builder;
+    ARROW_RETURN_NOT_OK(builder.Append(i));
+    ARROW_ASSIGN_OR_RAISE(auto array, builder.Finish());
+    return Datum(array->data());
+  };
+  for (int32_t i = 1; i <= 3; i++) {
+    ASSERT_OK_AND_ASSIGN(auto value0, build_array(i));
+    ASSERT_OK_AND_ASSIGN(auto value1, build_array(i + 1));

Review Comment:
   You can just use `ArrayFromJSON`, it will the code shorter and the examples easier to read (no need to bother with `Int32Builder`.



##########
cpp/src/arrow/compute/exec.h:
##########
@@ -30,7 +30,7 @@
 
 #include "arrow/array/data.h"
 #include "arrow/compute/exec/expression.h"
-#include "arrow/compute/type_fwd.h"
+#include "arrow/compute/registry.h"

Review Comment:
   I don't think this change is necessary actually?



##########
cpp/src/arrow/compute/function.cc:
##########
@@ -167,6 +179,109 @@ const Kernel* DispatchExactImpl(const Function* func,
   return nullptr;
 }
 
+struct FunctionExecutorImpl : public FunctionExecutor {
+  FunctionExecutorImpl(std::vector<TypeHolder> in_types, const Kernel* kernel,
+                       std::unique_ptr<detail::KernelExecutor> executor,
+                       const Function& func)
+      : in_types(std::move(in_types)),
+        kernel(kernel),
+        kernel_ctx(default_exec_context(), kernel),

Review Comment:
   Is this useful? It will be overriden in `Init` anyway.



##########
cpp/src/arrow/compute/function.cc:
##########
@@ -167,6 +179,109 @@ const Kernel* DispatchExactImpl(const Function* func,
   return nullptr;
 }
 
+struct FunctionExecutorImpl : public FunctionExecutor {
+  FunctionExecutorImpl(std::vector<TypeHolder> in_types, const Kernel* kernel,
+                       std::unique_ptr<detail::KernelExecutor> executor,
+                       const Function& func)
+      : in_types(std::move(in_types)),
+        kernel(kernel),
+        kernel_ctx(default_exec_context(), kernel),
+        executor(std::move(executor)),
+        func(func),
+        state(),
+        options(NULLPTR),
+        inited(false) {}
+  virtual ~FunctionExecutorImpl() {}
+
+  Status KernelInit(const FunctionOptions* options) {
+    RETURN_NOT_OK(CheckOptions(func, options));
+    if (options == NULLPTR) {
+      options = func.default_options();
+    }
+    if (kernel->init) {
+      ARROW_ASSIGN_OR_RAISE(state,
+                            kernel->init(&kernel_ctx, {kernel, in_types, options}));
+      kernel_ctx.SetState(state.get());
+    }
+
+    RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options}));
+    this->options = options;
+    inited = true;
+    return Status::OK();
+  }
+
+  Status Init(const FunctionOptions* options, ExecContext* exec_ctx) override {
+    kernel_ctx = KernelContext{exec_ctx, kernel};
+    return KernelInit(options);
+  }
+
+  Result<Datum> Execute(const std::vector<Datum>& args, int64_t passed_length) override {
+    util::tracing::Span span;
+
+    auto func_kind = func.kind();
+    auto func_name = func.name();
+    START_COMPUTE_SPAN(span, func_name,
+                       {{"function.name", func_name},
+                        {"function.options", options ? options->ToString() : "<NULLPTR>"},
+                        {"function.kind", func_kind}});
+
+    if (!inited) {
+      ARROW_RETURN_NOT_OK(Init(NULLPTR, default_exec_context()));
+    }
+    ExecContext* ctx = kernel_ctx.exec_context();
+    // Cast arguments if necessary
+    std::vector<Datum> args_with_cast;
+    for (size_t i = 0; i != args.size(); ++i) {
+      const auto& in_type = in_types[i];
+      auto arg = args[i];
+      if (in_type != args[i].type()) {
+        ARROW_ASSIGN_OR_RAISE(arg, Cast(args[i], CastOptions::Safe(in_type), ctx));
+      }
+      args_with_cast.push_back(arg);
+    }
+
+    detail::DatumAccumulator listener;
+
+    ExecBatch input(std::move(args_with_cast), /*length=*/0);
+    if (input.num_values() == 0) {
+      if (passed_length != -1) {
+        input.length = passed_length;
+      }
+    } else {
+      bool all_same_length = false;
+      int64_t inferred_length = detail::InferBatchLength(input.values, &all_same_length);
+      input.length = inferred_length;
+      if (func_kind == Function::SCALAR) {
+        if (passed_length != -1 && passed_length != inferred_length) {
+          return Status::Invalid(
+              "Passed batch length for execution did not match actual"
+              " length of values for scalar function execution");
+        }
+      } else if (func_kind == Function::VECTOR) {
+        auto vkernel = static_cast<const VectorKernel*>(kernel);

Review Comment:
   ```suggestion
           auto vkernel = checked_cast<const VectorKernel*>(kernel);
   ```



##########
cpp/src/arrow/compute/function.cc:
##########
@@ -167,6 +179,109 @@ const Kernel* DispatchExactImpl(const Function* func,
   return nullptr;
 }
 
+struct FunctionExecutorImpl : public FunctionExecutor {
+  FunctionExecutorImpl(std::vector<TypeHolder> in_types, const Kernel* kernel,
+                       std::unique_ptr<detail::KernelExecutor> executor,
+                       const Function& func)
+      : in_types(std::move(in_types)),
+        kernel(kernel),
+        kernel_ctx(default_exec_context(), kernel),
+        executor(std::move(executor)),
+        func(func),
+        state(),
+        options(NULLPTR),
+        inited(false) {}
+  virtual ~FunctionExecutorImpl() {}
+
+  Status KernelInit(const FunctionOptions* options) {
+    RETURN_NOT_OK(CheckOptions(func, options));
+    if (options == NULLPTR) {
+      options = func.default_options();
+    }
+    if (kernel->init) {
+      ARROW_ASSIGN_OR_RAISE(state,
+                            kernel->init(&kernel_ctx, {kernel, in_types, options}));
+      kernel_ctx.SetState(state.get());
+    }
+
+    RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options}));
+    this->options = options;
+    inited = true;
+    return Status::OK();
+  }
+
+  Status Init(const FunctionOptions* options, ExecContext* exec_ctx) override {
+    kernel_ctx = KernelContext{exec_ctx, kernel};
+    return KernelInit(options);
+  }
+
+  Result<Datum> Execute(const std::vector<Datum>& args, int64_t passed_length) override {
+    util::tracing::Span span;
+
+    auto func_kind = func.kind();
+    auto func_name = func.name();
+    START_COMPUTE_SPAN(span, func_name,
+                       {{"function.name", func_name},
+                        {"function.options", options ? options->ToString() : "<NULLPTR>"},
+                        {"function.kind", func_kind}});
+
+    if (!inited) {
+      ARROW_RETURN_NOT_OK(Init(NULLPTR, default_exec_context()));
+    }
+    ExecContext* ctx = kernel_ctx.exec_context();
+    // Cast arguments if necessary
+    std::vector<Datum> args_with_cast;
+    for (size_t i = 0; i != args.size(); ++i) {
+      const auto& in_type = in_types[i];
+      auto arg = args[i];
+      if (in_type != args[i].type()) {
+        ARROW_ASSIGN_OR_RAISE(arg, Cast(args[i], CastOptions::Safe(in_type), ctx));
+      }
+      args_with_cast.push_back(arg);
+    }
+
+    detail::DatumAccumulator listener;
+
+    ExecBatch input(std::move(args_with_cast), /*length=*/0);
+    if (input.num_values() == 0) {
+      if (passed_length != -1) {
+        input.length = passed_length;
+      }
+    } else {
+      bool all_same_length = false;
+      int64_t inferred_length = detail::InferBatchLength(input.values, &all_same_length);
+      input.length = inferred_length;
+      if (func_kind == Function::SCALAR) {
+        if (passed_length != -1 && passed_length != inferred_length) {
+          return Status::Invalid(
+              "Passed batch length for execution did not match actual"
+              " length of values for scalar function execution");
+        }
+      } else if (func_kind == Function::VECTOR) {
+        auto vkernel = static_cast<const VectorKernel*>(kernel);
+        if (!(all_same_length || !vkernel->can_execute_chunkwise)) {

Review Comment:
   Double negatives are more cumbersome to read
   ```suggestion
           if (!all_same_length && vkernel->can_execute_chunkwise)) {
   ```



##########
cpp/src/arrow/compute/function.cc:
##########
@@ -167,6 +179,109 @@ const Kernel* DispatchExactImpl(const Function* func,
   return nullptr;
 }
 
+struct FunctionExecutorImpl : public FunctionExecutor {
+  FunctionExecutorImpl(std::vector<TypeHolder> in_types, const Kernel* kernel,
+                       std::unique_ptr<detail::KernelExecutor> executor,
+                       const Function& func)
+      : in_types(std::move(in_types)),
+        kernel(kernel),
+        kernel_ctx(default_exec_context(), kernel),
+        executor(std::move(executor)),
+        func(func),
+        state(),
+        options(NULLPTR),
+        inited(false) {}
+  virtual ~FunctionExecutorImpl() {}
+
+  Status KernelInit(const FunctionOptions* options) {
+    RETURN_NOT_OK(CheckOptions(func, options));
+    if (options == NULLPTR) {
+      options = func.default_options();
+    }
+    if (kernel->init) {
+      ARROW_ASSIGN_OR_RAISE(state,
+                            kernel->init(&kernel_ctx, {kernel, in_types, options}));
+      kernel_ctx.SetState(state.get());
+    }
+
+    RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, in_types, options}));
+    this->options = options;
+    inited = true;
+    return Status::OK();
+  }
+
+  Status Init(const FunctionOptions* options, ExecContext* exec_ctx) override {
+    kernel_ctx = KernelContext{exec_ctx, kernel};
+    return KernelInit(options);
+  }
+
+  Result<Datum> Execute(const std::vector<Datum>& args, int64_t passed_length) override {
+    util::tracing::Span span;
+
+    auto func_kind = func.kind();
+    auto func_name = func.name();
+    START_COMPUTE_SPAN(span, func_name,
+                       {{"function.name", func_name},
+                        {"function.options", options ? options->ToString() : "<NULLPTR>"},
+                        {"function.kind", func_kind}});
+
+    if (!inited) {
+      ARROW_RETURN_NOT_OK(Init(NULLPTR, default_exec_context()));
+    }
+    ExecContext* ctx = kernel_ctx.exec_context();
+    // Cast arguments if necessary
+    std::vector<Datum> args_with_cast;

Review Comment:
   ```suggestion
       std::vector<Datum> args_with_cast;
       args_with_cast.reserve(args.size());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org