You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/03 14:37:38 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #11853: ARROW-1699: [C++] forward, backward fill kernel functions

lidavidm commented on a change in pull request #11853:
URL: https://github.com/apache/arrow/pull/11853#discussion_r777509436



##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -489,17 +872,57 @@ void RegisterVectorReplace(FunctionRegistry* registry) {
   }
   add_primitive_kernel(null());
   add_primitive_kernel(boolean());
-  add_kernel(Type::FIXED_SIZE_BINARY, ReplaceWithMaskFunctor<FixedSizeBinaryType>::Exec);
-  add_kernel(Type::DECIMAL128, ReplaceWithMaskFunctor<FixedSizeBinaryType>::Exec);
-  add_kernel(Type::DECIMAL256, ReplaceWithMaskFunctor<FixedSizeBinaryType>::Exec);
+  add_kernel(Type::FIXED_SIZE_BINARY, Functor<FixedSizeBinaryType>::Exec);
+  add_kernel(Type::DECIMAL128, Functor<FixedSizeBinaryType>::Exec);
+  add_kernel(Type::DECIMAL256, Functor<FixedSizeBinaryType>::Exec);
   for (const auto& ty : BaseBinaryTypes()) {
-    add_kernel(ty->id(), GenerateTypeAgnosticVarBinaryBase<ReplaceWithMaskFunctor>(*ty));
+    add_kernel(ty->id(), GenerateTypeAgnosticVarBinaryBase<Functor>(*ty));
   }
   // TODO: list types
   DCHECK_OK(registry->AddFunction(std::move(func)));
 
   // TODO(ARROW-9431): "replace_with_indices"
 }
