You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2021/06/21 12:06:58 UTC

[arrow] branch master updated: ARROW-13042: [C++] Check that kernel output is fully initialized

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e990d17  ARROW-13042: [C++] Check that kernel output is fully initialized
e990d17 is described below

commit e990d177b1f1dec962315487682f613d46be573c
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Jun 21 14:04:42 2021 +0200

    ARROW-13042: [C++] Check that kernel output is fully initialized
    
    Enhance TestInitialized() so that it really triggers Valgrind for every uninitialized bit in a buffer's data, including child and dictionary data.
    
    Call TestInitialized() automatically from kernel tests.
    
    Fix the BufferBuilder and TypedBufferBuilder API to really resize a Buffer to 0 when intended; introduce a FinishWithLength() method for cases where the caller wants to force a particular size.
    
    Other fixes to get all tests to pass under Valgrind.
    
    Closes #10550 from pitrou/ARROW-13042-check-kernel-output-initialized
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/array/builder_primitive.cc           |   5 +-
 cpp/src/arrow/array/builder_primitive.h            |   7 +-
 cpp/src/arrow/array/util.cc                        |   9 +-
 cpp/src/arrow/buffer_builder.h                     |  45 +++++-
 cpp/src/arrow/buffer_test.cc                       |  78 ++++++++-
 cpp/src/arrow/compute/exec.cc                      |   1 -
 cpp/src/arrow/compute/kernels/aggregate_test.cc    |  17 +-
 cpp/src/arrow/compute/kernels/codegen_internal.h   |   8 +
 cpp/src/arrow/compute/kernels/hash_aggregate.cc    |  45 +++---
 .../arrow/compute/kernels/hash_aggregate_test.cc   |   6 +-
 .../compute/kernels/scalar_arithmetic_test.cc      |   4 +-
 .../arrow/compute/kernels/scalar_cast_internal.cc  |   7 +-
 cpp/src/arrow/compute/kernels/scalar_cast_test.cc  |  12 +-
 .../arrow/compute/kernels/scalar_compare_test.cc   |   5 +-
 .../arrow/compute/kernels/scalar_fill_null_test.cc |   3 +-
 .../arrow/compute/kernels/scalar_if_else_test.cc   |   2 +-
 .../compute/kernels/scalar_set_lookup_test.cc      |  12 +-
 cpp/src/arrow/compute/kernels/scalar_temporal.cc   | 175 ++++++++-------------
 .../arrow/compute/kernels/scalar_temporal_test.cc  |  32 ++--
 cpp/src/arrow/compute/kernels/test_util.cc         |  61 ++++++-
 cpp/src/arrow/compute/kernels/test_util.h          |   2 +
 cpp/src/arrow/compute/kernels/vector_hash_test.cc  |  13 +-
 .../arrow/compute/kernels/vector_selection_test.cc |  52 +++---
 cpp/src/arrow/compute/kernels/vector_sort_test.cc  |  11 +-
 cpp/src/arrow/filesystem/s3fs.cc                   |   3 +-
 cpp/src/arrow/testing/gtest_util.cc                |  25 ++-
 cpp/src/arrow/testing/gtest_util.h                 |   7 +
 cpp/src/arrow/util/hashing.h                       |  12 +-
 cpp/src/arrow/util/windows_fixup.h                 |   7 +
 cpp/src/parquet/arrow/writer.cc                    |   2 -
 cpp/src/parquet/encoding.cc                        |   2 +-
 cpp/src/parquet/encoding_test.cc                   |   2 +-
 32 files changed, 421 insertions(+), 251 deletions(-)

diff --git a/cpp/src/arrow/array/builder_primitive.cc b/cpp/src/arrow/array/builder_primitive.cc
index 037a1ec..e403c42 100644
--- a/cpp/src/arrow/array/builder_primitive.cc
+++ b/cpp/src/arrow/array/builder_primitive.cc
@@ -65,9 +65,8 @@ Status BooleanBuilder::Resize(int64_t capacity) {
 }
 
 Status BooleanBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  std::shared_ptr<Buffer> null_bitmap, data;
-  RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
-  RETURN_NOT_OK(data_builder_.Finish(&data));
+  ARROW_ASSIGN_OR_RAISE(auto null_bitmap, null_bitmap_builder_.FinishWithLength(length_));
+  ARROW_ASSIGN_OR_RAISE(auto data, data_builder_.FinishWithLength(length_));
 
   *out = ArrayData::Make(boolean(), length_, {null_bitmap, data}, null_count_);
 
diff --git a/cpp/src/arrow/array/builder_primitive.h b/cpp/src/arrow/array/builder_primitive.h
index e10f11f..e0f39f9 100644
--- a/cpp/src/arrow/array/builder_primitive.h
+++ b/cpp/src/arrow/array/builder_primitive.h
@@ -23,6 +23,7 @@
 
 #include "arrow/array/builder_base.h"
 #include "arrow/array/data.h"
+#include "arrow/result.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 
@@ -185,9 +186,9 @@ class NumericBuilder : public ArrayBuilder {
   }
 
   Status FinishInternal(std::shared_ptr<ArrayData>* out) override {
-    std::shared_ptr<Buffer> data, null_bitmap;
-    ARROW_RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
-    ARROW_RETURN_NOT_OK(data_builder_.Finish(&data));
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap,
+                          null_bitmap_builder_.FinishWithLength(length_));
+    ARROW_ASSIGN_OR_RAISE(auto data, data_builder_.FinishWithLength(length_));
     *out = ArrayData::Make(type(), length_, {null_bitmap, data}, null_count_);
     capacity_ = length_ = null_count_ = 0;
     return Status::OK();
diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc
index 297745a..d485223 100644
--- a/cpp/src/arrow/array/util.cc
+++ b/cpp/src/arrow/array/util.cc
@@ -286,7 +286,7 @@ std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data) {
 // ----------------------------------------------------------------------
 // Misc APIs
 
-namespace internal {
+namespace {
 
 // get the maximum buffer length required, then allocate a single zeroed buffer
 // to use anywhere a buffer is required
@@ -650,12 +650,11 @@ class RepeatedArrayFactory {
   std::shared_ptr<Array> out_;
 };
 
-}  // namespace internal
+}  // namespace
 
 Result<std::shared_ptr<Array>> MakeArrayOfNull(const std::shared_ptr<DataType>& type,
                                                int64_t length, MemoryPool* pool) {
-  ARROW_ASSIGN_OR_RAISE(auto data,
-                        internal::NullArrayFactory(pool, type, length).Create());
+  ARROW_ASSIGN_OR_RAISE(auto data, NullArrayFactory(pool, type, length).Create());
   return MakeArray(data);
 }
 
@@ -664,7 +663,7 @@ Result<std::shared_ptr<Array>> MakeArrayFromScalar(const Scalar& scalar, int64_t
   if (!scalar.is_valid) {
     return MakeArrayOfNull(scalar.type, length, pool);
   }
-  return internal::RepeatedArrayFactory(pool, scalar, length).Create();
+  return RepeatedArrayFactory(pool, scalar, length).Create();
 }
 
 namespace internal {
diff --git a/cpp/src/arrow/buffer_builder.h b/cpp/src/arrow/buffer_builder.h
index f525ec2..c6250ae 100644
--- a/cpp/src/arrow/buffer_builder.h
+++ b/cpp/src/arrow/buffer_builder.h
@@ -64,15 +64,12 @@ class ARROW_EXPORT BufferBuilder {
   /// \brief Resize the buffer to the nearest multiple of 64 bytes
   ///
   /// \param new_capacity the new capacity of the of the builder. Will be
-  /// rounded up to a multiple of 64 bytes for padding \param shrink_to_fit if
-  /// new capacity is smaller than the existing size, reallocate internal
-  /// buffer. Set to false to avoid reallocations when shrinking the builder.
+  /// rounded up to a multiple of 64 bytes for padding
+  /// \param shrink_to_fit if new capacity is smaller than the existing,
+  /// reallocate internal buffer. Set to false to avoid reallocations when
+  /// shrinking the builder.
   /// \return Status
   Status Resize(const int64_t new_capacity, bool shrink_to_fit = true) {
-    // Resize(0) is a no-op
-    if (new_capacity == 0) {
-      return Status::OK();
-    }
     if (buffer_ == NULLPTR) {
       ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(new_capacity, pool_));
     } else {
@@ -168,6 +165,17 @@ class ARROW_EXPORT BufferBuilder {
     return out;
   }
 
+  /// \brief Like Finish, but override the final buffer size
+  ///
+  /// This is useful after writing data directly into the builder memory
+  /// without calling the Append methods (basically, when using BufferBuilder
+  /// mostly for memory allocation).
+  Result<std::shared_ptr<Buffer>> FinishWithLength(int64_t final_length,
+                                                   bool shrink_to_fit = true) {
+    size_ = final_length;
+    return Finish(shrink_to_fit);
+  }
+
   void Reset() {
     buffer_ = NULLPTR;
     capacity_ = size_ = 0;
@@ -273,6 +281,16 @@ class TypedBufferBuilder<
     return out;
   }
 
+  /// \brief Like Finish, but override the final buffer size
+  ///
+  /// This is useful after writing data directly into the builder memory
+  /// without calling the Append methods (basically, when using TypedBufferBuilder
+  /// only for memory allocation).
+  Result<std::shared_ptr<Buffer>> FinishWithLength(int64_t final_length,
+                                                   bool shrink_to_fit = true) {
+    return bytes_builder_.FinishWithLength(final_length * sizeof(T), shrink_to_fit);
+  }
+
   void Reset() { bytes_builder_.Reset(); }
 
   int64_t length() const { return bytes_builder_.length() / sizeof(T); }
@@ -399,6 +417,19 @@ class TypedBufferBuilder<bool> {
     return out;
   }
 
+  /// \brief Like Finish, but override the final buffer size
+  ///
+  /// This is useful after writing data directly into the builder memory
+  /// without calling the Append methods (basically, when using TypedBufferBuilder
+  /// only for memory allocation).
+  Result<std::shared_ptr<Buffer>> FinishWithLength(int64_t final_length,
+                                                   bool shrink_to_fit = true) {
+    const auto final_byte_length = BitUtil::BytesForBits(final_length);
+    bytes_builder_.UnsafeAdvance(final_byte_length - bytes_builder_.length());
+    bit_length_ = false_count_ = 0;
+    return bytes_builder_.FinishWithLength(final_byte_length, shrink_to_fit);
+  }
+
   void Reset() {
     bytes_builder_.Reset();
     bit_length_ = false_count_ = 0;
diff --git a/cpp/src/arrow/buffer_test.cc b/cpp/src/arrow/buffer_test.cc
index 02b96c3..4295d4c 100644
--- a/cpp/src/arrow/buffer_test.cc
+++ b/cpp/src/arrow/buffer_test.cc
@@ -653,18 +653,77 @@ TEST(TestBufferBuilder, ResizeReserve) {
 
   ASSERT_OK(builder.Resize(128));
   ASSERT_EQ(128, builder.capacity());
+  ASSERT_EQ(9, builder.length());
 
   // Do not shrink to fit
   ASSERT_OK(builder.Resize(64, false));
   ASSERT_EQ(128, builder.capacity());
+  ASSERT_EQ(9, builder.length());
 
   // Shrink to fit
   ASSERT_OK(builder.Resize(64));
   ASSERT_EQ(64, builder.capacity());
+  ASSERT_EQ(9, builder.length());
 
   // Reserve elements
   ASSERT_OK(builder.Reserve(60));
   ASSERT_EQ(128, builder.capacity());
+  ASSERT_EQ(9, builder.length());
+}
+
+TEST(TestBufferBuilder, Finish) {
+  const std::string data = "some data";
+  auto data_ptr = data.c_str();
+
+  for (const bool shrink_to_fit : {true, false}) {
+    ARROW_SCOPED_TRACE("shrink_to_fit = ", shrink_to_fit);
+    BufferBuilder builder;
+    ASSERT_OK(builder.Append(data_ptr, 9));
+    ASSERT_OK(builder.Append(data_ptr, 9));
+    ASSERT_EQ(18, builder.length());
+    ASSERT_EQ(64, builder.capacity());
+
+    ASSERT_OK_AND_ASSIGN(auto buf, builder.Finish(shrink_to_fit));
+    ASSERT_EQ(buf->size(), 18);
+    ASSERT_EQ(buf->capacity(), 64);
+  }
+  for (const bool shrink_to_fit : {true, false}) {
+    ARROW_SCOPED_TRACE("shrink_to_fit = ", shrink_to_fit);
+    BufferBuilder builder;
+    ASSERT_OK(builder.Reserve(1024));
+    builder.UnsafeAppend(data_ptr, 9);
+    builder.UnsafeAppend(data_ptr, 9);
+    ASSERT_EQ(18, builder.length());
+    ASSERT_EQ(builder.capacity(), 1024);
+
+    ASSERT_OK_AND_ASSIGN(auto buf, builder.Finish(shrink_to_fit));
+    ASSERT_EQ(buf->size(), 18);
+    ASSERT_EQ(buf->capacity(), shrink_to_fit ? 64 : 1024);
+  }
+}
+
+TEST(TestBufferBuilder, FinishEmpty) {
+  for (const bool shrink_to_fit : {true, false}) {
+    ARROW_SCOPED_TRACE("shrink_to_fit = ", shrink_to_fit);
+    BufferBuilder builder;
+    ASSERT_EQ(0, builder.length());
+    ASSERT_EQ(0, builder.capacity());
+
+    ASSERT_OK_AND_ASSIGN(auto buf, builder.Finish(shrink_to_fit));
+    ASSERT_EQ(buf->size(), 0);
+    ASSERT_EQ(buf->capacity(), 0);
+  }
+  for (const bool shrink_to_fit : {true, false}) {
+    ARROW_SCOPED_TRACE("shrink_to_fit = ", shrink_to_fit);
+    BufferBuilder builder;
+    ASSERT_OK(builder.Reserve(1024));
+    ASSERT_EQ(0, builder.length());
+    ASSERT_EQ(1024, builder.capacity());
+
+    ASSERT_OK_AND_ASSIGN(auto buf, builder.Finish(shrink_to_fit));
+    ASSERT_EQ(buf->size(), 0);
+    ASSERT_EQ(buf->capacity(), shrink_to_fit ? 0 : 1024);
+  }
 }
 
 template <typename T>
@@ -717,7 +776,7 @@ TYPED_TEST(TypedTestBufferBuilder, AppendCopies) {
   }
 }
 
-TEST(TestBufferBuilder, BasicBoolBufferBuilderUsage) {
+TEST(TestBoolBufferBuilder, Basics) {
   TypedBufferBuilder<bool> builder;
 
   ASSERT_OK(builder.Append(false));
@@ -746,7 +805,7 @@ TEST(TestBufferBuilder, BasicBoolBufferBuilderUsage) {
   ASSERT_EQ(built->size(), BitUtil::BytesForBits(nvalues + 1));
 }
 
-TEST(TestBufferBuilder, BoolBufferBuilderAppendCopies) {
+TEST(TestBoolBufferBuilder, AppendCopies) {
   TypedBufferBuilder<bool> builder;
 
   ASSERT_OK(builder.Append(13, true));
@@ -766,6 +825,21 @@ TEST(TestBufferBuilder, BoolBufferBuilderAppendCopies) {
   ASSERT_EQ(built->size(), BitUtil::BytesForBits(13 + 17));
 }
 
+TEST(TestBoolBufferBuilder, Reserve) {
+  TypedBufferBuilder<bool> builder;
+
+  ASSERT_OK(builder.Reserve(13 + 17));
+  builder.UnsafeAppend(13, true);
+  builder.UnsafeAppend(17, false);
+  ASSERT_EQ(builder.length(), 13 + 17);
+  ASSERT_EQ(builder.capacity(), 64 * 8);
+  ASSERT_EQ(builder.false_count(), 17);
+
+  ASSERT_OK_AND_ASSIGN(auto built, builder.Finish());
+  AssertIsCPUBuffer(*built);
+  ASSERT_EQ(built->size(), BitUtil::BytesForBits(13 + 17));
+}
+
 template <typename T>
 class TypedTestBuffer : public ::testing::Test {};
 
diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc
index 0b1f6b5..73cb82e 100644
--- a/cpp/src/arrow/compute/exec.cc
+++ b/cpp/src/arrow/compute/exec.cc
@@ -106,7 +106,6 @@ Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t l
     int64_t buffer_size = BitUtil::BytesForBits(length * bit_width);
     return ctx->Allocate(buffer_size);
   }
-  return Status::OK();
 }
 
 struct BufferPreallocation {
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index 476caab..4bce02a 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -1215,7 +1215,7 @@ class TestPrimitiveModeKernel : public ::testing::Test {
                       const std::vector<CType>& expected_modes,
                       const std::vector<int64_t>& expected_counts) {
     ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, ModeOptions{n}));
-    ASSERT_OK(out.make_array()->ValidateFull());
+    ValidateOutput(out);
     const StructArray out_array(out.array());
     ASSERT_EQ(out_array.length(), expected_modes.size());
     ASSERT_EQ(out_array.num_fields(), 2);
@@ -1256,7 +1256,8 @@ class TestPrimitiveModeKernel : public ::testing::Test {
 
   void AssertModesEmpty(const Datum& array, int n) {
     ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, ModeOptions{n}));
-    ASSERT_OK(out.make_array()->ValidateFull());
+    auto out_array = out.make_array();
+    ValidateOutput(*out_array);
     ASSERT_EQ(out.array()->length, 0);
   }
 
@@ -1397,8 +1398,8 @@ template <typename ArrowType, typename CTYPE = typename ArrowType::c_type>
 void VerifyMode(const std::shared_ptr<Array>& array) {
   auto expected = NaiveMode<ArrowType>(*array);
   ASSERT_OK_AND_ASSIGN(Datum out, Mode(array));
-  ASSERT_OK(out.make_array()->ValidateFull());
   const StructArray out_array(out.array());
+  ValidateOutput(out_array);
   ASSERT_EQ(out_array.length(), 1);
   ASSERT_EQ(out_array.num_fields(), 2);
 
@@ -1756,7 +1757,7 @@ class TestPrimitiveQuantileKernel : public ::testing::Test {
 
       ASSERT_OK_AND_ASSIGN(Datum out, Quantile(array, options));
       const auto& out_array = out.make_array();
-      ASSERT_OK(out_array->ValidateFull());
+      ValidateOutput(*out_array);
       ASSERT_EQ(out_array->length(), options.q.size());
       ASSERT_EQ(out_array->null_count(), 0);
       AssertTypeEqual(out_array->type(), expected[0][i].type());
@@ -1816,7 +1817,8 @@ class TestPrimitiveQuantileKernel : public ::testing::Test {
     for (auto interpolation : this->interpolations_) {
       options.interpolation = interpolation;
       ASSERT_OK_AND_ASSIGN(Datum out, Quantile(array, options));
-      ASSERT_OK(out.make_array()->ValidateFull());
+      auto out_array = out.make_array();
+      ValidateOutput(*out_array);
       ASSERT_EQ(out.array()->length, 0);
     }
   }
@@ -2044,7 +2046,7 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel<ArrowType> {
     TDigestOptions options(quantiles);
     ASSERT_OK_AND_ASSIGN(Datum out, TDigest(chunked, options));
     const auto& out_array = out.make_array();
-    ASSERT_OK(out_array->ValidateFull());
+    ValidateOutput(*out_array);
     ASSERT_EQ(out_array->length(), quantiles.size());
     ASSERT_EQ(out_array->null_count(), 0);
     AssertTypeEqual(out_array->type(), float64());
@@ -2186,7 +2188,8 @@ TEST_F(TestTDigestKernel, AllNullsOrNaNs) {
   for (const auto& json : tests) {
     auto chunked = ChunkedArrayFromJSON(float64(), json);
     ASSERT_OK_AND_ASSIGN(Datum out, TDigest(chunked, TDigestOptions()));
-    ASSERT_OK(out.make_array()->ValidateFull());
+    auto out_array = out.make_array();
+    ValidateOutput(*out_array);
     ASSERT_EQ(out.array()->length, 0);
   }
 }
diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h
index 6a5cee1..140f9fd 100644
--- a/cpp/src/arrow/compute/kernels/codegen_internal.h
+++ b/cpp/src/arrow/compute/kernels/codegen_internal.h
@@ -276,6 +276,8 @@ struct OutputArrayWriter<Type, enable_if_has_c_type_not_boolean<Type>> {
   // Note that this doesn't write the null bitmap, which should be consistent
   // with Write / WriteNull calls
   void WriteNull() { *values++ = T{}; }
+
+  void WriteAllNull(int64_t length) { std::memset(values, 0, sizeof(T) * length); }
 };
 
 template <typename Type>
@@ -290,6 +292,8 @@ struct OutputArrayWriter<Type, enable_if_decimal<Type>> {
   void Write(T value) { value.ToBytes(values++->data()); }
 
   void WriteNull() { T{}.ToBytes(values++->data()); }
+
+  void WriteAllNull(int64_t length) { std::memset(values, 0, sizeof(T) * length); }
 };
 
 // (Un)box Scalar to / from C++ value
@@ -918,6 +922,8 @@ struct ScalarBinaryNotNullStateful {
                 op.template Call<OutValue, Arg0Value, Arg1Value>(ctx, u, arg1_val, &st));
           },
           [&]() { writer.WriteNull(); });
+    } else {
+      writer.WriteAllNull(out->mutable_array()->length);
     }
     return st;
   }
@@ -935,6 +941,8 @@ struct ScalarBinaryNotNullStateful {
                 op.template Call<OutValue, Arg0Value, Arg1Value>(ctx, arg0_val, v, &st));
           },
           [&]() { writer.WriteNull(); });
+    } else {
+      writer.WriteAllNull(out->mutable_array()->length);
     }
     return st;
   }
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index 5f6503f..e282035 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -442,6 +442,9 @@ struct GrouperImpl : Grouper {
 };
 
 struct GrouperFastImpl : Grouper {
+  static constexpr int kBitmapPaddingForSIMD = 64;  // bits
+  static constexpr int kPaddingForSIMD = 32;        // bytes
+
   static bool CanUse(const std::vector<ValueDescr>& keys) {
 #if ARROW_LITTLE_ENDIAN
     for (size_t i = 0; i < keys.size(); ++i) {
@@ -517,9 +520,8 @@ struct GrouperFastImpl : Grouper {
                                   impl->encode_ctx_.stack, impl->log_minibatch_max_,
                                   equal_func, append_func));
     impl->cols_.resize(num_columns);
-    constexpr int padding_for_SIMD = 32;
     impl->minibatch_hashes_.resize(impl->minibatch_size_max_ +
-                                   padding_for_SIMD / sizeof(uint32_t));
+                                   kPaddingForSIMD / sizeof(uint32_t));
 
     return std::move(impl);
   }
