You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/06 18:34:12 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #11853: ARROW-1699: [C++] forward, backward fill kernel functions

lidavidm commented on a change in pull request #11853:
URL: https://github.com/apache/arrow/pull/11853#discussion_r763225230



##########
File path: cpp/src/arrow/compute/registry.cc
##########
@@ -175,6 +175,8 @@ static std::unique_ptr<FunctionRegistry> CreateBuiltInRegistry() {
   RegisterVectorHash(registry.get());
   RegisterVectorNested(registry.get());
   RegisterVectorReplace(registry.get());
+  RegisterVectorFillForwardNullValues(registry.get());
+  RegisterVectorFillBackwardNullValues(registry.get());

Review comment:
       nit: just call these from RegisterVectorReplace instead of adding them here and below

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
   }
 };
 
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t incrementer) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                              out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*array.type, out_values, 0, array, 0, array.length);
+
+  auto array_scalars = arrow::MakeArray(array.Copy());
+
+  int64_t write_offset = incrementer == 1 ? 0 : array.length - 1;
+  int64_t bitmap_offset = 0;
+  int64_t current_value_offset = 0;
+  bool has_fill_value = false;
+  arrow::internal::OptionalBinaryBitBlockCounter counter(
+      null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+  while (write_offset < array.length && write_offset >= 0) {
+    BitBlockCount block = counter.NextAndBlock();
+    if (block.AllSet()) {
+      current_value_offset = write_offset + incrementer * (block.length - 1);
+    } else {
+      if (block.popcount) {
+        for (int64_t i = 0; i != block.length; i++) {
+          uint64_t write_value_offset = write_offset + incrementer * bitmap_offset;
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_value_offset,
+                                              array, current_value_offset, /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            current_value_offset = write_value_offset;
+          }
+          bitmap_offset += 1;
+        }
+      } else {
+        auto in = *(array_scalars->GetScalar(current_value_offset));
+        ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_offset, *in,
+                                        current_value_offset, block.length);
+        bitmap_offset += block.length;
+      }
+      write_offset += block.length * incrementer;
+    }
+  }
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             uint8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);

Review comment:
       Null arrays can never contain anything but null, so you can just no-op here.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -326,6 +326,12 @@ struct ReplaceWithMask<Type, enable_if_null<Type>> {
     *output = array;
     return Status::OK();
   }
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const ArrayData& in, const int64_t in_offset,
+                       const int64_t length) {}
+
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const Scalar& in, const int64_t in_offset, const int64_t length) {}

Review comment:
       This should be unnecessary if the NullType case is just a no-op.

##########
File path: cpp/src/arrow/util/bitmap_ops.cc
##########
@@ -84,15 +84,26 @@ int64_t CountSetBits(const uint8_t* data, int64_t bit_offset, int64_t length) {
   return count;
 }
 
