You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/02/12 18:01:59 UTC

[arrow] branch master updated: ARROW-15215: [C++] Consolidate kernel data-copy utilities between replace_with_mask, case_when, coalesce, choose, fill_null_forward, fill_null_backward

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b7c7a2  ARROW-15215: [C++] Consolidate kernel data-copy utilities between replace_with_mask, case_when, coalesce, choose, fill_null_forward, fill_null_backward
6b7c7a2 is described below

commit 6b7c7a2702466f7c3c9c1f9dd41bc42458cff398
Author: JabariBooker <o....@gmail.com>
AuthorDate: Sat Feb 12 12:57:22 2022 -0500

    ARROW-15215: [C++] Consolidate kernel data-copy utilities between replace_with_mask, case_when, coalesce, choose, fill_null_forward, fill_null_backward
    
    Consolidate kernel data-copy utilities between replace_with_mask, case_when, coalesce, choose, fill_null_forward, fill_null_backward in one place
    
    Closes #12379 from JabariBooker/ARROW-15215
    
    Authored-by: JabariBooker <o....@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 cpp/src/arrow/compute/kernels/copy_data_internal.h | 112 ++++++++++++++++++
 cpp/src/arrow/compute/kernels/scalar_if_else.cc    | 117 +++++--------------
 cpp/src/arrow/compute/kernels/vector_replace.cc    | 125 ++++++---------------
 3 files changed, 175 insertions(+), 179 deletions(-)

diff --git a/cpp/src/arrow/compute/kernels/copy_data_internal.h b/cpp/src/arrow/compute/kernels/copy_data_internal.h
new file mode 100644
index 0000000..5a5d446
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/copy_data_internal.h
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/compute/kernels/codegen_internal.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+template <typename Type, typename Enable = void>
+struct CopyDataUtils {};
+
+template <>
+struct CopyDataUtils<BooleanType> {
+  static void CopyData(const DataType&, const Scalar& in, const int64_t in_offset,
+                       uint8_t* out, const int64_t out_offset, const int64_t length) {
+    bit_util::SetBitsTo(
+        out, out_offset, length,
+        in.is_valid ? checked_cast<const BooleanScalar&>(in).value : false);
+  }
+
+  static void CopyData(const DataType&, const uint8_t* in, const int64_t in_offset,
+                       uint8_t* out, const int64_t out_offset, const int64_t length) {
+    arrow::internal::CopyBitmap(in, in_offset, length, out, out_offset);
+  }
+
+  static void CopyData(const DataType&, const ArrayData& in, const int64_t in_offset,
+                       uint8_t* out, const int64_t out_offset, const int64_t length) {
+    const auto in_arr = in.GetValues<uint8_t>(1, /*absolute_offset=*/0);
+    CopyData(*in.type, in_arr, in_offset, out, out_offset, length);
+  }
+};
+
+template <>
+struct CopyDataUtils<FixedSizeBinaryType> {
+  static void CopyData(const DataType& ty, const Scalar& in, const int64_t in_offset,
+                       uint8_t* out, const int64_t out_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    uint8_t* begin = out + (width * out_offset);
+    const auto& scalar = checked_cast<const arrow::internal::PrimitiveScalarBase&>(in);
+    // Null scalar may have null value buffer
+    if (!scalar.is_valid) {
+      std::memset(begin, 0x00, width * length);
+    } else {
+      const util::string_view buffer = scalar.view();
+      DCHECK_GE(buffer.size(), static_cast<size_t>(width));
+      for (int i = 0; i < length; i++) {
+        std::memcpy(begin, buffer.data(), width);
+        begin += width;
+      }
+    }
+  }
+
+  static void CopyData(const DataType& ty, const uint8_t* in, const int64_t in_offset,
+                       uint8_t* out, const int64_t out_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    uint8_t* begin = out + (width * out_offset);
+    std::memcpy(begin, in + in_offset * width, length * width);
+  }
+
+  static void CopyData(const DataType& ty, const ArrayData& in, const int64_t in_offset,
+                       uint8_t* out, const int64_t out_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    const auto in_arr = in.GetValues<uint8_t>(1, in.offset * width);
+    CopyData(ty, in_arr, in_offset, out, out_offset, length);
+  }
+};
+
+template <typename Type>
+struct CopyDataUtils<
+    Type, enable_if_t<is_number_type<Type>::value || is_interval_type<Type>::value>> {
+  using CType = typename TypeTraits<Type>::CType;
+
+  static void CopyData(const DataType&, const Scalar& in, const int64_t in_offset,
+                       uint8_t* out, const int64_t out_offset, const int64_t length) {
+    CType* begin = reinterpret_cast<CType*>(out) + out_offset;
+    CType* end = begin + length;
+    std::fill(begin, end, UnboxScalar<Type>::Unbox(in));
+  }
+
+  static void CopyData(const DataType&, const uint8_t* in, const int64_t in_offset,
+                       uint8_t* out, const int64_t out_offset, const int64_t length) {
+    std::memcpy(out + out_offset * sizeof(CType), in + in_offset * sizeof(CType),
+                length * sizeof(CType));
+  }
+
+  static void CopyData(const DataType&, const ArrayData& in, const int64_t in_offset,
+                       uint8_t* out, const int64_t out_offset, const int64_t length) {
+    const auto in_arr = in.GetValues<uint8_t>(1, in.offset * sizeof(CType));
+    CopyData(*in.type, in_arr, in_offset, out, out_offset, length);
+  }
+};
+
+}  // namespace internal
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else.cc b/cpp/src/arrow/compute/kernels/scalar_if_else.cc
index bc692ae..5e1edbe 100644
--- a/cpp/src/arrow/compute/kernels/scalar_if_else.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_if_else.cc
@@ -21,6 +21,7 @@
 #include "arrow/array/builder_union.h"
 #include "arrow/compute/api.h"
 #include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/copy_data_internal.h"
 #include "arrow/util/bit_block_counter.h"
 #include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bitmap.h"