+
+const FunctionDoc replace_with_mask_doc(
+    "Replace items selected with a mask",
+    ("Given an array and a boolean mask (either scalar or of equal length),\n"
+     "along with replacement values (either scalar or array),\n"
+     "each element of the array for which the corresponding mask element is\n"
+     "true will be replaced by the next value from the replacements,\n"
+     "or with null if the mask is null.\n"
+     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
+    {"values", "mask", "replacements"});
+
+const FunctionDoc fill_null_forward_doc(
+    "Carry non-null values forward to fill null slots",
+    ("Given an array, propagate last valid observation forward to next valid\n"
+     "or nothing if all previous values are null. "),

Review comment:
       nit: remove the trailing space (here and below).

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,406 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId get_id) {
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), current_chunk.offset,
+                              current_chunk.length, out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
+
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
+
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+  bool use_current_chunk = false;
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      use_current_chunk = true;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      if (block.popcount) {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(
+                  *current_chunk.type, out_values, write_value_offset,
+                  use_current_chunk ? current_chunk : last_valid_value_chunk,
+                  *last_valid_value_offset,
+                  /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            use_current_chunk = true;
+            *last_valid_value_offset = write_value_offset;
+          }
+        }
+      } else {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(
+                *current_chunk.type, out_values, write_value_offset,
+                use_current_chunk ? current_chunk : last_valid_value_chunk,
+                *last_valid_value_offset,
+                /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", Arity::Ternary(),
-                                               &replace_with_mask_doc);
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& current_chunk,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(current_chunk.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(current_chunk.length));
+    RETURN_NOT_OK(builder.ReserveData(current_chunk.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : current_chunk.length - 1;
+    const uint8_t* data = current_chunk.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = current_chunk.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    /*tuple for store: <use current_chunk(true) or last_valid_chunk(false),
+     * start offset of the current value, end offset for the current value>*/
+    std::vector<std::tuple<bool, int64_t, int64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, output->offset, current_chunk.length,
+        current_chunk.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+            } else {
+              const offset_type offset0 = offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/false, offset0, offset1 - offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/false, -1, -1));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); ++it) {
+        if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it)));
+        }
+      }
+    } else {
+      for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); ++it) {
+        if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it)));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = current_chunk.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullForwardArray(ctx, array_input, out, array_input,
+                                    &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented("Unsupported type for fill_null_forward: ",
+                                  batch[0].ToString());
+  }
+
+  static Status FillNullForwardArray(KernelContext* ctx, const ArrayData& array,
+                                     Datum* out, const ArrayData& last_valid_value_chunk,
+                                     int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+    int8_t direction = 1;
+
+    if (array.MayHaveNulls()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto null_bitmap,
+          arrow::internal::CopyBitmap(ctx->memory_pool(), array.buffers[0]->data(),
+                                      array.offset, array.length));
+      return FillNullExecutor<Type>::ExecFillNull(ctx, array, null_bitmap->data(), output,
+                                                  direction, last_valid_value_chunk,
+                                                  last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = array.length - 1;
+      }
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status FillNullForwardChunkedArray(KernelContext* ctx,
+                                            const std::shared_ptr<ChunkedArray>& values,
+                                            Datum* out) {
+    if (values->null_count() == 0) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    if (values->null_count() == values->length()) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+
+    ArrayVector new_chunks;
+    if (values->length() > 0) {
+      ArrayData* array_with_current = &*values->chunk(/*first_chunk=*/0)->data();
+      int64_t last_valid_value_offset = -1;
+
+      for (const auto& chunk : values->chunks()) {
+        if (is_fixed_width(out->type()->id())) {
+          auto* output = out->mutable_array();
+          auto bit_width = checked_cast<const FixedWidthType&>(*output->type).bit_width();
+          auto data_bytes = bit_util::BytesForBits(bit_width * chunk->length());
+          ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(chunk->length()));
+          ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(data_bytes));
+        }
+        DCHECK_OK(FillNullForwardArray(ctx, *chunk->data(), out, *array_with_current,

Review comment:
       DCHECK_OK will crash the program if an error occurs, instead of returning it to the caller.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,406 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId get_id) {
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), current_chunk.offset,
+                              current_chunk.length, out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
+
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
+
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+  bool use_current_chunk = false;
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      use_current_chunk = true;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      if (block.popcount) {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(
+                  *current_chunk.type, out_values, write_value_offset,
+                  use_current_chunk ? current_chunk : last_valid_value_chunk,
+                  *last_valid_value_offset,
+                  /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            use_current_chunk = true;
+            *last_valid_value_offset = write_value_offset;
+          }
+        }
+      } else {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(
+                *current_chunk.type, out_values, write_value_offset,
+                use_current_chunk ? current_chunk : last_valid_value_chunk,
+                *last_valid_value_offset,
+                /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", Arity::Ternary(),
-                                               &replace_with_mask_doc);
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& current_chunk,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(current_chunk.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(current_chunk.length));
+    RETURN_NOT_OK(builder.ReserveData(current_chunk.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : current_chunk.length - 1;
+    const uint8_t* data = current_chunk.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = current_chunk.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    /*tuple for store: <use current_chunk(true) or last_valid_chunk(false),
+     * start offset of the current value, end offset for the current value>*/
+    std::vector<std::tuple<bool, int64_t, int64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, output->offset, current_chunk.length,
+        current_chunk.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+            } else {
+              const offset_type offset0 = offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/false, offset0, offset1 - offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/false, -1, -1));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); ++it) {
+        if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it)));
+        }
+      }
+    } else {
+      for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); ++it) {
+        if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it)));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = current_chunk.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullForwardArray(ctx, array_input, out, array_input,
+                                    &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented("Unsupported type for fill_null_forward: ",
+                                  batch[0].ToString());
+  }
+
+  static Status FillNullForwardArray(KernelContext* ctx, const ArrayData& array,
+                                     Datum* out, const ArrayData& last_valid_value_chunk,
+                                     int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+    int8_t direction = 1;
+
+    if (array.MayHaveNulls()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto null_bitmap,
+          arrow::internal::CopyBitmap(ctx->memory_pool(), array.buffers[0]->data(),
+                                      array.offset, array.length));
+      return FillNullExecutor<Type>::ExecFillNull(ctx, array, null_bitmap->data(), output,
+                                                  direction, last_valid_value_chunk,
+                                                  last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = array.length - 1;
+      }
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status FillNullForwardChunkedArray(KernelContext* ctx,
+                                            const std::shared_ptr<ChunkedArray>& values,
+                                            Datum* out) {
+    if (values->null_count() == 0) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    if (values->null_count() == values->length()) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+
+    ArrayVector new_chunks;
+    if (values->length() > 0) {
+      ArrayData* array_with_current = &*values->chunk(/*first_chunk=*/0)->data();
+      int64_t last_valid_value_offset = -1;
+
+      for (const auto& chunk : values->chunks()) {
+        if (is_fixed_width(out->type()->id())) {
+          auto* output = out->mutable_array();
+          auto bit_width = checked_cast<const FixedWidthType&>(*output->type).bit_width();
+          auto data_bytes = bit_util::BytesForBits(bit_width * chunk->length());
+          ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(chunk->length()));
+          ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(data_bytes));
+        }
+        DCHECK_OK(FillNullForwardArray(ctx, *chunk->data(), out, *array_with_current,

Review comment:
       We use it below for kernel registration because we expect that to never fail, but we do expect kernels themselves to fail normally.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,414 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId get_id) {
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), current_chunk.offset,
+                              current_chunk.length, out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
+
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
+
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+  bool use_current_chunk = false;
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      use_current_chunk = true;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      if (block.popcount) {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(
+                  *current_chunk.type, out_values, write_value_offset,
+                  use_current_chunk ? current_chunk : last_valid_value_chunk,
+                  *last_valid_value_offset,
+                  /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            use_current_chunk = true;
+            *last_valid_value_offset = write_value_offset;
+          }
+        }
+      } else {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(
+                *current_chunk.type, out_values, write_value_offset,
+                use_current_chunk ? current_chunk : last_valid_value_chunk,
+                *last_valid_value_offset,
+                /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", Arity::Ternary(),
-                                               &replace_with_mask_doc);
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& current_chunk,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(current_chunk.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(current_chunk.length));
+    RETURN_NOT_OK(builder.ReserveData(current_chunk.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : current_chunk.length - 1;
+    const uint8_t* data = current_chunk.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = current_chunk.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    /*tuple for store: <use current_chunk(true) or last_valid_chunk(false),
+     * start offset of the current value, end offset for the current value>*/
+    std::vector<std::tuple<bool, int64_t, int64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, output->offset, current_chunk.length,
+        current_chunk.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {

Review comment:
       Ping here.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,406 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId get_id) {
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), current_chunk.offset,
+                              current_chunk.length, out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
+
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
+
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+  bool use_current_chunk = false;
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      use_current_chunk = true;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      if (block.popcount) {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(
+                  *current_chunk.type, out_values, write_value_offset,
+                  use_current_chunk ? current_chunk : last_valid_value_chunk,
+                  *last_valid_value_offset,
+                  /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            use_current_chunk = true;
+            *last_valid_value_offset = write_value_offset;
+          }
+        }
+      } else {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(
+                *current_chunk.type, out_values, write_value_offset,
+                use_current_chunk ? current_chunk : last_valid_value_chunk,
+                *last_valid_value_offset,
+                /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", Arity::Ternary(),
-                                               &replace_with_mask_doc);
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& current_chunk,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(current_chunk.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(current_chunk.length));
+    RETURN_NOT_OK(builder.ReserveData(current_chunk.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : current_chunk.length - 1;
+    const uint8_t* data = current_chunk.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = current_chunk.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    /*tuple for store: <use current_chunk(true) or last_valid_chunk(false),
+     * start offset of the current value, end offset for the current value>*/
+    std::vector<std::tuple<bool, int64_t, int64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, output->offset, current_chunk.length,
+        current_chunk.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+            } else {
+              const offset_type offset0 = offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/false, offset0, offset1 - offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/false, -1, -1));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); ++it) {
+        if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it)));
+        }
+      }
+    } else {
+      for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); ++it) {
+        if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it)));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = current_chunk.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullForwardArray(ctx, array_input, out, array_input,
+                                    &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented("Unsupported type for fill_null_forward: ",
+                                  batch[0].ToString());
+  }
+
+  static Status FillNullForwardArray(KernelContext* ctx, const ArrayData& array,
+                                     Datum* out, const ArrayData& last_valid_value_chunk,
+                                     int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+    int8_t direction = 1;
+
+    if (array.MayHaveNulls()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto null_bitmap,
+          arrow::internal::CopyBitmap(ctx->memory_pool(), array.buffers[0]->data(),
+                                      array.offset, array.length));
+      return FillNullExecutor<Type>::ExecFillNull(ctx, array, null_bitmap->data(), output,
+                                                  direction, last_valid_value_chunk,
+                                                  last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = array.length - 1;
+      }
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status FillNullForwardChunkedArray(KernelContext* ctx,
+                                            const std::shared_ptr<ChunkedArray>& values,
+                                            Datum* out) {
+    if (values->null_count() == 0) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    if (values->null_count() == values->length()) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+
+    ArrayVector new_chunks;
+    if (values->length() > 0) {
+      ArrayData* array_with_current = &*values->chunk(/*first_chunk=*/0)->data();
+      int64_t last_valid_value_offset = -1;
+
+      for (const auto& chunk : values->chunks()) {
+        if (is_fixed_width(out->type()->id())) {
+          auto* output = out->mutable_array();
+          auto bit_width = checked_cast<const FixedWidthType&>(*output->type).bit_width();
+          auto data_bytes = bit_util::BytesForBits(bit_width * chunk->length());
+          ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(chunk->length()));
+          ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(data_bytes));
+        }
+        DCHECK_OK(FillNullForwardArray(ctx, *chunk->data(), out, *array_with_current,

Review comment:
       Do not use DCHECK_OK for this. Use RETURN_NOT_OK.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,406 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId get_id) {
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), current_chunk.offset,
+                              current_chunk.length, out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
+
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
+
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+  bool use_current_chunk = false;
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      use_current_chunk = true;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      if (block.popcount) {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(
+                  *current_chunk.type, out_values, write_value_offset,
+                  use_current_chunk ? current_chunk : last_valid_value_chunk,
+                  *last_valid_value_offset,
+                  /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            use_current_chunk = true;
+            *last_valid_value_offset = write_value_offset;
+          }
+        }
+      } else {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(
+                *current_chunk.type, out_values, write_value_offset,
+                use_current_chunk ? current_chunk : last_valid_value_chunk,
+                *last_valid_value_offset,
+                /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", Arity::Ternary(),
-                                               &replace_with_mask_doc);
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& current_chunk,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(current_chunk.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(current_chunk.length));
+    RETURN_NOT_OK(builder.ReserveData(current_chunk.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : current_chunk.length - 1;
+    const uint8_t* data = current_chunk.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = current_chunk.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    /*tuple for store: <use current_chunk(true) or last_valid_chunk(false),
+     * start offset of the current value, end offset for the current value>*/
+    std::vector<std::tuple<bool, int64_t, int64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, output->offset, current_chunk.length,
+        current_chunk.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+            } else {
+              const offset_type offset0 = offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/false, offset0, offset1 - offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/false, -1, -1));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); ++it) {
+        if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it)));
+        }
+      }
+    } else {
+      for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); ++it) {
+        if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it)));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = current_chunk.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullForwardArray(ctx, array_input, out, array_input,
+                                    &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented("Unsupported type for fill_null_forward: ",
+                                  batch[0].ToString());
+  }
+
+  static Status FillNullForwardArray(KernelContext* ctx, const ArrayData& array,
+                                     Datum* out, const ArrayData& last_valid_value_chunk,
+                                     int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+    int8_t direction = 1;
+
+    if (array.MayHaveNulls()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto null_bitmap,
+          arrow::internal::CopyBitmap(ctx->memory_pool(), array.buffers[0]->data(),
+                                      array.offset, array.length));
+      return FillNullExecutor<Type>::ExecFillNull(ctx, array, null_bitmap->data(), output,
+                                                  direction, last_valid_value_chunk,
+                                                  last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = array.length - 1;
+      }
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status FillNullForwardChunkedArray(KernelContext* ctx,
+                                            const std::shared_ptr<ChunkedArray>& values,
+                                            Datum* out) {
+    if (values->null_count() == 0) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    if (values->null_count() == values->length()) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+
+    ArrayVector new_chunks;
+    if (values->length() > 0) {
+      ArrayData* array_with_current = &*values->chunk(/*first_chunk=*/0)->data();
+      int64_t last_valid_value_offset = -1;
+
+      for (const auto& chunk : values->chunks()) {
+        if (is_fixed_width(out->type()->id())) {
+          auto* output = out->mutable_array();
+          auto bit_width = checked_cast<const FixedWidthType&>(*output->type).bit_width();
+          auto data_bytes = bit_util::BytesForBits(bit_width * chunk->length());
+          ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(chunk->length()));
+          ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(data_bytes));
+        }
+        DCHECK_OK(FillNullForwardArray(ctx, *chunk->data(), out, *array_with_current,
+                                       &last_valid_value_offset));
+        if (chunk->null_count() != chunk->length()) {
+          array_with_current = &*chunk->data();
+        }
+        new_chunks.push_back(MakeArray(out->make_array()->data()->Copy()));
+      }
+    }
+
+    auto output = std::make_shared<ChunkedArray>(std::move(new_chunks), values->type());
+    *out = Datum(output);
+    return Status::OK();
+  }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId get_id) {
+    return KernelSignature::Make({InputType::Array(get_id.id)}, OutputType(FirstType));
+  }
+};
+
+template <typename Type>
+struct FillNullBackwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullBackwardArray(ctx, array_input, out, array_input,
+                                     &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullBackwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented("Unsupported type for fill_null_backward operation: ",
+                                  batch[0].ToString());
+  }
+
+  static Status FillNullBackwardArray(KernelContext* ctx, const ArrayData& array,
+                                      Datum* out, const ArrayData& last_valid_value_chunk,
+                                      int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+    int8_t direction = -1;
+
+    if (array.MayHaveNulls()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto reversed_bitmap,
+          arrow::internal::ReverseBitmap(ctx->memory_pool(), array.buffers[0]->data(),
+                                         array.offset, array.length));
+      return FillNullExecutor<Type>::ExecFillNull(
+          ctx, array, reversed_bitmap->data(), output, direction, last_valid_value_chunk,
+          last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = 0;
+      }
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status FillNullBackwardChunkedArray(KernelContext* ctx,
+                                             const std::shared_ptr<ChunkedArray>& values,
+                                             Datum* out) {
+    if (values->null_count() == 0) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    if (values->null_count() == values->length()) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    std::vector<std::shared_ptr<Array>> new_chunks;
+
+    if (values->length() > 0) {
+      auto chunks_length = values->chunks().size();
+      ArrayData* array_with_current =
+          &(*values->chunk(/*first_chunk=*/chunks_length - 1)->data());
+      int64_t last_valid_value_offset = -1;
+      auto chunks = values->chunks();
+      for (int i = chunks_length - 1; i >= 0; --i) {
+        const auto& chunk = chunks[i];
+        if (is_fixed_width(out->type()->id())) {
+          auto* output = out->mutable_array();
+          auto bit_width = checked_cast<const FixedWidthType&>(*output->type).bit_width();
+          auto data_bytes = bit_util::BytesForBits(bit_width * chunk->length());
+          ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(chunk->length()));
+          ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(data_bytes));
+        }
+        DCHECK_OK(FillNullBackwardArray(ctx, *chunk->data(), out, *array_with_current,
+                                        &last_valid_value_offset));
+        if (chunk->null_count() != chunk->length()) {
+          array_with_current = &*chunk->data();
+        }
+        new_chunks.push_back(MakeArray(out->make_array()->data()->Copy()));
+      }
+    }
+
+    reverse(new_chunks.begin(), new_chunks.end());