-enum class TransferMode : bool { Copy, Invert };
+enum class TransferMode : char { Copy, Invert, Revert };
+
+uint8_t revert_uint8(uint8_t num) {

Review comment:
       Ditto comment about function naming here.

##########
File path: cpp/src/arrow/util/bitmap_ops.cc
##########
@@ -84,15 +84,26 @@ int64_t CountSetBits(const uint8_t* data, int64_t bit_offset, int64_t length) {
   return count;
 }
 
-enum class TransferMode : bool { Copy, Invert };
+enum class TransferMode : char { Copy, Invert, Revert };
+
+uint8_t revert_uint8(uint8_t num) {
+  num = ((num & 0xf0) >> 4) | ((num & 0x0f) << 4);
+  num = ((num & 0xcc) >> 2) | ((num & 0x33) << 2);
+  num = ((num & 0xaa) >> 1) | ((num & 0x55) << 1);
+  return num;
+}
+
+uint8_t get_reverted_block(uint8_t block_left, uint8_t block_right, uint8_t length) {
+  return revert_uint8(block_left >> (length) | block_right << (8 - length));
+}
 
 template <TransferMode mode>
 void TransferBitmap(const uint8_t* data, int64_t offset, int64_t length,

Review comment:
       Given that the bit-reverse mode has basically 0 in common with the other modes, maybe we can just keep it in an entirely separate function, i.e. use RevertBlockOffsets directly?

##########
File path: cpp/src/arrow/util/bitmap_ops.cc
##########
@@ -84,15 +84,26 @@ int64_t CountSetBits(const uint8_t* data, int64_t bit_offset, int64_t length) {
   return count;
 }
 
-enum class TransferMode : bool { Copy, Invert };
+enum class TransferMode : char { Copy, Invert, Revert };
+
+uint8_t revert_uint8(uint8_t num) {

Review comment:
       (This extends to e.g. variable names, parameter names, etc. above.)

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
   }
 };
 
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t incrementer) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                              out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*array.type, out_values, 0, array, 0, array.length);
+
+  auto array_scalars = arrow::MakeArray(array.Copy());
+
+  int64_t write_offset = incrementer == 1 ? 0 : array.length - 1;
+  int64_t bitmap_offset = 0;
+  int64_t current_value_offset = 0;
+  bool has_fill_value = false;
+  arrow::internal::OptionalBinaryBitBlockCounter counter(
+      null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+  while (write_offset < array.length && write_offset >= 0) {
+    BitBlockCount block = counter.NextAndBlock();
+    if (block.AllSet()) {
+      current_value_offset = write_offset + incrementer * (block.length - 1);
+    } else {
+      if (block.popcount) {
+        for (int64_t i = 0; i != block.length; i++) {
+          uint64_t write_value_offset = write_offset + incrementer * bitmap_offset;
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_value_offset,
+                                              array, current_value_offset, /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            current_value_offset = write_value_offset;
+          }
+          bitmap_offset += 1;
+        }
+      } else {
+        auto in = *(array_scalars->GetScalar(current_value_offset));

Review comment:
       We can use the ArrayData overload of CopyData to avoid allocating a scalar in the first place.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
   }
 };
 
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t incrementer) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                              out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*array.type, out_values, 0, array, 0, array.length);
+
+  auto array_scalars = arrow::MakeArray(array.Copy());
+
+  int64_t write_offset = incrementer == 1 ? 0 : array.length - 1;
+  int64_t bitmap_offset = 0;
+  int64_t current_value_offset = 0;
+  bool has_fill_value = false;
+  arrow::internal::OptionalBinaryBitBlockCounter counter(
+      null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+  while (write_offset < array.length && write_offset >= 0) {
+    BitBlockCount block = counter.NextAndBlock();
+    if (block.AllSet()) {
+      current_value_offset = write_offset + incrementer * (block.length - 1);
+    } else {
+      if (block.popcount) {
+        for (int64_t i = 0; i != block.length; i++) {
+          uint64_t write_value_offset = write_offset + incrementer * bitmap_offset;
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_value_offset,
+                                              array, current_value_offset, /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            current_value_offset = write_value_offset;
+          }
+          bitmap_offset += 1;
+        }
+      } else {
+        auto in = *(array_scalars->GetScalar(current_value_offset));
+        ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_offset, *in,
+                                        current_value_offset, block.length);
+        bitmap_offset += block.length;
+      }
+      write_offset += block.length * incrementer;
+    }
+  }
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             uint8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    auto array_scalars = arrow::MakeArray(array.Copy());
+
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t current_value_offset = 0;
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    bool has_fill_value = false;
+    const uint8_t* data = array.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+
+    std::vector<std::pair<uint64_t, uint64_t>> offsets_reverted;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reverted_bitmap, array.offset, array.length, array.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reverted.push_back(std::make_pair(offset0, offset1 - offset0));
+          current_value_offset = array_value_index;
+          has_fill_value = true;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value) {
+            const offset_type offset0 = offsets[current_value_offset];
+            const offset_type offset1 = offsets[current_value_offset + 1];
+            offsets_reverted.push_back(std::make_pair(offset0, offset1 - offset0));
+          } else {
+            offsets_reverted.push_back(std::make_pair(-1U, -1U));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reverted.begin(); it != offsets_reverted.end(); ++it) {
+        if (it->first == -1U && it->second == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else {
+          RETURN_NOT_OK(builder.Append(data + it->first, it->second));
+        }
+      }
+    } else {
+      for (auto it = offsets_reverted.rbegin(); it != offsets_reverted.rend(); ++it) {
+        if (it->first == -1U && it->second == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else {
+          RETURN_NOT_OK(builder.Append(data + it->first, it->second));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = array.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             uint8_t direction) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNull {
+  static Status ExecForwardFillNull(KernelContext* ctx, const ArrayData& array,
+                                    ArrayData* output) {
+    if (array.MayHaveNulls()) {
+      int8_t direction = 1;
+      return FillNullExecutor<Type>::ExecFillNull(ctx, array, array.buffers[0]->data(),
+                                                  output, direction);
+    } else {
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status ExecBackwardFillNull(KernelContext* ctx, const ArrayData& array,
+                                     ArrayData* output) {
+    if (array.MayHaveNulls()) {
+      int8_t direction = -1;
+      auto memory_pool = ctx->memory_pool();
+      auto reverted_bitmap = arrow::internal::CopyBitmap(
+          memory_pool, array.buffers[0]->data(), array.offset, array.length);
+      arrow::internal::RevertBitmap(array.buffers[0]->data(), array.offset, array.length,
+                                    reverted_bitmap.ValueOrDie()->mutable_data(),
+                                    output->offset);

Review comment:
       Why not use the overload of RevertBitmap that returns a buffer?

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
   }
 };
 
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t incrementer) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                              out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*array.type, out_values, 0, array, 0, array.length);
+
+  auto array_scalars = arrow::MakeArray(array.Copy());
+
+  int64_t write_offset = incrementer == 1 ? 0 : array.length - 1;
+  int64_t bitmap_offset = 0;
+  int64_t current_value_offset = 0;
+  bool has_fill_value = false;
+  arrow::internal::OptionalBinaryBitBlockCounter counter(
+      null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+  while (write_offset < array.length && write_offset >= 0) {
+    BitBlockCount block = counter.NextAndBlock();
+    if (block.AllSet()) {
+      current_value_offset = write_offset + incrementer * (block.length - 1);
+    } else {
+      if (block.popcount) {
+        for (int64_t i = 0; i != block.length; i++) {
+          uint64_t write_value_offset = write_offset + incrementer * bitmap_offset;
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_value_offset,
+                                              array, current_value_offset, /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            current_value_offset = write_value_offset;
+          }
+          bitmap_offset += 1;
+        }
+      } else {
+        auto in = *(array_scalars->GetScalar(current_value_offset));
+        ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_offset, *in,
+                                        current_value_offset, block.length);
+        bitmap_offset += block.length;
+      }
+      write_offset += block.length * incrementer;
+    }
+  }
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             uint8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    auto array_scalars = arrow::MakeArray(array.Copy());
+
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t current_value_offset = 0;
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    bool has_fill_value = false;
+    const uint8_t* data = array.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+
+    std::vector<std::pair<uint64_t, uint64_t>> offsets_reverted;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(
+        reverted_bitmap, array.offset, array.length, array.GetNullCount(),
+        [&]() {
+          const offset_type offset0 = offsets[array_value_index];
+          const offset_type offset1 = offsets[array_value_index + 1];
+          offsets_reverted.push_back(std::make_pair(offset0, offset1 - offset0));
+          current_value_offset = array_value_index;
+          has_fill_value = true;
+          array_value_index += direction;
+          return Status::OK();
+        },
+        [&]() {
+          if (has_fill_value) {
+            const offset_type offset0 = offsets[current_value_offset];
+            const offset_type offset1 = offsets[current_value_offset + 1];
+            offsets_reverted.push_back(std::make_pair(offset0, offset1 - offset0));
+          } else {
+            offsets_reverted.push_back(std::make_pair(-1U, -1U));
+          }
+          array_value_index += direction;
+          return Status::OK();
+        }));
+
+    if (direction == 1) {
+      for (auto it = offsets_reverted.begin(); it != offsets_reverted.end(); ++it) {
+        if (it->first == -1U && it->second == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else {
+          RETURN_NOT_OK(builder.Append(data + it->first, it->second));
+        }
+      }
+    } else {
+      for (auto it = offsets_reverted.rbegin(); it != offsets_reverted.rend(); ++it) {
+        if (it->first == -1U && it->second == -1U) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else {
+          RETURN_NOT_OK(builder.Append(data + it->first, it->second));
+        }
+      }
+    }
+
+    std::shared_ptr<Array> temp_output;
+    RETURN_NOT_OK(builder.Finish(&temp_output));
+    *output = *temp_output->data();
+    // Builder type != logical type due to GenerateTypeAgnosticVarBinaryBase
+    output->type = array.type;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_null<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             uint8_t direction) {
+    *output = array;
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNull {
+  static Status ExecForwardFillNull(KernelContext* ctx, const ArrayData& array,
+                                    ArrayData* output) {
+    if (array.MayHaveNulls()) {
+      int8_t direction = 1;
+      return FillNullExecutor<Type>::ExecFillNull(ctx, array, array.buffers[0]->data(),
+                                                  output, direction);
+    } else {
+      *output = array;
+    }
+    return Status::OK();
+  }
+
+  static Status ExecBackwardFillNull(KernelContext* ctx, const ArrayData& array,
+                                     ArrayData* output) {
+    if (array.MayHaveNulls()) {
+      int8_t direction = -1;
+      auto memory_pool = ctx->memory_pool();
+      auto reverted_bitmap = arrow::internal::CopyBitmap(
+          memory_pool, array.buffers[0]->data(), array.offset, array.length);
+      arrow::internal::RevertBitmap(array.buffers[0]->data(), array.offset, array.length,
+                                    reverted_bitmap.ValueOrDie()->mutable_data(),
+                                    output->offset);
+      return FillNullExecutor<Type>::ExecFillNull(
+          ctx, array, reverted_bitmap.ValueOrDie()->data(), output, direction);
+    } else {
+      *output = array;
+    }
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillForwardFunctor {
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const ArrayData& array = *batch[0].array();
+    ArrayData* output = out->array().get();
+    output->length = array.length;
+
+    return FillNull<Type>::ExecForwardFillNull(ctx, array, output);

Review comment:
       Since everything is templated anyways, why not just inline FillNull into FillForwardFunctor/FillBackwardFunctor?

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -500,6 +731,104 @@ void RegisterVectorReplace(FunctionRegistry* registry) {
 
   // TODO(ARROW-9431): "replace_with_indices"
 }
+
+const FunctionDoc fill_forward_null_doc(
+    "Replace null values using forward mode",
+    ("Given an array, propagate last valid observation forward to next valid\n"
+     "or nothing if all previous values are null. "),
+    {"values"});
+
+void RegisterVectorFillForwardNullValues(FunctionRegistry* registry) {

Review comment:
       We could consolidate these two registration functions:
   
   ```cpp
   {
     auto func = std::make_shared(...);
     RegisterVectorFill<FillForwardFunctor>(func.get());
     registry->AddFunction(std::move(func));
   }
   {
     auto func = std::make_shared(...);
     RegisterVectorFill<FillBackwardFunctor>(func.get());
     registry->AddFunction(std::move(func));
   }
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -410,6 +416,12 @@ struct ReplaceWithMask<Type, enable_if_base_binary<Type>> {
     output->type = array.type;
     return Status::OK();
   }
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const ArrayData& in, const int64_t in_offset,
+                       const int64_t length) {}
+
+  static void CopyData(const DataType&, uint8_t* out, const int64_t out_offset,
+                       const Scalar& in, const int64_t in_offset, const int64_t length) {}

Review comment:
       Why are these needed? It seems this template shouldn't get instantiated below.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
   }
 };
 
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t incrementer) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                              out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*array.type, out_values, 0, array, 0, array.length);
+
+  auto array_scalars = arrow::MakeArray(array.Copy());
+
+  int64_t write_offset = incrementer == 1 ? 0 : array.length - 1;
+  int64_t bitmap_offset = 0;
+  int64_t current_value_offset = 0;
+  bool has_fill_value = false;
+  arrow::internal::OptionalBinaryBitBlockCounter counter(
+      null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+  while (write_offset < array.length && write_offset >= 0) {
+    BitBlockCount block = counter.NextAndBlock();
+    if (block.AllSet()) {
+      current_value_offset = write_offset + incrementer * (block.length - 1);
+    } else {
+      if (block.popcount) {
+        for (int64_t i = 0; i != block.length; i++) {
+          uint64_t write_value_offset = write_offset + incrementer * bitmap_offset;
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_value_offset,
+                                              array, current_value_offset, /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            current_value_offset = write_value_offset;
+          }
+          bitmap_offset += 1;
+        }
+      } else {
+        auto in = *(array_scalars->GetScalar(current_value_offset));
+        ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_offset, *in,
+                                        current_value_offset, block.length);
+        bitmap_offset += block.length;
+      }
+      write_offset += block.length * incrementer;
+    }
+  }
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             uint8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    auto array_scalars = arrow::MakeArray(array.Copy());
+
+    BuilderType builder(array.type, ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(array.length));
+    RETURN_NOT_OK(builder.ReserveData(array.buffers[2]->size()));
+    int64_t current_value_offset = 0;
+    int64_t array_value_index = direction == 1 ? 0 : array.length - 1;
+    bool has_fill_value = false;
+    const uint8_t* data = array.buffers[2]->data();
+    const offset_type* offsets = array.GetValues<offset_type>(1);
+
+    std::vector<std::pair<uint64_t, uint64_t>> offsets_reverted;
+    RETURN_NOT_OK(VisitNullBitmapInline<>(

Review comment:
       Instead of passing through the array twice/allocating an intermediate, why not append to the builder directly? We can also use VisitArrayValuesInline to get access to string_views instead of having to manipulate offsets manually.

##########
File path: cpp/src/arrow/util/bitmap_ops.cc
##########
@@ -84,15 +84,26 @@ int64_t CountSetBits(const uint8_t* data, int64_t bit_offset, int64_t length) {
   return count;
 }
 
-enum class TransferMode : bool { Copy, Invert };
+enum class TransferMode : char { Copy, Invert, Revert };
+
+uint8_t revert_uint8(uint8_t num) {
+  num = ((num & 0xf0) >> 4) | ((num & 0x0f) << 4);
+  num = ((num & 0xcc) >> 2) | ((num & 0x33) << 2);
+  num = ((num & 0xaa) >> 1) | ((num & 0x55) << 1);
+  return num;
+}
+
+uint8_t get_reverted_block(uint8_t block_left, uint8_t block_right, uint8_t length) {

Review comment:
       Could a comment be added here to describe what this does/is it possible to name this in a way that is more clear about its function?

##########
File path: cpp/src/arrow/util/bitmap_ops.h
##########
@@ -81,6 +81,40 @@ ARROW_EXPORT
 Result<std::shared_ptr<Buffer>> InvertBitmap(MemoryPool* pool, const uint8_t* bitmap,
                                              int64_t offset, int64_t length);
 
+/// Revert a bit range of an existing bitmap into an existing bitmap
+///
+/// \param[in] bitmap source data
+/// \param[in] offset bit offset into the source data
+/// \param[in] length number of bits to revert
+/// \param[in] dest_offset bit offset into the destination
+/// \param[out] dest the destination buffer, must have at least space for
+/// (offset + length) bits
+ARROW_EXPORT
+void RevertBitmap(const uint8_t* bitmap, int64_t offset, int64_t length, uint8_t* dest,
+                  int64_t dest_offset);
+
+/// Revert a bit range of an existing bitmap
+///
+/// \param[in] pool memory pool to allocate memory from
+/// \param[in] bitmap source data
+/// \param[in] offset bit offset into the source data
+/// \param[in] length number of bits to revert
+///
+/// \return Status message
+ARROW_EXPORT
+Result<std::shared_ptr<Buffer>> RevertBitmap(MemoryPool* pool, const uint8_t* bitmap,
+                                             int64_t offset, int64_t length);
+
+/// Revert a bit range of an existing bitmap into an existing bitmap
+///
+/// \param[in] bitmap source data
+/// \param[in] offset bit offset into the source data
+/// \param[in] length number of bits to revert
+/// \param[in] dest_offset bit offset into the destination
+/// \param[out] dest the destination buffer, must have at least space for
+/// (offset + length) bits
+void RevertBlockOffsets(const uint8_t* data, int64_t offset, int64_t length,

Review comment:
       Does this need to be exposed publically? It seems redundant with ReverseBitmap.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
   }
 };
 
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t incrementer) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                              out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*array.type, out_values, 0, array, 0, array.length);
+
+  auto array_scalars = arrow::MakeArray(array.Copy());
+
+  int64_t write_offset = incrementer == 1 ? 0 : array.length - 1;
+  int64_t bitmap_offset = 0;
+  int64_t current_value_offset = 0;
+  bool has_fill_value = false;
+  arrow::internal::OptionalBinaryBitBlockCounter counter(
+      null_bitmap, array.offset, null_bitmap, array.offset, array.length);
+
+  while (write_offset < array.length && write_offset >= 0) {
+    BitBlockCount block = counter.NextAndBlock();
+    if (block.AllSet()) {
+      current_value_offset = write_offset + incrementer * (block.length - 1);
+    } else {
+      if (block.popcount) {
+        for (int64_t i = 0; i != block.length; i++) {
+          uint64_t write_value_offset = write_offset + incrementer * bitmap_offset;
+          auto current_bit = bit_util::GetBit(null_bitmap, bitmap_offset);
+          if (!current_bit) {
+            if (has_fill_value) {
+              ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_value_offset,
+                                              array, current_value_offset, /*length=*/1);
+              bit_util::SetBitTo(out_bitmap, write_value_offset, true);
+            }
+          } else {
+            has_fill_value = true;
+            current_value_offset = write_value_offset;
+          }
+          bitmap_offset += 1;
+        }
+      } else {
+        auto in = *(array_scalars->GetScalar(current_value_offset));
+        ReplaceWithMask<Type>::CopyData(*array.type, out_values, write_offset, *in,
+                                        current_value_offset, block.length);
+        bitmap_offset += block.length;
+      }
+      write_offset += block.length * incrementer;
+    }
+  }
+}
+
+template <typename Type, typename Enable = void>
+struct FillNullExecutor {};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_boolean<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             uint8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<
+    Type, enable_if_t<is_number_type<Type>::value ||
+                      std::is_same<Type, MonthDayNanoIntervalType>::value>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_fixed_size_binary<Type>> {
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    fillNullInDirectionImpl<Type>(array, reverted_bitmap, output, direction);
+    return Status::OK();
+  }
+};
+
+template <typename Type>
+struct FillNullExecutor<Type, enable_if_base_binary<Type>> {
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static Status ExecFillNull(KernelContext* ctx, const ArrayData& array,
+                             const uint8_t* reverted_bitmap, ArrayData* output,
+                             int8_t direction) {
+    auto array_scalars = arrow::MakeArray(array.Copy());

Review comment:
       This is unused.

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
   }
 };
 
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t incrementer) {
+  uint8_t* out_bitmap = output->buffers[0]->mutable_data();
+  uint8_t* out_values = output->buffers[1]->mutable_data();
+  arrow::internal::CopyBitmap(array.buffers[0]->data(), array.offset, array.length,
+                              out_bitmap, output->offset);
+  ReplaceWithMask<Type>::CopyData(*array.type, out_values, 0, array, 0, array.length);

Review comment:
       When we have constants as parameters, try to prefix them with a comment indicating the parameter name: `/*out_offset=*/0`, `/*in_offset=*/0`

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
   }
 };
 
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t* null_bitmap,
+                             ArrayData* output, int8_t incrementer) {

Review comment:
       nit: incrementer -> direction?

##########
File path: cpp/src/arrow/util/bitmap_ops.cc
##########
@@ -84,15 +84,26 @@ int64_t CountSetBits(const uint8_t* data, int64_t bit_offset, int64_t length) {
   return count;
 }
 
-enum class TransferMode : bool { Copy, Invert };
+enum class TransferMode : char { Copy, Invert, Revert };
+
+uint8_t revert_uint8(uint8_t num) {

Review comment:
       Also, in general, instead of "revert", can we call it "reverse" or possibly "flip"?

##########
File path: cpp/src/arrow/compute/kernels/vector_replace_test.cc
##########
@@ -798,5 +812,284 @@ TYPED_TEST(TestReplaceBinary, ReplaceWithMaskRandom) {
   }
 }
 
+template <typename T>
+class TestFillNullForwardNumeric : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullForwardDecimal : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullForwardBinary : public TestReplaceKernel<T> {
+ protected:
+  std::shared_ptr<DataType> type() override { return default_type_instance<T>(); }
+};
+template <typename T>
+class TestFillNullBackwardNumeric : public TestReplaceKernel<T> {

Review comment:
       Why have separate classes for forward/backward?

##########
File path: cpp/src/arrow/compute/kernels/vector_replace.cc
##########
@@ -444,6 +456,225 @@ struct ReplaceWithMaskFunctor {
   }
 };
 
+template <typename Type>
+void fillNullInDirectionImpl(const ArrayData& array, const uint8_t* null_bitmap,

Review comment:
       Note: in general, function/method/class names should use UpperCamelCase naming according to our [style guide](https://arrow.apache.org/docs/developers/cpp/development.html#code-style-linting-and-ci).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org