@@ -1279,71 +1280,6 @@ void AddNestedIfElseKernels(const std::shared_ptr<IfElseFunction>& scalar_functi
   }
 }
 
-// Helper to copy or broadcast fixed-width values between buffers.
-template <typename Type, typename Enable = void>
-struct CopyFixedWidth {};
-template <>
-struct CopyFixedWidth<BooleanType> {
-  static void CopyScalar(const Scalar& scalar, const int64_t length,
-                         uint8_t* raw_out_values, const int64_t out_offset) {
-    const bool value = UnboxScalar<BooleanType>::Unbox(scalar);
-    bit_util::SetBitsTo(raw_out_values, out_offset, length, value);
-  }
-  static void CopyArray(const DataType&, const uint8_t* in_values,
-                        const int64_t in_offset, const int64_t length,
-                        uint8_t* raw_out_values, const int64_t out_offset) {
-    arrow::internal::CopyBitmap(in_values, in_offset, length, raw_out_values, out_offset);
-  }
-};
-
-template <typename Type>
-struct CopyFixedWidth<
-    Type, enable_if_t<is_number_type<Type>::value || is_interval_type<Type>::value>> {
-  using CType = typename TypeTraits<Type>::CType;
-  static void CopyScalar(const Scalar& scalar, const int64_t length,
-                         uint8_t* raw_out_values, const int64_t out_offset) {
-    CType* out_values = reinterpret_cast<CType*>(raw_out_values);
-    const CType value = UnboxScalar<Type>::Unbox(scalar);
-    std::fill(out_values + out_offset, out_values + out_offset + length, value);
-  }
-  static void CopyArray(const DataType&, const uint8_t* in_values,
-                        const int64_t in_offset, const int64_t length,
-                        uint8_t* raw_out_values, const int64_t out_offset) {
-    std::memcpy(raw_out_values + out_offset * sizeof(CType),
-                in_values + in_offset * sizeof(CType), length * sizeof(CType));
-  }
-};
-
-template <typename Type>
-struct CopyFixedWidth<Type, enable_if_fixed_size_binary<Type>> {
-  static void CopyScalar(const Scalar& values, const int64_t length,
-                         uint8_t* raw_out_values, const int64_t out_offset) {
-    const int32_t width =
-        checked_cast<const FixedSizeBinaryType&>(*values.type).byte_width();
-    uint8_t* next = raw_out_values + (width * out_offset);
-    const auto& scalar =
-        checked_cast<const arrow::internal::PrimitiveScalarBase&>(values);
-    // Scalar may have null value buffer
-    if (!scalar.is_valid) {
-      std::memset(next, 0x00, width * length);
-    } else {
-      util::string_view view = scalar.view();
-      DCHECK_EQ(view.size(), static_cast<size_t>(width));
-      for (int i = 0; i < length; i++) {
-        std::memcpy(next, view.data(), width);
-        next += width;
-      }
-    }
-  }
-  static void CopyArray(const DataType& type, const uint8_t* in_values,
-                        const int64_t in_offset, const int64_t length,
-                        uint8_t* raw_out_values, const int64_t out_offset) {
-    const int32_t width = checked_cast<const FixedSizeBinaryType&>(type).byte_width();
-    uint8_t* next = raw_out_values + (width * out_offset);
-    std::memcpy(next, in_values + in_offset * width, length * width);
-  }
-};
-
 // Copy fixed-width values from a scalar/array datum into an output values buffer
 template <typename Type>
 void CopyValues(const Datum& in_values, const int64_t in_offset, const int64_t length,
@@ -1353,7 +1289,8 @@ void CopyValues(const Datum& in_values, const int64_t in_offset, const int64_t l
     if (out_valid) {
       bit_util::SetBitsTo(out_valid, out_offset, length, scalar.is_valid);
     }
-    CopyFixedWidth<Type>::CopyScalar(scalar, length, out_values, out_offset);
+    CopyDataUtils<Type>::CopyData(*scalar.type, scalar, /*in_offset=*/0, out_values,
+                                  out_offset, length);
   } else {
     const ArrayData& array = *in_values.array();
     if (out_valid) {
@@ -1371,9 +1308,9 @@ void CopyValues(const Datum& in_values, const int64_t in_offset, const int64_t l
         bit_util::SetBitsTo(out_valid, out_offset, length, true);
       }
     }
-    CopyFixedWidth<Type>::CopyArray(*array.type, array.buffers[1]->data(),
-                                    array.offset + in_offset, length, out_values,
-                                    out_offset);
+    CopyDataUtils<Type>::CopyData(*array.type, array.buffers[1]->data(),
+                                  array.offset + in_offset, out_values, out_offset,
+                                  length);
   }
 }
 