Review comment:
       nit: explicitly use `std::reverse`?

##########
File path: cpp/src/arrow/compute/kernels/vector_replace_test.cc
##########
@@ -798,5 +828,688 @@ TYPED_TEST(TestReplaceBinary, ReplaceWithMaskRandom) {
   }
 }
 
+template <typename T>
+class TestFillNullNumeric : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullDecimal : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBinary : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return default_type_instance<T>(); }
+};
+
+TYPED_TEST_SUITE(TestFillNullNumeric, NumericBasedTypes);
+TYPED_TEST_SUITE(TestFillNullDecimal, DecimalArrowTypes);
+TYPED_TEST_SUITE(TestFillNullBinary, BaseBinaryArrowTypes);
+
+TYPED_TEST(TestFillNullNumeric, FillNullValuesForward) {
+  this->AssertFillNullArray(FillForwardNull, this->array("[]"), this->array("[]"));
+
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, null, null, null]"),
+                            this->array("[null, null, null, null]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, null, null, 4]"),
+                            this->array("[null, null, null, 4]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, 4, null]"),
+                            this->array("[null, 4, 4]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, null, 4, null]"),
+                            this->array("[null, null, 4, 4]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, null, null, 4, null]"),
+                            this->array("[null, null, null, 4, 4]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[null, null, 4,null, 5, null]"),
+                            this->array("[null, null, 4, 4, 5, 5]"));
+
+  this->AssertFillNullArray(FillForwardNull, this->array("[1,4,null]"),
+                            this->array("[1,4,4]"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array("[1, 4, null, null, null, null]"),
+                            this->array("[1, 4 ,4, 4, 4, 4]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[1, 4, null, 5, null, null]"),
+                            this->array("[1, 4 ,4, 5, 5, 5]"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array("[1, 4, null, 5, null, null, 6]"),
+                            this->array("[1, 4 ,4, 5, 5, 5, 6]"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array("[1, 4, null, 5, null, null, 5]"),
+                            this->array("[1, 4 ,4, 5, 5, 5, 5]"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array("[1, 4, null, 5, null, 6, null]"),
+                            this->array("[1, 4 ,4, 5, 5, 6, 6]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[1, 4, null, 5, null, 6, 7]"),
+                            this->array("[1, 4 ,4, 5, 5, 6, 7]"));
+  this->AssertFillNullArray(FillForwardNull, this->array("[1, 4 ,4, 5, 5, 6, 7]"),
+                            this->array("[1, 4 ,4, 5, 5, 6, 7]"));
+}
+
+TYPED_TEST(TestFillNullDecimal, FillNullValuesForward) {
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([])"), this->array(R"([])"));
+
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, null, null, null])"),
+                            this->array(R"([null, null, null, null])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"([null, null, null, "30.00"])"),
+                            this->array(R"([null, null, null, "30.00"])"));
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, "30.00", null])"),
+                            this->array(R"([null, "30.00", "30.00"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"([null, null, "30.00", null])"),
+                            this->array(R"([null, null, "30.00", "30.00"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"([null, null, null, "30.00", null])"),
+                            this->array(R"([null, null, null, "30.00", "30.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"([null, null, "30.00",null, "5.00", null])"),
+      this->array(R"([null, null, "30.00", "30.00", "5.00", "5.00"])"));
+
+  this->AssertFillNullArray(FillForwardNull, this->array(R"(["10.00","30.00",null])"),
+                            this->array(R"(["10.00","30.00","30.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["10.00", "30.00", null, null, null, null])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "30.00", "30.00", "30.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["10.00", "30.00", null, "5.00", null, null])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "5.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["10.00", "30.00", null, "5.00", null, null, "6.00"])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "5.00", "6.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["10.00", "30.00", null, "5.00", null, null, "5.00"])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "5.00", "5.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["10.00", "30.00", null, "5.00", null, "6.00", null])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "6.00", "6.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["10.00", "30.00", null, "5.00", null, "6.00", "7.00"])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "6.00", "7.00"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "6.00", "7.00"])"),
+      this->array(R"(["10.00", "30.00" ,"30.00", "5.00", "5.00", "6.00", "7.00"])"));
+}
+
+TYPED_TEST(TestFillNullBinary, FillNullValuesForward) {
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([])"), this->array(R"([])"));
+
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, null, null, null])"),
+                            this->array(R"([null, null, null, null])"));
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, null, null, "ccc"])"),
+                            this->array(R"([null, null, null, "ccc"])"));
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, "ccc", null])"),
+                            this->array(R"([null, "ccc", "ccc"])"));
+  this->AssertFillNullArray(FillForwardNull, this->array(R"([null, null, "ccc", null])"),
+                            this->array(R"([null, null, "ccc", "ccc"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"([null, null, null, "ccc", null])"),
+                            this->array(R"([null, null, null, "ccc", "ccc"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"([null, null, "ccc",null, "xyz", null])"),
+                            this->array(R"([null, null, "ccc", "ccc", "xyz", "xyz"])"));
+
+  this->AssertFillNullArray(FillForwardNull, this->array(R"(["aaa","ccc",null])"),
+                            this->array(R"(["aaa","ccc","ccc"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"(["aaa", "ccc", null, null, null, null])"),
+                            this->array(R"(["aaa", "ccc" ,"ccc", "ccc", "ccc", "ccc"])"));
+  this->AssertFillNullArray(FillForwardNull,
+                            this->array(R"(["aaa", "ccc", null, "xyz", null, null])"),
+                            this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "xyz"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["aaa", "ccc", null, "xyz", null, null, "qwert"])"),
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "xyz", "qwert"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["aaa", "ccc", null, "xyz", null, null, "xyz"])"),
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "xyz", "xyz"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["aaa", "ccc", null, "xyz", null, "qwert", null])"),
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "qwert", "qwert"])"));
+  this->AssertFillNullArray(
+      FillForwardNull, this->array(R"(["aaa", "ccc", null, "xyz", null, "qwert", "uy"])"),
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "qwert", "uy"])"));
+  this->AssertFillNullArray(
+      FillForwardNull,
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "qwert", "uy"])"),
+      this->array(R"(["aaa", "ccc" ,"ccc", "xyz", "xyz", "qwert", "uy"])"));
+}
+
+TYPED_TEST(TestFillNullNumeric, FillNullValuesBackward) {
+  this->AssertFillNullArray(FillBackwardNull, this->array("[]"), this->array("[]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, null, null, null]"),
+                            this->array("[null, null, null, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, 4, null, null, null]"),
+                            this->array("[4, 4,null, null, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, null, null, 4]"),
+                            this->array("[4, 4, 4, 4]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, 4, null]"),
+                            this->array("[4, 4, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, null, 4, null]"),
+                            this->array("[4, 4, 4, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[null, null, null, 4, null]"),
+                            this->array("[4, 4, 4, 4, null]"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array("[null, null, 4,null, 5, null]"),
+                            this->array("[4, 4, 4, 5, 5, null]"));
+
+  this->AssertFillNullArray(FillBackwardNull, this->array("[1, 4, null]"),
+                            this->array("[1, 4, null]"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array("[1, 4, null, null, null, null]"),
+                            this->array("[1, 4 ,null, null, null, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[1, 4, null, 5, null, null]"),
+                            this->array("[1, 4 , 5, 5, null, null]"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array("[1, 4, null, 5, null, null, 6]"),
+                            this->array("[1, 4 ,5, 5, 6, 6, 6]"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array("[1, 4, null, 5, null, null, 5]"),
+                            this->array("[1, 4 ,5 , 5, 5, 5, 5]"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array("[1, 4, null, 5, null, 6, null]"),
+                            this->array("[1, 4 ,5 , 5, 6, 6, null]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[1, 4, null, 5, null, 6, 7]"),
+                            this->array("[1, 4 ,5, 5, 6, 6, 7]"));
+  this->AssertFillNullArray(FillBackwardNull, this->array("[1, 4 ,5, 5, 6, 6, 7]"),
+                            this->array("[1, 4 ,5, 5, 6, 6, 7]"));
+}
+
+TYPED_TEST(TestFillNullDecimal, FillNullValuesBackward) {
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([])"), this->array(R"([])"));
+
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, null, null, null])"),
+                            this->array(R"([null, null, null, null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, "40.00", null, null, null])"),
+                            this->array(R"(["40.00", "40.00",null, null, null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, null, null, "40.00"])"),
+                            this->array(R"(["40.00", "40.00", "40.00", "40.00"])"));
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, "40.00", null])"),
+                            this->array(R"(["40.00", "40.00", null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, null, "40.00", null])"),
+                            this->array(R"(["40.00", "40.00", "40.00", null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, null, null, "40.00", null])"),
+                            this->array(R"(["40.00", "40.00", "40.00", "40.00", null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull, this->array(R"([null, null, "40.00",null, "50.00", null])"),
+      this->array(R"(["40.00", "40.00", "40.00", "50.00", "50.00", null])"));
+
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"(["10.00", "40.00", null])"),
+                            this->array(R"(["10.00", "40.00", null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"(["10.00", "40.00", null, null, null, null])"),
+                            this->array(R"(["10.00", "40.00" ,null, null, null, null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull, this->array(R"(["10.00", "40.00", null, "50.00", null, null])"),
+      this->array(R"(["10.00", "40.00" , "50.00", "50.00", null, null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["10.00", "40.00", null, "50.00", null, null, "6.00"])"),
+      this->array(R"(["10.00", "40.00" ,"50.00", "50.00", "6.00", "6.00", "6.00"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["10.00", "40.00", null, "50.00", null, null, "50.00"])"),
+      this->array(R"(["10.00", "40.00" ,"50.00" , "50.00", "50.00", "50.00", "50.00"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["10.00", "40.00", null, "50.00", null, "6.00", null])"),
+      this->array(R"(["10.00", "40.00" ,"50.00" , "50.00", "6.00", "6.00", null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["10.00", "40.00", null, "50.00", null, "6.00", "7.00"])"),
+      this->array(R"(["10.00", "40.00" ,"50.00", "50.00", "6.00", "6.00", "7.00"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["10.00", "40.00" ,"50.00", "50.00", "6.00", "6.00", "7.00"])"),
+      this->array(R"(["10.00", "40.00" ,"50.00", "50.00", "6.00", "6.00", "7.00"])"));
+}
+
+TYPED_TEST(TestFillNullBinary, FillNullValuesBackward) {
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([])"), this->array(R"([])"));
+
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, null, null, null])"),
+                            this->array(R"([null, null, null, null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, "afd", null, null, null])"),
+                            this->array(R"(["afd", "afd",null, null, null])"));
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, null, null, "afd"])"),
+                            this->array(R"(["afd", "afd", "afd", "afd"])"));
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, "afd", null])"),
+                            this->array(R"(["afd", "afd", null])"));
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"([null, null, "afd", null])"),
+                            this->array(R"(["afd", "afd", "afd", null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, null, null, "afd", null])"),
+                            this->array(R"(["afd", "afd", "afd", "afd", null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"([null, null, "afd",null, "qwe", null])"),
+                            this->array(R"(["afd", "afd", "afd", "qwe", "qwe", null])"));
+
+  this->AssertFillNullArray(FillBackwardNull, this->array(R"(["tyu", "afd", null])"),
+                            this->array(R"(["tyu", "afd", null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"(["tyu", "afd", null, null, null, null])"),
+                            this->array(R"(["tyu", "afd" ,null, null, null, null])"));
+  this->AssertFillNullArray(FillBackwardNull,
+                            this->array(R"(["tyu", "afd", null, "qwe", null, null])"),
+                            this->array(R"(["tyu", "afd" , "qwe", "qwe", null, null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["tyu", "afd", null, "qwe", null, null, "oiutyu"])"),
+      this->array(R"(["tyu", "afd" ,"qwe", "qwe", "oiutyu", "oiutyu", "oiutyu"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull, this->array(R"(["tyu", "afd", null, "qwe", null, null, "qwe"])"),
+      this->array(R"(["tyu", "afd" ,"qwe" , "qwe", "qwe", "qwe", "qwe"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["tyu", "afd", null, "qwe", null, "oiutyu", null])"),
+      this->array(R"(["tyu", "afd" ,"qwe" , "qwe", "oiutyu", "oiutyu", null])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["tyu", "afd", null, "qwe", null, "oiutyu", "aaaagggbbb"])"),
+      this->array(R"(["tyu", "afd" ,"qwe", "qwe", "oiutyu", "oiutyu", "aaaagggbbb"])"));
+  this->AssertFillNullArray(
+      FillBackwardNull,
+      this->array(R"(["tyu", "afd" ,"qwe", "qwe", "oiutyu", "oiutyu", "aaaagggbbb"])"),
+      this->array(R"(["tyu", "afd" ,"qwe", "qwe", "oiutyu", "oiutyu", "aaaagggbbb"])"));
+}
+
+// For Test Blocks
+TYPED_TEST(TestFillNullNumeric, FillNullForwardLargeInput) {
+  using CType = typename TypeTraits<TypeParam>::CType;
+  random::RandomArrayGenerator rand(/*seed=*/1000);
+  int64_t len_null = 500;
+  int64_t len_random = 1000;
+  std::shared_ptr<Array> array_random =
+      rand.Numeric<TypeParam>(len_random, /*min=*/0, /*max=*/200, /*nulls=*/0);
+
+  if (array_random) {

Review comment:
       @AlvinJ15: what is the reason for this check? If there is one, please let me know - I'm just wondering why it's here.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -442,23 +442,406 @@ struct ReplaceWithMaskFunctor {
     }
     return ReplaceWithMask<Type>::ExecArrayMask(ctx, array, mask, replacements, output);
   }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId get_id) {
+    return KernelSignature::Make(
+        {InputType::Array(get_id.id), InputType(boolean()), InputType(get_id.id)},
+        OutputType(FirstType));
+  }
 };
 
