You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/07/04 20:28:24 UTC

[GitHub] [arrow] wesm commented on a change in pull request #7635: ARROW-1587: [C++] implement fill null

wesm commented on a change in pull request #7635:
URL: https://github.com/apache/arrow/pull/7635#discussion_r449802049



##########
File path: cpp/src/arrow/compute/api_scalar.cc
##########
@@ -126,5 +126,24 @@ Result<Datum> Compare(const Datum& left, const Datum& right, CompareOptions opti
 SCALAR_EAGER_UNARY(IsValid, "is_valid")
 SCALAR_EAGER_UNARY(IsNull, "is_null")
 
+Result<Datum> FillNull(const Datum& values, const Datum& fill_value, ExecContext* ctx) {
+  if (!values.is_arraylike()) {
+    return Status::Invalid("Values must be Array or ChunkedArray");
+  }
+
+  if (!fill_value.is_scalar()) {
+    return Status::Invalid("fill value must be a scalar");
+  }
+
+  if (!values.type()->Equals(fill_value.type())) {
+    std::stringstream ss;
+    ss << "Array type didn't match type of fill value: " << values.type()->ToString()
+       << " vs " << fill_value.type()->ToString();
+    return Status::Invalid(ss.str());
+  }

Review comment:
       None of these input validation checks should be here. Instead, the kernel should be implemented as an `Arity::Binary()` kernel with input validation handled by the kernel dispatch / executor layer. It's fine if the initial version has the type signature `Array/Scalar` instead of `Any/Any`

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null.cc
##########
@@ -0,0 +1,223 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_writer.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+namespace compute {
+namespace internal {
+namespace {
+
+template <typename T, typename R = void>
+using enable_if_supports_fill_null = enable_if_t<has_c_type<T>::value, R>;
+
+template <typename Type>
+struct FillNullState : public KernelState {
+  explicit FillNullState(MemoryPool* pool) {}
+
+  Status Init(const FillNullOptions& options) {
+    fill_value = options.fill_value.scalar();
+    return Status::OK();
+  }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+template <>
+struct FillNullState<NullType> : public KernelState {
+  explicit FillNullState(MemoryPool*) {}
+
+  Status Init(const FillNullOptions& options) { return Status::OK(); }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+struct InitFillNullStateVisitor {
+  KernelContext* ctx;
+  const FillNullOptions* options;
+  std::unique_ptr<KernelState> result;
+
+  InitFillNullStateVisitor(KernelContext* ctx, const FillNullOptions* options)
+      : ctx(ctx), options(options) {}
+
+  template <typename Type>
+  Status Init() {
+    using StateType = FillNullState<Type>;
+    result.reset(new StateType(ctx->exec_context()->memory_pool()));
+    return static_cast<StateType*>(result.get())->Init(*options);
+  }
+
+  Status Visit(const DataType&) { return Init<NullType>(); }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    return Init<Type>();
+  }
+
+  Status GetResult(std::unique_ptr<KernelState>* out) {
+    RETURN_NOT_OK(VisitTypeInline(*options->fill_value.type(), this));
+    *out = std::move(result);
+    return Status::OK();
+  }
+};
+
+std::unique_ptr<KernelState> InitFillNull(KernelContext* ctx,
+                                          const KernelInitArgs& args) {
+  InitFillNullStateVisitor visitor{ctx,
+                                   static_cast<const FillNullOptions*>(args.options)};
+  std::unique_ptr<KernelState> result;
+  ctx->SetStatus(visitor.GetResult(&result));
+  return result;
+}
+
+struct ScalarFillVisitor {
+  KernelContext* ctx;
+  const ArrayData& data;
+  Datum* out;
+
+  ScalarFillVisitor(KernelContext* ctx, const ArrayData& data, Datum* out)
+      : ctx(ctx), data(data), out(out) {}
+
+  Status Visit(const DataType&) {
+    ArrayData* out_arr = out->mutable_array();
+    *out_arr = data;
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType&) {
+    const auto& state = checked_cast<const FillNullState<BooleanType>&>(*ctx->state());
+    bool value = UnboxScalar<BooleanType>::Unbox(*state.fill_value);
+    ArrayData* out_arr = out->mutable_array();
+    FirstTimeBitmapWriter bit_writer(out_arr->buffers[1]->mutable_data(), out_arr->offset,
+                                     out_arr->length);
+    FirstTimeBitmapWriter bit_writer_validity(out_arr->buffers[0]->mutable_data(),
+                                              out_arr->offset, out_arr->length);
+    if (data.null_count != 0) {
+      BitmapReader bit_reader(data.buffers[1]->data(), data.offset, data.length);
+      BitmapReader bit_reader_validity(data.buffers[0]->data(), data.offset, data.length);
+      for (int64_t i = 0; i < data.length; i++) {
+        if (bit_reader_validity.IsNotSet()) {
+          if (value == true) {
+            bit_writer.Set();
+          } else {
+            bit_writer.Clear();
+          }
+          bit_writer_validity.Set();
+        } else {
+          if (bit_reader.IsSet()) {
+            bit_writer.Set();
+          } else {
+            bit_writer.Clear();
+          }
+          bit_writer_validity.Set();
+        }
+        bit_reader.Next();
+        bit_writer.Next();
+        bit_reader_validity.Next();
+        bit_writer_validity.Next();
+      }
+      bit_writer_validity.Finish();
+      bit_writer.Finish();
+    } else {
+      *out_arr = data;
+    }
+    return Status::OK();
+  }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    using T = typename GetViewType<Type>::T;
+    const auto& state = checked_cast<const FillNullState<Type>&>(*ctx->state());
+    T value = UnboxScalar<Type>::Unbox(*state.fill_value);
+    const T* in_data = data.GetValues<T>(1);
+    ArrayData* out_arr = out->mutable_array();
+    auto out_data = out_arr->GetMutableValues<T>(1);
+
+    if (data.null_count != 0) {
+      BitmapReader bit_reader(data.buffers[0]->data(), data.offset, data.length);
+      for (int64_t i = 0; i < data.length; i++) {
+        if (bit_reader.IsNotSet()) {

Review comment:
       We'll want to rework this to use BitBlockCounter (see `VisitBitBlocksVoid` in arrow/visitor_inline.h)

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null.cc
##########
@@ -0,0 +1,223 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_writer.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+namespace compute {
+namespace internal {
+namespace {
+
+template <typename T, typename R = void>
+using enable_if_supports_fill_null = enable_if_t<has_c_type<T>::value, R>;
+
+template <typename Type>
+struct FillNullState : public KernelState {
+  explicit FillNullState(MemoryPool* pool) {}
+
+  Status Init(const FillNullOptions& options) {
+    fill_value = options.fill_value.scalar();
+    return Status::OK();
+  }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+template <>
+struct FillNullState<NullType> : public KernelState {
+  explicit FillNullState(MemoryPool*) {}
+
+  Status Init(const FillNullOptions& options) { return Status::OK(); }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+struct InitFillNullStateVisitor {
+  KernelContext* ctx;
+  const FillNullOptions* options;
+  std::unique_ptr<KernelState> result;
+
+  InitFillNullStateVisitor(KernelContext* ctx, const FillNullOptions* options)
+      : ctx(ctx), options(options) {}
+
+  template <typename Type>
+  Status Init() {
+    using StateType = FillNullState<Type>;
+    result.reset(new StateType(ctx->exec_context()->memory_pool()));
+    return static_cast<StateType*>(result.get())->Init(*options);
+  }
+
+  Status Visit(const DataType&) { return Init<NullType>(); }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    return Init<Type>();
+  }
+
+  Status GetResult(std::unique_ptr<KernelState>* out) {
+    RETURN_NOT_OK(VisitTypeInline(*options->fill_value.type(), this));
+    *out = std::move(result);
+    return Status::OK();
+  }
+};
+
+std::unique_ptr<KernelState> InitFillNull(KernelContext* ctx,
+                                          const KernelInitArgs& args) {
+  InitFillNullStateVisitor visitor{ctx,
+                                   static_cast<const FillNullOptions*>(args.options)};
+  std::unique_ptr<KernelState> result;
+  ctx->SetStatus(visitor.GetResult(&result));
+  return result;
+}
+
+struct ScalarFillVisitor {
+  KernelContext* ctx;
+  const ArrayData& data;
+  Datum* out;
+
+  ScalarFillVisitor(KernelContext* ctx, const ArrayData& data, Datum* out)
+      : ctx(ctx), data(data), out(out) {}
+
+  Status Visit(const DataType&) {
+    ArrayData* out_arr = out->mutable_array();
+    *out_arr = data;
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType&) {
+    const auto& state = checked_cast<const FillNullState<BooleanType>&>(*ctx->state());
+    bool value = UnboxScalar<BooleanType>::Unbox(*state.fill_value);
+    ArrayData* out_arr = out->mutable_array();
+    FirstTimeBitmapWriter bit_writer(out_arr->buffers[1]->mutable_data(), out_arr->offset,
+                                     out_arr->length);
+    FirstTimeBitmapWriter bit_writer_validity(out_arr->buffers[0]->mutable_data(),
+                                              out_arr->offset, out_arr->length);

Review comment:
       The output is never null (unless the fill value is null), right? So you don't need to touch the validity bitmap. In fact, the default mode of the kernel should not allocate one (also for reasons of zero copy, I will comment below)

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null.cc
##########
@@ -0,0 +1,223 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_writer.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+namespace compute {
+namespace internal {
+namespace {
+
+template <typename T, typename R = void>
+using enable_if_supports_fill_null = enable_if_t<has_c_type<T>::value, R>;
+
+template <typename Type>
+struct FillNullState : public KernelState {
+  explicit FillNullState(MemoryPool* pool) {}
+
+  Status Init(const FillNullOptions& options) {
+    fill_value = options.fill_value.scalar();
+    return Status::OK();
+  }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+template <>
+struct FillNullState<NullType> : public KernelState {
+  explicit FillNullState(MemoryPool*) {}
+
+  Status Init(const FillNullOptions& options) { return Status::OK(); }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+struct InitFillNullStateVisitor {
+  KernelContext* ctx;
+  const FillNullOptions* options;
+  std::unique_ptr<KernelState> result;
+
+  InitFillNullStateVisitor(KernelContext* ctx, const FillNullOptions* options)
+      : ctx(ctx), options(options) {}
+
+  template <typename Type>
+  Status Init() {
+    using StateType = FillNullState<Type>;
+    result.reset(new StateType(ctx->exec_context()->memory_pool()));
+    return static_cast<StateType*>(result.get())->Init(*options);
+  }
+
+  Status Visit(const DataType&) { return Init<NullType>(); }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    return Init<Type>();
+  }
+
+  Status GetResult(std::unique_ptr<KernelState>* out) {
+    RETURN_NOT_OK(VisitTypeInline(*options->fill_value.type(), this));
+    *out = std::move(result);
+    return Status::OK();
+  }
+};
+
+std::unique_ptr<KernelState> InitFillNull(KernelContext* ctx,
+                                          const KernelInitArgs& args) {
+  InitFillNullStateVisitor visitor{ctx,
+                                   static_cast<const FillNullOptions*>(args.options)};
+  std::unique_ptr<KernelState> result;
+  ctx->SetStatus(visitor.GetResult(&result));
+  return result;
+}

Review comment:
       Per above, I think this kernel would be better implemented without a KernelInit function

##########
File path: cpp/src/arrow/compute/api_scalar.h
##########
@@ -259,6 +259,27 @@ Result<Datum> IsValid(const Datum& values, ExecContext* ctx = NULLPTR);
 ARROW_EXPORT
 Result<Datum> IsNull(const Datum& values, ExecContext* ctx = NULLPTR);
 
+struct ARROW_EXPORT FillNullOptions : public FunctionOptions {
+  explicit FillNullOptions(Datum fill_value) : fill_value(std::move(fill_value)) {}
+
+  Datum fill_value;
+};

Review comment:
       See comments above. I think this should be implemented as a binary kernel since the "fill values" could be provided by an array. This also will allow the execution layer to (in the near future) insert implicit casts where needed

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null.cc
##########
@@ -0,0 +1,223 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_writer.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+namespace compute {
+namespace internal {
+namespace {
+
+template <typename T, typename R = void>
+using enable_if_supports_fill_null = enable_if_t<has_c_type<T>::value, R>;
+
+template <typename Type>
+struct FillNullState : public KernelState {
+  explicit FillNullState(MemoryPool* pool) {}
+
+  Status Init(const FillNullOptions& options) {
+    fill_value = options.fill_value.scalar();
+    return Status::OK();
+  }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+template <>
+struct FillNullState<NullType> : public KernelState {
+  explicit FillNullState(MemoryPool*) {}
+
+  Status Init(const FillNullOptions& options) { return Status::OK(); }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+struct InitFillNullStateVisitor {
+  KernelContext* ctx;
+  const FillNullOptions* options;
+  std::unique_ptr<KernelState> result;
+
+  InitFillNullStateVisitor(KernelContext* ctx, const FillNullOptions* options)
+      : ctx(ctx), options(options) {}
+
+  template <typename Type>
+  Status Init() {
+    using StateType = FillNullState<Type>;
+    result.reset(new StateType(ctx->exec_context()->memory_pool()));
+    return static_cast<StateType*>(result.get())->Init(*options);
+  }
+
+  Status Visit(const DataType&) { return Init<NullType>(); }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    return Init<Type>();
+  }
+
+  Status GetResult(std::unique_ptr<KernelState>* out) {
+    RETURN_NOT_OK(VisitTypeInline(*options->fill_value.type(), this));
+    *out = std::move(result);
+    return Status::OK();
+  }
+};
+
+std::unique_ptr<KernelState> InitFillNull(KernelContext* ctx,
+                                          const KernelInitArgs& args) {
+  InitFillNullStateVisitor visitor{ctx,
+                                   static_cast<const FillNullOptions*>(args.options)};
+  std::unique_ptr<KernelState> result;
+  ctx->SetStatus(visitor.GetResult(&result));
+  return result;
+}
+
+struct ScalarFillVisitor {
+  KernelContext* ctx;
+  const ArrayData& data;
+  Datum* out;
+
+  ScalarFillVisitor(KernelContext* ctx, const ArrayData& data, Datum* out)
+      : ctx(ctx), data(data), out(out) {}
+
+  Status Visit(const DataType&) {
+    ArrayData* out_arr = out->mutable_array();
+    *out_arr = data;
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType&) {
+    const auto& state = checked_cast<const FillNullState<BooleanType>&>(*ctx->state());
+    bool value = UnboxScalar<BooleanType>::Unbox(*state.fill_value);
+    ArrayData* out_arr = out->mutable_array();
+    FirstTimeBitmapWriter bit_writer(out_arr->buffers[1]->mutable_data(), out_arr->offset,
+                                     out_arr->length);
+    FirstTimeBitmapWriter bit_writer_validity(out_arr->buffers[0]->mutable_data(),
+                                              out_arr->offset, out_arr->length);
+    if (data.null_count != 0) {
+      BitmapReader bit_reader(data.buffers[1]->data(), data.offset, data.length);
+      BitmapReader bit_reader_validity(data.buffers[0]->data(), data.offset, data.length);
+      for (int64_t i = 0; i < data.length; i++) {
+        if (bit_reader_validity.IsNotSet()) {
+          if (value == true) {
+            bit_writer.Set();
+          } else {
+            bit_writer.Clear();
+          }
+          bit_writer_validity.Set();
+        } else {
+          if (bit_reader.IsSet()) {
+            bit_writer.Set();
+          } else {
+            bit_writer.Clear();
+          }
+          bit_writer_validity.Set();
+        }
+        bit_reader.Next();
+        bit_writer.Next();
+        bit_reader_validity.Next();
+        bit_writer_validity.Next();
+      }
+      bit_writer_validity.Finish();
+      bit_writer.Finish();
+    } else {
+      *out_arr = data;
+    }
+    return Status::OK();
+  }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    using T = typename GetViewType<Type>::T;
+    const auto& state = checked_cast<const FillNullState<Type>&>(*ctx->state());
+    T value = UnboxScalar<Type>::Unbox(*state.fill_value);
+    const T* in_data = data.GetValues<T>(1);
+    ArrayData* out_arr = out->mutable_array();
+    auto out_data = out_arr->GetMutableValues<T>(1);
+
+    if (data.null_count != 0) {
+      BitmapReader bit_reader(data.buffers[0]->data(), data.offset, data.length);
+      for (int64_t i = 0; i < data.length; i++) {
+        if (bit_reader.IsNotSet()) {
+          out_data[i] = value;
+        } else {
+          out_data[i] = static_cast<T>(in_data[i]);
+        }
+        bit_reader.Next();
+      }
+      BitUtil::SetBitsTo(out_arr->buffers[0]->mutable_data(), out_arr->offset,
+                         out_arr->length, true);

Review comment:
       When an array has zero null count, the validity bitmap is immediately discarded, so this action is unneeded

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null.cc
##########
@@ -0,0 +1,223 @@
+

Review comment:
       Remove this line

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null.cc
##########
@@ -0,0 +1,223 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_writer.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+namespace compute {
+namespace internal {
+namespace {
+
+template <typename T, typename R = void>
+using enable_if_supports_fill_null = enable_if_t<has_c_type<T>::value, R>;
+
+template <typename Type>
+struct FillNullState : public KernelState {
+  explicit FillNullState(MemoryPool* pool) {}
+
+  Status Init(const FillNullOptions& options) {
+    fill_value = options.fill_value.scalar();
+    return Status::OK();
+  }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+template <>
+struct FillNullState<NullType> : public KernelState {
+  explicit FillNullState(MemoryPool*) {}
+
+  Status Init(const FillNullOptions& options) { return Status::OK(); }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+struct InitFillNullStateVisitor {
+  KernelContext* ctx;
+  const FillNullOptions* options;
+  std::unique_ptr<KernelState> result;
+
+  InitFillNullStateVisitor(KernelContext* ctx, const FillNullOptions* options)
+      : ctx(ctx), options(options) {}
+
+  template <typename Type>
+  Status Init() {
+    using StateType = FillNullState<Type>;
+    result.reset(new StateType(ctx->exec_context()->memory_pool()));
+    return static_cast<StateType*>(result.get())->Init(*options);
+  }
+
+  Status Visit(const DataType&) { return Init<NullType>(); }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    return Init<Type>();
+  }
+
+  Status GetResult(std::unique_ptr<KernelState>* out) {
+    RETURN_NOT_OK(VisitTypeInline(*options->fill_value.type(), this));
+    *out = std::move(result);
+    return Status::OK();
+  }
+};
+
+std::unique_ptr<KernelState> InitFillNull(KernelContext* ctx,
+                                          const KernelInitArgs& args) {
+  InitFillNullStateVisitor visitor{ctx,
+                                   static_cast<const FillNullOptions*>(args.options)};
+  std::unique_ptr<KernelState> result;
+  ctx->SetStatus(visitor.GetResult(&result));
+  return result;
+}
+
+struct ScalarFillVisitor {
+  KernelContext* ctx;
+  const ArrayData& data;
+  Datum* out;
+
+  ScalarFillVisitor(KernelContext* ctx, const ArrayData& data, Datum* out)
+      : ctx(ctx), data(data), out(out) {}
+
+  Status Visit(const DataType&) {
+    ArrayData* out_arr = out->mutable_array();
+    *out_arr = data;
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType&) {
+    const auto& state = checked_cast<const FillNullState<BooleanType>&>(*ctx->state());
+    bool value = UnboxScalar<BooleanType>::Unbox(*state.fill_value);
+    ArrayData* out_arr = out->mutable_array();
+    FirstTimeBitmapWriter bit_writer(out_arr->buffers[1]->mutable_data(), out_arr->offset,
+                                     out_arr->length);
+    FirstTimeBitmapWriter bit_writer_validity(out_arr->buffers[0]->mutable_data(),
+                                              out_arr->offset, out_arr->length);
+    if (data.null_count != 0) {
+      BitmapReader bit_reader(data.buffers[1]->data(), data.offset, data.length);
+      BitmapReader bit_reader_validity(data.buffers[0]->data(), data.offset, data.length);
+      for (int64_t i = 0; i < data.length; i++) {
+        if (bit_reader_validity.IsNotSet()) {

Review comment:
       We should rework this to use a combination of BitBlockCounter and probably GenerateBitsUnrolled

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null.cc
##########
@@ -0,0 +1,223 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_writer.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+namespace compute {
+namespace internal {
+namespace {
+
+template <typename T, typename R = void>
+using enable_if_supports_fill_null = enable_if_t<has_c_type<T>::value, R>;
+
+template <typename Type>
+struct FillNullState : public KernelState {
+  explicit FillNullState(MemoryPool* pool) {}
+
+  Status Init(const FillNullOptions& options) {
+    fill_value = options.fill_value.scalar();
+    return Status::OK();
+  }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+template <>
+struct FillNullState<NullType> : public KernelState {
+  explicit FillNullState(MemoryPool*) {}
+
+  Status Init(const FillNullOptions& options) { return Status::OK(); }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+struct InitFillNullStateVisitor {
+  KernelContext* ctx;
+  const FillNullOptions* options;
+  std::unique_ptr<KernelState> result;
+
+  InitFillNullStateVisitor(KernelContext* ctx, const FillNullOptions* options)
+      : ctx(ctx), options(options) {}
+
+  template <typename Type>
+  Status Init() {
+    using StateType = FillNullState<Type>;
+    result.reset(new StateType(ctx->exec_context()->memory_pool()));
+    return static_cast<StateType*>(result.get())->Init(*options);
+  }
+
+  Status Visit(const DataType&) { return Init<NullType>(); }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    return Init<Type>();
+  }
+
+  Status GetResult(std::unique_ptr<KernelState>* out) {
+    RETURN_NOT_OK(VisitTypeInline(*options->fill_value.type(), this));
+    *out = std::move(result);
+    return Status::OK();
+  }
+};
+
+std::unique_ptr<KernelState> InitFillNull(KernelContext* ctx,
+                                          const KernelInitArgs& args) {
+  InitFillNullStateVisitor visitor{ctx,
+                                   static_cast<const FillNullOptions*>(args.options)};
+  std::unique_ptr<KernelState> result;
+  ctx->SetStatus(visitor.GetResult(&result));
+  return result;
+}
+
+struct ScalarFillVisitor {
+  KernelContext* ctx;
+  const ArrayData& data;
+  Datum* out;
+
+  ScalarFillVisitor(KernelContext* ctx, const ArrayData& data, Datum* out)
+      : ctx(ctx), data(data), out(out) {}
+
+  Status Visit(const DataType&) {
+    ArrayData* out_arr = out->mutable_array();
+    *out_arr = data;
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType&) {
+    const auto& state = checked_cast<const FillNullState<BooleanType>&>(*ctx->state());
+    bool value = UnboxScalar<BooleanType>::Unbox(*state.fill_value);
+    ArrayData* out_arr = out->mutable_array();
+    FirstTimeBitmapWriter bit_writer(out_arr->buffers[1]->mutable_data(), out_arr->offset,
+                                     out_arr->length);
+    FirstTimeBitmapWriter bit_writer_validity(out_arr->buffers[0]->mutable_data(),
+                                              out_arr->offset, out_arr->length);
+    if (data.null_count != 0) {
+      BitmapReader bit_reader(data.buffers[1]->data(), data.offset, data.length);
+      BitmapReader bit_reader_validity(data.buffers[0]->data(), data.offset, data.length);
+      for (int64_t i = 0; i < data.length; i++) {
+        if (bit_reader_validity.IsNotSet()) {
+          if (value == true) {
+            bit_writer.Set();
+          } else {
+            bit_writer.Clear();
+          }
+          bit_writer_validity.Set();
+        } else {
+          if (bit_reader.IsSet()) {
+            bit_writer.Set();
+          } else {
+            bit_writer.Clear();
+          }
+          bit_writer_validity.Set();
+        }
+        bit_reader.Next();
+        bit_writer.Next();
+        bit_reader_validity.Next();
+        bit_writer_validity.Next();
+      }
+      bit_writer_validity.Finish();
+      bit_writer.Finish();
+    } else {
+      *out_arr = data;
+    }
+    return Status::OK();
+  }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    using T = typename GetViewType<Type>::T;
+    const auto& state = checked_cast<const FillNullState<Type>&>(*ctx->state());
+    T value = UnboxScalar<Type>::Unbox(*state.fill_value);
+    const T* in_data = data.GetValues<T>(1);
+    ArrayData* out_arr = out->mutable_array();
+    auto out_data = out_arr->GetMutableValues<T>(1);
+
+    if (data.null_count != 0) {
+      BitmapReader bit_reader(data.buffers[0]->data(), data.offset, data.length);
+      for (int64_t i = 0; i < data.length; i++) {
+        if (bit_reader.IsNotSet()) {
+          out_data[i] = value;
+        } else {
+          out_data[i] = static_cast<T>(in_data[i]);
+        }
+        bit_reader.Next();
+      }
+      BitUtil::SetBitsTo(out_arr->buffers[0]->mutable_data(), out_arr->offset,
+                         out_arr->length, true);
+    } else {
+      *out_arr = data;
+    }
+    return Status::OK();
+  }
+
+  Status Execute() { return VisitTypeInline(*data.type, this); }
+};
+
+void ExecFillNull(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+  ScalarFillVisitor dispatch(ctx, *batch[0].array(), out);
+  ctx->SetStatus(dispatch.Execute());
+}
+
+void AddBasicFillNullKernels(ScalarKernel kernel, ScalarFunction* func) {
+  auto AddKernels = [&](const std::vector<std::shared_ptr<DataType>>& types) {
+    for (const std::shared_ptr<DataType>& ty : types) {
+      kernel.signature = KernelSignature::Make({InputType::Array(ty)}, ty);
+      DCHECK_OK(func->AddKernel(kernel));
+    }
+  };
+
+  AddKernels(NumericTypes());
+  AddKernels(TemporalTypes());
+
+  std::vector<std::shared_ptr<DataType>> other_types = {boolean()};
+
+  for (auto ty : other_types) {
+    kernel.signature = KernelSignature::Make({InputType::Array(ty)}, ty);
+    DCHECK_OK(func->AddKernel(kernel));
+  }
+}
+
+}  // namespace
+
+void RegisterScalarFillNull(FunctionRegistry* registry) {
+  // Fill Null always writes into preallocated memory

Review comment:
       I disagree:
   
   * If the fill value is not null, then the output is non-null, so there is no need to allocate a validity bitmap in most cases. So the default behavior should be to use `NullHandling::COMPUTED_NO_PREALLOCATE` for the nulls
   * Since the kernel can do zero-copy when the input has no nulls, this needs to use `MemAllocation::NO_PREALLOCATE` and instead leave memory allocation to the kernel

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null.cc
##########
@@ -0,0 +1,223 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_writer.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+namespace compute {
+namespace internal {
+namespace {
+
+template <typename T, typename R = void>
+using enable_if_supports_fill_null = enable_if_t<has_c_type<T>::value, R>;
+
+template <typename Type>
+struct FillNullState : public KernelState {
+  explicit FillNullState(MemoryPool* pool) {}
+
+  Status Init(const FillNullOptions& options) {
+    fill_value = options.fill_value.scalar();
+    return Status::OK();
+  }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+template <>
+struct FillNullState<NullType> : public KernelState {
+  explicit FillNullState(MemoryPool*) {}
+
+  Status Init(const FillNullOptions& options) { return Status::OK(); }
+
+  std::shared_ptr<Scalar> fill_value;
+};
+
+struct InitFillNullStateVisitor {
+  KernelContext* ctx;
+  const FillNullOptions* options;
+  std::unique_ptr<KernelState> result;
+
+  InitFillNullStateVisitor(KernelContext* ctx, const FillNullOptions* options)
+      : ctx(ctx), options(options) {}
+
+  template <typename Type>
+  Status Init() {
+    using StateType = FillNullState<Type>;
+    result.reset(new StateType(ctx->exec_context()->memory_pool()));
+    return static_cast<StateType*>(result.get())->Init(*options);
+  }
+
+  Status Visit(const DataType&) { return Init<NullType>(); }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    return Init<Type>();
+  }
+
+  Status GetResult(std::unique_ptr<KernelState>* out) {
+    RETURN_NOT_OK(VisitTypeInline(*options->fill_value.type(), this));
+    *out = std::move(result);
+    return Status::OK();
+  }
+};
+
+std::unique_ptr<KernelState> InitFillNull(KernelContext* ctx,
+                                          const KernelInitArgs& args) {
+  InitFillNullStateVisitor visitor{ctx,
+                                   static_cast<const FillNullOptions*>(args.options)};
+  std::unique_ptr<KernelState> result;
+  ctx->SetStatus(visitor.GetResult(&result));
+  return result;
+}
+
+struct ScalarFillVisitor {
+  KernelContext* ctx;
+  const ArrayData& data;
+  Datum* out;
+
+  ScalarFillVisitor(KernelContext* ctx, const ArrayData& data, Datum* out)
+      : ctx(ctx), data(data), out(out) {}
+
+  Status Visit(const DataType&) {
+    ArrayData* out_arr = out->mutable_array();
+    *out_arr = data;
+    return Status::OK();
+  }
+
+  Status Visit(const BooleanType&) {
+    const auto& state = checked_cast<const FillNullState<BooleanType>&>(*ctx->state());
+    bool value = UnboxScalar<BooleanType>::Unbox(*state.fill_value);
+    ArrayData* out_arr = out->mutable_array();
+    FirstTimeBitmapWriter bit_writer(out_arr->buffers[1]->mutable_data(), out_arr->offset,
+                                     out_arr->length);
+    FirstTimeBitmapWriter bit_writer_validity(out_arr->buffers[0]->mutable_data(),
+                                              out_arr->offset, out_arr->length);
+    if (data.null_count != 0) {
+      BitmapReader bit_reader(data.buffers[1]->data(), data.offset, data.length);
+      BitmapReader bit_reader_validity(data.buffers[0]->data(), data.offset, data.length);
+      for (int64_t i = 0; i < data.length; i++) {
+        if (bit_reader_validity.IsNotSet()) {
+          if (value == true) {
+            bit_writer.Set();
+          } else {
+            bit_writer.Clear();
+          }
+          bit_writer_validity.Set();
+        } else {
+          if (bit_reader.IsSet()) {
+            bit_writer.Set();
+          } else {
+            bit_writer.Clear();
+          }
+          bit_writer_validity.Set();
+        }
+        bit_reader.Next();
+        bit_writer.Next();
+        bit_reader_validity.Next();
+        bit_writer_validity.Next();
+      }
+      bit_writer_validity.Finish();
+      bit_writer.Finish();
+    } else {
+      *out_arr = data;
+    }
+    return Status::OK();
+  }
+
+  template <typename Type>
+  enable_if_supports_fill_null<Type, Status> Visit(const Type&) {
+    using T = typename GetViewType<Type>::T;
+    const auto& state = checked_cast<const FillNullState<Type>&>(*ctx->state());
+    T value = UnboxScalar<Type>::Unbox(*state.fill_value);
+    const T* in_data = data.GetValues<T>(1);
+    ArrayData* out_arr = out->mutable_array();
+    auto out_data = out_arr->GetMutableValues<T>(1);
+
+    if (data.null_count != 0) {
+      BitmapReader bit_reader(data.buffers[0]->data(), data.offset, data.length);
+      for (int64_t i = 0; i < data.length; i++) {
+        if (bit_reader.IsNotSet()) {
+          out_data[i] = value;
+        } else {
+          out_data[i] = static_cast<T>(in_data[i]);
+        }
+        bit_reader.Next();
+      }
+      BitUtil::SetBitsTo(out_arr->buffers[0]->mutable_data(), out_arr->offset,
+                         out_arr->length, true);
+    } else {
+      *out_arr = data;
+    }
+    return Status::OK();
+  }
+
+  Status Execute() { return VisitTypeInline(*data.type, this); }

Review comment:
       This will need to be reworked to use the `PhysicalType` attributes so that we don't generate functionally identical binary code e.g. for Int64Type / UInt64Type / TimestampType / Date64Type / DurationType (which all use an 8-byte fixed width C type value)

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <iosfwd>
+#include <locale>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/testing/gtest_compat.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+
+namespace arrow {
+namespace compute {
+
+template <typename Type, typename T = typename TypeTraits<Type>::c_type>
+void CheckFillNull(const std::shared_ptr<DataType>& type, const std::vector<T>& in_values,
+                   const std::vector<bool>& in_is_valid, const Datum fill_value,
+                   const std::vector<T>& out_values,
+                   const std::vector<bool>& out_is_valid) {
+  std::shared_ptr<Array> input = _MakeArray<Type, T>(type, in_values, in_is_valid);
+  std::shared_ptr<Array> expected = _MakeArray<Type, T>(type, out_values, out_is_valid);
+
+  ASSERT_OK_AND_ASSIGN(Datum datum_out, FillNull(input, fill_value));
+  std::shared_ptr<Array> result = datum_out.make_array();
+  ASSERT_OK(result->ValidateFull());
+  AssertArraysEqual(*expected, *result, /*verbose=*/true);
+}
+
+class TestFillNullKernel : public ::testing::Test {};
+
+template <typename Type>
+class TestFillNullPrimitive : public ::testing::Test {};
+
+typedef ::testing::Types<Int8Type, UInt8Type, Int16Type, UInt16Type, Int32Type,
+                         UInt32Type, Int64Type, UInt64Type, Date32Type, Date64Type>
+    PrimitiveTypes;

Review comment:
       These are declared elsewhere I think?

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <iosfwd>
+#include <locale>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/testing/gtest_compat.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+
+namespace arrow {
+namespace compute {
+
+template <typename Type, typename T = typename TypeTraits<Type>::c_type>
+void CheckFillNull(const std::shared_ptr<DataType>& type, const std::vector<T>& in_values,
+                   const std::vector<bool>& in_is_valid, const Datum fill_value,
+                   const std::vector<T>& out_values,
+                   const std::vector<bool>& out_is_valid) {
+  std::shared_ptr<Array> input = _MakeArray<Type, T>(type, in_values, in_is_valid);
+  std::shared_ptr<Array> expected = _MakeArray<Type, T>(type, out_values, out_is_valid);
+
+  ASSERT_OK_AND_ASSIGN(Datum datum_out, FillNull(input, fill_value));
+  std::shared_ptr<Array> result = datum_out.make_array();
+  ASSERT_OK(result->ValidateFull());
+  AssertArraysEqual(*expected, *result, /*verbose=*/true);
+}

Review comment:
       Can we use the ArrayFromJSON functions instead for specifying the test cases?

##########
File path: cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <iosfwd>
+#include <locale>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/testing/gtest_compat.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+
+namespace arrow {
+namespace compute {
+
+template <typename Type, typename T = typename TypeTraits<Type>::c_type>
+void CheckFillNull(const std::shared_ptr<DataType>& type, const std::vector<T>& in_values,
+                   const std::vector<bool>& in_is_valid, const Datum fill_value,
+                   const std::vector<T>& out_values,
+                   const std::vector<bool>& out_is_valid) {
+  std::shared_ptr<Array> input = _MakeArray<Type, T>(type, in_values, in_is_valid);
+  std::shared_ptr<Array> expected = _MakeArray<Type, T>(type, out_values, out_is_valid);
+
+  ASSERT_OK_AND_ASSIGN(Datum datum_out, FillNull(input, fill_value));
+  std::shared_ptr<Array> result = datum_out.make_array();
+  ASSERT_OK(result->ValidateFull());
+  AssertArraysEqual(*expected, *result, /*verbose=*/true);
+}
+
+class TestFillNullKernel : public ::testing::Test {};
+
+template <typename Type>
+class TestFillNullPrimitive : public ::testing::Test {};
+
+typedef ::testing::Types<Int8Type, UInt8Type, Int16Type, UInt16Type, Int32Type,
+                         UInt32Type, Int64Type, UInt64Type, Date32Type, Date64Type>
+    PrimitiveTypes;
+
+TYPED_TEST_SUITE(TestFillNullPrimitive, PrimitiveTypes);
+
+TYPED_TEST(TestFillNullPrimitive, FillNull) {
+  using T = typename TypeParam::c_type;
+  using ScalarType = typename TypeTraits<TypeParam>::ScalarType;
+  auto type = TypeTraits<TypeParam>::type_singleton();
+  auto scalar = std::make_shared<ScalarType>(static_cast<T>(5));
+  // No Nulls
+  CheckFillNull<TypeParam, T>(type, {2, 4, 7, 9}, {true, true, true, true}, Datum(scalar),
+                              {2, 4, 7, 9}, {true, true, true, true});
+  // Some Nulls
+  CheckFillNull<TypeParam, T>(type, {2, 4, 7, 8}, {false, true, false, true},
+                              Datum(scalar), {5, 4, 5, 8}, {true, true, true, true});
+  // Empty Array
+  CheckFillNull<TypeParam, T>(type, {}, {}, Datum(scalar), {}, {});
+}
+
+TEST_F(TestFillNullKernel, FillNullNull) {
+  auto datum = Datum(std::make_shared<NullScalar>());
+  CheckFillNull<NullType, std::nullptr_t>(null(), {0, 0, 0, 0},
+                                          {false, false, false, false}, datum,
+                                          {0, 0, 0, 0}, {false, false, false, false});
+  CheckFillNull<NullType, std::nullptr_t>(null(), {NULL, NULL, NULL, NULL}, {}, datum,
+                                          {NULL, NULL, NULL, NULL}, {});
+  CheckFillNull<NullType, std::nullptr_t>(null(), {0, 0, 0, 0},
+                                          {false, false, false, false}, datum,
+                                          {0, 0, 0, 0}, {false, false, false, false});
+  CheckFillNull<NullType, std::nullptr_t>(null(), {NULL, NULL, NULL, NULL}, {}, datum,
+                                          {NULL, NULL, NULL, NULL}, {});
+}
+
+TEST_F(TestFillNullKernel, FillNullBoolean) {
+  auto scalar1 = std::make_shared<BooleanScalar>(false);
+  auto scalar2 = std::make_shared<BooleanScalar>(true);
+  // no nulls
+  CheckFillNull<BooleanType, bool>(boolean(), {true, false, true, false},
+                                   {true, true, true, true}, Datum(scalar1),
+                                   {true, false, true, false}, {true, true, true, true});
+  // some nulls
+  CheckFillNull<BooleanType, bool>(boolean(), {true, false, true, false},
+                                   {false, true, true, false}, Datum(scalar1),
+                                   {false, false, true, false}, {true, true, true, true});
+  CheckFillNull<BooleanType, bool>(boolean(), {true, false, true, false},
+                                   {false, true, false, false}, Datum(scalar2),
+                                   {true, false, true, true}, {true, true, true, true});
+}
+
+TEST_F(TestFillNullKernel, FillNullTimeStamp) {
+  auto time32_type = time32(TimeUnit::SECOND);
+  auto time64_type = time64(TimeUnit::NANO);
+  auto scalar1 = Datum(std::make_shared<Time32Scalar>(5, time32_type));
+  auto scalar2 = Datum(std::make_shared<Time64Scalar>(6, time64_type));
+  // no nulls
+  CheckFillNull<Time32Type, int32_t>(time32_type, {2, 1, 6, 9}, {true, true, true, true},
+                                     Datum(scalar1), {2, 1, 6, 9},
+                                     {true, true, true, true});
+  CheckFillNull<Time32Type, int32_t>(time32_type, {2, 1, 6, 9},
+                                     {true, false, true, false}, Datum(scalar1),
+                                     {2, 5, 6, 5}, {true, true, true, true});
+  // some nulls
+  CheckFillNull<Time64Type, int64_t>(time64_type, {2, 1, 6, 9}, {true, true, true, true},
+                                     scalar2, {2, 1, 6, 9}, {true, true, true, true});
+  CheckFillNull<Time64Type, int64_t>(time64_type, {2, 1, 6, 9},
+                                     {true, false, true, false}, scalar2, {2, 6, 6, 6},
+                                     {true, true, true, true});
+}
+

Review comment:
       The behavior of the kernel when passing a scalar with `is_valid=false` is not validated (and probably yield incorrect results)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org