@@ -608,6 +610,22 @@ struct GrouperFastImpl : Grouper {
 
   uint32_t num_groups() const override { return static_cast<uint32_t>(rows_.length()); }
 
+  // Make sure padded buffers end up with the right logical size
+
+  Result<std::shared_ptr<Buffer>> AllocatePaddedBitmap(int64_t length) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> buf,
+        AllocateBitmap(length + kBitmapPaddingForSIMD, ctx_->memory_pool()));
+    return SliceMutableBuffer(buf, 0, BitUtil::BytesForBits(length));
+  }
+
+  Result<std::shared_ptr<Buffer>> AllocatePaddedBuffer(int64_t size) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> buf,
+        AllocateBuffer(size + kBitmapPaddingForSIMD, ctx_->memory_pool()));
+    return SliceMutableBuffer(buf, 0, size);
+  }
+
   Result<ExecBatch> GetUniques() override {
     auto num_columns = static_cast<uint32_t>(col_metadata_.size());
     int64_t num_groups = rows_.length();
@@ -616,28 +634,19 @@ struct GrouperFastImpl : Grouper {
     std::vector<std::shared_ptr<Buffer>> fixedlen_bufs(num_columns);
     std::vector<std::shared_ptr<Buffer>> varlen_bufs(num_columns);
 
-    constexpr int padding_bits = 64;
-    constexpr int padding_for_SIMD = 32;
     for (size_t i = 0; i < num_columns; ++i) {
-      ARROW_ASSIGN_OR_RAISE(non_null_bufs[i], AllocateBitmap(num_groups + padding_bits,
-                                                             ctx_->memory_pool()));
+      ARROW_ASSIGN_OR_RAISE(non_null_bufs[i], AllocatePaddedBitmap(num_groups));
       if (col_metadata_[i].is_fixed_length) {
         if (col_metadata_[i].fixed_length == 0) {
-          ARROW_ASSIGN_OR_RAISE(
-              fixedlen_bufs[i],
-              AllocateBitmap(num_groups + padding_bits, ctx_->memory_pool()));
+          ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i], AllocatePaddedBitmap(num_groups));
         } else {
           ARROW_ASSIGN_OR_RAISE(
               fixedlen_bufs[i],
-              AllocateBuffer(
-                  num_groups * col_metadata_[i].fixed_length + padding_for_SIMD,
-                  ctx_->memory_pool()));
+              AllocatePaddedBuffer(num_groups * col_metadata_[i].fixed_length));
         }
       } else {
-        ARROW_ASSIGN_OR_RAISE(
-            fixedlen_bufs[i],
-            AllocateBuffer((num_groups + 1) * sizeof(uint32_t) + padding_for_SIMD,
-                           ctx_->memory_pool()));
+        ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i],
+                              AllocatePaddedBuffer((num_groups + 1) * sizeof(uint32_t)));
       }
       cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray(
           col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
@@ -657,9 +666,7 @@ struct GrouperFastImpl : Grouper {
         if (!col_metadata_[i].is_fixed_length) {
           auto varlen_size =
               reinterpret_cast<const uint32_t*>(fixedlen_bufs[i]->data())[num_groups];
-          ARROW_ASSIGN_OR_RAISE(
-              varlen_bufs[i],
-              AllocateBuffer(varlen_size + padding_for_SIMD, ctx_->memory_pool()));
+          ARROW_ASSIGN_OR_RAISE(varlen_bufs[i], AllocatePaddedBuffer(varlen_size));
           cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray(
               col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
               fixedlen_bufs[i]->mutable_data(), varlen_bufs[i]->mutable_data());
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index 5e4f8c5..a8f8c64 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -116,7 +116,7 @@ void ValidateGroupBy(const std::vector<internal::Aggregate>& aggregates,
   ASSERT_OK_AND_ASSIGN(Datum actual, GroupBy(arguments, keys, aggregates));
 
   ASSERT_OK(expected.make_array()->ValidateFull());
-  ASSERT_OK(actual.make_array()->ValidateFull());
+  ValidateOutput(actual);
 
   AssertDatumsEqual(expected, actual, /*verbose=*/true);
 }
@@ -250,7 +250,7 @@ struct TestGrouper {
       // check that uniques_ are prefixes of new_uniques
       for (int i = 0; i < uniques_.num_values(); ++i) {
         auto new_unique = new_uniques[i].make_array();
-        ASSERT_OK(new_unique->ValidateFull());
+        ValidateOutput(*new_unique);
 
         AssertDatumsEqual(uniques_[i], new_unique->Slice(0, uniques_.length),
                           /*verbose=*/true);
@@ -261,7 +261,7 @@ struct TestGrouper {
 
     // check that the ids encode an equivalent key sequence
     auto ids = id_batch.make_array();
-    ASSERT_OK(ids->ValidateFull());
+    ValidateOutput(*ids);
 
     for (int i = 0; i < key_batch.num_values(); ++i) {
       SCOPED_TRACE(std::to_string(i) + "th key array");
diff --git a/cpp/src/arrow/compute/kernels/scalar_arithmetic_test.cc b/cpp/src/arrow/compute/kernels/scalar_arithmetic_test.cc
index 3ee862c..ae2f55c 100644
--- a/cpp/src/arrow/compute/kernels/scalar_arithmetic_test.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_arithmetic_test.cc
@@ -125,7 +125,7 @@ class TestUnaryArithmetic : public TestBase {
 
   void ValidateAndAssertApproxEqual(const std::shared_ptr<Array>& actual,
                                     const std::shared_ptr<Array>& expected) {
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(*actual);
     AssertArraysApproxEqual(*expected, *actual, /*verbose=*/true, equal_options_);
   }
 
@@ -262,7 +262,7 @@ class TestBinaryArithmetic : public TestBase {
 
   void ValidateAndAssertApproxEqual(const std::shared_ptr<Array>& actual,
                                     const std::shared_ptr<Array>& expected) {
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(*actual);
     AssertArraysApproxEqual(*expected, *actual, /*verbose=*/true, equal_options_);
   }
 
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc b/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc
index f42635c..198c82b 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_internal.cc
@@ -255,7 +255,12 @@ static bool CanCastFromDictionary(Type::type type_id) {
 
 void AddCommonCasts(Type::type out_type_id, OutputType out_ty, CastFunction* func) {
   // From null to this type
-  DCHECK_OK(func->AddKernel(Type::NA, {null()}, out_ty, CastFromNull));
+  ScalarKernel kernel;
+  kernel.exec = CastFromNull;
+  kernel.signature = KernelSignature::Make({null()}, out_ty);
+  kernel.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE;
+  kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
+  DCHECK_OK(func->AddKernel(Type::NA, std::move(kernel)));
 
   // From dictionary to this type
   if (CanCastFromDictionary(out_type_id)) {
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_test.cc b/cpp/src/arrow/compute/kernels/scalar_cast_test.cc
index ef22fa8..494b15d 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_test.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_test.cc
@@ -113,7 +113,7 @@ static void CheckCastZeroCopy(std::shared_ptr<Array> input,
                               std::shared_ptr<DataType> to_type,
                               CastOptions options = CastOptions::Safe()) {
   ASSERT_OK_AND_ASSIGN(auto converted, Cast(*input, to_type, options));
-  ASSERT_OK(converted->ValidateFull());
+  ValidateOutput(*converted);
 
   ASSERT_EQ(input->data()->buffers.size(), converted->data()->buffers.size());
   for (size_t i = 0; i < input->data()->buffers.size(); ++i) {
@@ -1583,7 +1583,7 @@ TEST(Cast, BinaryOrStringToBinary) {
 
       // invalid utf-8 is not an error for binary
       ASSERT_OK_AND_ASSIGN(auto strings, Cast(*invalid_utf8, to_type));
-      ASSERT_OK(strings->ValidateFull());
+      ValidateOutput(*strings);
       AssertBinaryZeroCopy(invalid_utf8, strings);
 
       // invalid utf-8 masked by a null bit is not an error
@@ -1687,12 +1687,12 @@ TEST(Cast, ListToList) {
     auto list_int64 = list_int32->Copy();
     list_int64->type = make_list(int64());
     list_int64->child_data[0] = Cast(list_int32->child_data[0], int64())->array();
-    ASSERT_OK(MakeArray(list_int64)->ValidateFull());
+    ValidateOutput(*list_int64);
 
     auto list_float32 = list_int32->Copy();
     list_float32->type = make_list(float32());
     list_float32->child_data[0] = Cast(list_int32->child_data[0], float32())->array();
-    ASSERT_OK(MakeArray(list_float32)->ValidateFull());
+    ValidateOutput(*list_float32);
 
     CheckCast(MakeArray(list_int32), MakeArray(list_float32));
     CheckCast(MakeArray(list_float32), MakeArray(list_int64));
@@ -1711,7 +1711,7 @@ TEST(Cast, ListToList) {
     auto list_int64 = list_int32->Copy();
     list_int64->type = make_list(int64());
     list_int64->child_data[0] = Cast(list_int32->child_data[0], int64())->array();
-    ASSERT_OK(MakeArray(list_int64)->ValidateFull());
+    ValidateOutput(*list_int64);
 
     CheckCast(MakeArray(list_int32), MakeArray(list_int64));
     CheckCast(MakeArray(list_int64), MakeArray(list_int32));
@@ -1861,7 +1861,7 @@ TEST(Cast, FromDictionary) {
     data->buffers[0] = nullptr;
     data->null_count = 0;
     std::shared_ptr<Array> dict_array = std::make_shared<DictionaryArray>(data);
-    ASSERT_OK(dict_array->ValidateFull());
+    ValidateOutput(*dict_array);
 
     CheckCast(dict_array, no_nulls);
   }
diff --git a/cpp/src/arrow/compute/kernels/scalar_compare_test.cc b/cpp/src/arrow/compute/kernels/scalar_compare_test.cc
index 50327e8..87f3bd3 100644
--- a/cpp/src/arrow/compute/kernels/scalar_compare_test.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_compare_test.cc
@@ -673,10 +673,7 @@ class TestVarArgsCompare : public TestBase {
   Datum Eval(VarArgsFunction func, const std::vector<Datum>& args) {
     EXPECT_OK_AND_ASSIGN(auto actual,
                          func(args, element_wise_aggregate_options_, nullptr));
-    if (actual.is_array()) {
-      auto arr = actual.make_array();
-      ARROW_EXPECT_OK(arr->ValidateFull());
-    }
+    ValidateOutput(actual);
     return actual;
   }
 
diff --git a/cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc b/cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc
index a0b6fdc..70ce4d5 100644
--- a/cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_fill_null_test.cc
@@ -22,6 +22,7 @@
 
 #include "arrow/array/array_base.h"
 #include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
 #include "arrow/result.h"
 #include "arrow/scalar.h"
 #include "arrow/testing/gtest_compat.h"
@@ -38,7 +39,7 @@ void CheckFillNull(const Array& input, const Datum& fill_value, const Array& exp
   auto Check = [&](const Array& input, const Array& expected) {
     ASSERT_OK_AND_ASSIGN(Datum datum_out, FillNull(input, fill_value));
     std::shared_ptr<Array> result = datum_out.make_array();
-    ASSERT_OK(result->ValidateFull());
+    ValidateOutput(*result);
     AssertArraysEqual(expected, *result, /*verbose=*/true);
     if (all_valid) {
       // Check null count of ArrayData is set, not the computed Array.null_count
diff --git a/cpp/src/arrow/compute/kernels/scalar_if_else_test.cc b/cpp/src/arrow/compute/kernels/scalar_if_else_test.cc
index 0fb0a1f..2b63af2 100644
--- a/cpp/src/arrow/compute/kernels/scalar_if_else_test.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_if_else_test.cc
@@ -29,7 +29,7 @@ void CheckIfElseOutput(const Datum& cond, const Datum& left, const Datum& right,
   ASSERT_OK_AND_ASSIGN(Datum datum_out, IfElse(cond, left, right));
   if (datum_out.is_array()) {
     std::shared_ptr<Array> result = datum_out.make_array();
-    ASSERT_OK(result->ValidateFull());
+    ValidateOutput(*result);
     std::shared_ptr<Array> expected_ = expected.make_array();
     AssertArraysEqual(*expected_, *result, /*verbose=*/true);
   } else {  // expecting scalar
diff --git a/cpp/src/arrow/compute/kernels/scalar_set_lookup_test.cc b/cpp/src/arrow/compute/kernels/scalar_set_lookup_test.cc
index 5c8bf98..9b6ded0 100644
--- a/cpp/src/arrow/compute/kernels/scalar_set_lookup_test.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_set_lookup_test.cc
@@ -57,7 +57,7 @@ void CheckIsIn(const std::shared_ptr<DataType>& type, const std::string& input_j
   ASSERT_OK_AND_ASSIGN(Datum actual_datum,
                        IsIn(input, SetLookupOptions(value_set, skip_nulls)));
   std::shared_ptr<Array> actual = actual_datum.make_array();
-  ASSERT_OK(actual->ValidateFull());
+  ValidateOutput(actual_datum);
   AssertArraysEqual(*expected, *actual, /*verbose=*/true);
 }
 
@@ -68,7 +68,7 @@ void CheckIsInChunked(const std::shared_ptr<ChunkedArray>& input,
   ASSERT_OK_AND_ASSIGN(Datum actual_datum,
                        IsIn(input, SetLookupOptions(value_set, skip_nulls)));
   auto actual = actual_datum.chunked_array();
-  ASSERT_OK(actual->ValidateFull());
+  ValidateOutput(actual_datum);
   AssertChunkedEqual(*expected, *actual);
 }
 
@@ -89,7 +89,7 @@ void CheckIsInDictionary(const std::shared_ptr<DataType>& type,
   ASSERT_OK_AND_ASSIGN(Datum actual_datum,
                        IsIn(input, SetLookupOptions(value_set, skip_nulls)));
   std::shared_ptr<Array> actual = actual_datum.make_array();
-  ASSERT_OK(actual->ValidateFull());
+  ValidateOutput(actual_datum);
   AssertArraysEqual(*expected, *actual, /*verbose=*/true);
 }
 
@@ -436,7 +436,7 @@ class TestIndexInKernel : public ::testing::Test {
     SetLookupOptions options(value_set, skip_nulls);
     ASSERT_OK_AND_ASSIGN(Datum actual_datum, IndexIn(input, options));
     std::shared_ptr<Array> actual = actual_datum.make_array();
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual_datum);
     AssertArraysEqual(*expected, *actual, /*verbose=*/true);
   }
 
@@ -447,7 +447,7 @@ class TestIndexInKernel : public ::testing::Test {
     ASSERT_OK_AND_ASSIGN(Datum actual,
                          IndexIn(input, SetLookupOptions(value_set, skip_nulls)));
     ASSERT_EQ(Datum::CHUNKED_ARRAY, actual.kind());
-    ASSERT_OK(actual.chunked_array()->ValidateFull());
+    ValidateOutput(actual);
     AssertChunkedEqual(*expected, *actual.chunked_array());
   }
 
@@ -469,7 +469,7 @@ class TestIndexInKernel : public ::testing::Test {
     SetLookupOptions options(value_set, skip_nulls);
     ASSERT_OK_AND_ASSIGN(Datum actual_datum, IndexIn(input, options));
     std::shared_ptr<Array> actual = actual_datum.make_array();
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual_datum);
     AssertArraysEqual(*expected, *actual, /*verbose=*/true);
   }
 };
diff --git a/cpp/src/arrow/compute/kernels/scalar_temporal.cc b/cpp/src/arrow/compute/kernels/scalar_temporal.cc
index cc22ccf..1694d22 100644
--- a/cpp/src/arrow/compute/kernels/scalar_temporal.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_temporal.cc
@@ -17,13 +17,16 @@
 
 #include "arrow/builder.h"
 #include "arrow/compute/kernels/common.h"
+#include "arrow/util/checked_cast.h"
 #include "arrow/util/time.h"
 #include "arrow/vendored/datetime.h"
 
 namespace arrow {
 
-namespace compute {
+using internal::checked_cast;
+using internal::checked_pointer_cast;
 
+namespace compute {
 namespace internal {
 
 namespace {
@@ -45,77 +48,35 @@ using arrow_vendored::date::literals::thu;
 using internal::applicator::ScalarUnaryNotNull;
 using internal::applicator::SimpleUnary;
 
-// Based on ScalarUnaryNotNullStateful. Adds timezone awareness.
-template <typename Op, typename OutType>
-struct ScalarUnaryStatefulTemporal {
-  using ThisType = ScalarUnaryStatefulTemporal<Op, OutType>;
-  using OutValue = typename internal::GetOutputType<OutType>::T;
+const std::string& GetInputTimezone(const Datum& datum) {
+  return checked_cast<const TimestampType&>(*datum.type()).timezone();
+}
 
-  Op op;
-  explicit ScalarUnaryStatefulTemporal(Op op) : op(std::move(op)) {}
-
-  template <typename Type>
-  struct ArrayExec {
-    static Status Exec(const ThisType& functor, KernelContext* ctx, const ArrayData& arg0,
-                       Datum* out) {
-      const std::string timezone =
-          checked_pointer_cast<const TimestampType>(arg0.type)->timezone();
-      Status st = Status::OK();
-      ArrayData* out_arr = out->mutable_array();
-      auto out_data = out_arr->GetMutableValues<OutValue>(1);
-
-      if (timezone.empty()) {
-        internal::VisitArrayValuesInline<Int64Type>(
-            arg0,
-            [&](int64_t v) {
-              *out_data++ = functor.op.template Call<OutValue>(ctx, v, &st);
-            },
-            [&]() {
-              // null
-              ++out_data;
-            });
-      } else {
-        st = Status::Invalid("Timezone aware timestamps not supported. Timezone found: ",
-                             timezone);
-      }
-      return st;
-    }
-  };
-
-  Status Scalar(KernelContext* ctx, const Scalar& arg0, Datum* out) {
-    const std::string timezone =
-        checked_pointer_cast<const TimestampType>(arg0.type)->timezone();
-    Status st = Status::OK();
-    if (timezone.empty()) {
-      if (arg0.is_valid) {
-        int64_t arg0_val = internal::UnboxScalar<Int64Type>::Unbox(arg0);
-        internal::BoxScalar<OutType>::Box(
-            this->op.template Call<OutValue>(ctx, arg0_val, &st), out->scalar().get());
-      }
-    } else {
-      st = Status::Invalid("Timezone aware timestamps not supported. Timezone found: ",
-                           timezone);
-    }
-    return st;
-  }
+const std::string& GetInputTimezone(const Scalar& scalar) {
+  return checked_cast<const TimestampType&>(*scalar.type).timezone();
+}
 
-  Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
-    if (batch[0].kind() == Datum::ARRAY) {
-      return ArrayExec<OutType>::Exec(*this, ctx, *batch[0].array(), out);
-    } else {
-      return Scalar(ctx, *batch[0].scalar(), out);
-    }
+const std::string& GetInputTimezone(const ArrayData& array) {
+  return checked_cast<const TimestampType&>(*array.type).timezone();
+}
+
+template <typename T>
+Status TemporalComponentExtractCheckTimezone(const T& input) {
+  const auto& timezone = GetInputTimezone(input);
+  if (!timezone.empty()) {
+    return Status::NotImplemented(
+        "Cannot extract components from timestamp with specific timezone: ", timezone);
   }
-};
+  return Status::OK();
+}
 
 template <typename Op, typename OutType>
-struct ScalarUnaryTemporal {
+struct TemporalComponentExtract {
   using OutValue = typename internal::GetOutputType<OutType>::T;
 
   static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
-    // Seed kernel with dummy state
-    ScalarUnaryStatefulTemporal<Op, OutType> kernel({});
-    return kernel.Exec(ctx, batch, out);
+    RETURN_NOT_OK(TemporalComponentExtractCheckTimezone(batch.values[0]));
+    return ScalarUnaryNotNull<OutType, TimestampType, Op>::Exec(ctx, batch, out);
   }
 };
 
@@ -124,8 +85,8 @@ struct ScalarUnaryTemporal {
 
 template <typename Duration>
 struct Year {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     return static_cast<T>(static_cast<const int32_t>(
         year_month_day(floor<days>(sys_time<Duration>(Duration{arg}))).year()));
   }
@@ -136,8 +97,8 @@ struct Year {
 
 template <typename Duration>
 struct Month {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     return static_cast<T>(static_cast<const uint32_t>(
         year_month_day(floor<days>(sys_time<Duration>(Duration{arg}))).month()));
   }
@@ -148,8 +109,8 @@ struct Month {
 
 template <typename Duration>
 struct Day {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     return static_cast<T>(static_cast<const uint32_t>(
         year_month_day(floor<days>(sys_time<Duration>(Duration{arg}))).day()));
   }
@@ -160,8 +121,8 @@ struct Day {
 
 template <typename Duration>
 struct DayOfWeek {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     return static_cast<T>(
         weekday(year_month_day(floor<days>(sys_time<Duration>(Duration{arg}))))
             .iso_encoding() -
@@ -174,8 +135,8 @@ struct DayOfWeek {
 
 template <typename Duration>
 struct DayOfYear {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     const auto t = floor<days>(sys_time<Duration>(Duration{arg}));
     return static_cast<T>(
         (t - sys_time<days>(year_month_day(t).year() / jan / 0)).count());
@@ -190,8 +151,8 @@ struct DayOfYear {
 
 template <typename Duration>
 struct ISOYear {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     const auto t = floor<days>(sys_time<Duration>(Duration{arg}));
     auto y = year_month_day{t + days{3}}.year();
     auto start = sys_time<days>((y - years{1}) / dec / thu[last]) + (mon - thu);
@@ -211,8 +172,8 @@ struct ISOYear {
 // https://github.com/HowardHinnant/date/blob/6e921e1b1d21e84a5c82416ba7ecd98e33a436d0/include/date/iso_week.h#L1503
 template <typename Duration>
 struct ISOWeek {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     const auto t = floor<days>(sys_time<Duration>(Duration{arg}));
     auto y = year_month_day{t + days{3}}.year();
     auto start = sys_time<days>((y - years{1}) / dec / thu[last]) + (mon - thu);
@@ -229,8 +190,8 @@ struct ISOWeek {
 
 template <typename Duration>
 struct Quarter {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     const auto ymd = year_month_day(floor<days>(sys_time<Duration>(Duration{arg})));
     return static_cast<T>((static_cast<const uint32_t>(ymd.month()) - 1) / 3 + 1);
   }
@@ -241,8 +202,8 @@ struct Quarter {
 
 template <typename Duration>
 struct Hour {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     Duration t = Duration{arg};
     return static_cast<T>((t - floor<days>(t)) / std::chrono::hours(1));
   }
@@ -253,8 +214,8 @@ struct Hour {
 
 template <typename Duration>
 struct Minute {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     Duration t = Duration{arg};
     return static_cast<T>((t - floor<std::chrono::hours>(t)) / std::chrono::minutes(1));
   }
@@ -265,8 +226,8 @@ struct Minute {
 
 template <typename Duration>
 struct Second {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     Duration t = Duration{arg};
     return static_cast<T>((t - floor<std::chrono::minutes>(t)) / std::chrono::seconds(1));
   }
@@ -277,8 +238,8 @@ struct Second {
 
 template <typename Duration>
 struct Subsecond {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     Duration t = Duration{arg};
     return static_cast<T>(
         (std::chrono::duration<double>(t - floor<std::chrono::seconds>(t)).count()));
@@ -290,8 +251,8 @@ struct Subsecond {
 
 template <typename Duration>
 struct Millisecond {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     Duration t = Duration{arg};
     return static_cast<T>(
         ((t - floor<std::chrono::seconds>(t)) / std::chrono::milliseconds(1)) % 1000);
@@ -303,8 +264,8 @@ struct Millisecond {
 
 template <typename Duration>
 struct Microsecond {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     Duration t = Duration{arg};
     return static_cast<T>(
         ((t - floor<std::chrono::seconds>(t)) / std::chrono::microseconds(1)) % 1000);
@@ -316,8 +277,8 @@ struct Microsecond {
 
 template <typename Duration>
 struct Nanosecond {
-  template <typename T>
-  static T Call(KernelContext*, int64_t arg, Status*) {
+  template <typename T, typename Arg0>
+  static T Call(KernelContext*, Arg0 arg, Status*) {
     Duration t = Duration{arg};
     return static_cast<T>(
         ((t - floor<std::chrono::seconds>(t)) / std::chrono::nanoseconds(1)) % 1000);
@@ -345,13 +306,7 @@ inline std::vector<int64_t> get_iso_calendar(int64_t arg) {
 template <typename Duration>
 struct ISOCalendar {
   static Status Call(KernelContext* ctx, const Scalar& in, Scalar* out) {
-    const std::string timezone =
-        checked_pointer_cast<const TimestampType>(in.type)->timezone();
-    if (!timezone.empty()) {
-      return Status::Invalid("Timezone aware timestamps not supported. Timezone found: ",
-                             timezone);
-    }
-
+    RETURN_NOT_OK(TemporalComponentExtractCheckTimezone(in));
     if (in.is_valid) {
       const std::shared_ptr<DataType> iso_calendar_type =
           struct_({field("iso_year", int64()), field("iso_week", int64()),
@@ -372,12 +327,8 @@ struct ISOCalendar {
 
   static Status Call(KernelContext* ctx, const ArrayData& in, ArrayData* out) {
     using BuilderType = typename TypeTraits<Int64Type>::BuilderType;
-    const std::string timezone =
-        checked_pointer_cast<const TimestampType>(in.type)->timezone();
-    if (!timezone.empty()) {
-      return Status::Invalid("Timezone aware timestamps not supported. Timezone found: ",
-                             timezone);
-    }
+
+    RETURN_NOT_OK(TemporalComponentExtractCheckTimezone(in));
     const std::shared_ptr<DataType> iso_calendar_type =
         struct_({field("iso_year", int64()), field("iso_week", int64()),
                  field("iso_day_of_week", int64())});
@@ -421,22 +372,24 @@ std::shared_ptr<ScalarFunction> MakeTemporal(std::string name, const FunctionDoc
     InputType in_type{match::TimestampTypeUnit(unit)};
     switch (unit) {
       case TimeUnit::SECOND: {
-        auto exec = ScalarUnaryTemporal<Op<std::chrono::seconds>, OutType>::Exec;
+        auto exec = TemporalComponentExtract<Op<std::chrono::seconds>, OutType>::Exec;
         DCHECK_OK(func->AddKernel({in_type}, out_type, std::move(exec)));
         break;
       }
       case TimeUnit::MILLI: {
-        auto exec = ScalarUnaryTemporal<Op<std::chrono::milliseconds>, OutType>::Exec;
+        auto exec =
+            TemporalComponentExtract<Op<std::chrono::milliseconds>, OutType>::Exec;
         DCHECK_OK(func->AddKernel({in_type}, out_type, std::move(exec)));
         break;
       }
       case TimeUnit::MICRO: {
-        auto exec = ScalarUnaryTemporal<Op<std::chrono::microseconds>, OutType>::Exec;
+        auto exec =
+            TemporalComponentExtract<Op<std::chrono::microseconds>, OutType>::Exec;
         DCHECK_OK(func->AddKernel({in_type}, out_type, std::move(exec)));
         break;
       }
       case TimeUnit::NANO: {
-        auto exec = ScalarUnaryTemporal<Op<std::chrono::nanoseconds>, OutType>::Exec;
+        auto exec = TemporalComponentExtract<Op<std::chrono::nanoseconds>, OutType>::Exec;
         DCHECK_OK(func->AddKernel({in_type}, out_type, std::move(exec)));
         break;
       }
diff --git a/cpp/src/arrow/compute/kernels/scalar_temporal_test.cc b/cpp/src/arrow/compute/kernels/scalar_temporal_test.cc
index be1054b..cc01d25 100644
--- a/cpp/src/arrow/compute/kernels/scalar_temporal_test.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_temporal_test.cc
@@ -159,22 +159,22 @@ TEST(ScalarTemporalTest, TestZonedTemporalComponentExtraction) {
     auto unit = timestamp(u, timezone);
     auto timestamps = ArrayFromJSON(unit, times);
 
-    ASSERT_RAISES(Invalid, Year(timestamps));
-    ASSERT_RAISES(Invalid, Month(timestamps));
-    ASSERT_RAISES(Invalid, Day(timestamps));
-    ASSERT_RAISES(Invalid, DayOfWeek(timestamps));
-    ASSERT_RAISES(Invalid, DayOfYear(timestamps));
-    ASSERT_RAISES(Invalid, ISOYear(timestamps));
-    ASSERT_RAISES(Invalid, ISOWeek(timestamps));
-    ASSERT_RAISES(Invalid, ISOCalendar(timestamps));
-    ASSERT_RAISES(Invalid, Quarter(timestamps));
-    ASSERT_RAISES(Invalid, Hour(timestamps));
-    ASSERT_RAISES(Invalid, Minute(timestamps));
-    ASSERT_RAISES(Invalid, Second(timestamps));
-    ASSERT_RAISES(Invalid, Millisecond(timestamps));
-    ASSERT_RAISES(Invalid, Microsecond(timestamps));
-    ASSERT_RAISES(Invalid, Nanosecond(timestamps));
-    ASSERT_RAISES(Invalid, Subsecond(timestamps));
+    ASSERT_RAISES(NotImplemented, Year(timestamps));
+    ASSERT_RAISES(NotImplemented, Month(timestamps));
+    ASSERT_RAISES(NotImplemented, Day(timestamps));
+    ASSERT_RAISES(NotImplemented, DayOfWeek(timestamps));
+    ASSERT_RAISES(NotImplemented, DayOfYear(timestamps));
+    ASSERT_RAISES(NotImplemented, ISOYear(timestamps));
+    ASSERT_RAISES(NotImplemented, ISOWeek(timestamps));
+    ASSERT_RAISES(NotImplemented, ISOCalendar(timestamps));
+    ASSERT_RAISES(NotImplemented, Quarter(timestamps));
+    ASSERT_RAISES(NotImplemented, Hour(timestamps));
+    ASSERT_RAISES(NotImplemented, Minute(timestamps));
+    ASSERT_RAISES(NotImplemented, Second(timestamps));
+    ASSERT_RAISES(NotImplemented, Millisecond(timestamps));
+    ASSERT_RAISES(NotImplemented, Microsecond(timestamps));
+    ASSERT_RAISES(NotImplemented, Nanosecond(timestamps));
+    ASSERT_RAISES(NotImplemented, Subsecond(timestamps));
   }
 }
 }  // namespace compute
diff --git a/cpp/src/arrow/compute/kernels/test_util.cc b/cpp/src/arrow/compute/kernels/test_util.cc
index 1825797..a115171 100644
--- a/cpp/src/arrow/compute/kernels/test_util.cc
+++ b/cpp/src/arrow/compute/kernels/test_util.cc
@@ -22,12 +22,14 @@
 #include <string>
 
 #include "arrow/array.h"
+#include "arrow/array/validate.h"
 #include "arrow/chunked_array.h"
 #include "arrow/compute/exec.h"
 #include "arrow/compute/function.h"
 #include "arrow/compute/registry.h"
 #include "arrow/datum.h"
 #include "arrow/result.h"
+#include "arrow/table.h"
 #include "arrow/testing/gtest_util.h"
 
 namespace arrow {
@@ -49,7 +51,7 @@ void CheckScalarNonRecursive(const std::string& func_name, const DatumVector& in
                              const FunctionOptions* options) {
   ASSERT_OK_AND_ASSIGN(Datum out, CallFunction(func_name, inputs, options));
   std::shared_ptr<Array> actual = std::move(out).make_array();
-  ASSERT_OK(actual->ValidateFull());
+  ValidateOutput(*actual);
   AssertArraysEqual(*expected, *actual, /*verbose=*/true);
 }
 
@@ -164,7 +166,9 @@ void CheckScalar(std::string func_name, const DatumVector& inputs,
 
     ASSERT_OK_AND_ASSIGN(Datum out,
                          CallFunction(func_name, GetDatums(chunked_inputs), options));
-    ASSERT_OK(out.chunked_array()->ValidateFull());
+    ValidateOutput(out);
+    auto chunked = out.chunked_array();
+    (void)chunked;
     AssertDatumsEqual(std::make_shared<ChunkedArray>(expected_chunks), out);
   }
 }
@@ -191,7 +195,7 @@ void CheckVectorUnary(std::string func_name, Datum input, std::shared_ptr<Array>
                       const FunctionOptions* options) {
   ASSERT_OK_AND_ASSIGN(Datum out, CallFunction(func_name, {input}, options));
   std::shared_ptr<Array> actual = std::move(out).make_array();
-  ASSERT_OK(actual->ValidateFull());
+  ValidateOutput(*actual);
   AssertArraysEqual(*expected, *actual, /*verbose=*/true);
 }
 
@@ -219,6 +223,57 @@ void CheckScalarBinary(std::string func_name, std::shared_ptr<Scalar> left_input
   CheckScalar(std::move(func_name), {left_input, right_input}, expected, options);
 }
 
+namespace {
+
+void ValidateOutput(const ArrayData& output) {
+  ASSERT_OK(::arrow::internal::ValidateArrayFull(output));
+  TestInitialized(output);
+}
+
+void ValidateOutput(const ChunkedArray& output) {
+  ASSERT_OK(output.ValidateFull());
+  for (const auto& chunk : output.chunks()) {
+    TestInitialized(*chunk);
+  }
+}
+
+void ValidateOutput(const RecordBatch& output) {
+  ASSERT_OK(output.ValidateFull());
+  for (const auto& column : output.column_data()) {
+    TestInitialized(*column);
+  }
+}
+
+void ValidateOutput(const Table& output) {
+  ASSERT_OK(output.ValidateFull());
+  for (const auto& column : output.columns()) {
+    for (const auto& chunk : column->chunks()) {
+      TestInitialized(*chunk);
+    }
+  }
+}
+
+}  // namespace
+
+void ValidateOutput(const Datum& output) {
+  switch (output.kind()) {
+    case Datum::ARRAY:
+      ValidateOutput(*output.array());
+      break;
+    case Datum::CHUNKED_ARRAY:
+      ValidateOutput(*output.chunked_array());
+      break;
+    case Datum::RECORD_BATCH:
+      ValidateOutput(*output.record_batch());
+      break;
+    case Datum::TABLE:
+      ValidateOutput(*output.table());
+      break;
+    default:
+      break;
+  }
+}
+
 void CheckDispatchBest(std::string func_name, std::vector<ValueDescr> original_values,
                        std::vector<ValueDescr> expected_equivalent_values) {
   ASSERT_OK_AND_ASSIGN(auto function, GetFunctionRegistry()->GetFunction(func_name));
diff --git a/cpp/src/arrow/compute/kernels/test_util.h b/cpp/src/arrow/compute/kernels/test_util.h
index 85ed04c..f485408 100644
--- a/cpp/src/arrow/compute/kernels/test_util.h
+++ b/cpp/src/arrow/compute/kernels/test_util.h
@@ -135,6 +135,8 @@ void CheckScalarBinary(std::string func_name, std::shared_ptr<Scalar> left_input
 void CheckVectorUnary(std::string func_name, Datum input, std::shared_ptr<Array> expected,
                       const FunctionOptions* options = nullptr);
 
+void ValidateOutput(const Datum& output);
+
 using BinaryTypes =
     ::testing::Types<BinaryType, LargeBinaryType, StringType, LargeStringType>;
 using StringTypes = ::testing::Types<StringType, LargeStringType>;
diff --git a/cpp/src/arrow/compute/kernels/vector_hash_test.cc b/cpp/src/arrow/compute/kernels/vector_hash_test.cc
index a3fa931..c09b042 100644
--- a/cpp/src/arrow/compute/kernels/vector_hash_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_hash_test.cc
@@ -59,7 +59,7 @@ template <typename T>
 void CheckUnique(const std::shared_ptr<T>& input,
                  const std::shared_ptr<Array>& expected) {
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> result, Unique(input));
-  ASSERT_OK(result->ValidateFull());
+  ValidateOutput(*result);
   // TODO: We probably shouldn't rely on array ordering.
   ASSERT_ARRAYS_EQUAL(*expected, *result);
 }
@@ -84,7 +84,7 @@ void CheckValueCountsNull(const std::shared_ptr<DataType>& type) {
   std::shared_ptr<Array> ex_counts = ArrayFromJSON(int64(), "[]");
 
   ASSERT_OK_AND_ASSIGN(auto result_struct, ValueCounts(input));
-  ASSERT_OK(result_struct->ValidateFull());
+  ValidateOutput(*result_struct);
   ASSERT_NE(result_struct->GetFieldByName(kValuesFieldName), nullptr);
   // TODO: We probably shouldn't rely on value ordering.
   ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->GetFieldByName(kValuesFieldName));
@@ -96,7 +96,7 @@ void CheckValueCounts(const std::shared_ptr<T>& input,
                       const std::shared_ptr<Array>& expected_values,
                       const std::shared_ptr<Array>& expected_counts) {
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> result, ValueCounts(input));
-  ASSERT_OK(result->ValidateFull());
+  ValidateOutput(*result);
   auto result_struct = std::dynamic_pointer_cast<StructArray>(result);
   ASSERT_EQ(result_struct->num_fields(), 2);
   // TODO: We probably shouldn't rely on value ordering.
@@ -128,7 +128,7 @@ void CheckDictEncode(const std::shared_ptr<Array>& input,
 
   ASSERT_OK_AND_ASSIGN(Datum datum_out, DictionaryEncode(input));
   std::shared_ptr<Array> result = MakeArray(datum_out.array());
-  ASSERT_OK(result->ValidateFull());
+  ValidateOutput(*result);
 
   ASSERT_ARRAYS_EQUAL(expected, *result);
 }
@@ -691,10 +691,7 @@ TEST_F(TestHashKernel, ZeroLengthDictionaryEncode) {
   // ARROW-7008
   auto values = ArrayFromJSON(utf8(), "[]");
   ASSERT_OK_AND_ASSIGN(Datum datum_result, DictionaryEncode(values));
-
-  std::shared_ptr<Array> result = datum_result.make_array();
-  const auto& dict_result = checked_cast<const DictionaryArray&>(*result);
-  ASSERT_OK(dict_result.ValidateFull());
+  ValidateOutput(datum_result);
 }
 
 TEST_F(TestHashKernel, NullEncodingSchemes) {
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_test.cc b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
index cf52870..f428da0 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
@@ -51,7 +51,7 @@ TEST(GetTakeIndices, Basics) {
     ASSERT_OK_AND_ASSIGN(auto indices,
                          internal::GetTakeIndices(*filter->data(), null_selection));
     auto indices_array = MakeArray(indices);
-    ASSERT_OK(indices_array->ValidateFull());
+    ValidateOutput(indices);
     AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
   };
 
@@ -73,13 +73,13 @@ TEST(GetTakeIndices, NullValidityBuffer) {
   ASSERT_OK_AND_ASSIGN(auto indices,
                        internal::GetTakeIndices(*filter.data(), FilterOptions::DROP));
   auto indices_array = MakeArray(indices);
-  ASSERT_OK(indices_array->ValidateFull());
+  ValidateOutput(indices);
   AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
 
   ASSERT_OK_AND_ASSIGN(
       indices, internal::GetTakeIndices(*filter.data(), FilterOptions::EMIT_NULL));
   indices_array = MakeArray(indices);
-  ASSERT_OK(indices_array->ValidateFull());
+  ValidateOutput(indices);
   AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
 }
 
@@ -93,7 +93,7 @@ void CheckGetTakeIndicesCase(const Array& untyped_filter) {
   // Verify DROP indices
   {
     IndexArrayType indices(drop_indices);
-    ASSERT_OK(indices.ValidateFull());
+    ValidateOutput(indices);
 
     int64_t out_position = 0;
     for (int64_t i = 0; i < filter.length(); ++i) {
@@ -116,7 +116,7 @@ void CheckGetTakeIndicesCase(const Array& untyped_filter) {
   // Verify EMIT_NULL indices
   {
     IndexArrayType indices(emit_indices);
-    ASSERT_OK(indices.ValidateFull());
+    ValidateOutput(indices);
 
     int64_t out_position = 0;
     for (int64_t i = 0; i < filter.length(); ++i) {
@@ -183,7 +183,7 @@ class TestFilterKernel : public ::testing::Test {
     // test with EMIT_NULL
     ASSERT_OK_AND_ASSIGN(Datum out_datum, Filter(values, filter, emit_null_));
     auto actual = out_datum.make_array();
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(*actual);
     AssertArraysEqual(*expected, *actual);
 
     // test with DROP using EMIT_NULL and a coalesced filter
@@ -192,7 +192,7 @@ class TestFilterKernel : public ::testing::Test {
     expected = out_datum.make_array();
     ASSERT_OK_AND_ASSIGN(out_datum, Filter(values, filter, drop_));
     actual = out_datum.make_array();
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(*actual);
     AssertArraysEqual(*expected, *actual);
   }
 
@@ -212,11 +212,11 @@ void ValidateFilter(const std::shared_ptr<Array>& values,
 
   ASSERT_OK_AND_ASSIGN(Datum out_datum, Filter(values, filter_boxed, emit_null));
   auto filtered_emit_null = out_datum.make_array();
-  ASSERT_OK(filtered_emit_null->ValidateFull());
+  ValidateOutput(*filtered_emit_null);
 
   ASSERT_OK_AND_ASSIGN(out_datum, Filter(values, filter_boxed, drop));
   auto filtered_drop = out_datum.make_array();
-  ASSERT_OK(filtered_drop->ValidateFull());
+  ValidateOutput(*filtered_drop);
 
   // Create the expected arrays using Take
   ASSERT_OK_AND_ASSIGN(
@@ -384,7 +384,7 @@ TYPED_TEST(TestFilterKernelWithNumeric, CompareScalarAndFilterRandomNumeric) {
                            Compare(array, Datum(fifty), CompareOptions(op)));
       ASSERT_OK_AND_ASSIGN(Datum filtered, Filter(array, selection));
       auto filtered_array = filtered.make_array();
-      ASSERT_OK(filtered_array->ValidateFull());
+      ValidateOutput(*filtered_array);
       auto expected =
           CompareAndFilter<TypeParam>(array->raw_values(), array->length(), c_fifty, op);
       ASSERT_ARRAYS_EQUAL(*filtered_array, *expected);
@@ -406,7 +406,7 @@ TYPED_TEST(TestFilterKernelWithNumeric, CompareArrayAndFilterRandomNumeric) {
       ASSERT_OK_AND_ASSIGN(Datum selection, Compare(lhs, rhs, CompareOptions(op)));
       ASSERT_OK_AND_ASSIGN(Datum filtered, Filter(lhs, selection));
       auto filtered_array = filtered.make_array();
-      ASSERT_OK(filtered_array->ValidateFull());
+      ValidateOutput(*filtered_array);
       auto expected = CompareAndFilter<TypeParam>(lhs->raw_values(), lhs->length(),
                                                   rhs->raw_values(), op);
       ASSERT_ARRAYS_EQUAL(*filtered_array, *expected);
@@ -434,7 +434,7 @@ TYPED_TEST(TestFilterKernelWithNumeric, ScalarInRangeAndFilterRandomNumeric) {
     ASSERT_OK_AND_ASSIGN(Datum selection, And(greater_than_fifty, less_than_hundred));
     ASSERT_OK_AND_ASSIGN(Datum filtered, Filter(array, selection));
     auto filtered_array = filtered.make_array();
-    ASSERT_OK(filtered_array->ValidateFull());
+    ValidateOutput(*filtered_array);
     auto expected = CompareAndFilter<TypeParam>(
         array->raw_values(), array->length(),
         [&](CType e) { return (e > c_fifty) && (e < c_hundred); });
@@ -642,7 +642,7 @@ class TestFilterKernelWithRecordBatch : public TestFilterKernel<RecordBatch> {
     std::shared_ptr<RecordBatch> actual;
 
     ASSERT_OK(this->DoFilter(schm, batch_json, selection, options, &actual));
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual);
     ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(schm, expected_batch), *actual);
   }
 
@@ -695,7 +695,7 @@ class TestFilterKernelWithChunkedArray : public TestFilterKernel<ChunkedArray> {
                     const std::vector<std::string>& expected) {
     std::shared_ptr<ChunkedArray> actual;
     ASSERT_OK(this->FilterWithArray(type, values, filter, &actual));
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual);
     AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual);
   }
 
@@ -705,7 +705,7 @@ class TestFilterKernelWithChunkedArray : public TestFilterKernel<ChunkedArray> {
                            const std::vector<std::string>& expected) {
     std::shared_ptr<ChunkedArray> actual;
     ASSERT_OK(this->FilterWithChunkedArray(type, values, filter, &actual));
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual);
     AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual);
   }
 
@@ -754,7 +754,7 @@ class TestFilterKernelWithTable : public TestFilterKernel<Table> {
     std::shared_ptr<Table> actual;
 
     ASSERT_OK(this->FilterWithArray(schm, table_json, filter, options, &actual));
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual);
     ASSERT_TABLES_EQUAL(*TableFromJSON(schm, expected_table), *actual);
   }
 
@@ -765,7 +765,7 @@ class TestFilterKernelWithTable : public TestFilterKernel<Table> {
     std::shared_ptr<Table> actual;
 
     ASSERT_OK(this->FilterWithChunkedArray(schm, table_json, filter, options, &actual));
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual);
     AssertTablesEqual(*TableFromJSON(schm, expected_table), *actual,
                       /*same_chunk_layout=*/false);
   }
@@ -843,7 +843,7 @@ void AssertTakeArrays(const std::shared_ptr<Array>& values,
                       const std::shared_ptr<Array>& indices,
                       const std::shared_ptr<Array>& expected) {
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> actual, Take(*values, *indices));
-  ASSERT_OK(actual->ValidateFull());
+  ValidateOutput(actual);
   AssertArraysEqual(*expected, *actual, /*verbose=*/true);
 }
 
@@ -860,7 +860,7 @@ void CheckTake(const std::shared_ptr<DataType>& type, const std::string& values,
 
   for (auto index_type : {int8(), uint32()}) {
     ASSERT_OK(TakeJSON(type, values, index_type, indices, &actual));
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual);
     AssertArraysEqual(*ArrayFromJSON(type, expected), *actual, /*verbose=*/true);
   }
 }
@@ -900,7 +900,7 @@ void ValidateTake(const std::shared_ptr<Array>& values,
                   const std::shared_ptr<Array>& indices) {
   ASSERT_OK_AND_ASSIGN(Datum out, Take(values, indices));
   auto taken = out.make_array();
-  ASSERT_OK(taken->ValidateFull());
+  ValidateOutput(taken);
   ASSERT_EQ(indices->length(), taken->length());
   switch (indices->type_id()) {
     case Type::INT8:
@@ -1324,7 +1324,7 @@ class TestPermutationsWithTake : public TestBase {
   void DoTake(const Int16Array& values, const Int16Array& indices,
               std::shared_ptr<Int16Array>* out) {
     ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> boxed_out, Take(values, indices));
-    ASSERT_OK(boxed_out->ValidateFull());
+    ValidateOutput(boxed_out);
     *out = checked_pointer_cast<Int16Array>(std::move(boxed_out));
   }
 
@@ -1441,7 +1441,7 @@ class TestTakeKernelWithRecordBatch : public TestTakeKernelTyped<RecordBatch> {
 
     for (auto index_type : {int8(), uint32()}) {
       ASSERT_OK(TakeJSON(schm, batch_json, index_type, indices, &actual));
-      ASSERT_OK(actual->ValidateFull());
+      ValidateOutput(actual);
       ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(schm, expected_batch), *actual);
     }
   }
@@ -1499,7 +1499,7 @@ class TestTakeKernelWithChunkedArray : public TestTakeKernelTyped<ChunkedArray>
                   const std::vector<std::string>& expected) {
     std::shared_ptr<ChunkedArray> actual;
     ASSERT_OK(this->TakeWithArray(type, values, indices, &actual));
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual);
     AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual);
   }
 
@@ -1509,7 +1509,7 @@ class TestTakeKernelWithChunkedArray : public TestTakeKernelTyped<ChunkedArray>
                          const std::vector<std::string>& expected) {
     std::shared_ptr<ChunkedArray> actual;
     ASSERT_OK(this->TakeWithChunkedArray(type, values, indices, &actual));
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual);
     AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual);
   }
 
@@ -1557,7 +1557,7 @@ class TestTakeKernelWithTable : public TestTakeKernelTyped<Table> {
     std::shared_ptr<Table> actual;
 
     ASSERT_OK(this->TakeWithArray(schm, table_json, filter, &actual));
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual);
     ASSERT_TABLES_EQUAL(*TableFromJSON(schm, expected_table), *actual);
   }
 
@@ -1568,7 +1568,7 @@ class TestTakeKernelWithTable : public TestTakeKernelTyped<Table> {
     std::shared_ptr<Table> actual;
 
     ASSERT_OK(this->TakeWithChunkedArray(schm, table_json, filter, &actual));
-    ASSERT_OK(actual->ValidateFull());
+    ValidateOutput(actual);
     ASSERT_TABLES_EQUAL(*TableFromJSON(schm, expected_table), *actual);
   }
 
diff --git a/cpp/src/arrow/compute/kernels/vector_sort_test.cc b/cpp/src/arrow/compute/kernels/vector_sort_test.cc
index a54890e..2d76f01 100644
--- a/cpp/src/arrow/compute/kernels/vector_sort_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_sort_test.cc
@@ -24,6 +24,7 @@
 #include "arrow/array/array_decimal.h"
 #include "arrow/array/concatenate.h"
 #include "arrow/compute/api_vector.h"
+#include "arrow/compute/kernels/test_util.h"
 #include "arrow/table.h"
 #include "arrow/testing/gtest_common.h"
 #include "arrow/testing/gtest_util.h"
@@ -153,7 +154,7 @@ class TestNthToIndicesBase : public TestBase {
     ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> offsets, NthToIndices(*values, n));
     // null_count field should have been initialized to 0, for convenience
     ASSERT_EQ(offsets->data()->null_count, 0);
-    ASSERT_OK(offsets->ValidateFull());
+    ValidateOutput(*offsets);
     Validate(*checked_pointer_cast<ArrayType>(values), n,
              *checked_pointer_cast<UInt64Array>(offsets));
   }
@@ -352,7 +353,7 @@ template <typename T>
 void AssertSortIndices(const std::shared_ptr<T>& input, SortOrder order,
                        const std::shared_ptr<Array>& expected) {
   ASSERT_OK_AND_ASSIGN(auto actual, SortIndices(*input, order));
-  ASSERT_OK(actual->ValidateFull());
+  ValidateOutput(*actual);
   AssertArraysEqual(*expected, *actual, /*verbose=*/true);
 }
 
@@ -360,7 +361,7 @@ template <typename T>
 void AssertSortIndices(const std::shared_ptr<T>& input, const SortOptions& options,
                        const std::shared_ptr<Array>& expected) {
   ASSERT_OK_AND_ASSIGN(auto actual, SortIndices(Datum(*input), options));
-  ASSERT_OK(actual->ValidateFull());
+  ValidateOutput(*actual);
   AssertArraysEqual(*expected, *actual, /*verbose=*/true);
 }
 
@@ -549,7 +550,7 @@ using SortIndicesableTypes =
 
 template <typename ArrayType>
 void ValidateSorted(const ArrayType& array, UInt64Array& offsets, SortOrder order) {
-  ASSERT_OK(array.ValidateFull());
+  ValidateOutput(array);
   SortComparator<ArrayType> compare;
   for (int i = 1; i < array.length(); i++) {
     uint64_t lhs = offsets.Value(i - 1);
@@ -1171,7 +1172,7 @@ class TestTableSortIndicesRandom : public testing::TestWithParam<RandomParam> {
  public:
   // Validates the sorted indexes are really sorted.
   void Validate(const Table& table, const SortOptions& options, UInt64Array& offsets) {
-    ASSERT_OK(offsets.ValidateFull());
+    ValidateOutput(offsets);
     Comparator comparator{table, options};
     for (int i = 1; i < table.num_rows(); i++) {
       uint64_t lhs = offsets.Value(i - 1);
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 5e242d4..39ce58e 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -66,6 +66,8 @@
 #include <aws/s3/model/PutObjectRequest.h>
 #include <aws/s3/model/UploadPartRequest.h>
 
+#include "arrow/util/windows_fixup.h"
+
 #include "arrow/buffer.h"
 #include "arrow/filesystem/filesystem.h"
 #include "arrow/filesystem/path_util.h"
@@ -85,7 +87,6 @@
 #include "arrow/util/optional.h"
 #include "arrow/util/task_group.h"
 #include "arrow/util/thread_pool.h"
-#include "arrow/util/windows_fixup.h"
 
 namespace arrow {
 
diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc
index eb0edd5..ea6edb0 100644
--- a/cpp/src/arrow/testing/gtest_util.cc
+++ b/cpp/src/arrow/testing/gtest_util.cc
@@ -643,17 +643,34 @@ void AssertZeroPadded(const Array& array) {
   }
 }
 
-void TestInitialized(const Array& array) {
-  for (const auto& buffer : array.data()->buffers) {
+void TestInitialized(const Array& array) { TestInitialized(*array.data()); }
+
+void TestInitialized(const ArrayData& array) {
+  uint8_t total = 0;
+  for (const auto& buffer : array.buffers) {
     if (buffer && buffer->capacity() > 0) {
-      int total = 0;
       auto data = buffer->data();
       for (int64_t i = 0; i < buffer->size(); ++i) {
         total ^= data[i];
       }
-      throw_away = total;
     }
   }
+  uint8_t total_bit = 0;
+  for (uint32_t mask = 1; mask < 256; mask <<= 1) {
+    total_bit ^= (total & mask) != 0;
+  }
+  // This is a dummy condition on all the bits of `total` (which depend on the
+  // entire buffer data).  If not all bits are well-defined, Valgrind will
+  // error with "Conditional jump or move depends on uninitialised value(s)".
+  if (total_bit == 0) {
+    ++throw_away;
+  }
+  for (const auto& child : array.child_data) {
+    TestInitialized(*child);
+  }
+  if (array.dictionary) {
+    TestInitialized(*array.dictionary);
+  }
 }
 
 void SleepFor(double seconds) {
diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h
index 9d01cd4..5917451 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -41,6 +41,7 @@
 #include "arrow/type_traits.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/macros.h"
+#include "arrow/util/string_builder.h"
 #include "arrow/util/type_fwd.h"
 
 // NOTE: failing must be inline in the macros below, to get correct file / line number
@@ -136,6 +137,11 @@
     ASSERT_EQ(expected, _actual);               \
   } while (0)
 
+// A generalized version of GTest's SCOPED_TRACE that takes arbitrary arguments.
+//   ARROW_SCOPED_TRACE("some variable = ", some_variable, ...)
+
+#define ARROW_SCOPED_TRACE(...) SCOPED_TRACE(::arrow::util::StringBuilder(__VA_ARGS__))
+
 namespace arrow {
 
 // ----------------------------------------------------------------------
@@ -275,6 +281,7 @@ ARROW_TESTING_EXPORT void AssertZeroPadded(const Array& array);
 
 // Check if the valid buffer bytes are initialized
 // and cause valgrind warnings otherwise.
+ARROW_TESTING_EXPORT void TestInitialized(const ArrayData& array);
 ARROW_TESTING_EXPORT void TestInitialized(const Array& array);
 
 template <typename BuilderType>
diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h
index f55ac88..09076c5 100644
--- a/cpp/src/arrow/util/hashing.h
+++ b/cpp/src/arrow/util/hashing.h
@@ -329,8 +329,7 @@ class HashTable {
 
     // Stash old entries and seal builder, effectively resetting the Buffer
     const Entry* old_entries = entries_;
-    std::shared_ptr<Buffer> previous;
-    RETURN_NOT_OK(entries_builder_.Finish(&previous));
+    ARROW_ASSIGN_OR_RAISE(auto previous, entries_builder_.FinishWithLength(capacity_));
     // Allocate new buffer
     RETURN_NOT_OK(UpsizeBuffer(new_capacity));
 
@@ -461,6 +460,13 @@ class ScalarMemoTable : public MemoTable {
         out_data[index] = entry->payload.value;
       }
     });
+    // Zero-initialize the null entry
+    if (null_index_ != kKeyNotFound) {
+      int32_t index = null_index_ - start;
+      if (index >= 0) {
+        out_data[index] = Scalar{};
+      }
+    }
   }
 
   void CopyValues(Scalar* out_data) const { CopyValues(0, out_data); }
@@ -775,6 +781,8 @@ class BinaryMemoTable : public MemoTable {
     if (left_size > 0) {
       memcpy(out_data, in_data + left_offset, left_size);
     }
+    // Zero-initialize the null entry
+    memset(out_data + left_size, 0, width_size);
 
     auto right_size = values_size() - static_cast<size_t>(null_data_offset);
     if (right_size > 0) {
diff --git a/cpp/src/arrow/util/windows_fixup.h b/cpp/src/arrow/util/windows_fixup.h
index 0afa53c..2949ac4 100644
--- a/cpp/src/arrow/util/windows_fixup.h
+++ b/cpp/src/arrow/util/windows_fixup.h
@@ -19,6 +19,13 @@
 
 #ifdef _WIN32
 
+#ifdef max
+#undef max
+#endif
+#ifdef min
+#undef min
+#endif
+
 // The Windows API defines macros from *File resolving to either
 // *FileA or *FileW.  Need to undo them.
 #ifdef CopyFile
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 2a22023..2fbebf2 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -25,7 +25,6 @@
 #include <vector>
 
 #include "arrow/array.h"
-#include "arrow/buffer_builder.h"
 #include "arrow/extension_type.h"
 #include "arrow/ipc/writer.h"
 #include "arrow/table.h"
@@ -56,7 +55,6 @@ using arrow::ExtensionArray;
 using arrow::ExtensionType;
 using arrow::Field;
 using arrow::FixedSizeBinaryArray;
-using Int16BufferBuilder = arrow::TypedBufferBuilder<int16_t>;
 using arrow::ListArray;
 using arrow::MemoryPool;
 using arrow::NumericArray;
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 89b2b0e..cc1e262 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -341,7 +341,6 @@ class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEnco
       // no nulls, just dump the data
       ::arrow::internal::CopyBitmap(data.data()->GetValues<uint8_t>(1), data.offset(),
                                     data.length(), sink_.mutable_data(), sink_.length());
-      sink_.UnsafeAdvance(data.length());
     } else {
       auto n_valid = BitUtil::BytesForBits(data.length() - data.null_count());
       PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid));
@@ -360,6 +359,7 @@ class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEnco
       }
       writer.Finish();
     }
+    sink_.UnsafeAdvance(data.length());
   }
 
  private:
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 02e81be..d271d59 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -669,7 +669,7 @@ class EncodingAdHocTyped : public ::testing::Test {
     std::shared_ptr<::arrow::Array> result;
     ASSERT_OK(acc.Finish(&result));
     ASSERT_EQ(50, result->length());
-    ::arrow::AssertArraysEqual(*values, *result);
+    ::arrow::AssertArraysEqual(*values, *result, /*verbose=*/true);
   }
 
   void ByteStreamSplit(int seed) {