-}  // namespace
+// This is for fixed-size types only
+template <typename Type>
+void FillNullInDirectionImpl(const ArrayData& current_chunk, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t direction,
+                             const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(current_chunk.buffers[0]->data(), current_chunk.offset,
+                              current_chunk.length, out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*current_chunk.type, out_values,
+                                  /*out_offset=*/output->offset, current_chunk,
+                                  /*in_offset=*/0, current_chunk.length);
+
+  bool has_fill_value = *last_valid_value_offset != -1;
+  int64_t write_offset = direction == 1 ? 0 : current_chunk.length - 1;
+  int64_t bitmap_offset = 0;
+
+  arrow::internal::OptionalBitBlockCounter counter(null_bitmap, output->offset,
+                                                   current_chunk.length);
+  bool use_current_chunk = false;
+  while (bitmap_offset < current_chunk.length) {
+    BitBlockCount block = counter.NextBlock();
+    if (block.AllSet()) {
+      *last_valid_value_offset =
+          write_offset + direction * (block.length - 1 + bitmap_offset);
+      has_fill_value = true;
+      use_current_chunk = true;
+    } else {
+      uint64_t block_start_offset = write_offset + direction * bitmap_offset;
+      uint64_t write_value_offset = block_start_offset;
+      if (block.popcount) {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset + i);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(
+                  *current_chunk.type, out_values, write_value_offset,
+                  use_current_chunk ? current_chunk : last_valid_value_chunk,
+                  *last_valid_value_offset,
+                  /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            use_current_chunk = true;
+            *last_valid_value_offset = write_value_offset;
+          }
+        }
+      } else {
+        for (int64_t i = 0; i < block.length; i++, write_value_offset += direction) {
+          if (has_fill_value) {
+            ReplaceWithMask<Type>::CopyData(
+                *current_chunk.type, out_values, write_value_offset,
+                use_current_chunk ? current_chunk : last_valid_value_chunk,
+                *last_valid_value_offset,
+                /*length=*/1);
+            bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+          }
+        }
+      }
+    }
+    bitmap_offset += block.length;
+  }
+  output->null_count = -1;
+  output->GetNullCount();
+}
 