@@ -1389,8 +1326,8 @@ void CopyOneArrayValue(const DataType& type, const uint8_t* in_valid,
     bit_util::SetBitTo(out_valid, out_offset,
                        !in_valid || bit_util::GetBit(in_valid, in_offset));
   }
-  CopyFixedWidth<Type>::CopyArray(type, in_values, in_offset, /*length=*/1, out_values,
-                                  out_offset);
+  CopyDataUtils<Type>::CopyData(type, in_values, in_offset, out_values, out_offset,
+                                /*length=*/1);
 }
 
 template <typename Type>
@@ -1399,7 +1336,8 @@ void CopyOneScalarValue(const Scalar& scalar, uint8_t* out_valid, uint8_t* out_v
   if (out_valid) {
     bit_util::SetBitTo(out_valid, out_offset, scalar.is_valid);
   }
-  CopyFixedWidth<Type>::CopyScalar(scalar, /*length=*/1, out_values, out_offset);
+  CopyDataUtils<Type>::CopyData(*scalar.type, scalar, /*in_offset=*/0, out_values,
+                                out_offset, /*length=*/1);
 }
 
 template <typename Type>
@@ -2080,8 +2018,8 @@ Status ExecArrayCoalesce(KernelContext* ctx, const ExecBatch& batch, Datum* out)
           }
           if (!run.set) {
             // Copy from input
-            CopyFixedWidth<Type>::CopyArray(type, in_values, in_offset + offset,
-                                            run.length, out_values, out_offset + offset);
+            CopyDataUtils<Type>::CopyData(type, in_values, in_offset + offset, out_values,
+                                          out_offset + offset, run.length);
           }
           offset += run.length;
         }
