You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2020/11/04 20:19:10 UTC

[arrow] branch master updated: ARROW-10468: [C++][Compute] Provide KernelExecutor instead of FunctionExecutor

This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c223f5  ARROW-10468: [C++][Compute] Provide KernelExecutor instead of FunctionExecutor
1c223f5 is described below

commit 1c223f517f30f4d577a00eae3d0dbca929b2ffac
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Wed Nov 4 15:18:18 2020 -0500

    ARROW-10468: [C++][Compute] Provide KernelExecutor instead of FunctionExecutor
    
    This is a sub PR of ARROW-10322.
    
    The motivation is to defer dispatch and state initialization instead of handling these within FunctionExecutor, which will allow us to avoid multiple dispatches in the case of repeated execution of the same kernel(s). Initialization of KernelState is also deferred so that expensive state (for example hash table construction in the set lookup kernels) may be easily reused for multiple executions.
    
    A microbenchmark of kernel dispatch performance is also added. The changes here do not affect dispatch time.
    
    ```
    Before:
    ------------------------------------------------------------------------------
    Benchmark                                       Time           CPU Iterations
    ------------------------------------------------------------------------------
    BM_CastDispatch/min_time:1.000            1515564 ns    1515530 ns        916   659.835k items/s
    BM_CastDispatchBaseline/min_time:1.000     236472 ns     236468 ns       5919   4.12978M items/s
    BM_AddDispatch/min_time:1.000                 284 ns        284 ns    4921359   3.35523M items/s
    
    After:
    ------------------------------------------------------------------------------
    Benchmark                                       Time           CPU Iterations
    ------------------------------------------------------------------------------
    BM_CastDispatch/min_time:1.000            1583169 ns    1583129 ns        884    631.66k items/s
    BM_CastDispatchBaseline/min_time:1.000     233199 ns     233194 ns       5990   4.18776M items/s
    BM_AddDispatch/min_time:1.000                 284 ns        284 ns    4901845   3.35489M items/s
    ```
    
    Closes #8574 from bkietz/10468-Refactor-FunctionExecutor
    
    Authored-by: Benjamin Kietzman <be...@gmail.com>
    Signed-off-by: Benjamin Kietzman <be...@gmail.com>
---
 cpp/src/arrow/compute/CMakeLists.txt               |   2 +
 cpp/src/arrow/compute/cast.cc                      |   2 +-
 cpp/src/arrow/compute/cast.h                       |   2 +-
 cpp/src/arrow/compute/exec.cc                      | 188 ++++++++-------------
 cpp/src/arrow/compute/exec_internal.h              |  14 +-
 cpp/src/arrow/compute/function.cc                  |  33 +++-
 cpp/src/arrow/compute/function.h                   |  30 ++--
 cpp/src/arrow/compute/function_benchmark.cc        | 117 +++++++++++++
 cpp/src/arrow/compute/function_test.cc             |   7 +-
 .../arrow/compute/kernels/scalar_cast_temporal.cc  |   2 +-
 10 files changed, 249 insertions(+), 148 deletions(-)

diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt
index 97fbd17..e781dff 100644
--- a/cpp/src/arrow/compute/CMakeLists.txt
+++ b/cpp/src/arrow/compute/CMakeLists.txt
@@ -65,4 +65,6 @@ add_arrow_compute_test(internals_test
                        kernel_test.cc
                        registry_test.cc)
 
+add_arrow_benchmark(function_benchmark PREFIX "arrow-compute")
+
 add_subdirectory(kernels)
diff --git a/cpp/src/arrow/compute/cast.cc b/cpp/src/arrow/compute/cast.cc
index fb180e8..29a80f7 100644
--- a/cpp/src/arrow/compute/cast.cc
+++ b/cpp/src/arrow/compute/cast.cc
@@ -163,7 +163,7 @@ bool CastFunction::CanCastTo(const DataType& out_type) const {
   return impl_->in_types.find(static_cast<int>(out_type.id())) != impl_->in_types.end();
 }
 
