You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/04 12:25:24 UTC

[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10412: ARROW-9430: [C++] Implement replace_with_mask kernel

jorisvandenbossche commented on a change in pull request #10412:
URL: https://github.com/apache/arrow/pull/10412#discussion_r645510198



##########
File path: cpp/src/arrow/compute/api_vector.h
##########
@@ -157,6 +157,23 @@ Result<std::shared_ptr<ArrayData>> GetTakeIndices(
 
 }  // namespace internal
 
+/// \brief ReplaceWithMask replaces each value in the array corresponding
+/// to a true value in the mask with the next element from `options`.

Review comment:
       ```suggestion
   /// to a true value in the mask with the next element from `replacements`.
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace_test.cc
##########
@@ -0,0 +1,738 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/make_unique.h"
+
+namespace arrow {
+namespace compute {
+
+using arrow::internal::checked_pointer_cast;
+
+namespace {
+template <typename T>
+enable_if_parameter_free<T, std::shared_ptr<DataType>> default_type_instance() {
+  return TypeTraits<T>::type_singleton();
+}
+template <typename T>
+enable_if_time<T, std::shared_ptr<DataType>> default_type_instance() {
+  // Time32 requires second/milli, Time64 requires nano/micro
+  if (TypeTraits<T>::bytes_required(1) == 4) {
+    return std::make_shared<T>(TimeUnit::type::SECOND);
+  } else {
+    return std::make_shared<T>(TimeUnit::type::NANO);
+  }
+}
+template <typename T>
+enable_if_timestamp<T, std::shared_ptr<DataType>> default_type_instance() {
+  return std::make_shared<T>(TimeUnit::type::SECOND);
+}
+template <typename T>
+enable_if_decimal<T, std::shared_ptr<DataType>> default_type_instance() {
+  return std::make_shared<T>(5, 2);
+}
+template <typename T>
+enable_if_parameter_free<T, std::unique_ptr<typename TypeTraits<T>::BuilderType>>
+builder_instance() {
+  return arrow::internal::make_unique<typename TypeTraits<T>::BuilderType>();
+}
+template <typename T>
+enable_if_time<T, std::unique_ptr<typename TypeTraits<T>::BuilderType>>
+builder_instance() {
+  return arrow::internal::make_unique<typename TypeTraits<T>::BuilderType>(
+      default_type_instance<T>(), default_memory_pool());
+}
+template <typename T>
+enable_if_timestamp<T, std::unique_ptr<typename TypeTraits<T>::BuilderType>>
+builder_instance() {
+  return arrow::internal::make_unique<typename TypeTraits<T>::BuilderType>(
+      default_type_instance<T>(), default_memory_pool());
+}
+template <typename T>
+enable_if_t<std::is_signed<T>::value, T> max_int_value() {
+  return static_cast<T>(
+      std::min<double>(16384.0, static_cast<double>(std::numeric_limits<T>::max())));
+}
+template <typename T>
+enable_if_t<std::is_unsigned<T>::value, T> max_int_value() {
+  return static_cast<T>(
+      std::min<double>(16384.0, static_cast<double>(std::numeric_limits<T>::max())));
+}
+}  // namespace
+
+template <typename T>
+class TestReplaceKernel : public ::testing::Test {
+ protected:
+  virtual std::shared_ptr<DataType> type() = 0;
+
+  using ReplaceFunction = std::function<Result<Datum>(const Datum&, const Datum&,
+                                                      const Datum&, ExecContext*)>;
+
+  void SetUp() override { equal_options_ = equal_options_.nans_equal(true); }
+
+  Datum mask_scalar(bool value) { return Datum(std::make_shared<BooleanScalar>(value)); }
+
+  Datum null_mask_scalar() {
+    auto scalar = std::make_shared<BooleanScalar>(true);
+    scalar->is_valid = false;
+    return Datum(std::move(scalar));
+  }
+
+  Datum scalar(const std::string& json) { return ScalarFromJSON(type(), json); }
+
+  std::shared_ptr<Array> array(const std::string& value) {
+    return ArrayFromJSON(type(), value);
+  }
+
+  std::shared_ptr<Array> mask(const std::string& value) {
+    return ArrayFromJSON(boolean(), value);
+  }
+
+  Status AssertRaises(ReplaceFunction func, const std::shared_ptr<Array>& array,
+                      const Datum& mask, const std::shared_ptr<Array>& replacements) {
+    auto result = func(array, mask, replacements, nullptr);
+    EXPECT_FALSE(result.ok());
+    return result.status();
+  }
+
+  void Assert(ReplaceFunction func, const std::shared_ptr<Array>& array,
+              const Datum& mask, Datum replacements,
+              const std::shared_ptr<Array>& expected) {
+    SCOPED_TRACE("Replacements: " + (replacements.is_array()
+                                         ? replacements.make_array()->ToString()
+                                         : replacements.scalar()->ToString()));
+    SCOPED_TRACE("Mask: " + (mask.is_array() ? mask.make_array()->ToString()
+                                             : mask.scalar()->ToString()));
+    SCOPED_TRACE("Array: " + array->ToString());
+
+    ASSERT_OK_AND_ASSIGN(auto actual, func(array, mask, replacements, nullptr));
+    ASSERT_TRUE(actual.is_array());
+    ASSERT_OK(actual.make_array()->ValidateFull());
+
+    AssertArraysApproxEqual(*expected, *actual.make_array(), /*verbose=*/true,
+                            equal_options_);
+  }
+
+  std::shared_ptr<Array> NaiveImpl(
+      const typename TypeTraits<T>::ArrayType& array, const BooleanArray& mask,
+      const typename TypeTraits<T>::ArrayType& replacements) {
+    auto length = array.length();
+    auto builder = builder_instance<T>();
+    int64_t replacement_offset = 0;
+    for (int64_t i = 0; i < length; ++i) {
+      if (mask.IsValid(i)) {
+        if (mask.Value(i)) {
+          if (replacements.IsValid(replacement_offset)) {
+            ARROW_EXPECT_OK(builder->Append(replacements.Value(replacement_offset++)));
+          } else {
+            ARROW_EXPECT_OK(builder->AppendNull());
+            replacement_offset++;
+          }
+        } else {
+          if (array.IsValid(i)) {
+            ARROW_EXPECT_OK(builder->Append(array.Value(i)));
+          } else {
+            ARROW_EXPECT_OK(builder->AppendNull());
+          }
+        }
+      } else {
+        ARROW_EXPECT_OK(builder->AppendNull());
+      }
+    }
+    EXPECT_OK_AND_ASSIGN(auto expected, builder->Finish());
+    return expected;
+  }
+
+  EqualOptions equal_options_ = EqualOptions::Defaults();
+};
+
+template <typename T>
+class TestReplaceNumeric : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return default_type_instance<T>(); }
+};
+
+class TestReplaceBoolean : public TestReplaceKernel<BooleanType> {
+ protected:
+  std::shared_ptr<DataType> type() override {
+    return TypeTraits<BooleanType>::type_singleton();
+  }
+};
+
+class TestReplaceFixedSizeBinary : public TestReplaceKernel<FixedSizeBinaryType> {
+ protected:
+  std::shared_ptr<DataType> type() override { return fixed_size_binary(3); }
+};
+
+template <typename T>
+class TestReplaceDecimal : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return default_type_instance<T>(); }
+};
+
+class TestReplaceDayTimeInterval : public TestReplaceKernel<DayTimeIntervalType> {
+ protected:
+  std::shared_ptr<DataType> type() override {
+    return TypeTraits<DayTimeIntervalType>::type_singleton();
+  }
+};
+
+template <typename T>
+class TestReplaceBinary : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return default_type_instance<T>(); }
+};
+
+using NumericBasedTypes =
+    ::testing::Types<UInt8Type, UInt16Type, UInt32Type, UInt64Type, Int8Type, Int16Type,
+                     Int32Type, Int64Type, FloatType, DoubleType, Date32Type, Date64Type,
+                     Time32Type, Time64Type, TimestampType, MonthIntervalType>;
+
+TYPED_TEST_SUITE(TestReplaceNumeric, NumericBasedTypes);
+TYPED_TEST_SUITE(TestReplaceDecimal, DecimalArrowTypes);
+TYPED_TEST_SUITE(TestReplaceBinary, BinaryTypes);
+
+TYPED_TEST(TestReplaceNumeric, ReplaceWithMask) {
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask_scalar(false),
+               this->array("[]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask_scalar(true),
+               this->array("[]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[]"), this->null_mask_scalar(),
+               this->array("[]"), this->array("[]"));
+
+  this->Assert(ReplaceWithMask, this->array("[1]"), this->mask_scalar(false),
+               this->array("[]"), this->array("[1]"));
+  this->Assert(ReplaceWithMask, this->array("[1]"), this->mask_scalar(true),
+               this->array("[0]"), this->array("[0]"));
+  this->Assert(ReplaceWithMask, this->array("[1]"), this->null_mask_scalar(),
+               this->array("[]"), this->array("[null]"));
+
+  this->Assert(ReplaceWithMask, this->array("[0, 0]"), this->mask_scalar(false),
+               this->scalar("1"), this->array("[0, 0]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 0]"), this->mask_scalar(true),
+               this->scalar("1"), this->array("[1, 1]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 0]"), this->mask_scalar(true),
+               this->scalar("null"), this->array("[null, null]"));
+
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask("[]"), this->array("[]"),
+               this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 1, 2, 3]"),
+               this->mask("[false, false, false, false]"), this->array("[]"),
+               this->array("[0, 1, 2, 3]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 1, 2, 3]"),
+               this->mask("[true, true, true, true]"), this->array("[10, 11, 12, 13]"),
+               this->array("[10, 11, 12, 13]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 1, 2, 3]"),
+               this->mask("[null, null, null, null]"), this->array("[]"),
+               this->array("[null, null, null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 1, 2, null]"),
+               this->mask("[false, false, false, false]"), this->array("[]"),
+               this->array("[0, 1, 2, null]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 1, 2, null]"),
+               this->mask("[true, true, true, true]"), this->array("[10, 11, 12, 13]"),
+               this->array("[10, 11, 12, 13]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 1, 2, null]"),
+               this->mask("[null, null, null, null]"), this->array("[]"),
+               this->array("[null, null, null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 1, 2, 3, 4, 5]"),
+               this->mask("[true, true, false, false, null, null]"),
+               this->array("[10, null]"), this->array("[10, null, 2, 3, null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[null, null, null, null, null, null]"),
+               this->mask("[true, true, false, false, null, null]"),
+               this->array("[10, null]"),
+               this->array("[10, null, null, null, null, null]"));
+
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask("[]"), this->scalar("1"),
+               this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 1]"), this->mask("[true, true]"),
+               this->scalar("10"), this->array("[10, 10]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 1]"), this->mask("[true, true]"),
+               this->scalar("null"), this->array("[null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[0, 1, 2]"),
+               this->mask("[true, false, null]"), this->scalar("10"),
+               this->array("[10, 1, null]"));
+}
+
+TYPED_TEST(TestReplaceNumeric, ReplaceWithMaskRandom) {
+  using ArrayType = typename TypeTraits<TypeParam>::ArrayType;
+  using CType = typename TypeTraits<TypeParam>::CType;
+  auto ty = this->type();
+
+  random::RandomArrayGenerator rand(/*seed=*/0);
+  const int64_t length = 1023;
+  // Clamp the range because date/time types don't print well with extreme values
+  std::vector<std::string> values = {"0.01", "0"};
+  values.push_back(std::to_string(max_int_value<CType>()));
+  auto options = key_value_metadata({"null_probability", "min", "max"}, values);
+  auto array =
+      checked_pointer_cast<ArrayType>(rand.ArrayOf(*field("a", ty, options), length));
+  auto mask = checked_pointer_cast<BooleanArray>(
+      rand.ArrayOf(boolean(), length, /*null_probability=*/0.01));
+  const int64_t num_replacements = std::count_if(
+      mask->begin(), mask->end(),
+      [](util::optional<bool> value) { return value.has_value() && *value; });
+  auto replacements = checked_pointer_cast<ArrayType>(
+      rand.ArrayOf(*field("a", ty, options), num_replacements));
+  auto expected = this->NaiveImpl(*array, *mask, *replacements);
+
+  this->Assert(ReplaceWithMask, array, mask, replacements, expected);
+  for (int64_t slice = 1; slice <= 16; slice++) {
+    auto sliced_array = checked_pointer_cast<ArrayType>(array->Slice(slice, 15));
+    auto sliced_mask = checked_pointer_cast<BooleanArray>(mask->Slice(slice, 15));
+    auto new_expected = this->NaiveImpl(*sliced_array, *sliced_mask, *replacements);
+    this->Assert(ReplaceWithMask, sliced_array, sliced_mask, replacements, new_expected);
+  }
+}
+
+TYPED_TEST(TestReplaceNumeric, ReplaceWithMaskErrors) {
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      Invalid,
+      ::testing::HasSubstr("Replacement array must be of appropriate length (expected 1 "
+                           "items but got 2 items)"),
+      this->AssertRaises(ReplaceWithMask, this->array("[1]"), this->mask_scalar(true),
+                         this->array("[0, 1]")));
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      Invalid,
+      ::testing::HasSubstr("Replacement array must be of appropriate length (expected 2 "
+                           "items but got 1 items)"),
+      this->AssertRaises(ReplaceWithMask, this->array("[1, 2]"),
+                         this->mask("[true, true]"), this->array("[0]")));
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      Invalid,
+      ::testing::HasSubstr("Replacement array must be of appropriate length (expected 1 "
+                           "items but got 0 items)"),
+      this->AssertRaises(ReplaceWithMask, this->array("[1, 2]"),
+                         this->mask("[true, null]"), this->array("[]")));
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      Invalid,
+      ::testing::HasSubstr("Mask must be of same length as array (expected 2 "
+                           "items but got 0 items)"),
+      this->AssertRaises(ReplaceWithMask, this->array("[1, 2]"), this->mask("[]"),
+                         this->array("[]")));
+}
+
+TEST_F(TestReplaceBoolean, ReplaceWithMask) {
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask_scalar(false),
+               this->array("[]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask_scalar(true),
+               this->array("[]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[]"), this->null_mask_scalar(),
+               this->array("[]"), this->array("[]"));
+
+  this->Assert(ReplaceWithMask, this->array("[true]"), this->mask_scalar(false),
+               this->array("[]"), this->array("[true]"));
+  this->Assert(ReplaceWithMask, this->array("[true]"), this->mask_scalar(true),
+               this->array("[false]"), this->array("[false]"));
+  this->Assert(ReplaceWithMask, this->array("[true]"), this->null_mask_scalar(),
+               this->array("[]"), this->array("[null]"));
+
+  this->Assert(ReplaceWithMask, this->array("[false, false]"), this->mask_scalar(false),
+               this->scalar("true"), this->array("[false, false]"));
+  this->Assert(ReplaceWithMask, this->array("[false, false]"), this->mask_scalar(true),
+               this->scalar("true"), this->array("[true, true]"));
+  this->Assert(ReplaceWithMask, this->array("[false, false]"), this->mask_scalar(true),
+               this->scalar("null"), this->array("[null, null]"));
+
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask("[]"), this->array("[]"),
+               this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[true, true, true, true]"),
+               this->mask("[false, false, false, false]"), this->array("[]"),
+               this->array("[true, true, true, true]"));
+  this->Assert(ReplaceWithMask, this->array("[true, true, true, true]"),
+               this->mask("[true, true, true, true]"),
+               this->array("[false, false, false, false]"),
+               this->array("[false, false, false, false]"));
+  this->Assert(ReplaceWithMask, this->array("[true, true, true, true]"),
+               this->mask("[null, null, null, null]"), this->array("[]"),
+               this->array("[null, null, null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[true, true, true, null]"),
+               this->mask("[false, false, false, false]"), this->array("[]"),
+               this->array("[true, true, true, null]"));
+  this->Assert(ReplaceWithMask, this->array("[true, true, true, null]"),
+               this->mask("[true, true, true, true]"),
+               this->array("[false, false, false, false]"),
+               this->array("[false, false, false, false]"));
+  this->Assert(ReplaceWithMask, this->array("[true, true, true, null]"),
+               this->mask("[null, null, null, null]"), this->array("[]"),
+               this->array("[null, null, null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[true, true, true, true, true, true]"),
+               this->mask("[true, true, false, false, null, null]"),
+               this->array("[false, null]"),
+               this->array("[false, null, true, true, null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[null, null, null, null, null, null]"),
+               this->mask("[true, true, false, false, null, null]"),
+               this->array("[false, null]"),
+               this->array("[false, null, null, null, null, null]"));
+
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask("[]"), this->scalar("true"),
+               this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[false, false]"), this->mask("[true, true]"),
+               this->scalar("true"), this->array("[true, true]"));
+  this->Assert(ReplaceWithMask, this->array("[false, false]"), this->mask("[true, true]"),
+               this->scalar("null"), this->array("[null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[false, false, false]"),
+               this->mask("[true, false, null]"), this->scalar("true"),
+               this->array("[true, false, null]"));
+}
+
+TEST_F(TestReplaceBoolean, ReplaceWithMaskErrors) {
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      Invalid,
+      ::testing::HasSubstr("Replacement array must be of appropriate length (expected 1 "
+                           "items but got 2 items)"),
+      this->AssertRaises(ReplaceWithMask, this->array("[true]"), this->mask_scalar(true),
+                         this->array("[false, false]")));
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      Invalid,
+      ::testing::HasSubstr("Replacement array must be of appropriate length (expected 2 "
+                           "items but got 1 items)"),
+      this->AssertRaises(ReplaceWithMask, this->array("[true, true]"),
+                         this->mask("[true, true]"), this->array("[false]")));
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      Invalid,
+      ::testing::HasSubstr("Replacement array must be of appropriate length (expected 1 "
+                           "items but got 0 items)"),
+      this->AssertRaises(ReplaceWithMask, this->array("[true, true]"),
+                         this->mask("[true, null]"), this->array("[]")));
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      Invalid,
+      ::testing::HasSubstr("Mask must be of same length as array (expected 2 "
+                           "items but got 0 items)"),
+      this->AssertRaises(ReplaceWithMask, this->array("[true, true]"), this->mask("[]"),
+                         this->array("[]")));
+}
+
+TEST_F(TestReplaceFixedSizeBinary, ReplaceWithMask) {
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask_scalar(false),
+               this->array("[]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask_scalar(true),
+               this->array("[]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[]"), this->null_mask_scalar(),
+               this->array("[]"), this->array("[]"));
+
+  this->Assert(ReplaceWithMask, this->array(R"(["foo"])"), this->mask_scalar(false),
+               this->array("[]"), this->array(R"(["foo"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["foo"])"), this->mask_scalar(true),
+               this->array(R"(["bar"])"), this->array(R"(["bar"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["foo"])"), this->null_mask_scalar(),
+               this->array("[]"), this->array("[null]"));
+
+  this->Assert(ReplaceWithMask, this->array(R"(["foo", "bar"])"),
+               this->mask_scalar(false), this->scalar(R"("baz")"),
+               this->array(R"(["foo", "bar"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["foo", "bar"])"), this->mask_scalar(true),
+               this->scalar(R"("baz")"), this->array(R"(["baz", "baz"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["foo", "bar"])"), this->mask_scalar(true),
+               this->scalar("null"), this->array(R"([null, null])"));
+
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask("[]"), this->array("[]"),
+               this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array(R"(["aaa", "bbb", "ccc", "ddd"])"),
+               this->mask("[false, false, false, false]"), this->array("[]"),
+               this->array(R"(["aaa", "bbb", "ccc", "ddd"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["aaa", "bbb", "ccc", "ddd"])"),
+               this->mask("[true, true, true, true]"),
+               this->array(R"(["eee", "fff", "ggg", "hhh"])"),
+               this->array(R"(["eee", "fff", "ggg", "hhh"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["aaa", "bbb", "ccc", "ddd"])"),
+               this->mask("[null, null, null, null]"), this->array("[]"),
+               this->array(R"([null, null, null, null])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["aaa", "bbb", "ccc", null])"),
+               this->mask("[false, false, false, false]"), this->array("[]"),
+               this->array(R"(["aaa", "bbb", "ccc", null])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["aaa", "bbb", "ccc", null])"),
+               this->mask("[true, true, true, true]"),
+               this->array(R"(["eee", "fff", "ggg", "hhh"])"),
+               this->array(R"(["eee", "fff", "ggg", "hhh"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["aaa", "bbb", "ccc", null])"),
+               this->mask("[null, null, null, null]"), this->array("[]"),
+               this->array(R"([null, null, null, null])"));
+  this->Assert(ReplaceWithMask,
+               this->array(R"(["aaa", "bbb", "ccc", "ddd", "eee", "fff"])"),
+               this->mask("[true, true, false, false, null, null]"),
+               this->array(R"(["ggg", null])"),
+               this->array(R"(["ggg", null, "ccc", "ddd", null, null])"));
+  this->Assert(ReplaceWithMask, this->array(R"([null, null, null, null, null, null])"),
+               this->mask("[true, true, false, false, null, null]"),
+               this->array(R"(["aaa", null])"),
+               this->array(R"(["aaa", null, null, null, null, null])"));
+
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask("[]"),
+               this->scalar(R"("zzz")"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array(R"(["aaa", "bbb"])"),
+               this->mask("[true, true]"), this->scalar(R"("zzz")"),
+               this->array(R"(["zzz", "zzz"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["aaa", "bbb"])"),
+               this->mask("[true, true]"), this->scalar("null"),
+               this->array("[null, null]"));
+  this->Assert(ReplaceWithMask, this->array(R"(["aaa", "bbb", "ccc"])"),
+               this->mask("[true, false, null]"), this->scalar(R"("zzz")"),
+               this->array(R"(["zzz", "bbb", null])"));
+}
+
+TEST_F(TestReplaceFixedSizeBinary, ReplaceWithMaskErrors) {
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      Invalid,
+      ::testing::AllOf(
+          ::testing::HasSubstr("Replacements must be of same type (expected "),
+          ::testing::HasSubstr(this->type()->ToString()),
+          ::testing::HasSubstr("but got fixed_size_binary[2]")),
+      this->AssertRaises(ReplaceWithMask, this->array("[]"), this->mask_scalar(true),
+                         ArrayFromJSON(fixed_size_binary(2), "[]")));
+}
+
+TYPED_TEST(TestReplaceDecimal, ReplaceWithMask) {
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask_scalar(false),
+               this->array("[]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask_scalar(true),
+               this->array("[]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[]"), this->null_mask_scalar(),
+               this->array("[]"), this->array("[]"));
+
+  this->Assert(ReplaceWithMask, this->array(R"(["1.00"])"), this->mask_scalar(false),
+               this->array("[]"), this->array(R"(["1.00"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["1.00"])"), this->mask_scalar(true),
+               this->array(R"(["0.00"])"), this->array(R"(["0.00"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["1.00"])"), this->null_mask_scalar(),
+               this->array("[]"), this->array("[null]"));
+
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "0.00"])"),
+               this->mask_scalar(false), this->scalar(R"("1.00")"),
+               this->array(R"(["0.00", "0.00"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "0.00"])"),
+               this->mask_scalar(true), this->scalar(R"("1.00")"),
+               this->array(R"(["1.00", "1.00"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "0.00"])"),
+               this->mask_scalar(true), this->scalar("null"),
+               this->array("[null, null]"));
+
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask("[]"), this->array("[]"),
+               this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "1.00", "2.00", "3.00"])"),
+               this->mask("[false, false, false, false]"), this->array("[]"),
+               this->array(R"(["0.00", "1.00", "2.00", "3.00"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "1.00", "2.00", "3.00"])"),
+               this->mask("[true, true, true, true]"),
+               this->array(R"(["10.00", "11.00", "12.00", "13.00"])"),
+               this->array(R"(["10.00", "11.00", "12.00", "13.00"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "1.00", "2.00", "3.00"])"),
+               this->mask("[null, null, null, null]"), this->array("[]"),
+               this->array("[null, null, null, null]"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "1.00", "2.00", null])"),
+               this->mask("[false, false, false, false]"), this->array("[]"),
+               this->array(R"(["0.00", "1.00", "2.00", null])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "1.00", "2.00", null])"),
+               this->mask("[true, true, true, true]"),
+               this->array(R"(["10.00", "11.00", "12.00", "13.00"])"),
+               this->array(R"(["10.00", "11.00", "12.00", "13.00"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "1.00", "2.00", null])"),
+               this->mask("[null, null, null, null]"), this->array("[]"),
+               this->array("[null, null, null, null]"));
+  this->Assert(ReplaceWithMask,
+               this->array(R"(["0.00", "1.00", "2.00", "3.00", "4.00", "5.00"])"),
+               this->mask("[true, true, false, false, null, null]"),
+               this->array(R"(["10.00", null])"),
+               this->array(R"(["10.00", null, "2.00", "3.00", null, null])"));
+  this->Assert(ReplaceWithMask, this->array("[null, null, null, null, null, null]"),
+               this->mask("[true, true, false, false, null, null]"),
+               this->array(R"(["10.00", null])"),
+               this->array(R"(["10.00", null, null, null, null, null])"));
+
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask("[]"),
+               this->scalar(R"("1.00")"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "1.00"])"),
+               this->mask("[true, true]"), this->scalar(R"("10.00")"),
+               this->array(R"(["10.00", "10.00"])"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "1.00"])"),
+               this->mask("[true, true]"), this->scalar("null"),
+               this->array("[null, null]"));
+  this->Assert(ReplaceWithMask, this->array(R"(["0.00", "1.00", "2.00"])"),
+               this->mask("[true, false, null]"), this->scalar(R"("10.00")"),
+               this->array(R"(["10.00", "1.00", null])"));
+}
+
+TEST_F(TestReplaceDayTimeInterval, ReplaceWithMask) {
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask_scalar(false),
+               this->array("[]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask_scalar(true),
+               this->array("[]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[]"), this->null_mask_scalar(),
+               this->array("[]"), this->array("[]"));
+
+  this->Assert(ReplaceWithMask, this->array("[[1, 2]]"), this->mask_scalar(false),
+               this->array("[]"), this->array("[[1, 2]]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2]]"), this->mask_scalar(true),
+               this->array("[[3, 4]]"), this->array("[[3, 4]]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2]]"), this->null_mask_scalar(),
+               this->array("[]"), this->array("[null]"));
+
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [3, 4]]"), this->mask_scalar(false),
+               this->scalar("[7, 8]"), this->array("[[1, 2], [3, 4]]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [3, 4]]"), this->mask_scalar(true),
+               this->scalar("[7, 8]"), this->array("[[7, 8], [7, 8]]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [3, 4]]"), this->mask_scalar(true),
+               this->scalar("null"), this->array("[null, null]"));
+
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask("[]"), this->array("[]"),
+               this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [1, 2], [1, 2], [1, 2]]"),
+               this->mask("[false, false, false, false]"), this->array("[]"),
+               this->array("[[1, 2], [1, 2], [1, 2], [1, 2]]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [1, 2], [1, 2], [1, 2]]"),
+               this->mask("[true, true, true, true]"),
+               this->array("[[3, 4], [3, 4], [3, 4], [3, 4]]"),
+               this->array("[[3, 4], [3, 4], [3, 4], [3, 4]]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [1, 2], [1, 2], [1, 2]]"),
+               this->mask("[null, null, null, null]"), this->array("[]"),
+               this->array("[null, null, null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [1, 2], [1, 2], null]"),
+               this->mask("[false, false, false, false]"), this->array("[]"),
+               this->array("[[1, 2], [1, 2], [1, 2], null]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [1, 2], [1, 2], null]"),
+               this->mask("[true, true, true, true]"),
+               this->array("[[3, 4], [3, 4], [3, 4], [3, 4]]"),
+               this->array("[[3, 4], [3, 4], [3, 4], [3, 4]]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [1, 2], [1, 2], null]"),
+               this->mask("[null, null, null, null]"), this->array("[]"),
+               this->array("[null, null, null, null]"));
+  this->Assert(
+      ReplaceWithMask, this->array("[[1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 2]]"),
+      this->mask("[true, true, false, false, null, null]"), this->array("[[3, 4], null]"),
+      this->array("[[3, 4], null, [1, 2], [1, 2], null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[null, null, null, null, null, null]"),
+               this->mask("[true, true, false, false, null, null]"),
+               this->array("[[3, 4], null]"),
+               this->array("[[3, 4], null, null, null, null, null]"));
+
+  this->Assert(ReplaceWithMask, this->array("[]"), this->mask("[]"),
+               this->scalar("[7, 8]"), this->array("[]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [3, 4]]"),
+               this->mask("[true, true]"), this->scalar("[7, 8]"),
+               this->array("[[7, 8], [7, 8]]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [3, 4]]"),
+               this->mask("[true, true]"), this->scalar("null"),
+               this->array("[null, null]"));
+  this->Assert(ReplaceWithMask, this->array("[[1, 2], [3, 4], [5, 6]]"),
+               this->mask("[true, false, null]"), this->scalar("[7, 8]"),
+               this->array("[[7, 8], [3, 4], null]"));
+}
+
+TYPED_TEST(TestReplaceBinary, ReplaceWithMask) {

Review comment:
       If I understand correctly, this test is for the variable sized binary/string types? But the values in the test seem to only contain fixed length strings (probably copied from the TestReplaceFixedSizeBinary). Maybe introduce a few strings with varying length?

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+namespace {
+
+Status ReplacementArrayTooShort(int64_t expected, int64_t actual) {
+  return Status::Invalid("Replacement array must be of appropriate length (expected ",
+                         expected, " items but got ", actual, " items)");
+}
+
+// Helper to implement replace_with kernel with scalar mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types
+Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
+                             const BooleanScalar& mask, const Datum& replacements,
+                             ArrayData* output) {
+  if (!mask.is_valid) {
+    // Output = null
+    ARROW_ASSIGN_OR_RAISE(auto array,
+                          MakeArrayOfNull(array.type, array.length, ctx->memory_pool()));
+    *output = *array->data();
+    return Status::OK();
+  }
+  if (mask.value) {
+    // Output = replacement
+    if (replacements.is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto replacement_array,
+          MakeArrayFromScalar(*replacements.scalar(), array.length, ctx->memory_pool()));
+      *output = *replacement_array->data();
+    } else {
+      auto replacement_array = replacements.array();
+      if (replacement_array->length != array.length) {
+        return ReplacementArrayTooShort(array.length, replacement_array->length);
+      }
+      *output = *replacement_array;
+    }
+  } else {
+    // Output = input
+    *output = array;
+  }
+  return Status::OK();
+}
+
+// Helper to implement replace_with kernel with array mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types and to handle
+// scalar and array replacements
+template <typename Functor>
+Status ReplaceWithArrayMask(KernelContext* ctx, const ArrayData& array,
+                            const ArrayData& mask, const Datum& replacements,
+                            ArrayData* output) {
+  ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                        Functor::AllocateData(ctx, *array.type, array.length));
+
+  uint8_t* out_bitmap = nullptr;
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  const uint8_t* mask_bitmap = mask.MayHaveNulls() ? mask.buffers[0]->data() : nullptr;
+  const uint8_t* mask_values = mask.buffers[1]->data();
+  bool replacements_bitmap;
+  int64_t replacements_length;
+  if (replacements.is_array()) {
+    replacements_bitmap = replacements.array()->MayHaveNulls();
+    replacements_length = replacements.array()->length;
+  } else {
+    replacements_bitmap = !replacements.scalar()->is_valid;
+    replacements_length = std::numeric_limits<int64_t>::max();
+  }
+  if (array.MayHaveNulls() || mask.MayHaveNulls() || replacements_bitmap) {
+    ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(array.length));
+    out_bitmap = output->buffers[0]->mutable_data();
+    output->null_count = -1;
+    if (array.MayHaveNulls()) {
+      arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                                  out_bitmap, /*dest_offset=*/0);
+    } else {
+      std::memset(out_bitmap, 0xFF, output->buffers[0]->size());
+    }
+  } else {
+    output->null_count = 0;
+  }
+  auto copy_bitmap = [&](int64_t out_offset, int64_t in_offset, int64_t length) {
+    DCHECK(out_bitmap);
+    if (replacements.is_array()) {
+      const auto& in_data = *replacements.array();
+      const auto in_bitmap = in_data.GetValues<uint8_t>(0, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_bitmap, in_data.offset + in_offset, length,
+                                  out_bitmap, out_offset);
+    } else {
+      BitUtil::SetBitsTo(out_bitmap, out_offset, length, !replacements_bitmap);
+    }
+  };
+
+  Functor::CopyData(*array.type, out_values, /*out_offset=*/0, array, /*in_offset=*/0,
+                    array.length);
+  arrow::internal::BitBlockCounter value_counter(mask_values, mask.offset, mask.length);
+  arrow::internal::OptionalBitBlockCounter valid_counter(mask_bitmap, mask.offset,
+                                                         mask.length);
+  int64_t out_offset = 0;
+  int64_t replacements_offset = 0;
+  while (out_offset < array.length) {
+    BitBlockCount value_block = value_counter.NextWord();
+    BitBlockCount valid_block = valid_counter.NextWord();
+    DCHECK_EQ(value_block.length, valid_block.length);
+    if (value_block.AllSet() && valid_block.AllSet()) {
+      // Copy from replacement array
+      if (replacements_offset + valid_block.length > replacements_length) {
+        return ReplacementArrayTooShort(replacements_offset + valid_block.length,
+                                        replacements_length);
+      }
+      Functor::CopyData(*array.type, out_values, out_offset, replacements,
+                        replacements_offset, valid_block.length);
+      if (replacements_bitmap) {
+        copy_bitmap(out_offset, replacements_offset, valid_block.length);
+      } else if (!replacements_bitmap && out_bitmap) {
+        BitUtil::SetBitsTo(out_bitmap, out_offset, valid_block.length, true);
+      }
+      replacements_offset += valid_block.length;
+    } else if (value_block.NoneSet() && valid_block.AllSet()) {
+      // Do nothing
+    } else if (valid_block.NoneSet()) {
+      DCHECK(out_bitmap);
+      BitUtil::SetBitsTo(out_bitmap, out_offset, valid_block.length, false);
+    } else {
+      for (int64_t i = 0; i < valid_block.length; ++i) {
+        if (BitUtil::GetBit(mask_values, out_offset + mask.offset + i) &&
+            (!mask_bitmap ||
+             BitUtil::GetBit(mask_bitmap, out_offset + mask.offset + i))) {
+          if (replacements_offset >= replacements_length) {
+            return ReplacementArrayTooShort(replacements_offset + 1, replacements_length);
+          }
+          Functor::CopyData(*array.type, out_values, out_offset + i, replacements,
+                            replacements_offset,
+                            /*length=*/1);
+          if (replacements_bitmap) {
+            copy_bitmap(out_offset + i, replacements_offset, 1);
+          }
+          replacements_offset++;
+        }
+      }
+    }
+    out_offset += valid_block.length;
+  }
+
+  if (mask.MayHaveNulls()) {
+    arrow::internal::BitmapAnd(out_bitmap, /*left_offset=*/0, mask.buffers[0]->data(),
+                               mask.offset, array.length,
+                               /*out_offset=*/0, out_bitmap);
+  }
+  return Status::OK();
+}
+
+template <typename Type, typename Enable = void>
+struct ReplaceWithMask {};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_number<Type>> {
+  using T = typename TypeTraits<Type>::CType;
+
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx, const DataType&,
+                                                      const int64_t length) {
+    return ctx->Allocate(length * sizeof(T));
+  }
+
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * sizeof(T));
+      std::memcpy(out + (out_offset * sizeof(T)), in_arr, length * sizeof(T));
+    } else {
+      T* begin = reinterpret_cast<T*>(out + (out_offset * sizeof(T)));
+      T* end = begin + length;
+      std::fill(begin, end, UnboxScalar<Type>::Unbox(*in.scalar()));
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_boolean<Type>> {
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx, const DataType&,
+                                                      const int64_t length) {
+    return ctx->AllocateBitmap(length);
+  }
+
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr = in_data.GetValues<uint8_t>(1, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_arr, in_offset + in_data.offset, length, out,
+                                  out_offset);
+    } else {
+      BitUtil::SetBitsTo(out, out_offset, length, in.scalar()->is_valid);
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_same<Type, FixedSizeBinaryType>> {
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx,
+                                                      const DataType& ty,
+                                                      const int64_t length) {
+    return ctx->Allocate(length *
+                         checked_cast<const FixedSizeBinaryType&>(ty).byte_width());
+  }
+
+  static void CopyData(const DataType& ty, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    uint8_t* begin = out + (out_offset * width);
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * width);
+      std::memcpy(begin, in_arr, length * width);
+    } else {
+      const FixedSizeBinaryScalar& scalar =
+          checked_cast<const FixedSizeBinaryScalar&>(*in.scalar());
+      // Null scalar may have null value buffer
+      if (!scalar.value) return;
+      const Buffer& buffer = *scalar.value;
+      const uint8_t* value = buffer.data();
+      DCHECK_GE(buffer.size(), width);
+      for (int i = 0; i < length; i++) {
+        std::memcpy(begin, value, width);
+        begin += width;
+      }
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_decimal<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx,
+                                                      const DataType& ty,
+                                                      const int64_t length) {
+    return ctx->Allocate(length *
+                         checked_cast<const FixedSizeBinaryType&>(ty).byte_width());
+  }
+
+  static void CopyData(const DataType& ty, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    uint8_t* begin = out + (out_offset * width);
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * width);
+      std::memcpy(begin, in_arr, length * width);
+    } else {
+      const ScalarType& scalar = checked_cast<const ScalarType&>(*in.scalar());
+      const auto value = scalar.value.ToBytes();
+      for (int i = 0; i < length; i++) {
+        std::memcpy(begin, value.data(), width);
+        begin += width;
+      }
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_null<Type>> {
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    *output = array;
+    return Status::OK();
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t source_offset = 0;
+    int64_t replacements_offset = 0;
+    RETURN_NOT_OK(VisitArrayDataInline<BooleanType>(
+        mask,
+        [&](bool replace) {
+          if (replace && replacements.is_scalar()) {
+            const Scalar& scalar = *replacements.scalar();
+            if (scalar.is_valid) {
+              RETURN_NOT_OK(builder.Append(UnboxScalar<Type>::Unbox(scalar)));
+            } else {
+              RETURN_NOT_OK(builder.AppendNull());
+            }
+          } else {
+            const ArrayData& source = replace ? *replacements.array() : array;
+            const int64_t offset = replace ? replacements_offset++ : source_offset;
+            if (!source.MayHaveNulls() ||
+                BitUtil::GetBit(source.buffers[0]->data(), source.offset + offset)) {
+              const uint8_t* data = source.buffers[2]->data();
+              const offset_type* offsets = source.GetValues<offset_type>(1);
+              const offset_type offset0 = offsets[offset];
+              const offset_type offset1 = offsets[offset + 1];
+              RETURN_NOT_OK(builder.Append(data + offset0, offset1 - offset0));
+            } else {
+              RETURN_NOT_OK(builder.AppendNull());
+            }
+          }
+          source_offset++;
+          return Status::OK();
+        },
+        [&]() {
+          RETURN_NOT_OK(builder.AppendNull());
+          source_offset++;
+          return Status::OK();
+        }));
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = array.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMaskFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const ArrayData& array = *batch[0].array();
+    const Datum& replacements = batch[2];
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+
+    // Needed for FixedSizeBinary/parameterized types
+    if (!array.type->Equals(*replacements.type(), /*check_metadata=*/false)) {
+      return Status::Invalid("Replacements must be of same type (expected ",
+                             array.type->ToString(), " but got ",
+                             replacements.type()->ToString(), ")");
+    }
+
+    if (!replacements.is_array() && !replacements.is_scalar()) {
+      return Status::Invalid("Replacements must be array or scalar");
+    }
+
+    if (batch[1].is_scalar()) {
+      return ReplaceWithMask<Type>::ExecScalarMask(
+          ctx, array, batch[1].scalar_as<BooleanScalar>(), replacements, output);
+    }
+    const ArrayData& mask = *batch[1].array();
+    if (array.length != mask.length) {
+      return Status::Invalid("Mask must be of same length as array (expected ",
+                             array.length, " items but got ", mask.length, " items)");
+    }
+    return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, replacements, output);
+  }
+};
+
+}  // namespace
+
+const FunctionDoc replace_with_mask_doc(
+    "Replace items using a mask and replacement values",
+    ("Given an array and a Boolean mask (either scalar or of equal length), "
+     "along with replacement values (either scalar or array), "
+     "each corresponding element of the mask will be replaced by the next "
+     "value of the replacements (or null if the mask is null)."

Review comment:
       ```suggestion
        "value of the replacements (or null if the mask is null). "
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+namespace {
+
+Status ReplacementArrayTooShort(int64_t expected, int64_t actual) {
+  return Status::Invalid("Replacement array must be of appropriate length (expected ",
+                         expected, " items but got ", actual, " items)");
+}
+
+// Helper to implement replace_with kernel with scalar mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types
+Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
+                             const BooleanScalar& mask, const Datum& replacements,
+                             ArrayData* output) {
+  if (!mask.is_valid) {
+    // Output = null
+    ARROW_ASSIGN_OR_RAISE(auto array,
+                          MakeArrayOfNull(array.type, array.length, ctx->memory_pool()));
+    *output = *array->data();
+    return Status::OK();
+  }
+  if (mask.value) {
+    // Output = replacement
+    if (replacements.is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto replacement_array,
+          MakeArrayFromScalar(*replacements.scalar(), array.length, ctx->memory_pool()));
+      *output = *replacement_array->data();
+    } else {
+      auto replacement_array = replacements.array();
+      if (replacement_array->length != array.length) {
+        return ReplacementArrayTooShort(array.length, replacement_array->length);
+      }
+      *output = *replacement_array;
+    }
+  } else {
+    // Output = input
+    *output = array;
+  }
+  return Status::OK();
+}
+
+// Helper to implement replace_with kernel with array mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types and to handle
+// scalar and array replacements
+template <typename Functor>
+Status ReplaceWithArrayMask(KernelContext* ctx, const ArrayData& array,
+                            const ArrayData& mask, const Datum& replacements,
+                            ArrayData* output) {
+  ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                        Functor::AllocateData(ctx, *array.type, array.length));
+
+  uint8_t* out_bitmap = nullptr;
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  const uint8_t* mask_bitmap = mask.MayHaveNulls() ? mask.buffers[0]->data() : nullptr;
+  const uint8_t* mask_values = mask.buffers[1]->data();
+  bool replacements_bitmap;
+  int64_t replacements_length;
+  if (replacements.is_array()) {
+    replacements_bitmap = replacements.array()->MayHaveNulls();
+    replacements_length = replacements.array()->length;
+  } else {
+    replacements_bitmap = !replacements.scalar()->is_valid;
+    replacements_length = std::numeric_limits<int64_t>::max();
+  }
+  if (array.MayHaveNulls() || mask.MayHaveNulls() || replacements_bitmap) {
+    ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(array.length));
+    out_bitmap = output->buffers[0]->mutable_data();
+    output->null_count = -1;
+    if (array.MayHaveNulls()) {
+      arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                                  out_bitmap, /*dest_offset=*/0);
+    } else {
+      std::memset(out_bitmap, 0xFF, output->buffers[0]->size());
+    }
+  } else {
+    output->null_count = 0;
+  }
+  auto copy_bitmap = [&](int64_t out_offset, int64_t in_offset, int64_t length) {
+    DCHECK(out_bitmap);
+    if (replacements.is_array()) {
+      const auto& in_data = *replacements.array();
+      const auto in_bitmap = in_data.GetValues<uint8_t>(0, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_bitmap, in_data.offset + in_offset, length,
+                                  out_bitmap, out_offset);
+    } else {
+      BitUtil::SetBitsTo(out_bitmap, out_offset, length, !replacements_bitmap);
+    }
+  };
+
+  Functor::CopyData(*array.type, out_values, /*out_offset=*/0, array, /*in_offset=*/0,
+                    array.length);
+  arrow::internal::BitBlockCounter value_counter(mask_values, mask.offset, mask.length);
+  arrow::internal::OptionalBitBlockCounter valid_counter(mask_bitmap, mask.offset,
+                                                         mask.length);
+  int64_t out_offset = 0;
+  int64_t replacements_offset = 0;
+  while (out_offset < array.length) {
+    BitBlockCount value_block = value_counter.NextWord();
+    BitBlockCount valid_block = valid_counter.NextWord();
+    DCHECK_EQ(value_block.length, valid_block.length);
+    if (value_block.AllSet() && valid_block.AllSet()) {
+      // Copy from replacement array
+      if (replacements_offset + valid_block.length > replacements_length) {
+        return ReplacementArrayTooShort(replacements_offset + valid_block.length,
+                                        replacements_length);
+      }
+      Functor::CopyData(*array.type, out_values, out_offset, replacements,
+                        replacements_offset, valid_block.length);
+      if (replacements_bitmap) {
+        copy_bitmap(out_offset, replacements_offset, valid_block.length);
+      } else if (!replacements_bitmap && out_bitmap) {
+        BitUtil::SetBitsTo(out_bitmap, out_offset, valid_block.length, true);
+      }
+      replacements_offset += valid_block.length;
+    } else if (value_block.NoneSet() && valid_block.AllSet()) {
+      // Do nothing
+    } else if (valid_block.NoneSet()) {
+      DCHECK(out_bitmap);
+      BitUtil::SetBitsTo(out_bitmap, out_offset, valid_block.length, false);
+    } else {
+      for (int64_t i = 0; i < valid_block.length; ++i) {
+        if (BitUtil::GetBit(mask_values, out_offset + mask.offset + i) &&
+            (!mask_bitmap ||
+             BitUtil::GetBit(mask_bitmap, out_offset + mask.offset + i))) {
+          if (replacements_offset >= replacements_length) {
+            return ReplacementArrayTooShort(replacements_offset + 1, replacements_length);
+          }
+          Functor::CopyData(*array.type, out_values, out_offset + i, replacements,
+                            replacements_offset,
+                            /*length=*/1);
+          if (replacements_bitmap) {
+            copy_bitmap(out_offset + i, replacements_offset, 1);
+          }
+          replacements_offset++;
+        }
+      }
+    }
+    out_offset += valid_block.length;
+  }
+
+  if (mask.MayHaveNulls()) {
+    arrow::internal::BitmapAnd(out_bitmap, /*left_offset=*/0, mask.buffers[0]->data(),
+                               mask.offset, array.length,
+                               /*out_offset=*/0, out_bitmap);
+  }
+  return Status::OK();
+}
+
+template <typename Type, typename Enable = void>
+struct ReplaceWithMask {};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_number<Type>> {
+  using T = typename TypeTraits<Type>::CType;
+
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx, const DataType&,
+                                                      const int64_t length) {
+    return ctx->Allocate(length * sizeof(T));
+  }
+
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * sizeof(T));
+      std::memcpy(out + (out_offset * sizeof(T)), in_arr, length * sizeof(T));
+    } else {
+      T* begin = reinterpret_cast<T*>(out + (out_offset * sizeof(T)));
+      T* end = begin + length;
+      std::fill(begin, end, UnboxScalar<Type>::Unbox(*in.scalar()));
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_boolean<Type>> {
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx, const DataType&,
+                                                      const int64_t length) {
+    return ctx->AllocateBitmap(length);
+  }
+
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr = in_data.GetValues<uint8_t>(1, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_arr, in_offset + in_data.offset, length, out,
+                                  out_offset);
+    } else {
+      BitUtil::SetBitsTo(out, out_offset, length, in.scalar()->is_valid);
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_same<Type, FixedSizeBinaryType>> {
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx,
+                                                      const DataType& ty,
+                                                      const int64_t length) {
+    return ctx->Allocate(length *
+                         checked_cast<const FixedSizeBinaryType&>(ty).byte_width());
+  }
+
+  static void CopyData(const DataType& ty, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    uint8_t* begin = out + (out_offset * width);
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * width);
+      std::memcpy(begin, in_arr, length * width);
+    } else {
+      const FixedSizeBinaryScalar& scalar =
+          checked_cast<const FixedSizeBinaryScalar&>(*in.scalar());
+      // Null scalar may have null value buffer
+      if (!scalar.value) return;
+      const Buffer& buffer = *scalar.value;
+      const uint8_t* value = buffer.data();
+      DCHECK_GE(buffer.size(), width);
+      for (int i = 0; i < length; i++) {
+        std::memcpy(begin, value, width);
+        begin += width;
+      }
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_decimal<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx,
+                                                      const DataType& ty,
+                                                      const int64_t length) {
+    return ctx->Allocate(length *
+                         checked_cast<const FixedSizeBinaryType&>(ty).byte_width());
+  }
+
+  static void CopyData(const DataType& ty, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    uint8_t* begin = out + (out_offset * width);
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * width);
+      std::memcpy(begin, in_arr, length * width);
+    } else {
+      const ScalarType& scalar = checked_cast<const ScalarType&>(*in.scalar());
+      const auto value = scalar.value.ToBytes();
+      for (int i = 0; i < length; i++) {
+        std::memcpy(begin, value.data(), width);
+        begin += width;
+      }
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_null<Type>> {
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    *output = array;
+    return Status::OK();
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t source_offset = 0;
+    int64_t replacements_offset = 0;
+    RETURN_NOT_OK(VisitArrayDataInline<BooleanType>(
+        mask,
+        [&](bool replace) {
+          if (replace && replacements.is_scalar()) {
+            const Scalar& scalar = *replacements.scalar();
+            if (scalar.is_valid) {
+              RETURN_NOT_OK(builder.Append(UnboxScalar<Type>::Unbox(scalar)));
+            } else {
+              RETURN_NOT_OK(builder.AppendNull());
+            }
+          } else {
+            const ArrayData& source = replace ? *replacements.array() : array;
+            const int64_t offset = replace ? replacements_offset++ : source_offset;
+            if (!source.MayHaveNulls() ||
+                BitUtil::GetBit(source.buffers[0]->data(), source.offset + offset)) {
+              const uint8_t* data = source.buffers[2]->data();
+              const offset_type* offsets = source.GetValues<offset_type>(1);
+              const offset_type offset0 = offsets[offset];
+              const offset_type offset1 = offsets[offset + 1];
+              RETURN_NOT_OK(builder.Append(data + offset0, offset1 - offset0));
+            } else {
+              RETURN_NOT_OK(builder.AppendNull());
+            }
+          }
+          source_offset++;
+          return Status::OK();
+        },
+        [&]() {
+          RETURN_NOT_OK(builder.AppendNull());
+          source_offset++;
+          return Status::OK();
+        }));
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = array.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMaskFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const ArrayData& array = *batch[0].array();
+    const Datum& replacements = batch[2];
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+
+    // Needed for FixedSizeBinary/parameterized types
+    if (!array.type->Equals(*replacements.type(), /*check_metadata=*/false)) {
+      return Status::Invalid("Replacements must be of same type (expected ",
+                             array.type->ToString(), " but got ",
+                             replacements.type()->ToString(), ")");
+    }
+
+    if (!replacements.is_array() && !replacements.is_scalar()) {
+      return Status::Invalid("Replacements must be array or scalar");
+    }
+
+    if (batch[1].is_scalar()) {
+      return ReplaceWithMask<Type>::ExecScalarMask(
+          ctx, array, batch[1].scalar_as<BooleanScalar>(), replacements, output);
+    }
+    const ArrayData& mask = *batch[1].array();
+    if (array.length != mask.length) {
+      return Status::Invalid("Mask must be of same length as array (expected ",
+                             array.length, " items but got ", mask.length, " items)");
+    }
+    return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, replacements, output);
+  }
+};
+
+}  // namespace
+
+const FunctionDoc replace_with_mask_doc(
+    "Replace items using a mask and replacement values",
+    ("Given an array and a Boolean mask (either scalar or of equal length), "
+     "along with replacement values (either scalar or array), "
+     "each corresponding element of the mask will be replaced by the next "
+     "value of the replacements (or null if the mask is null)."
+     "Hence, for replacement arrays, len(replacements) == popcnt(mask)."),

Review comment:
       I think popcnt will not be clear for many users. Maybe something like `sum(mask == true)` ?

##########
File path: docs/source/cpp/compute.rst
##########
@@ -815,6 +815,22 @@ Associative transforms
   Each output element corresponds to a unique value in the input, along
   with the number of times this value has appeared.
 
+Replacements
+~~~~~~~~~~~~
+
+These functions create a copy of the first input with some elements
+replaced, based on the remaining inputs.
+
++-------------------+------------+-------------------------+--------------+--------------+------------------+-------------+
+| Function name     | Arity      | Input type 1            | Input type 2 | Input type 3 | Output type      | Notes       |
++===================+============+=========================+==============+==============+==================+=============+
+| replace_with_mask | Ternary    | Fixed-width, non-binary | Boolean      | Input type 1 | Input type 1     | \(1)        |

Review comment:
       Non-fixed width and binary is now also supported already?

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+namespace {
+
+Status ReplacementArrayTooShort(int64_t expected, int64_t actual) {
+  return Status::Invalid("Replacement array must be of appropriate length (expected ",
+                         expected, " items but got ", actual, " items)");
+}
+
+// Helper to implement replace_with kernel with scalar mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types
+Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
+                             const BooleanScalar& mask, const Datum& replacements,
+                             ArrayData* output) {
+  if (!mask.is_valid) {
+    // Output = null
+    ARROW_ASSIGN_OR_RAISE(auto array,
+                          MakeArrayOfNull(array.type, array.length, ctx->memory_pool()));
+    *output = *array->data();
+    return Status::OK();
+  }
+  if (mask.value) {
+    // Output = replacement
+    if (replacements.is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto replacement_array,
+          MakeArrayFromScalar(*replacements.scalar(), array.length, ctx->memory_pool()));
+      *output = *replacement_array->data();
+    } else {
+      auto replacement_array = replacements.array();
+      if (replacement_array->length != array.length) {
+        return ReplacementArrayTooShort(array.length, replacement_array->length);
+      }
+      *output = *replacement_array;
+    }
+  } else {
+    // Output = input
+    *output = array;
+  }
+  return Status::OK();
+}
+
+// Helper to implement replace_with kernel with array mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types and to handle
+// scalar and array replacements
+template <typename Functor>
+Status ReplaceWithArrayMask(KernelContext* ctx, const ArrayData& array,
+                            const ArrayData& mask, const Datum& replacements,
+                            ArrayData* output) {
+  ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                        Functor::AllocateData(ctx, *array.type, array.length));
+
+  uint8_t* out_bitmap = nullptr;
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  const uint8_t* mask_bitmap = mask.MayHaveNulls() ? mask.buffers[0]->data() : nullptr;
+  const uint8_t* mask_values = mask.buffers[1]->data();
+  bool replacements_bitmap;
+  int64_t replacements_length;
+  if (replacements.is_array()) {
+    replacements_bitmap = replacements.array()->MayHaveNulls();
+    replacements_length = replacements.array()->length;
+  } else {
+    replacements_bitmap = !replacements.scalar()->is_valid;
+    replacements_length = std::numeric_limits<int64_t>::max();
+  }
+  if (array.MayHaveNulls() || mask.MayHaveNulls() || replacements_bitmap) {
+    ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(array.length));
+    out_bitmap = output->buffers[0]->mutable_data();
+    output->null_count = -1;
+    if (array.MayHaveNulls()) {
+      arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                                  out_bitmap, /*dest_offset=*/0);
+    } else {
+      std::memset(out_bitmap, 0xFF, output->buffers[0]->size());
+    }
+  } else {
+    output->null_count = 0;
+  }
+  auto copy_bitmap = [&](int64_t out_offset, int64_t in_offset, int64_t length) {
+    DCHECK(out_bitmap);
+    if (replacements.is_array()) {
+      const auto& in_data = *replacements.array();
+      const auto in_bitmap = in_data.GetValues<uint8_t>(0, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_bitmap, in_data.offset + in_offset, length,
+                                  out_bitmap, out_offset);
+    } else {
+      BitUtil::SetBitsTo(out_bitmap, out_offset, length, !replacements_bitmap);
+    }
+  };
+
+  Functor::CopyData(*array.type, out_values, /*out_offset=*/0, array, /*in_offset=*/0,
+                    array.length);
+  arrow::internal::BitBlockCounter value_counter(mask_values, mask.offset, mask.length);
+  arrow::internal::OptionalBitBlockCounter valid_counter(mask_bitmap, mask.offset,
+                                                         mask.length);
+  int64_t out_offset = 0;
+  int64_t replacements_offset = 0;
+  while (out_offset < array.length) {
+    BitBlockCount value_block = value_counter.NextWord();
+    BitBlockCount valid_block = valid_counter.NextWord();
+    DCHECK_EQ(value_block.length, valid_block.length);
+    if (value_block.AllSet() && valid_block.AllSet()) {
+      // Copy from replacement array
+      if (replacements_offset + valid_block.length > replacements_length) {
+        return ReplacementArrayTooShort(replacements_offset + valid_block.length,
+                                        replacements_length);
+      }
+      Functor::CopyData(*array.type, out_values, out_offset, replacements,
+                        replacements_offset, valid_block.length);
+      if (replacements_bitmap) {
+        copy_bitmap(out_offset, replacements_offset, valid_block.length);
+      } else if (!replacements_bitmap && out_bitmap) {
+        BitUtil::SetBitsTo(out_bitmap, out_offset, valid_block.length, true);
+      }
+      replacements_offset += valid_block.length;
+    } else if (value_block.NoneSet() && valid_block.AllSet()) {
+      // Do nothing
+    } else if (valid_block.NoneSet()) {
+      DCHECK(out_bitmap);
+      BitUtil::SetBitsTo(out_bitmap, out_offset, valid_block.length, false);
+    } else {
+      for (int64_t i = 0; i < valid_block.length; ++i) {
+        if (BitUtil::GetBit(mask_values, out_offset + mask.offset + i) &&
+            (!mask_bitmap ||
+             BitUtil::GetBit(mask_bitmap, out_offset + mask.offset + i))) {
+          if (replacements_offset >= replacements_length) {
+            return ReplacementArrayTooShort(replacements_offset + 1, replacements_length);
+          }
+          Functor::CopyData(*array.type, out_values, out_offset + i, replacements,
+                            replacements_offset,
+                            /*length=*/1);
+          if (replacements_bitmap) {
+            copy_bitmap(out_offset + i, replacements_offset, 1);
+          }
+          replacements_offset++;
+        }
+      }
+    }
+    out_offset += valid_block.length;
+  }
+
+  if (mask.MayHaveNulls()) {
+    arrow::internal::BitmapAnd(out_bitmap, /*left_offset=*/0, mask.buffers[0]->data(),
+                               mask.offset, array.length,
+                               /*out_offset=*/0, out_bitmap);
+  }
+  return Status::OK();
+}
+
+template <typename Type, typename Enable = void>
+struct ReplaceWithMask {};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_number<Type>> {
+  using T = typename TypeTraits<Type>::CType;
+
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx, const DataType&,
+                                                      const int64_t length) {
+    return ctx->Allocate(length * sizeof(T));
+  }
+
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * sizeof(T));
+      std::memcpy(out + (out_offset * sizeof(T)), in_arr, length * sizeof(T));
+    } else {
+      T* begin = reinterpret_cast<T*>(out + (out_offset * sizeof(T)));
+      T* end = begin + length;
+      std::fill(begin, end, UnboxScalar<Type>::Unbox(*in.scalar()));
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_boolean<Type>> {
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx, const DataType&,
+                                                      const int64_t length) {
+    return ctx->AllocateBitmap(length);
+  }
+
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr = in_data.GetValues<uint8_t>(1, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_arr, in_offset + in_data.offset, length, out,
+                                  out_offset);
+    } else {
+      BitUtil::SetBitsTo(out, out_offset, length, in.scalar()->is_valid);
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_same<Type, FixedSizeBinaryType>> {
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx,
+                                                      const DataType& ty,
+                                                      const int64_t length) {
+    return ctx->Allocate(length *
+                         checked_cast<const FixedSizeBinaryType&>(ty).byte_width());
+  }
+
+  static void CopyData(const DataType& ty, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    uint8_t* begin = out + (out_offset * width);
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * width);
+      std::memcpy(begin, in_arr, length * width);
+    } else {
+      const FixedSizeBinaryScalar& scalar =
+          checked_cast<const FixedSizeBinaryScalar&>(*in.scalar());
+      // Null scalar may have null value buffer
+      if (!scalar.value) return;
+      const Buffer& buffer = *scalar.value;
+      const uint8_t* value = buffer.data();
+      DCHECK_GE(buffer.size(), width);
+      for (int i = 0; i < length; i++) {
+        std::memcpy(begin, value, width);
+        begin += width;
+      }
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_decimal<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+
+  static Result<std::shared_ptr<Buffer>> AllocateData(KernelContext* ctx,
+                                                      const DataType& ty,
+                                                      const int64_t length) {
+    return ctx->Allocate(length *
+                         checked_cast<const FixedSizeBinaryType&>(ty).byte_width());
+  }
+
+  static void CopyData(const DataType& ty, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    uint8_t* begin = out + (out_offset * width);
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * width);
+      std::memcpy(begin, in_arr, length * width);
+    } else {
+      const ScalarType& scalar = checked_cast<const ScalarType&>(*in.scalar());
+      const auto value = scalar.value.ToBytes();
+      for (int i = 0; i < length; i++) {
+        std::memcpy(begin, value.data(), width);
+        begin += width;
+      }
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_null<Type>> {
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    *output = array;
+    return Status::OK();
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t source_offset = 0;
+    int64_t replacements_offset = 0;
+    RETURN_NOT_OK(VisitArrayDataInline<BooleanType>(
+        mask,
+        [&](bool replace) {
+          if (replace && replacements.is_scalar()) {
+            const Scalar& scalar = *replacements.scalar();
+            if (scalar.is_valid) {
+              RETURN_NOT_OK(builder.Append(UnboxScalar<Type>::Unbox(scalar)));
+            } else {
+              RETURN_NOT_OK(builder.AppendNull());
+            }
+          } else {
+            const ArrayData& source = replace ? *replacements.array() : array;
+            const int64_t offset = replace ? replacements_offset++ : source_offset;
+            if (!source.MayHaveNulls() ||
+                BitUtil::GetBit(source.buffers[0]->data(), source.offset + offset)) {
+              const uint8_t* data = source.buffers[2]->data();
+              const offset_type* offsets = source.GetValues<offset_type>(1);
+              const offset_type offset0 = offsets[offset];
+              const offset_type offset1 = offsets[offset + 1];
+              RETURN_NOT_OK(builder.Append(data + offset0, offset1 - offset0));
+            } else {
+              RETURN_NOT_OK(builder.AppendNull());
+            }
+          }
+          source_offset++;
+          return Status::OK();
+        },
+        [&]() {
+          RETURN_NOT_OK(builder.AppendNull());
+          source_offset++;
+          return Status::OK();
+        }));
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = array.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMaskFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const ArrayData& array = *batch[0].array();
+    const Datum& replacements = batch[2];
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+
+    // Needed for FixedSizeBinary/parameterized types
+    if (!array.type->Equals(*replacements.type(), /*check_metadata=*/false)) {
+      return Status::Invalid("Replacements must be of same type (expected ",
+                             array.type->ToString(), " but got ",
+                             replacements.type()->ToString(), ")");
+    }
+
+    if (!replacements.is_array() && !replacements.is_scalar()) {
+      return Status::Invalid("Replacements must be array or scalar");
+    }
+
+    if (batch[1].is_scalar()) {
+      return ReplaceWithMask<Type>::ExecScalarMask(
+          ctx, array, batch[1].scalar_as<BooleanScalar>(), replacements, output);
+    }
+    const ArrayData& mask = *batch[1].array();
+    if (array.length != mask.length) {
+      return Status::Invalid("Mask must be of same length as array (expected ",
+                             array.length, " items but got ", mask.length, " items)");
+    }
+    return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, replacements, output);
+  }
+};
+
+}  // namespace
+
+const FunctionDoc replace_with_mask_doc(
+    "Replace items using a mask and replacement values",
+    ("Given an array and a Boolean mask (either scalar or of equal length), "
+     "along with replacement values (either scalar or array), "
+     "each corresponding element of the mask will be replaced by the next "

Review comment:
       ```suggestion
        "each corresponding true value of the mask will be replaced by the next "
   ```
   
   ?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org