@@ -2136,8 +2074,8 @@ Status ExecArrayScalarCoalesce(KernelContext* ctx, Datum left, Datum right,
   if (left.null_count() < length * 0.2) {
     // There are less than 20% nulls in the left array, so first copy
     // the left values, then fill any nulls with the right value
-    CopyFixedWidth<Type>::CopyArray(*left_arr.type, left_values, left_arr.offset, length,
-                                    out_values, out_offset);
+    CopyDataUtils<Type>::CopyData(*left_arr.type, left_values, left_arr.offset,
+                                  out_values, out_offset, length);
 
     BitRunReader reader(left_valid, left_arr.offset, left_arr.length);
     int64_t offset = 0;
@@ -2146,8 +2084,9 @@ Status ExecArrayScalarCoalesce(KernelContext* ctx, Datum left, Datum right,
       if (run.length == 0) break;
       if (!run.set) {
         // All from right
-        CopyFixedWidth<Type>::CopyScalar(right_scalar, run.length, out_values,
-                                         out_offset + offset);
+        CopyDataUtils<Type>::CopyData(*right_scalar.type, right_scalar,
+                                      /*in_offset=*/0, out_values, out_offset + offset,
+                                      run.length);
       }
       offset += run.length;
     }
@@ -2160,13 +2099,14 @@ Status ExecArrayScalarCoalesce(KernelContext* ctx, Datum left, Datum right,
       if (run.length == 0) break;
       if (run.set) {
         // All from left
-        CopyFixedWidth<Type>::CopyArray(*left_arr.type, left_values,
-                                        left_arr.offset + offset, run.length, out_values,
-                                        out_offset + offset);
+        CopyDataUtils<Type>::CopyData(*left_arr.type, left_values,
+                                      left_arr.offset + offset, out_values,
+                                      out_offset + offset, run.length);
       } else {
         // All from right
-        CopyFixedWidth<Type>::CopyScalar(right_scalar, run.length, out_values,
-                                         out_offset + offset);
+        CopyDataUtils<Type>::CopyData(*right_scalar.type, right_scalar,
+                                      /*in_offset=*/0, out_values, out_offset + offset,
+                                      run.length);
       }
       offset += run.length;
     }
@@ -2233,14 +2173,13 @@ Status ExecBinaryCoalesce(KernelContext* ctx, Datum left, Datum right, int64_t l
     }
     if (run.set) {
       // All from left
-      CopyFixedWidth<Type>::CopyArray(*left_arr.type, left_values,
-                                      left_arr.offset + offset, run.length, out_values,
-                                      out_offset + offset);
+      CopyDataUtils<Type>::CopyData(*left_arr.type, left_values, left_arr.offset + offset,
+                                    out_values, out_offset + offset, run.length);
     } else {
       // All from right
-      CopyFixedWidth<Type>::CopyArray(*right_arr.type, right_values,
-                                      right_arr.offset + offset, run.length, out_values,
-                                      out_offset + offset);
+      CopyDataUtils<Type>::CopyData(*right_arr.type, right_values,
+                                    right_arr.offset + offset, out_values,
+                                    out_offset + offset, run.length);
     }
     offset += run.length;
   }
diff --git a/cpp/src/arrow/compute/kernels/vector_replace.cc b/cpp/src/arrow/compute/kernels/vector_replace.cc
index 35790e9..ae2a5a9 100644
--- a/cpp/src/arrow/compute/kernels/vector_replace.cc
+++ b/cpp/src/arrow/compute/kernels/vector_replace.cc
@@ -17,6 +17,7 @@
 
 #include "arrow/compute/api_scalar.h"
 #include "arrow/compute/kernels/common.h"
+#include "arrow/compute/kernels/copy_data_internal.h"
 #include "arrow/util/bitmap_ops.h"
 
 namespace arrow {
@@ -32,7 +33,7 @@ Status ReplacementArrayTooShort(int64_t expected, int64_t actual) {
 
 // Helper to implement replace_with kernel with scalar mask for fixed-width types,
 // using callbacks to handle both bool and byte-sized types
-template <typename Functor>
+template <typename Type>
 Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
                              const BooleanScalar& mask, const Datum& replacements,
                              ArrayData* output) {
@@ -52,8 +53,8 @@ Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
     if (in_data.length < array.length) {
       return ReplacementArrayTooShort(array.length, in_data.length);
     }
-    Functor::CopyData(*array.type, out_values, out_offset, in_data, /*in_offset=*/0,
-                      array.length);
+    CopyDataUtils<Type>::CopyData(*array.type, in_data, /*in_offset=*/0, out_values,
+                                  out_offset, array.length);
     if (in_data.MayHaveNulls()) {
       arrow::internal::CopyBitmap(in_data.buffers[0]->data(), in_data.offset,
                                   array.length, out_bitmap, out_offset);
@@ -62,8 +63,8 @@ Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
     }
   } else {
     const Scalar& in_data = *source.scalar();
-    Functor::CopyData(*array.type, out_values, out_offset, in_data, /*in_offset=*/0,
-                      array.length);
+    CopyDataUtils<Type>::CopyData(*array.type, in_data, /*in_offset=*/0, out_values,
+                                  out_offset, array.length);
     bit_util::SetBitsTo(out_bitmap, out_offset, array.length, in_data.is_valid);
   }
   return Status::OK();
@@ -102,14 +103,14 @@ struct CopyScalarBitmap {
 // Helper to implement replace_with kernel with array mask for fixed-width types,
 // using callbacks to handle both bool and byte-sized types and to handle
 // scalar and array replacements
-template <typename Functor, typename Data, typename CopyBitmap>
+template <typename Type, typename Data, typename CopyBitmap>
 void ReplaceWithArrayMaskImpl(const ArrayData& array, const ArrayData& mask,
                               const Data& replacements, bool replacements_bitmap,
                               const CopyBitmap& copy_bitmap, const uint8_t* mask_bitmap,
                               const uint8_t* mask_values, uint8_t* out_bitmap,
                               uint8_t* out_values, const int64_t out_offset) {
-  Functor::CopyData(*array.type, out_values, /*out_offset=*/0, array, /*in_offset=*/0,
-                    array.length);
+  CopyDataUtils<Type>::CopyData(*array.type, array, /*in_offset=*/0, out_values,
+                                /*out_offset=*/0, array.length);
   arrow::internal::OptionalBinaryBitBlockCounter counter(
       mask_values, mask.offset, mask_bitmap, mask.offset, mask.length);
   int64_t write_offset = 0;
@@ -118,8 +119,9 @@ void ReplaceWithArrayMaskImpl(const ArrayData& array, const ArrayData& mask,
     BitBlockCount block = counter.NextAndBlock();
     if (block.AllSet()) {
       // Copy from replacement array
-      Functor::CopyData(*array.type, out_values, out_offset + write_offset, replacements,
-                        replacements_offset, block.length);
+      CopyDataUtils<Type>::CopyData(*array.type, replacements, replacements_offset,
+                                    out_values, out_offset + write_offset, block.length);
+
       if (replacements_bitmap) {
         copy_bitmap.CopyBitmap(out_bitmap, out_offset + write_offset, replacements_offset,
                                block.length);
@@ -132,8 +134,9 @@ void ReplaceWithArrayMaskImpl(const ArrayData& array, const ArrayData& mask,
         if (bit_util::GetBit(mask_values, write_offset + mask.offset + i) &&
             (!mask_bitmap ||
              bit_util::GetBit(mask_bitmap, write_offset + mask.offset + i))) {
-          Functor::CopyData(*array.type, out_values, out_offset + write_offset + i,
-                            replacements, replacements_offset, /*length=*/1);
+          CopyDataUtils<Type>::CopyData(*array.type, replacements, replacements_offset,
+                                        out_values, out_offset + write_offset + i,
+                                        /*length=*/1);
           copy_bitmap.SetBit(out_bitmap, out_offset + write_offset + i,
 
                              replacements_offset);
@@ -145,7 +148,7 @@ void ReplaceWithArrayMaskImpl(const ArrayData& array, const ArrayData& mask,
   }
 }
 
-template <typename Functor>
+template <typename Type>
 Status ReplaceWithArrayMask(KernelContext* ctx, const ArrayData& array,
                             const ArrayData& mask, const Datum& replacements,
                             ArrayData* output) {
@@ -186,16 +189,16 @@ Status ReplaceWithArrayMask(KernelContext* ctx, const ArrayData& array,
 
   if (replacements.is_array()) {
     const ArrayData& array_repl = *replacements.array();
-    ReplaceWithArrayMaskImpl<Functor>(
+    ReplaceWithArrayMaskImpl<Type>(
         array, mask, array_repl, replacements_bitmap,
         CopyArrayBitmap{(replacements_bitmap) ? array_repl.buffers[0]->data() : nullptr,
                         array_repl.offset},
         mask_bitmap, mask_values, out_bitmap, out_values, out_offset);
   } else {
     const Scalar& scalar_repl = *replacements.scalar();
-    ReplaceWithArrayMaskImpl<Functor>(array, mask, scalar_repl, replacements_bitmap,
-                                      CopyScalarBitmap{scalar_repl.is_valid}, mask_bitmap,
-                                      mask_values, out_bitmap, out_values, out_offset);
+    ReplaceWithArrayMaskImpl<Type>(array, mask, scalar_repl, replacements_bitmap,
+                                   CopyScalarBitmap{scalar_repl.is_valid}, mask_bitmap,
+                                   mask_values, out_bitmap, out_values, out_offset);
   }
 
   if (mask.MayHaveNulls()) {
@@ -212,103 +215,45 @@ template <typename Type>
 struct ReplaceWithMask<Type,
                        enable_if_t<is_number_type<Type>::value ||
                                    std::is_same<Type, MonthDayNanoIntervalType>::value>> {
-  using T = typename TypeTraits<Type>::CType;
-
-  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
-                       const ArrayData& in, const int64_t in_offset,
-                       const int64_t length) {
-    const auto in_arr = in.GetValues<uint8_t>(1, (in_offset + in.offset) * sizeof(T));
-    std::memcpy(out + (out_offset * sizeof(T)), in_arr, length * sizeof(T));
-  }
-
-  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
-                       const Scalar& in, const int64_t in_offset, const int64_t length) {
-    T* begin = reinterpret_cast<T*>(out + (out_offset * sizeof(T)));
-    T* end = begin + length;
-    std::fill(begin, end, UnboxScalar<Type>::Unbox(in));
-  }
-
   static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
                                const BooleanScalar& mask, const Datum& replacements,
                                ArrayData* output) {
-    return ReplaceWithScalarMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
-                                                        output);
+    return ReplaceWithScalarMask<Type>(ctx, array, mask, replacements, output);
   }
 
   static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
                               const ArrayData& mask, const Datum& replacements,
                               ArrayData* output) {
-    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
-                                                       output);
+    return ReplaceWithArrayMask<Type>(ctx, array, mask, replacements, output);
   }
 };
 
 template <typename Type>
 struct ReplaceWithMask<Type, enable_if_boolean<Type>> {
-  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
-                       const ArrayData& in, const int64_t in_offset,
-                       const int64_t length) {
-    const auto in_arr = in.GetValues<uint8_t>(1, /*absolute_offset=*/0);
-    arrow::internal::CopyBitmap(in_arr, in_offset + in.offset, length, out, out_offset);
-  }
-  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
-                       const Scalar& in, const int64_t in_offset, const int64_t length) {
-    bit_util::SetBitsTo(
-        out, out_offset, length,
-        in.is_valid ? checked_cast<const BooleanScalar&>(in).value : false);
-  }
-
   static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
                                const BooleanScalar& mask, const Datum& replacements,
                                ArrayData* output) {
-    return ReplaceWithScalarMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
-                                                        output);
+    return ReplaceWithScalarMask<Type>(ctx, array, mask, replacements, output);
   }
   static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
                               const ArrayData& mask, const Datum& replacements,
                               ArrayData* output) {
-    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
-                                                       output);
+    return ReplaceWithArrayMask<Type>(ctx, array, mask, replacements, output);
   }
 };
 
 template <typename Type>
 struct ReplaceWithMask<Type, enable_if_fixed_size_binary<Type>> {
-  static void CopyData(const DataType& ty, uint8_t* out, const int64_t out_offset,
-                       const ArrayData& in, const int64_t in_offset,
-                       const int64_t length) {
-    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
-    uint8_t* begin = out + (out_offset * width);
-    const auto in_arr = in.GetValues<uint8_t>(1, (in_offset + in.offset) * width);
-    std::memcpy(begin, in_arr, length * width);
-  }
-  static void CopyData(const DataType& ty, uint8_t* out, const int64_t out_offset,
-                       const Scalar& in, const int64_t in_offset, const int64_t length) {
-    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
-    uint8_t* begin = out + (out_offset * width);
-    const auto& scalar = checked_cast<const arrow::internal::PrimitiveScalarBase&>(in);
-    // Null scalar may have null value buffer
-    if (!scalar.is_valid) return;
-    const util::string_view buffer = scalar.view();
-    DCHECK_GE(buffer.size(), static_cast<size_t>(width));
-    for (int i = 0; i < length; i++) {
-      std::memcpy(begin, buffer.data(), width);
-      begin += width;
-    }
-  }
-
   static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
                                const BooleanScalar& mask, const Datum& replacements,
                                ArrayData* output) {
-    return ReplaceWithScalarMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
-                                                        output);
+    return ReplaceWithScalarMask<Type>(ctx, array, mask, replacements, output);
   }
 
   static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
                               const ArrayData& mask, const Datum& replacements,
                               ArrayData* output) {
-    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
-                                                       output);
+    return ReplaceWithArrayMask<Type>(ctx, array, mask, replacements, output);
   }
 };
 