-Result<const ScalarKernel*> CastFunction::DispatchExact(
+Result<const Kernel*> CastFunction::DispatchExact(
     const std::vector<ValueDescr>& values) const {
   const int passed_num_args = static_cast<int>(values.size());
 
diff --git a/cpp/src/arrow/compute/cast.h b/cpp/src/arrow/compute/cast.h
index 82dd357..43392ce 100644
--- a/cpp/src/arrow/compute/cast.h
+++ b/cpp/src/arrow/compute/cast.h
@@ -98,7 +98,7 @@ class CastFunction : public ScalarFunction {
 
   bool CanCastTo(const DataType& out_type) const;
 
-  Result<const ScalarKernel*> DispatchExact(
+  Result<const Kernel*> DispatchExact(
       const std::vector<ValueDescr>& values) const override;
 
  private:
diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc
index 69e788a..dd97119 100644
--- a/cpp/src/arrow/compute/exec.cc
+++ b/cpp/src/arrow/compute/exec.cc
@@ -45,6 +45,7 @@
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
 
 namespace arrow {
 
@@ -100,14 +101,6 @@ void ComputeDataPreallocate(const DataType& type,
   }
 }
 
-Status GetValueDescriptors(const std::vector<Datum>& args,
-                           std::vector<ValueDescr>* descrs) {
-  for (const auto& arg : args) {
-    descrs->emplace_back(arg.descr());
-  }
-  return Status::OK();
-}
-
 }  // namespace
 
 namespace detail {
@@ -432,15 +425,15 @@ class NullPropagator {
 std::shared_ptr<ChunkedArray> ToChunkedArray(const std::vector<Datum>& values,
                                              const std::shared_ptr<DataType>& type) {
   std::vector<std::shared_ptr<Array>> arrays;
-  for (const auto& val : values) {
-    auto boxed = val.make_array();
-    if (boxed->length() == 0) {
+  arrays.reserve(values.size());
+  for (const Datum& val : values) {
+    if (val.length() == 0) {
       // Skip empty chunks
       continue;
     }
-    arrays.emplace_back(std::move(boxed));
+    arrays.emplace_back(val.make_array());
   }
-  return std::make_shared<ChunkedArray>(arrays, type);
+  return std::make_shared<ChunkedArray>(std::move(arrays), type);
 }
 
 bool HaveChunkedArray(const std::vector<Datum>& values) {
@@ -452,82 +445,56 @@ bool HaveChunkedArray(const std::vector<Datum>& values) {
   return false;
 }
 
-template <typename FunctionType>
-class FunctionExecutorImpl : public FunctionExecutor {
+template <typename KernelType>
+class KernelExecutorImpl : public KernelExecutor {
  public:
-  FunctionExecutorImpl(ExecContext* exec_ctx, const FunctionType* func,
-                       const FunctionOptions* options)
-      : exec_ctx_(exec_ctx), kernel_ctx_(exec_ctx), func_(func), options_(options) {}
-
- protected:
-  using KernelType = typename FunctionType::KernelType;
+  Status Init(KernelContext* kernel_ctx, KernelInitArgs args) override {
+    kernel_ctx_ = kernel_ctx;
+    kernel_ = static_cast<const KernelType*>(args.kernel);
 
-  void Reset() {}
+    // Resolve the output descriptor for this kernel
+    ARROW_ASSIGN_OR_RAISE(
+        output_descr_, kernel_->signature->out_type().Resolve(kernel_ctx_, args.inputs));
 
-  Status InitState() {
-    // Some kernels require initialization of an opaque state object
-    if (kernel_->init) {
-      KernelInitArgs init_args{kernel_, input_descrs_, options_};
-      state_ = kernel_->init(&kernel_ctx_, init_args);
-      ARROW_CTX_RETURN_IF_ERROR(&kernel_ctx_);
-      kernel_ctx_.SetState(state_.get());
-    }
     return Status::OK();
   }
 
+ protected:
   // This is overridden by the VectorExecutor
   virtual Status SetupArgIteration(const std::vector<Datum>& args) {
-    ARROW_ASSIGN_OR_RAISE(batch_iterator_,
-                          ExecBatchIterator::Make(args, exec_ctx_->exec_chunksize()));
+    ARROW_ASSIGN_OR_RAISE(
+        batch_iterator_, ExecBatchIterator::Make(args, exec_context()->exec_chunksize()));
     return Status::OK();
   }
 
-  Status BindArgs(const std::vector<Datum>& args) {
-    RETURN_NOT_OK(GetValueDescriptors(args, &input_descrs_));
-    ARROW_ASSIGN_OR_RAISE(kernel_, func_->DispatchExact(input_descrs_));
-
-    // Initialize kernel state, since type resolution may depend on this state
-    RETURN_NOT_OK(this->InitState());
-
-    // Resolve the output descriptor for this kernel
-    ARROW_ASSIGN_OR_RAISE(output_descr_, kernel_->signature->out_type().Resolve(
-                                             &kernel_ctx_, input_descrs_));
-
-    return SetupArgIteration(args);
-  }
-
   Result<std::shared_ptr<ArrayData>> PrepareOutput(int64_t length) {
     auto out = std::make_shared<ArrayData>(output_descr_.type, length);
     out->buffers.resize(output_num_buffers_);
 
     if (validity_preallocated_) {
-      ARROW_ASSIGN_OR_RAISE(out->buffers[0], kernel_ctx_.AllocateBitmap(length));
+      ARROW_ASSIGN_OR_RAISE(out->buffers[0], kernel_ctx_->AllocateBitmap(length));
     }
     for (size_t i = 0; i < data_preallocated_.size(); ++i) {
       const auto& prealloc = data_preallocated_[i];
       if (prealloc.bit_width >= 0) {
         ARROW_ASSIGN_OR_RAISE(
             out->buffers[i + 1],
-            AllocateDataBuffer(&kernel_ctx_, length + prealloc.added_length,
+            AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length,
                                prealloc.bit_width));
       }
     }
     return out;
   }
 
-  ValueDescr output_descr() const override { return output_descr_; }
+  ExecContext* exec_context() { return kernel_ctx_->exec_context(); }
+  KernelState* state() { return kernel_ctx_->state(); }
 
   // Not all of these members are used for every executor type
 
-  ExecContext* exec_ctx_;
-  KernelContext kernel_ctx_;
-  const FunctionType* func_;
+  KernelContext* kernel_ctx_;
   const KernelType* kernel_;
   std::unique_ptr<ExecBatchIterator> batch_iterator_;
-  std::unique_ptr<KernelState> state_;
-  std::vector<ValueDescr> input_descrs_;
   ValueDescr output_descr_;
-  const FunctionOptions* options_;
 
   int output_num_buffers_;
 
@@ -540,13 +507,8 @@ class FunctionExecutorImpl : public FunctionExecutor {
   std::vector<BufferPreallocation> data_preallocated_;
 };
 
-class ScalarExecutor : public FunctionExecutorImpl<ScalarFunction> {
+class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
  public:
-  using FunctionType = ScalarFunction;
-  static constexpr Function::Kind function_kind = Function::SCALAR;
-  using BASE = FunctionExecutorImpl<ScalarFunction>;
-  using BASE::BASE;
-
   Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
     RETURN_NOT_OK(PrepareExecute(args));
     ExecBatch batch;
@@ -583,7 +545,8 @@ class ScalarExecutor : public FunctionExecutorImpl<ScalarFunction> {
       } else {
         // XXX: In the case where no outputs are omitted, is returning a 0-length
         // array always the correct move?
-        return MakeArrayOfNull(output_descr_.type, /*length=*/0, exec_ctx_->memory_pool())
+        return MakeArrayOfNull(output_descr_.type, /*length=*/0,
+                               exec_context()->memory_pool())
             .ValueOrDie();
       }
     }
@@ -597,7 +560,7 @@ class ScalarExecutor : public FunctionExecutorImpl<ScalarFunction> {
     if (output_descr_.shape == ValueDescr::ARRAY) {
       ArrayData* out_arr = out.mutable_array();
       if (kernel_->null_handling == NullHandling::INTERSECTION) {
-        RETURN_NOT_OK(PropagateNulls(&kernel_ctx_, batch, out_arr));
+        RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out_arr));
       } else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
         out_arr->null_count = 0;
       }
@@ -612,8 +575,8 @@ class ScalarExecutor : public FunctionExecutorImpl<ScalarFunction> {
       }
     }
 
-    kernel_->exec(&kernel_ctx_, batch, &out);
-    ARROW_CTX_RETURN_IF_ERROR(&kernel_ctx_);
+    kernel_->exec(kernel_ctx_, batch, &out);
+    ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
     if (!preallocate_contiguous_) {
       // If we are producing chunked output rather than one big array, then
       // emit each chunk as soon as it's available
@@ -623,8 +586,7 @@ class ScalarExecutor : public FunctionExecutorImpl<ScalarFunction> {
   }
 
   Status PrepareExecute(const std::vector<Datum>& args) {
-    this->Reset();
-    RETURN_NOT_OK(this->BindArgs(args));
+    RETURN_NOT_OK(this->SetupArgIteration(args));
 
     if (output_descr_.shape == ValueDescr::ARRAY) {
       // If the executor is configured to produce a single large Array output for
@@ -698,7 +660,7 @@ class ScalarExecutor : public FunctionExecutorImpl<ScalarFunction> {
     // Some kernels are also unable to write into sliced outputs, so we respect the
     // kernel's attributes.
     preallocate_contiguous_ =
-        (exec_ctx_->preallocate_contiguous() && kernel_->can_write_into_slices &&
+        (exec_context()->preallocate_contiguous() && kernel_->can_write_into_slices &&
          validity_preallocated_ && !is_nested(output_descr_.type->id()) &&
          data_preallocated_.size() == static_cast<size_t>(output_num_buffers_ - 1) &&
          std::all_of(data_preallocated_.begin(), data_preallocated_.end(),
@@ -740,13 +702,8 @@ Status PackBatchNoChunks(const std::vector<Datum>& args, ExecBatch* out) {
   return Status::OK();
 }
 
-class VectorExecutor : public FunctionExecutorImpl<VectorFunction> {
+class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
  public:
-  using FunctionType = VectorFunction;
-  static constexpr Function::Kind function_kind = Function::VECTOR;
-  using BASE = FunctionExecutorImpl<VectorFunction>;
-  using BASE::BASE;
-
   Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
     RETURN_NOT_OK(PrepareExecute(args));
     ExecBatch batch;
@@ -797,10 +754,10 @@ class VectorExecutor : public FunctionExecutorImpl<VectorFunction> {
 
     if (kernel_->null_handling == NullHandling::INTERSECTION &&
         output_descr_.shape == ValueDescr::ARRAY) {
-      RETURN_NOT_OK(PropagateNulls(&kernel_ctx_, batch, out.mutable_array()));
+      RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out.mutable_array()));
     }
-    kernel_->exec(&kernel_ctx_, batch, &out);
-    ARROW_CTX_RETURN_IF_ERROR(&kernel_ctx_);
+    kernel_->exec(kernel_ctx_, batch, &out);
+    ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
     if (!kernel_->finalize) {
       // If there is no result finalizer (e.g. for hash-based functions, we can
       // emit the processed batch right away rather than waiting
@@ -815,8 +772,8 @@ class VectorExecutor : public FunctionExecutorImpl<VectorFunction> {
     if (kernel_->finalize) {
       // Intermediate results require post-processing after the execution is
       // completed (possibly involving some accumulated state)
-      kernel_->finalize(&kernel_ctx_, &results_);
-      ARROW_CTX_RETURN_IF_ERROR(&kernel_ctx_);
+      kernel_->finalize(kernel_ctx_, &results_);
+      ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
       for (const auto& result : results_) {
         RETURN_NOT_OK(listener->OnResult(result));
       }
@@ -826,15 +783,14 @@ class VectorExecutor : public FunctionExecutorImpl<VectorFunction> {
 
   Status SetupArgIteration(const std::vector<Datum>& args) override {
     if (kernel_->can_execute_chunkwise) {
-      ARROW_ASSIGN_OR_RAISE(batch_iterator_,
-                            ExecBatchIterator::Make(args, exec_ctx_->exec_chunksize()));
+      ARROW_ASSIGN_OR_RAISE(batch_iterator_, ExecBatchIterator::Make(
+                                                 args, exec_context()->exec_chunksize()));
     }
     return Status::OK();
   }
 
   Status PrepareExecute(const std::vector<Datum>& args) {
-    this->Reset();
-    RETURN_NOT_OK(this->BindArgs(args));
+    RETURN_NOT_OK(this->SetupArgIteration(args));
     output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());
 
     // Decide if we need to preallocate memory for this kernel
@@ -850,15 +806,16 @@ class VectorExecutor : public FunctionExecutorImpl<VectorFunction> {
   std::vector<Datum> results_;
 };
 
-class ScalarAggExecutor : public FunctionExecutorImpl<ScalarAggregateFunction> {
+class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {
  public:
-  using FunctionType = ScalarAggregateFunction;
-  static constexpr Function::Kind function_kind = Function::SCALAR_AGGREGATE;
-  using BASE = FunctionExecutorImpl<ScalarAggregateFunction>;
-  using BASE::BASE;
+  Status Init(KernelContext* ctx, KernelInitArgs args) override {
+    input_descrs_ = &args.inputs;
+    options_ = args.options;
+    return KernelExecutorImpl<ScalarAggregateKernel>::Init(ctx, args);
+  }
 
   Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
-    RETURN_NOT_OK(BindArgs(args));
+    RETURN_NOT_OK(this->SetupArgIteration(args));
 
     ExecBatch batch;
     while (batch_iterator_->Next(&batch)) {
@@ -869,8 +826,8 @@ class ScalarAggExecutor : public FunctionExecutorImpl<ScalarAggregateFunction> {
     }
 
     Datum out;
-    kernel_->finalize(&kernel_ctx_, &out);
-    ARROW_CTX_RETURN_IF_ERROR(&kernel_ctx_);
+    kernel_->finalize(kernel_ctx_, &out);
+    ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
     RETURN_NOT_OK(listener->OnResult(std::move(out)));
     return Status::OK();
   }
@@ -883,36 +840,38 @@ class ScalarAggExecutor : public FunctionExecutorImpl<ScalarAggregateFunction> {
 
  private:
   Status Consume(const ExecBatch& batch) {
-    KernelInitArgs init_args{kernel_, input_descrs_, options_};
-    auto batch_state = kernel_->init(&kernel_ctx_, init_args);
-    ARROW_CTX_RETURN_IF_ERROR(&kernel_ctx_);
+    auto batch_state = kernel_->init(kernel_ctx_, {kernel_, *input_descrs_, options_});
+    ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
 
     if (batch_state == nullptr) {
-      kernel_ctx_.SetStatus(
+      kernel_ctx_->SetStatus(
           Status::Invalid("ScalarAggregation requires non-null kernel state"));
-      return kernel_ctx_.status();
+      return kernel_ctx_->status();
     }
 
-    KernelContext batch_ctx(exec_ctx_);
+    KernelContext batch_ctx(exec_context());
     batch_ctx.SetState(batch_state.get());
 
     kernel_->consume(&batch_ctx, batch);
     ARROW_CTX_RETURN_IF_ERROR(&batch_ctx);
 
-    kernel_->merge(&kernel_ctx_, std::move(*batch_state), state_.get());
-    ARROW_CTX_RETURN_IF_ERROR(&kernel_ctx_);
+    kernel_->merge(kernel_ctx_, std::move(*batch_state), state());
+    ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);
     return Status::OK();
   }
+
+  const std::vector<ValueDescr>* input_descrs_;
+  const FunctionOptions* options_;
 };
 
 template <typename ExecutorType,
           typename FunctionType = typename ExecutorType::FunctionType>
-Result<std::unique_ptr<FunctionExecutor>> MakeExecutor(ExecContext* ctx,
-                                                       const Function* func,
-                                                       const FunctionOptions* options) {
+Result<std::unique_ptr<KernelExecutor>> MakeExecutor(ExecContext* ctx,
+                                                     const Function* func,
+                                                     const FunctionOptions* options) {
   DCHECK_EQ(ExecutorType::function_kind, func->kind());
   auto typed_func = checked_cast<const FunctionType*>(func);
-  return std::unique_ptr<FunctionExecutor>(new ExecutorType(ctx, typed_func, options));
+  return std::unique_ptr<KernelExecutor>(new ExecutorType(ctx, typed_func, options));
 }
 
 }  // namespace
@@ -939,19 +898,16 @@ Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* out
   return propagator.Execute();
 }
 
-Result<std::unique_ptr<FunctionExecutor>> FunctionExecutor::Make(
-    ExecContext* ctx, const Function* func, const FunctionOptions* options) {
-  switch (func->kind()) {
-    case Function::SCALAR:
-      return MakeExecutor<ScalarExecutor>(ctx, func, options);
-    case Function::VECTOR:
-      return MakeExecutor<VectorExecutor>(ctx, func, options);
-    case Function::SCALAR_AGGREGATE:
-      return MakeExecutor<ScalarAggExecutor>(ctx, func, options);
-    default:
-      DCHECK(false);
-      return nullptr;
-  }
+std::unique_ptr<KernelExecutor> KernelExecutor::MakeScalar() {
+  return ::arrow::internal::make_unique<detail::ScalarExecutor>();
+}
+
+std::unique_ptr<KernelExecutor> KernelExecutor::MakeVector() {
+  return ::arrow::internal::make_unique<detail::VectorExecutor>();
+}
+
+std::unique_ptr<KernelExecutor> KernelExecutor::MakeScalarAggregate() {
+  return ::arrow::internal::make_unique<detail::ScalarAggExecutor>();
 }
 
 }  // namespace detail
diff --git a/cpp/src/arrow/compute/exec_internal.h b/cpp/src/arrow/compute/exec_internal.h
index 507cd17..8bad135 100644
--- a/cpp/src/arrow/compute/exec_internal.h
+++ b/cpp/src/arrow/compute/exec_internal.h
@@ -102,22 +102,22 @@ class DatumAccumulator : public ExecListener {
 /// inputs will be split into non-chunked ExecBatch values for execution
 Status CheckAllValues(const std::vector<Datum>& values);
 
-class ARROW_EXPORT FunctionExecutor {
+class ARROW_EXPORT KernelExecutor {
  public:
-  virtual ~FunctionExecutor() = default;
+  virtual ~KernelExecutor() = default;
+
+  virtual Status Init(KernelContext*, KernelInitArgs) = 0;
 
   /// XXX: Better configurability for listener
   /// Not thread-safe
   virtual Status Execute(const std::vector<Datum>& args, ExecListener* listener) = 0;
 
-  virtual ValueDescr output_descr() const = 0;
-
   virtual Datum WrapResults(const std::vector<Datum>& args,
                             const std::vector<Datum>& outputs) = 0;
 
-  static Result<std::unique_ptr<FunctionExecutor>> Make(ExecContext* ctx,
-                                                        const Function* func,
-                                                        const FunctionOptions* options);
+  static std::unique_ptr<KernelExecutor> MakeScalar();
+  static std::unique_ptr<KernelExecutor> MakeVector();
+  static std::unique_ptr<KernelExecutor> MakeScalarAggregate();
 };
 
 /// \brief Populate validity bitmap with the intersection of the nullity of the
diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc
index e4a6578..2d3e06e 100644
--- a/cpp/src/arrow/compute/function.cc
+++ b/cpp/src/arrow/compute/function.cc
@@ -117,8 +117,31 @@ Result<Datum> Function::Execute(const std::vector<Datum>& args,
   // type-check Datum arguments here. Really we'd like to avoid this as much as
   // possible
   RETURN_NOT_OK(detail::CheckAllValues(args));
-  ARROW_ASSIGN_OR_RAISE(auto executor,
-                        detail::FunctionExecutor::Make(ctx, this, options));
+  std::vector<ValueDescr> inputs(args.size());
+  for (size_t i = 0; i != args.size(); ++i) {
+    inputs[i] = args[i].descr();
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto kernel, DispatchExact(inputs));
+  std::unique_ptr<KernelState> state;
+
+  KernelContext kernel_ctx{ctx};
+  if (kernel->init) {
+    state = kernel->init(&kernel_ctx, {kernel, inputs, options});
+    RETURN_NOT_OK(kernel_ctx.status());
+    kernel_ctx.SetState(state.get());
+  }
+
+  std::unique_ptr<detail::KernelExecutor> executor;
+  if (kind() == Function::SCALAR) {
+    executor = detail::KernelExecutor::MakeScalar();
+  } else if (kind() == Function::VECTOR) {
+    executor = detail::KernelExecutor::MakeVector();
+  } else {
+    executor = detail::KernelExecutor::MakeScalarAggregate();
+  }
+  RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, inputs, options}));
+
   auto listener = std::make_shared<detail::DatumAccumulator>();
   RETURN_NOT_OK(executor->Execute(args, listener.get()));
   return executor->WrapResults(args, listener->values());
@@ -157,7 +180,7 @@ Status ScalarFunction::AddKernel(ScalarKernel kernel) {
   return Status::OK();
 }
 
-Result<const ScalarKernel*> ScalarFunction::DispatchExact(
+Result<const Kernel*> ScalarFunction::DispatchExact(
     const std::vector<ValueDescr>& values) const {
   return DispatchExactImpl(*this, kernels_, values);
 }
@@ -184,7 +207,7 @@ Status VectorFunction::AddKernel(VectorKernel kernel) {
   return Status::OK();
 }
 
-Result<const VectorKernel*> VectorFunction::DispatchExact(
+Result<const Kernel*> VectorFunction::DispatchExact(
     const std::vector<ValueDescr>& values) const {
   return DispatchExactImpl(*this, kernels_, values);
 }
@@ -198,7 +221,7 @@ Status ScalarAggregateFunction::AddKernel(ScalarAggregateKernel kernel) {
   return Status::OK();
 }
 
-Result<const ScalarAggregateKernel*> ScalarAggregateFunction::DispatchExact(
+Result<const Kernel*> ScalarAggregateFunction::DispatchExact(
     const std::vector<ValueDescr>& values) const {
   return DispatchExactImpl(*this, kernels_, values);
 }
diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h
index aa8489d..a71dbe4 100644
--- a/cpp/src/arrow/compute/function.h
+++ b/cpp/src/arrow/compute/function.h
@@ -155,6 +155,13 @@ class ARROW_EXPORT Function {
   /// \brief Returns the number of registered kernels for this function.
   virtual int num_kernels() const = 0;
 
+  /// \brief Return a kernel that can execute the function given the exact
+  /// argument types (without implicit type casts or scalar->array promotions).
+  ///
+  /// NB: This function is overridden in CastFunction.
+  virtual Result<const Kernel*> DispatchExact(
+      const std::vector<ValueDescr>& values) const = 0;
+
   /// \brief Execute the function eagerly with the passed input arguments with
   /// kernel dispatch, batch iteration, and memory allocation details taken
   /// care of.
@@ -241,12 +248,8 @@ class ARROW_EXPORT ScalarFunction : public detail::FunctionImpl<ScalarKernel> {
   /// kernel's signature does not match the function's arity.
   Status AddKernel(ScalarKernel kernel);
 
-  /// \brief Return a kernel that can execute the function given the exact
-  /// argument types (without implicit type casts or scalar->array promotions).
-  ///
-  /// NB: This function is overridden in CastFunction.
-  virtual Result<const ScalarKernel*> DispatchExact(
-      const std::vector<ValueDescr>& values) const;
+  Result<const Kernel*> DispatchExact(
+      const std::vector<ValueDescr>& values) const override;
 };
 
 /// \brief A function that executes general array operations that may yield
@@ -272,9 +275,8 @@ class ARROW_EXPORT VectorFunction : public detail::FunctionImpl<VectorKernel> {
   /// kernel's signature does not match the function's arity.
   Status AddKernel(VectorKernel kernel);
 
-  /// \brief Return a kernel that can execute the function given the exact
-  /// argument types (without implicit type casts or scalar->array promotions)
-  Result<const VectorKernel*> DispatchExact(const std::vector<ValueDescr>& values) const;
+  Result<const Kernel*> DispatchExact(
+      const std::vector<ValueDescr>& values) const override;
 };
 
 class ARROW_EXPORT ScalarAggregateFunction
@@ -291,10 +293,8 @@ class ARROW_EXPORT ScalarAggregateFunction
   /// kernel's signature does not match the function's arity.
   Status AddKernel(ScalarAggregateKernel kernel);
 
-  /// \brief Return a kernel that can execute the function given the exact
-  /// argument types (without implicit type casts or scalar->array promotions)
-  Result<const ScalarAggregateKernel*> DispatchExact(
-      const std::vector<ValueDescr>& values) const;
+  Result<const Kernel*> DispatchExact(
+      const std::vector<ValueDescr>& values) const override;
 };
 
 /// \brief A function that dispatches to other functions. Must implement
@@ -309,6 +309,10 @@ class ARROW_EXPORT MetaFunction : public Function {
   Result<Datum> Execute(const std::vector<Datum>& args, const FunctionOptions* options,
                         ExecContext* ctx) const override;
 
+  Result<const Kernel*> DispatchExact(const std::vector<ValueDescr>&) const override {
+    return Status::NotImplemented("DispatchExact for a MetaFunction's Kernels");
+  }
+
  protected:
   virtual Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
                                     const FunctionOptions* options,
diff --git a/cpp/src/arrow/compute/function_benchmark.cc b/cpp/src/arrow/compute/function_benchmark.cc
new file mode 100644
index 0000000..e2214f8
--- /dev/null
+++ b/cpp/src/arrow/compute/function_benchmark.cc
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/array/array_base.h"
+#include "arrow/compute/api.h"
+#include "arrow/memory_pool.h"
+#include "arrow/scalar.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/benchmark_util.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+constexpr int32_t kSeed = 0xfede4a7e;
+constexpr int64_t kScalarCount = 1 << 10;
+
+inline ScalarVector ToScalars(std::shared_ptr<Array> arr) {
+  ScalarVector scalars{static_cast<size_t>(arr->length())};
+  int64_t i = 0;
+  for (auto& scalar : scalars) {
+    scalar = arr->GetScalar(i++).ValueOrDie();
+  }
+  return scalars;
+}
+
+void BM_CastDispatch(benchmark::State& state) {  // NOLINT non-const reference
+  // Repeatedly invoke a trivial Cast: the main cost should be dispatch
+  random::RandomArrayGenerator rag(kSeed);
+
+  auto int_scalars = ToScalars(rag.Int64(kScalarCount, 0, 1 << 20));
+
+  auto double_type = float64();
+  for (auto _ : state) {
+    Datum timestamp_scalar;
+    for (Datum int_scalar : int_scalars) {
+      ASSERT_OK_AND_ASSIGN(timestamp_scalar, Cast(int_scalar, double_type));
+    }
+    benchmark::DoNotOptimize(timestamp_scalar);
+  }
+
+  state.SetItemsProcessed(state.iterations() * kScalarCount);
+}
+
+void BM_CastDispatchBaseline(benchmark::State& state) {  // NOLINT non-const reference
+  // Repeatedly invoke a trivial Cast with all dispatch outside the hot loop
+  random::RandomArrayGenerator rag(kSeed);
+
+  auto int_scalars = ToScalars(rag.Int64(kScalarCount, 0, 1 << 20));
+
+  auto double_type = float64();
+  CastOptions cast_options;
+  cast_options.to_type = double_type;
+  ASSERT_OK_AND_ASSIGN(auto cast_function, GetCastFunction(double_type));
+  ASSERT_OK_AND_ASSIGN(auto cast_kernel,
+                       cast_function->DispatchExact({int_scalars[0]->type}));
+  const auto& exec = static_cast<const ScalarKernel*>(cast_kernel)->exec;
+
+  ExecContext exec_context;
+  KernelContext kernel_context(&exec_context);
+  auto cast_state =
+      cast_kernel->init(&kernel_context, {cast_kernel, {double_type}, &cast_options});
+  ABORT_NOT_OK(kernel_context.status());
+  kernel_context.SetState(cast_state.get());
+
+  for (auto _ : state) {
+    Datum timestamp_scalar = MakeNullScalar(double_type);
+    for (Datum int_scalar : int_scalars) {
+      exec(&kernel_context, {{std::move(int_scalar)}, 1}, &timestamp_scalar);
+      ABORT_NOT_OK(kernel_context.status());
+    }
+    benchmark::DoNotOptimize(timestamp_scalar);
+  }
+
+  state.SetItemsProcessed(state.iterations() * kScalarCount);
+}
+
+void BM_AddDispatch(benchmark::State& state) {  // NOLINT non-const reference
+  ExecContext exec_context;
+  KernelContext kernel_context(&exec_context);
+
+  for (auto _ : state) {
+    ASSERT_OK_AND_ASSIGN(auto add_function, GetFunctionRegistry()->GetFunction("add"));
+    ASSERT_OK_AND_ASSIGN(auto add_kernel,
+                         checked_cast<const ScalarFunction&>(*add_function)
+                             .DispatchExact({int64(), int64()}));
+    benchmark::DoNotOptimize(add_kernel);
+  }
+
+  state.SetItemsProcessed(state.iterations());
+}
+
+BENCHMARK(BM_CastDispatch)->MinTime(1.0);
+BENCHMARK(BM_CastDispatchBaseline)->MinTime(1.0);
+BENCHMARK(BM_AddDispatch)->MinTime(1.0);
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc
index bdbf1fd..b6f1815 100644
--- a/cpp/src/arrow/compute/function_test.cc
+++ b/cpp/src/arrow/compute/function_test.cc
@@ -126,7 +126,7 @@ void CheckAddDispatch(FunctionType* func) {
   KernelType invalid_kernel({boolean()}, boolean(), ExecNYI);
   ASSERT_RAISES(Invalid, func->AddKernel(invalid_kernel));
 
-  ASSERT_OK_AND_ASSIGN(const KernelType* kernel, func->DispatchExact({int32(), int32()}));
+  ASSERT_OK_AND_ASSIGN(const Kernel* kernel, func->DispatchExact({int32(), int32()}));
   KernelSignature expected_sig(in_types1, out_type1);
   ASSERT_TRUE(kernel->signature->Equals(expected_sig));
 
@@ -164,7 +164,7 @@ TEST(ArrayFunction, VarArgs) {
   ASSERT_RAISES(Invalid, va_func.AddKernel(non_va_kernel));
 
   std::vector<ValueDescr> args = {ValueDescr::Scalar(int8()), int8(), int8()};
-  ASSERT_OK_AND_ASSIGN(const ScalarKernel* kernel, va_func.DispatchExact(args));
+  ASSERT_OK_AND_ASSIGN(const Kernel* kernel, va_func.DispatchExact(args));
   ASSERT_TRUE(kernel->signature->MatchesInputs(args));
 
   // No dispatch possible because args incompatible
@@ -215,8 +215,7 @@ TEST(ScalarAggregateFunction, DispatchExact) {
   ASSERT_RAISES(Invalid, func.AddKernel(kernel));
 
   std::vector<ValueDescr> dispatch_args = {ValueDescr::Array(int8())};
-  ASSERT_OK_AND_ASSIGN(const ScalarAggregateKernel* selected_kernel,
-                       func.DispatchExact(dispatch_args));
+  ASSERT_OK_AND_ASSIGN(const Kernel* selected_kernel, func.DispatchExact(dispatch_args));
   ASSERT_EQ(func.kernels()[0], selected_kernel);
   ASSERT_TRUE(selected_kernel->signature->MatchesInputs(dispatch_args));
 
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_temporal.cc b/cpp/src/arrow/compute/kernels/scalar_cast_temporal.cc
index 96f0b87..c707695 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_temporal.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_temporal.cc
@@ -41,7 +41,7 @@ template <typename in_type, typename out_type>
 void ShiftTime(KernelContext* ctx, const util::DivideOrMultiply factor_op,
                const int64_t factor, const ArrayData& input, ArrayData* output) {
   const CastOptions& options = checked_cast<const CastState&>(*ctx->state()).options;
-  const in_type* in_data = input.GetValues<in_type>(1);
+  auto in_data = input.GetValues<in_type>(1);
   auto out_data = output->GetMutableValues<out_type>(1);
 
   if (factor == 1) {