-const FunctionDoc replace_with_mask_doc(
-    "Replace items selected with a mask",
-    ("Given an array and a boolean mask (either scalar or of equal length),\n"
-     "along with replacement values (either scalar or array),\n"
-     "each element of the array for which the corresponding mask element is\n"
-     "true will be replaced by the next value from the replacements,\n"
-     "or with null if the mask is null.\n"
-     "Hence, for replacement arrays, len(replacements) == sum(mask == true)."),
-    {"values", "mask", "replacements"});
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
 
-void RegisterVectorReplace(FunctionRegistry* registry) {
-  auto func = std::make_shared<VectorFunction>("replace_with_mask", Arity::Ternary(),
-                                               &replace_with_mask_doc);
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    FillNullInDirectionImpl<Type>(array, reversed_bitmap, output, direction,
+                                  last_valid_value_chunk, last_valid_value_offset);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& current_chunk,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    BuilderType builder(current_chunk.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(current_chunk.length));
+    RETURN_NOT_OK(builder.ReserveData(current_chunk.buffers[2]->size()));
+    int64_t array_value_index = direction == 1 ? 0 : current_chunk.length - 1;
+    const uint8_t* data = current_chunk.buffers[2]->data();
+    const uint8_t* data_prev = last_valid_value_chunk.buffers[2]->data();
+    const offset_type* offsets = current_chunk.GetValues<offset_type>(1);
+    const offset_type* offsets_prev = last_valid_value_chunk.GetValues<offset_type>(1);
+
+    bool has_fill_value_last_chunk = *last_valid_value_offset != -1;
+    bool has_fill_value_current_chunk = false;
+    /*tuple for store: <use current_chunk(true) or last_valid_chunk(false),
+     * start offset of the current value, end offset for the current value>*/
+    std::vector<std::tuple<bool, int64_t, int64_t>> offsets_reversed;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reversed_bitmap, output->offset, current_chunk.length,
+        current_chunk.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reversed.push_back(
+              std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+          *last_valid_value_offset = array_value_index;
+          has_fill_value_current_chunk = true;
+          has_fill_value_last_chunk = false;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value_current_chunk || has_fill_value_last_chunk) {
+            if (!has_fill_value_last_chunk) {
+              const offset_type offset0 = offsets[*last_valid_value_offset];
+              const offset_type offset1 = offsets[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/true, offset0, offset1 - offset0));
+            } else {
+              const offset_type offset0 = offsets_prev[*last_valid_value_offset];
+              const offset_type offset1 = offsets_prev[*last_valid_value_offset + 1];
+              offsets_reversed.push_back(
+                  std::make_tuple(/*current_chunk=*/false, offset0, offset1 - offset0));
+            }
+          } else {
+            offsets_reversed.push_back(std::make_tuple(/*current_chunk=*/false, -1, -1));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reversed.begin(); it != offsets_reversed.end(); ++it) {
+        if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it)));
+        }
+      }
+    } else {
+      for (auto it = offsets_reversed.rbegin(); it != offsets_reversed.rend(); ++it) {
+        if (std::get<1>(*it) == -1 && std::get<2>(*it) == -1) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (std::get<0>(*it)) {
+          RETURN_NOT_OK(builder.Append(data + std::get<1>(*it), std::get<2>(*it)));
+        } else {
+          RETURN_NOT_OK(builder.Append(data_prev + std::get<1>(*it), std::get<2>(*it)));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = current_chunk.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reversed_bitmap, ArrayData* output,
+                             int8_t direction, const ArrayData& last_valid_value_chunk,
+                             int64_t* last_valid_value_offset) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullForwardArray(ctx, array_input, out, array_input,
+                                    &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullForwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented("Unsupported type for fill_null_forward: ",
+                                  batch[0].ToString());
+  }
+
+  static Status FillNullForwardArray(KernelContext* ctx, const ArrayData& array,
+                                     Datum* out, const ArrayData& last_valid_value_chunk,
+                                     int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+    int8_t direction = 1;
+
+    if (array.MayHaveNulls()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto null_bitmap,
+          arrow::internal::CopyBitmap(ctx->memory_pool(), array.buffers[0]->data(),
+                                      array.offset, array.length));
+      return FillNullExecutor<Type>::ExecFillNull(ctx, array, null_bitmap->data(), output,
+                                                  direction, last_valid_value_chunk,
+                                                  last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = array.length - 1;
+      }
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status FillNullForwardChunkedArray(KernelContext* ctx,
+                                            const std::shared_ptr<ChunkedArray>& values,
+                                            Datum* out) {
+    if (values->null_count() == 0) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    if (values->null_count() == values->length()) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+
+    ArrayVector new_chunks;
+    if (values->length() > 0) {
+      ArrayData* array_with_current = &*values->chunk(/*first_chunk=*/0)->data();
+      int64_t last_valid_value_offset = -1;
+
+      for (const auto& chunk : values->chunks()) {
+        if (is_fixed_width(out->type()->id())) {
+          auto* output = out->mutable_array();
+          auto bit_width = checked_cast<const FixedWidthType&>(*output->type).bit_width();
+          auto data_bytes = bit_util::BytesForBits(bit_width * chunk->length());
+          ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(chunk->length()));
+          ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(data_bytes));
+        }
+        DCHECK_OK(FillNullForwardArray(ctx, *chunk->data(), out, *array_with_current,
+                                       &last_valid_value_offset));
+        if (chunk->null_count() != chunk->length()) {
+          array_with_current = &*chunk->data();
+        }
+        new_chunks.push_back(MakeArray(out->make_array()->data()->Copy()));
+      }
+    }
+
+    auto output = std::make_shared<ChunkedArray>(std::move(new_chunks), values->type());
+    *out = Datum(output);
+    return Status::OK();
+  }
+
+  static std::shared_ptr<KernelSignature> GetSignature(detail::GetTypeId get_id) {
+    return KernelSignature::Make({InputType::Array(get_id.id)}, OutputType(FirstType));
+  }
+};
+
+template <typename Type>
+struct FillNullBackwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto array_input = *batch[0].array();
+        int64_t last_valid_value_offset = -1;
+        return FillNullBackwardArray(ctx, array_input, out, array_input,
+                                     &last_valid_value_offset);
+      }
+      case Datum::CHUNKED_ARRAY: {
+        return FillNullBackwardChunkedArray(ctx, batch[0].chunked_array(), out);
+      }
+      default:
+        break;
+    }
+    return Status::NotImplemented("Unsupported type for fill_null_backward operation: ",
+                                  batch[0].ToString());
+  }
+
+  static Status FillNullBackwardArray(KernelContext* ctx, const ArrayData& array,
+                                      Datum* out, const ArrayData& last_valid_value_chunk,
+                                      int64_t* last_valid_value_offset) {
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+    int8_t direction = -1;
+
+    if (array.MayHaveNulls()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto reversed_bitmap,
+          arrow::internal::ReverseBitmap(ctx->memory_pool(), array.buffers[0]->data(),
+                                         array.offset, array.length));
+      return FillNullExecutor<Type>::ExecFillNull(
+          ctx, array, reversed_bitmap->data(), output, direction, last_valid_value_chunk,
+          last_valid_value_offset);
+    } else {
+      if (array.length > 0) {
+        *last_valid_value_offset = 0;
+      }
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status FillNullBackwardChunkedArray(KernelContext* ctx,
+                                             const std::shared_ptr<ChunkedArray>& values,
+                                             Datum* out) {
+    if (values->null_count() == 0) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    if (values->null_count() == values->length()) {
+      *out = Datum(values);
+      return Status::OK();
+    }
+    std::vector<std::shared_ptr<Array>> new_chunks;
+
+    if (values->length() > 0) {
+      auto chunks_length = values->chunks().size();
+      ArrayData* array_with_current =
+          &(*values->chunk(/*first_chunk=*/chunks_length - 1)->data());
+      int64_t last_valid_value_offset = -1;
+      auto chunks = values->chunks();
+      for (int i = chunks_length - 1; i >= 0; --i) {
+        const auto& chunk = chunks[i];
+        if (is_fixed_width(out->type()->id())) {
+          auto* output = out->mutable_array();
+          auto bit_width = checked_cast<const FixedWidthType&>(*output->type).bit_width();
+          auto data_bytes = bit_util::BytesForBits(bit_width * chunk->length());
+          ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(chunk->length()));
+          ARROW_ASSIGN_OR_RAISE(output->buffers[1], ctx->Allocate(data_bytes));
+        }
+        DCHECK_OK(FillNullBackwardArray(ctx, *chunk->data(), out, *array_with_current,

Review comment:
       Ditto here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org