@@ -460,9 +405,9 @@ void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* null
   uint8_t* out_values = output->buffers[1]->mutable_data();
   arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), current_chunk.offset,
                               current_chunk.length, out_bitmap, output->offset);
-  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
-                                  /*out_offset=*/output->offset, current_chunk,
-                                  /*in_offset=*/0, current_chunk.length);
+  CopyDataUtils<Type>::CopyData(*current_chunk.type, current_chunk, /*in_offset=*/0,
+                                out_values, /*out_offset=*/output->offset,
+                                current_chunk.length);
 
   bool has_fill_value = *last_valid_value_offset != -1;
   int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
@@ -486,10 +431,10 @@ void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* null
           auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
           if (!current_bit) {
             if (has_fill_value) {
-              ReplaceWithMask<Type>::CopyData(
-                  *current_chunk.type, out_values, write_value_offset,
+              CopyDataUtils<Type>::CopyData(
+                  *current_chunk.type,
                   use_current_chunk ? current_chunk : last_valid_value_chunk,
-                  *last_valid_value_offset,
+                  *last_valid_value_offset, out_values, write_value_offset,
                   /*length=*/1);
               bit_util::SetBitTo(out_bitmap, write_value_offset, true);
             }
@@ -502,10 +447,10 @@ void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* null
       } else {
         for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
           if (has_fill_value) {
-            ReplaceWithMask<Type>::CopyData(
-                *current_chunk.type, out_values, write_value_offset,
+            CopyDataUtils<Type>::CopyData(
+                *current_chunk.type,
                 use_current_chunk ? current_chunk : last_valid_value_chunk,
-                *last_valid_value_offset,
+                *last_valid_value_offset, out_values, write_value_offset,
                 /*length=*/1);
             bit_util::SetBitTo(out_bitmap, write_value_offset, true);
           }