You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/07 15:55:44 UTC

[GitHub] [arrow] nirandaperera commented on a change in pull request #10412: ARROW-9430: [C++] Implement replace_with_mask kernel

nirandaperera commented on a change in pull request #10412:
URL: https://github.com/apache/arrow/pull/10412#discussion_r665457625



##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+namespace {
+
+Status ReplacementArrayTooShort(int64_t expected, int64_t actual) {
+  return Status::Invalid("Replacement array must be of appropriate length (expected ",
+                         expected, " items but got ", actual, " items)");
+}
+
+// Helper to implement replace_with kernel with scalar mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types
+Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
+                             const BooleanScalar& mask, const Datum& replacements,
+                             ArrayData* output) {
+  if (!mask.is_valid) {
+    // Output = null
+    ARROW_ASSIGN_OR_RAISE(auto replacement_array,
+                          MakeArrayOfNull(array.type, array.length, ctx->memory_pool()));
+    *output = *replacement_array->data();
+    return Status::OK();
+  }
+  if (mask.value) {
+    // Output = replacement
+    if (replacements.is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto replacement_array,
+          MakeArrayFromScalar(*replacements.scalar(), array.length, ctx->memory_pool()));
+      *output = *replacement_array->data();
+    } else {
+      auto replacement_array = replacements.array();
+      if (replacement_array->length != array.length) {
+        return ReplacementArrayTooShort(array.length, replacement_array->length);
+      }
+      *output = *replacement_array;
+    }
+  } else {
+    // Output = input
+    *output = array;
+  }
+  return Status::OK();
+}
+
+// Helper to implement replace_with kernel with array mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types and to handle
+// scalar and array replacements
+template <typename Functor>
+Status ReplaceWithArrayMask(KernelContext* ctx, const ArrayData& array,
+                            const ArrayData& mask, const Datum& replacements,
+                            ArrayData* output) {
+  uint8_t* out_bitmap = nullptr;
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  const uint8_t* mask_bitmap = mask.MayHaveNulls() ? mask.buffers[0]->data() : nullptr;
+  const uint8_t* mask_values = mask.buffers[1]->data();
+  bool replacements_bitmap;
+  int64_t replacements_length;
+  if (replacements.is_array()) {
+    replacements_bitmap = replacements.array()->MayHaveNulls();
+    replacements_length = replacements.array()->length;
+  } else {
+    replacements_bitmap = !replacements.scalar()->is_valid;
+    replacements_length = std::numeric_limits<int64_t>::max();
+  }
+  if (array.MayHaveNulls() || mask.MayHaveNulls() || replacements_bitmap) {
+    ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(array.length));
+    out_bitmap = output->buffers[0]->mutable_data();
+    output->null_count = -1;
+    if (array.MayHaveNulls()) {
+      // Copy array's bitmap
+      arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                                  out_bitmap, /*dest_offset=*/0);
+    } else {
+      // Array has no bitmap but mask/replacements do, generate an all-valid bitmap
+      std::memset(out_bitmap, 0xFF, output->buffers[0]->size());
+    }
+  } else {
+    output->null_count = 0;
+  }
+  auto copy_bitmap = [&](int64_t out_offset, int64_t in_offset, int64_t length) {
+    DCHECK(out_bitmap);
+    if (replacements.is_array()) {
+      const auto& in_data = *replacements.array();
+      const auto in_bitmap = in_data.GetValues<uint8_t>(0, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_bitmap, in_data.offset + in_offset, length,
+                                  out_bitmap, out_offset);
+    } else {
+      BitUtil::SetBitsTo(out_bitmap, out_offset, length, !replacements_bitmap);
+    }
+  };
+
+  Functor::CopyData(*array.type, out_values, /*out_offset=*/0, array, /*in_offset=*/0,
+                    array.length);
+  arrow::internal::BitBlockCounter value_counter(mask_values, mask.offset, mask.length);
+  arrow::internal::OptionalBitBlockCounter valid_counter(mask_bitmap, mask.offset,
+                                                         mask.length);
+  int64_t out_offset = 0;
+  int64_t replacements_offset = 0;
+  while (out_offset < array.length) {
+    BitBlockCount value_block = value_counter.NextWord();
+    BitBlockCount valid_block = valid_counter.NextWord();
+    DCHECK_EQ(value_block.length, valid_block.length);
+    if (value_block.AllSet() && valid_block.AllSet()) {
+      // Copy from replacement array
+      if (replacements_offset + valid_block.length > replacements_length) {
+        return ReplacementArrayTooShort(replacements_offset + valid_block.length,
+                                        replacements_length);
+      }
+      Functor::CopyData(*array.type, out_values, out_offset, replacements,
+                        replacements_offset, valid_block.length);
+      if (replacements_bitmap) {
+        copy_bitmap(out_offset, replacements_offset, valid_block.length);
+      } else if (!replacements_bitmap && out_bitmap) {
+        BitUtil::SetBitsTo(out_bitmap, out_offset, valid_block.length, true);
+      }
+      replacements_offset += valid_block.length;
+    } else if ((value_block.NoneSet() && valid_block.AllSet()) || valid_block.NoneSet()) {

Review comment:
       nit. I think this simplifies a bit. 
   ```
   A.B + ~B = (A+~B).(B+~B) = A+~B --> value_block.NoneSet() || valid_block.NoneSet()
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+namespace {
+
+Status ReplacementArrayTooShort(int64_t expected, int64_t actual) {
+  return Status::Invalid("Replacement array must be of appropriate length (expected ",
+                         expected, " items but got ", actual, " items)");
+}
+
+// Helper to implement replace_with kernel with scalar mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types
+Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
+                             const BooleanScalar& mask, const Datum& replacements,
+                             ArrayData* output) {
+  if (!mask.is_valid) {
+    // Output = null
+    ARROW_ASSIGN_OR_RAISE(auto replacement_array,
+                          MakeArrayOfNull(array.type, array.length, ctx->memory_pool()));
+    *output = *replacement_array->data();
+    return Status::OK();
+  }
+  if (mask.value) {
+    // Output = replacement
+    if (replacements.is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto replacement_array,
+          MakeArrayFromScalar(*replacements.scalar(), array.length, ctx->memory_pool()));
+      *output = *replacement_array->data();
+    } else {
+      auto replacement_array = replacements.array();
+      if (replacement_array->length != array.length) {
+        return ReplacementArrayTooShort(array.length, replacement_array->length);
+      }
+      *output = *replacement_array;
+    }
+  } else {
+    // Output = input
+    *output = array;
+  }
+  return Status::OK();
+}
+
+// Helper to implement replace_with kernel with array mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types and to handle
+// scalar and array replacements
+template <typename Functor>
+Status ReplaceWithArrayMask(KernelContext* ctx, const ArrayData& array,
+                            const ArrayData& mask, const Datum& replacements,
+                            ArrayData* output) {
+  uint8_t* out_bitmap = nullptr;
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  const uint8_t* mask_bitmap = mask.MayHaveNulls() ? mask.buffers[0]->data() : nullptr;
+  const uint8_t* mask_values = mask.buffers[1]->data();
+  bool replacements_bitmap;
+  int64_t replacements_length;
+  if (replacements.is_array()) {
+    replacements_bitmap = replacements.array()->MayHaveNulls();
+    replacements_length = replacements.array()->length;
+  } else {
+    replacements_bitmap = !replacements.scalar()->is_valid;
+    replacements_length = std::numeric_limits<int64_t>::max();
+  }
+  if (array.MayHaveNulls() || mask.MayHaveNulls() || replacements_bitmap) {
+    ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(array.length));
+    out_bitmap = output->buffers[0]->mutable_data();
+    output->null_count = -1;
+    if (array.MayHaveNulls()) {
+      // Copy array's bitmap
+      arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                                  out_bitmap, /*dest_offset=*/0);
+    } else {
+      // Array has no bitmap but mask/replacements do, generate an all-valid bitmap
+      std::memset(out_bitmap, 0xFF, output->buffers[0]->size());
+    }
+  } else {
+    output->null_count = 0;
+  }
+  auto copy_bitmap = [&](int64_t out_offset, int64_t in_offset, int64_t length) {
+    DCHECK(out_bitmap);
+    if (replacements.is_array()) {
+      const auto& in_data = *replacements.array();
+      const auto in_bitmap = in_data.GetValues<uint8_t>(0, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_bitmap, in_data.offset + in_offset, length,
+                                  out_bitmap, out_offset);
+    } else {
+      BitUtil::SetBitsTo(out_bitmap, out_offset, length, !replacements_bitmap);
+    }
+  };
+
+  Functor::CopyData(*array.type, out_values, /*out_offset=*/0, array, /*in_offset=*/0,
+                    array.length);
+  arrow::internal::BitBlockCounter value_counter(mask_values, mask.offset, mask.length);
+  arrow::internal::OptionalBitBlockCounter valid_counter(mask_bitmap, mask.offset,
+                                                         mask.length);
+  int64_t out_offset = 0;
+  int64_t replacements_offset = 0;
+  while (out_offset < array.length) {
+    BitBlockCount value_block = value_counter.NextWord();
+    BitBlockCount valid_block = valid_counter.NextWord();
+    DCHECK_EQ(value_block.length, valid_block.length);
+    if (value_block.AllSet() && valid_block.AllSet()) {
+      // Copy from replacement array
+      if (replacements_offset + valid_block.length > replacements_length) {
+        return ReplacementArrayTooShort(replacements_offset + valid_block.length,
+                                        replacements_length);
+      }
+      Functor::CopyData(*array.type, out_values, out_offset, replacements,
+                        replacements_offset, valid_block.length);
+      if (replacements_bitmap) {
+        copy_bitmap(out_offset, replacements_offset, valid_block.length);
+      } else if (!replacements_bitmap && out_bitmap) {
+        BitUtil::SetBitsTo(out_bitmap, out_offset, valid_block.length, true);
+      }
+      replacements_offset += valid_block.length;
+    } else if ((value_block.NoneSet() && valid_block.AllSet()) || valid_block.NoneSet()) {
+      // Do nothing
+    } else {
+      for (int64_t i = 0; i < valid_block.length; ++i) {
+        if (BitUtil::GetBit(mask_values, out_offset + mask.offset + i) &&
+            (!mask_bitmap ||
+             BitUtil::GetBit(mask_bitmap, out_offset + mask.offset + i))) {
+          if (replacements_offset >= replacements_length) {

Review comment:
       can't we pre-calculate true count in `mask` and validate it with the `replacements.length()` before hand, so that we don't have to do this check for every block? :thinking: 

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+namespace {
+
+Status ReplacementArrayTooShort(int64_t expected, int64_t actual) {
+  return Status::Invalid("Replacement array must be of appropriate length (expected ",
+                         expected, " items but got ", actual, " items)");
+}
+
+// Helper to implement replace_with kernel with scalar mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types
+Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
+                             const BooleanScalar& mask, const Datum& replacements,
+                             ArrayData* output) {
+  if (!mask.is_valid) {
+    // Output = null
+    ARROW_ASSIGN_OR_RAISE(auto replacement_array,
+                          MakeArrayOfNull(array.type, array.length, ctx->memory_pool()));
+    *output = *replacement_array->data();
+    return Status::OK();
+  }
+  if (mask.value) {
+    // Output = replacement
+    if (replacements.is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto replacement_array,
+          MakeArrayFromScalar(*replacements.scalar(), array.length, ctx->memory_pool()));
+      *output = *replacement_array->data();
+    } else {
+      auto replacement_array = replacements.array();
+      if (replacement_array->length != array.length) {
+        return ReplacementArrayTooShort(array.length, replacement_array->length);
+      }
+      *output = *replacement_array;
+    }
+  } else {
+    // Output = input
+    *output = array;
+  }
+  return Status::OK();
+}
+
+// Helper to implement replace_with kernel with array mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types and to handle
+// scalar and array replacements
+template <typename Functor>
+Status ReplaceWithArrayMask(KernelContext* ctx, const ArrayData& array,
+                            const ArrayData& mask, const Datum& replacements,
+                            ArrayData* output) {
+  uint8_t* out_bitmap = nullptr;
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  const uint8_t* mask_bitmap = mask.MayHaveNulls() ? mask.buffers[0]->data() : nullptr;
+  const uint8_t* mask_values = mask.buffers[1]->data();
+  bool replacements_bitmap;
+  int64_t replacements_length;
+  if (replacements.is_array()) {
+    replacements_bitmap = replacements.array()->MayHaveNulls();
+    replacements_length = replacements.array()->length;
+  } else {
+    replacements_bitmap = !replacements.scalar()->is_valid;
+    replacements_length = std::numeric_limits<int64_t>::max();
+  }
+  if (array.MayHaveNulls() || mask.MayHaveNulls() || replacements_bitmap) {
+    ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(array.length));
+    out_bitmap = output->buffers[0]->mutable_data();
+    output->null_count = -1;
+    if (array.MayHaveNulls()) {
+      // Copy array's bitmap
+      arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                                  out_bitmap, /*dest_offset=*/0);
+    } else {
+      // Array has no bitmap but mask/replacements do, generate an all-valid bitmap
+      std::memset(out_bitmap, 0xFF, output->buffers[0]->size());
+    }
+  } else {
+    output->null_count = 0;
+  }
+  auto copy_bitmap = [&](int64_t out_offset, int64_t in_offset, int64_t length) {
+    DCHECK(out_bitmap);
+    if (replacements.is_array()) {
+      const auto& in_data = *replacements.array();
+      const auto in_bitmap = in_data.GetValues<uint8_t>(0, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_bitmap, in_data.offset + in_offset, length,
+                                  out_bitmap, out_offset);
+    } else {
+      BitUtil::SetBitsTo(out_bitmap, out_offset, length, !replacements_bitmap);
+    }
+  };
+
+  Functor::CopyData(*array.type, out_values, /*out_offset=*/0, array, /*in_offset=*/0,
+                    array.length);
+  arrow::internal::BitBlockCounter value_counter(mask_values, mask.offset, mask.length);

Review comment:
       can't we use `OptionalBitBlockCounter` here? :thinking: 

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -0,0 +1,495 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+namespace {
+
+Status ReplacementArrayTooShort(int64_t expected, int64_t actual) {
+  return Status::Invalid("Replacement array must be of appropriate length (expected ",
+                         expected, " items but got ", actual, " items)");
+}
+
+// Helper to implement replace_with kernel with scalar mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types
+Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
+                             const BooleanScalar& mask, const Datum& replacements,
+                             ArrayData* output) {
+  if (!mask.is_valid) {
+    // Output = null
+    ARROW_ASSIGN_OR_RAISE(auto array,
+                          MakeArrayOfNull(array.type, array.length, ctx->memory_pool()));
+    *output = *array->data();
+    return Status::OK();
+  }
+  if (mask.value) {
+    // Output = replacement
+    if (replacements.is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto replacement_array,
+          MakeArrayFromScalar(*replacements.scalar(), array.length, ctx->memory_pool()));
+      *output = *replacement_array->data();
+    } else {
+      auto replacement_array = replacements.array();
+      if (replacement_array->length != array.length) {
+        return ReplacementArrayTooShort(array.length, replacement_array->length);
+      }
+      *output = *replacement_array;
+    }
+  } else {
+    // Output = input
+    *output = array;
+  }
+  return Status::OK();
+}
+
+// Helper to implement replace_with kernel with array mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types and to handle
+// scalar and array replacements
+template <typename Functor>
+Status ReplaceWithArrayMask(KernelContext* ctx, const ArrayData& array,
+                            const ArrayData& mask, const Datum& replacements,
+                            ArrayData* output) {
+  ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                        Functor::AllocateData(ctx, *array.type, array.length));
+
+  uint8_t* out_bitmap = nullptr;
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  const uint8_t* mask_bitmap = mask.MayHaveNulls() ? mask.buffers[0]->data() : nullptr;
+  const uint8_t* mask_values = mask.buffers[1]->data();
+  bool replacements_bitmap;
+  int64_t replacements_length;
+  if (replacements.is_array()) {
+    replacements_bitmap = replacements.array()->MayHaveNulls();
+    replacements_length = replacements.array()->length;
+  } else {
+    replacements_bitmap = !replacements.scalar()->is_valid;
+    replacements_length = std::numeric_limits<int64_t>::max();
+  }
+  if (array.MayHaveNulls() || mask.MayHaveNulls() || replacements_bitmap) {
+    ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(array.length));
+    out_bitmap = output->buffers[0]->mutable_data();
+    output->null_count = -1;
+    if (array.MayHaveNulls()) {
+      arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                                  out_bitmap, /*dest_offset=*/0);
+    } else {
+      std::memset(out_bitmap, 0xFF, output->buffers[0]->size());

Review comment:
       @lidavidm I think it will be better to use `BitUtil::SetBitsTo/SetBitmap` here. It would more precisely set values upto the [`offset`, `offset+length`).

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+namespace {
+
+Status ReplacementArrayTooShort(int64_t expected, int64_t actual) {
+  return Status::Invalid("Replacement array must be of appropriate length (expected ",
+                         expected, " items but got ", actual, " items)");
+}
+
+// Helper to implement replace_with kernel with scalar mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types
+Status ReplaceWithScalarMask(KernelContext* ctx, const ArrayData& array,
+                             const BooleanScalar& mask, const Datum& replacements,
+                             ArrayData* output) {
+  if (!mask.is_valid) {
+    // Output = null
+    ARROW_ASSIGN_OR_RAISE(auto replacement_array,
+                          MakeArrayOfNull(array.type, array.length, ctx->memory_pool()));
+    *output = *replacement_array->data();
+    return Status::OK();
+  }
+  if (mask.value) {
+    // Output = replacement
+    if (replacements.is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto replacement_array,
+          MakeArrayFromScalar(*replacements.scalar(), array.length, ctx->memory_pool()));
+      *output = *replacement_array->data();
+    } else {
+      auto replacement_array = replacements.array();
+      if (replacement_array->length != array.length) {
+        return ReplacementArrayTooShort(array.length, replacement_array->length);
+      }
+      *output = *replacement_array;
+    }
+  } else {
+    // Output = input
+    *output = array;
+  }
+  return Status::OK();
+}
+
+// Helper to implement replace_with kernel with array mask for fixed-width types,
+// using callbacks to handle both bool and byte-sized types and to handle
+// scalar and array replacements
+template <typename Functor>
+Status ReplaceWithArrayMask(KernelContext* ctx, const ArrayData& array,
+                            const ArrayData& mask, const Datum& replacements,
+                            ArrayData* output) {
+  uint8_t* out_bitmap = nullptr;
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  const uint8_t* mask_bitmap = mask.MayHaveNulls() ? mask.buffers[0]->data() : nullptr;
+  const uint8_t* mask_values = mask.buffers[1]->data();
+  bool replacements_bitmap;
+  int64_t replacements_length;
+  if (replacements.is_array()) {
+    replacements_bitmap = replacements.array()->MayHaveNulls();
+    replacements_length = replacements.array()->length;
+  } else {
+    replacements_bitmap = !replacements.scalar()->is_valid;
+    replacements_length = std::numeric_limits<int64_t>::max();
+  }
+  if (array.MayHaveNulls() || mask.MayHaveNulls() || replacements_bitmap) {
+    ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(array.length));
+    out_bitmap = output->buffers[0]->mutable_data();
+    output->null_count = -1;
+    if (array.MayHaveNulls()) {
+      // Copy array's bitmap
+      arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                                  out_bitmap, /*dest_offset=*/0);
+    } else {
+      // Array has no bitmap but mask/replacements do, generate an all-valid bitmap
+      std::memset(out_bitmap, 0xFF, output->buffers[0]->size());
+    }
+  } else {
+    output->null_count = 0;
+  }
+  auto copy_bitmap = [&](int64_t out_offset, int64_t in_offset, int64_t length) {
+    DCHECK(out_bitmap);
+    if (replacements.is_array()) {
+      const auto& in_data = *replacements.array();
+      const auto in_bitmap = in_data.GetValues<uint8_t>(0, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_bitmap, in_data.offset + in_offset, length,
+                                  out_bitmap, out_offset);
+    } else {
+      BitUtil::SetBitsTo(out_bitmap, out_offset, length, !replacements_bitmap);
+    }
+  };
+
+  Functor::CopyData(*array.type, out_values, /*out_offset=*/0, array, /*in_offset=*/0,
+                    array.length);
+  arrow::internal::BitBlockCounter value_counter(mask_values, mask.offset, mask.length);
+  arrow::internal::OptionalBitBlockCounter valid_counter(mask_bitmap, mask.offset,
+                                                         mask.length);
+  int64_t out_offset = 0;
+  int64_t replacements_offset = 0;
+  while (out_offset < array.length) {
+    BitBlockCount value_block = value_counter.NextWord();
+    BitBlockCount valid_block = valid_counter.NextWord();
+    DCHECK_EQ(value_block.length, valid_block.length);
+    if (value_block.AllSet() && valid_block.AllSet()) {
+      // Copy from replacement array
+      if (replacements_offset + valid_block.length > replacements_length) {
+        return ReplacementArrayTooShort(replacements_offset + valid_block.length,
+                                        replacements_length);
+      }
+      Functor::CopyData(*array.type, out_values, out_offset, replacements,
+                        replacements_offset, valid_block.length);
+      if (replacements_bitmap) {
+        copy_bitmap(out_offset, replacements_offset, valid_block.length);
+      } else if (!replacements_bitmap && out_bitmap) {
+        BitUtil::SetBitsTo(out_bitmap, out_offset, valid_block.length, true);
+      }
+      replacements_offset += valid_block.length;
+    } else if ((value_block.NoneSet() && valid_block.AllSet()) || valid_block.NoneSet()) {
+      // Do nothing
+    } else {
+      for (int64_t i = 0; i < valid_block.length; ++i) {
+        if (BitUtil::GetBit(mask_values, out_offset + mask.offset + i) &&
+            (!mask_bitmap ||
+             BitUtil::GetBit(mask_bitmap, out_offset + mask.offset + i))) {
+          if (replacements_offset >= replacements_length) {
+            return ReplacementArrayTooShort(replacements_offset + 1, replacements_length);
+          }
+          Functor::CopyData(*array.type, out_values, out_offset + i, replacements,
+                            replacements_offset,
+                            /*length=*/1);
+          if (replacements_bitmap) {
+            copy_bitmap(out_offset + i, replacements_offset, 1);
+          }
+          replacements_offset++;
+        }
+      }
+    }
+    out_offset += valid_block.length;
+  }
+
+  if (mask.MayHaveNulls()) {
+    arrow::internal::BitmapAnd(out_bitmap, /*left_offset=*/0, mask.buffers[0]->data(),
+                               mask.offset, array.length,
+                               /*out_offset=*/0, out_bitmap);
+  }
+  return Status::OK();
+}
+
+template <typename Type, typename Enable = void>
+struct ReplaceWithMask {};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_number<Type>> {
+  using T = typename TypeTraits<Type>::CType;
+
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * sizeof(T));
+      std::memcpy(out + (out_offset * sizeof(T)), in_arr, length * sizeof(T));
+    } else {
+      T* begin = reinterpret_cast<T*>(out + (out_offset * sizeof(T)));
+      T* end = begin + length;
+      std::fill(begin, end, UnboxScalar<Type>::Unbox(*in.scalar()));
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_boolean<Type>> {
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr = in_data.GetValues<uint8_t>(1, /*absolute_offset=*/0);
+      arrow::internal::CopyBitmap(in_arr, in_offset + in_data.offset, length, out,
+                                  out_offset);
+    } else {
+      BitUtil::SetBitsTo(out, out_offset, length, in.scalar()->is_valid);
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_same<Type, FixedSizeBinaryType>> {
+  static void CopyData(const DataType& ty, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    uint8_t* begin = out + (out_offset * width);
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * width);
+      std::memcpy(begin, in_arr, length * width);
+    } else {
+      const FixedSizeBinaryScalar& scalar =
+          checked_cast<const FixedSizeBinaryScalar&>(*in.scalar());
+      // Null scalar may have null value buffer
+      if (!scalar.value) return;
+      const Buffer& buffer = *scalar.value;
+      const uint8_t* value = buffer.data();
+      DCHECK_GE(buffer.size(), width);
+      for (int i = 0; i < length; i++) {
+        std::memcpy(begin, value, width);
+        begin += width;
+      }
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_decimal<Type>> {
+  using ScalarType = typename TypeTraits<Type>::ScalarType;
+  static void CopyData(const DataType& ty, uint8_t* out, const int64_t out_offset,
+                       const Datum& in, const int64_t in_offset, const int64_t length) {
+    const int32_t width = checked_cast<const FixedSizeBinaryType&>(ty).byte_width();
+    uint8_t* begin = out + (out_offset * width);
+    if (in.is_array()) {
+      const auto& in_data = *in.array();
+      const auto in_arr =
+          in_data.GetValues<uint8_t>(1, (in_offset + in_data.offset) * width);
+      std::memcpy(begin, in_arr, length * width);
+    } else {
+      const ScalarType& scalar = checked_cast<const ScalarType&>(*in.scalar());
+      const auto value = scalar.value.ToBytes();
+      for (int i = 0; i < length; i++) {
+        std::memcpy(begin, value.data(), width);
+        begin += width;
+      }
+    }
+  }
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    return ReplaceWithArrayMask<ReplaceWithMask<Type>>(ctx, array, mask, replacements,
+                                                       output);
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_null<Type>> {
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    *output = array;
+    return Status::OK();
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct ReplaceWithMask<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecScalarMask(KernelContext* ctx, const ArrayData& array,
+                               const BooleanScalar& mask, const Datum& replacements,
+                               ArrayData* output) {
+    return ReplaceWithScalarMask(ctx, array, mask, replacements, output);
+  }
+  static Status ExecArrayMask(KernelContext* ctx, const ArrayData& array,
+                              const ArrayData& mask, const Datum& replacements,
+                              ArrayData* output) {
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));

Review comment:
       nit. `array.length + 1`? :thinking: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org