You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/05/31 15:38:23 UTC

[arrow] branch master updated: ARROW-2641: [C++] Avoid spurious memset() calls, improve bitmap write performance

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d19089e  ARROW-2641: [C++] Avoid spurious memset() calls, improve bitmap write performance
d19089e is described below

commit d19089e5a1179c654cb30df62433698c93e3cd60
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Thu May 31 11:38:04 2018 -0400

    ARROW-2641: [C++] Avoid spurious memset() calls, improve bitmap write performance
    
    Also:
    * Fix ARROW-2622 and add test
    * Fix various Valgrind-detected uninitialized read issues
    * Add Valgrind suppressions file to silence false positives
    * Add a benchmark for BooleanBuilder
    * Improve speed of BooleanBuilder by 3x (from 270 to 800 MB/s here)
    * Remove most implementation macros in DictionaryBuilder (replaced with a template class helper)
    * Fix bug/oddity in DictionaryBuilder lookup with a delta dictionary (the non-delta dictionary could be looked up out of bounds)
    * Improve implementation of boolean unique kernel
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #2091 from pitrou/ARROW-2641-spurious-memsets and squashes the following commits:
    
    89abac26 <Antoine Pitrou> Use existing predicates instead of inventing new ones
    a6f0ae87 <Antoine Pitrou> Remove most macro ugliness in DictionaryBuilder with template class helper
    d4ee1436 <Antoine Pitrou> ARROW-2641:  Avoid spurious memset() calls
---
 cpp/cmake_modules/BuildUtils.cmake            |   4 +-
 cpp/src/arrow/array-test.cc                   |  49 ++-
 cpp/src/arrow/array.h                         |   2 +-
 cpp/src/arrow/builder-benchmark.cc            |  24 ++
 cpp/src/arrow/builder.cc                      | 419 ++++++++++++++------------
 cpp/src/arrow/builder.h                       |  16 +-
 cpp/src/arrow/compare.cc                      |   5 +-
 cpp/src/arrow/compute/compute-test.cc         |   2 +
 cpp/src/arrow/compute/kernels/cast.cc         |   4 +-
 cpp/src/arrow/compute/kernels/hash.cc         |  11 +-
 cpp/src/arrow/compute/kernels/util-internal.h |  57 ----
 cpp/src/arrow/ipc/feather-test.cc             |  35 ++-
 cpp/src/arrow/ipc/ipc-read-write-test.cc      |   4 +
 cpp/src/arrow/ipc/json-internal.cc            |  44 ++-
 cpp/src/arrow/type_traits.h                   |  65 ++++
 cpp/src/arrow/util/bit-util-benchmark.cc      |   9 +
 cpp/src/arrow/util/bit-util-test.cc           |  56 +++-
 cpp/src/arrow/util/bit-util.h                 |  72 ++++-
 cpp/src/arrow/util/hash.h                     |   3 +
 cpp/valgrind.supp                             |  30 ++
 20 files changed, 606 insertions(+), 305 deletions(-)

diff --git a/cpp/cmake_modules/BuildUtils.cmake b/cpp/cmake_modules/BuildUtils.cmake
index f10a7ca..ca8d285 100644
--- a/cpp/cmake_modules/BuildUtils.cmake
+++ b/cpp/cmake_modules/BuildUtils.cmake
@@ -318,7 +318,9 @@ function(ADD_ARROW_TEST REL_TEST_NAME)
       APPEND_STRING PROPERTY
       COMPILE_FLAGS " -DARROW_VALGRIND")
     add_test(${TEST_NAME}
-      bash -c "cd ${EXECUTABLE_OUTPUT_PATH}; valgrind --tool=memcheck --leak-check=full --leak-check-heuristics=stdstring --error-exitcode=1 ${TEST_PATH}")
+      bash -c "cd '${CMAKE_SOURCE_DIR}'; \
+               valgrind --suppressions=valgrind.supp --tool=memcheck --gen-suppressions=all \
+                 --leak-check=full --leak-check-heuristics=stdstring --error-exitcode=1 ${TEST_PATH}")
   elseif(MSVC)
     add_test(${TEST_NAME} ${TEST_PATH})
   else()
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index 39847e0..cd32976 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -192,6 +192,18 @@ TEST_F(TestArray, TestIsNullIsValid) {
   }
 }
 
+TEST_F(TestArray, TestIsNullIsValidNoNulls) {
+  const int64_t size = 10;
+
+  std::unique_ptr<Array> arr;
+  arr.reset(new Int32Array(size, nullptr, nullptr, 0));
+
+  for (size_t i = 0; i < size; ++i) {
+    EXPECT_TRUE(arr->IsValid(i));
+    EXPECT_FALSE(arr->IsNull(i));
+  }
+}
+
 TEST_F(TestArray, BuildLargeInMemoryArray) {
   const int64_t length = static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1;
 
@@ -373,46 +385,50 @@ int64_t TestPrimitiveBuilder<PBoolean>::FlipValue(int64_t value) const {
 template <>
 void TestPrimitiveBuilder<PBoolean>::Check(const std::unique_ptr<BooleanBuilder>& builder,
                                            bool nullable) {
-  int64_t size = builder->length();
+  const int64_t size = builder->length();
 
+  // Build expected result array
   std::shared_ptr<Buffer> ex_data;
-  ASSERT_OK(BitUtil::BytesToBits(draws_, default_memory_pool(), &ex_data));
-
   std::shared_ptr<Buffer> ex_null_bitmap;
   int64_t ex_null_count = 0;
 
+  ASSERT_OK(BitUtil::BytesToBits(draws_, default_memory_pool(), &ex_data));
   if (nullable) {
     ASSERT_OK(BitUtil::BytesToBits(valid_bytes_, default_memory_pool(), &ex_null_bitmap));
     ex_null_count = test::null_count(valid_bytes_);
   } else {
     ex_null_bitmap = nullptr;
   }
-
   auto expected =
       std::make_shared<BooleanArray>(size, ex_data, ex_null_bitmap, ex_null_count);
+  ASSERT_EQ(size, expected->length());
 
+  // Finish builder and check result array
   std::shared_ptr<Array> out;
   ASSERT_OK(builder->Finish(&out));
   std::shared_ptr<BooleanArray> result = std::dynamic_pointer_cast<BooleanArray>(out);
 
-  // Builder is now reset
-  ASSERT_EQ(0, builder->length());
-  ASSERT_EQ(0, builder->capacity());
-  ASSERT_EQ(0, builder->null_count());
-  ASSERT_EQ(nullptr, builder->data());
-
   ASSERT_EQ(ex_null_count, result->null_count());
+  ASSERT_EQ(size, result->length());
 
-  ASSERT_EQ(expected->length(), result->length());
-
-  for (int64_t i = 0; i < result->length(); ++i) {
+  for (int64_t i = 0; i < size; ++i) {
     if (nullable) {
       ASSERT_EQ(valid_bytes_[i] == 0, result->IsNull(i)) << i;
+    } else {
+      ASSERT_FALSE(result->IsNull(i));
+    }
+    if (!result->IsNull(i)) {
+      bool actual = BitUtil::GetBit(result->values()->data(), i);
+      ASSERT_EQ(draws_[i] != 0, actual) << i;
     }
-    bool actual = BitUtil::GetBit(result->values()->data(), i);
-    ASSERT_EQ(draws_[i] != 0, actual) << i;
   }
   ASSERT_TRUE(result->Equals(*expected));
+
+  // Builder is now reset
+  ASSERT_EQ(0, builder->length());
+  ASSERT_EQ(0, builder->capacity());
+  ASSERT_EQ(0, builder->null_count());
+  ASSERT_EQ(nullptr, builder->data());
 }
 
 typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16,
@@ -633,6 +649,9 @@ TYPED_TEST(TestPrimitiveBuilder, TestAppendValues) {
   ASSERT_EQ(size, this->builder_->length());
   ASSERT_EQ(BitUtil::NextPower2(size), this->builder_->capacity());
 
+  ASSERT_EQ(size, this->builder_nn_->length());
+  ASSERT_EQ(BitUtil::NextPower2(size), this->builder_nn_->capacity());
+
   this->Check(this->builder_, true);
   this->Check(this->builder_nn_, false);
 }
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 6f61511..7d26793 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -210,7 +210,7 @@ class ARROW_EXPORT Array {
   /// \brief Return true if value at index is valid (not null). Does not
   /// boundscheck
   bool IsValid(int64_t i) const {
-    return null_bitmap_data_ != NULLPTR &&
+    return null_bitmap_data_ == NULLPTR ||
            BitUtil::GetBit(null_bitmap_data_, i + data_->offset);
   }
 
diff --git a/cpp/src/arrow/builder-benchmark.cc b/cpp/src/arrow/builder-benchmark.cc
index 12b3eac..809e6ff 100644
--- a/cpp/src/arrow/builder-benchmark.cc
+++ b/cpp/src/arrow/builder-benchmark.cc
@@ -115,6 +115,27 @@ static void BM_BuildAdaptiveUIntNoNulls(
   state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t));
 }
 
+static void BM_BuildBooleanArrayNoNulls(
+    benchmark::State& state) {  // NOLINT non-const reference
+  // 2 MiB block
+  std::vector<uint8_t> data(2 * 1024 * 1024);
+  constexpr uint8_t bit_pattern = 0xcc;  // 0b11001100
+  uint64_t index = 0;
+  std::generate(data.begin(), data.end(),
+                [&index]() -> uint8_t { return (bit_pattern >> ((index++) % 8)) & 1; });
+
+  while (state.KeepRunning()) {
+    BooleanBuilder builder;
+    for (int i = 0; i < kFinalSize; i++) {
+      // Build up an array of 512 MiB in size
+      ABORT_NOT_OK(builder.AppendValues(data.data(), data.size()));
+    }
+    std::shared_ptr<Array> out;
+    ABORT_NOT_OK(builder.Finish(&out));
+  }
+  state.SetBytesProcessed(state.iterations() * data.size() * kFinalSize);
+}
+
 static void BM_BuildBinaryArray(benchmark::State& state) {  // NOLINT non-const reference
   const int64_t iterations = 1 << 20;
 
@@ -152,6 +173,9 @@ static void BM_BuildFixedSizeBinaryArray(
 
 BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMicrosecond);
 BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMicrosecond);
+
+BENCHMARK(BM_BuildBooleanArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMicrosecond);
+
 BENCHMARK(BM_BuildAdaptiveIntNoNulls)->Repetitions(3)->Unit(benchmark::kMicrosecond);
 BENCHMARK(BM_BuildAdaptiveIntNoNullsScalarAppend)
     ->Repetitions(3)
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 6be35fe..65018de 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -63,7 +63,7 @@ Status ArrayBuilder::AppendToBitmap(const uint8_t* valid_bytes, int64_t length)
 }
 
 Status ArrayBuilder::Init(int64_t capacity) {
-  int64_t to_alloc = BitUtil::CeilByte(capacity) / 8;
+  int64_t to_alloc = BitUtil::BytesForBits(capacity);
   null_bitmap_ = std::make_shared<PoolBuffer>(pool_);
   RETURN_NOT_OK(null_bitmap_->Resize(to_alloc));
   // Buffers might allocate more then necessary to satisfy padding requirements
@@ -78,7 +78,7 @@ Status ArrayBuilder::Resize(int64_t new_bits) {
   if (!null_bitmap_) {
     return Init(new_bits);
   }
-  int64_t new_bytes = BitUtil::CeilByte(new_bits) / 8;
+  int64_t new_bytes = BitUtil::BytesForBits(new_bits);
   int64_t old_bytes = null_bitmap_->size();
   RETURN_NOT_OK(null_bitmap_->Resize(new_bytes));
   null_bitmap_data_ = null_bitmap_->mutable_data();
@@ -236,8 +236,6 @@ Status PrimitiveBuilder<T>::Init(int64_t capacity) {
 
   int64_t nbytes = TypeTraits<T>::bytes_required(capacity);
   RETURN_NOT_OK(data_->Resize(nbytes));
-  // TODO(emkornfield) valgrind complains without this
-  memset(data_->mutable_data(), 0, static_cast<size_t>(nbytes));
 
   raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data());
   return Status::OK();
@@ -254,13 +252,9 @@ Status PrimitiveBuilder<T>::Resize(int64_t capacity) {
     RETURN_NOT_OK(Init(capacity));
   } else {
     RETURN_NOT_OK(ArrayBuilder::Resize(capacity));
-    const int64_t old_bytes = data_->size();
     const int64_t new_bytes = TypeTraits<T>::bytes_required(capacity);
     RETURN_NOT_OK(data_->Resize(new_bytes));
     raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data());
-    // TODO(emkornfield) valgrind complains without this
-    memset(data_->mutable_data() + old_bytes, 0,
-           static_cast<size_t>(new_bytes - old_bytes));
   }
   return Status::OK();
 }
@@ -372,8 +366,6 @@ Status AdaptiveIntBuilderBase::Init(int64_t capacity) {
 
   int64_t nbytes = capacity * int_size_;
   RETURN_NOT_OK(data_->Resize(nbytes));
-  // TODO(emkornfield) valgrind complains without this
-  memset(data_->mutable_data(), 0, static_cast<size_t>(nbytes));
 
   raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
   return Status::OK();
@@ -389,13 +381,9 @@ Status AdaptiveIntBuilderBase::Resize(int64_t capacity) {
     RETURN_NOT_OK(Init(capacity));
   } else {
     RETURN_NOT_OK(ArrayBuilder::Resize(capacity));
-    const int64_t old_bytes = data_->size();
     const int64_t new_bytes = capacity * int_size_;
     RETURN_NOT_OK(data_->Resize(new_bytes));
     raw_data_ = data_->mutable_data();
-    // TODO(emkornfield) valgrind complains without this
-    memset(data_->mutable_data() + old_bytes, 0,
-           static_cast<size_t>(new_bytes - old_bytes));
   }
   return Status::OK();
 }
@@ -736,8 +724,6 @@ Status BooleanBuilder::Init(int64_t capacity) {
 
   int64_t nbytes = BitUtil::BytesForBits(capacity);
   RETURN_NOT_OK(data_->Resize(nbytes));
-  // TODO(emkornfield) valgrind complains without this
-  memset(data_->mutable_data(), 0, static_cast<size_t>(nbytes));
 
   raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
   return Status::OK();
@@ -753,24 +739,31 @@ Status BooleanBuilder::Resize(int64_t capacity) {
     RETURN_NOT_OK(Init(capacity));
   } else {
     RETURN_NOT_OK(ArrayBuilder::Resize(capacity));
+
     const int64_t old_bytes = data_->size();
     const int64_t new_bytes = BitUtil::BytesForBits(capacity);
 
-    RETURN_NOT_OK(data_->Resize(new_bytes));
-    raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
-    memset(data_->mutable_data() + old_bytes, 0,
-           static_cast<size_t>(new_bytes - old_bytes));
+    if (new_bytes != old_bytes) {
+      RETURN_NOT_OK(data_->Resize(new_bytes));
+      raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
+    }
   }
   return Status::OK();
 }
 
 Status BooleanBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
   const int64_t bytes_required = BitUtil::BytesForBits(length_);
-
   if (bytes_required > 0 && bytes_required < data_->size()) {
     // Trim buffers
     RETURN_NOT_OK(data_->Resize(bytes_required));
   }
+
+  int64_t bit_offset = length_ % 8;
+  if (bit_offset > 0) {
+    // Adjust last byte
+    data_->mutable_data()[length_ / 8] &= BitUtil::kPrecedingBitmask[bit_offset];
+  }
+
   *out = ArrayData::Make(boolean(), length_, {null_bitmap_, data_}, null_count_);
 
   data_ = null_bitmap_ = nullptr;
@@ -782,9 +775,16 @@ Status BooleanBuilder::AppendValues(const uint8_t* values, int64_t length,
                                     const uint8_t* valid_bytes) {
   RETURN_NOT_OK(Reserve(length));
 
+  internal::FirstTimeBitmapWriter bit_writer(raw_data_, length_, length);
   for (int64_t i = 0; i < length; ++i) {
-    BitUtil::SetBitTo(raw_data_, length_ + i, values[i] != 0);
+    if (values[i] != 0) {
+      bit_writer.Set();
+    } else {
+      bit_writer.Clear();
+    }
+    bit_writer.Next();
   }
+  bit_writer.Finish();
 
   // this updates length_
   ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
@@ -801,9 +801,16 @@ Status BooleanBuilder::AppendValues(const uint8_t* values, int64_t length,
   RETURN_NOT_OK(Reserve(length));
   DCHECK_EQ(length, static_cast<int64_t>(is_valid.size()));
 
+  internal::FirstTimeBitmapWriter bit_writer(raw_data_, length_, length);
   for (int64_t i = 0; i < length; ++i) {
-    BitUtil::SetBitTo(raw_data_, length_ + i, values[i] != 0);
+    if (values[i]) {
+      bit_writer.Set();
+    } else {
+      bit_writer.Clear();
+    }
+    bit_writer.Next();
   }
+  bit_writer.Finish();
 
   // this updates length_
   ArrayBuilder::UnsafeAppendToBitmap(is_valid);
@@ -839,9 +846,16 @@ Status BooleanBuilder::AppendValues(const std::vector<bool>& values,
   RETURN_NOT_OK(Reserve(length));
   DCHECK_EQ(length, static_cast<int64_t>(is_valid.size()));
 
+  internal::FirstTimeBitmapWriter bit_writer(raw_data_, length_, length);
   for (int64_t i = 0; i < length; ++i) {
-    BitUtil::SetBitTo(raw_data_, length_ + i, values[i]);
+    if (values[i]) {
+      bit_writer.Set();
+    } else {
+      bit_writer.Clear();
+    }
+    bit_writer.Next();
   }
+  bit_writer.Finish();
 
   // this updates length_
   ArrayBuilder::UnsafeAppendToBitmap(is_valid);
@@ -857,10 +871,18 @@ Status BooleanBuilder::AppendValues(const std::vector<bool>& values) {
   const int64_t length = static_cast<int64_t>(values.size());
   RETURN_NOT_OK(Reserve(length));
 
+  internal::FirstTimeBitmapWriter bit_writer(raw_data_, length_, length);
   for (int64_t i = 0; i < length; ++i) {
-    BitUtil::SetBitTo(raw_data_, length_ + i, values[i]);
+    if (values[i]) {
+      bit_writer.Set();
+    } else {
+      bit_writer.Clear();
+    }
+    bit_writer.Next();
   }
+  bit_writer.Finish();
 
+  // this updates length_
   ArrayBuilder::UnsafeSetNotNull(length);
   return Status::OK();
 }
@@ -872,8 +894,122 @@ Status BooleanBuilder::Append(const std::vector<bool>& values) {
 // ----------------------------------------------------------------------
 // DictionaryBuilder
 
+using internal::DictionaryScalar;
 using internal::WrappedBinary;
 
+namespace {
+
+// A helper class to manage a hash table embedded in a typed Builder.
+template <typename T, typename Enable = void>
+struct DictionaryHashHelper {};
+
+// DictionaryHashHelper implementation for primitive types
+template <typename T>
+struct DictionaryHashHelper<T, enable_if_has_c_type<T>> {
+  using Builder = typename TypeTraits<T>::BuilderType;
+  using Scalar = typename DictionaryScalar<T>::type;
+
+  // Get the dictionary value at the given builder index
+  static Scalar GetDictionaryValue(const Builder& builder, int64_t index) {
+    const Scalar* data = reinterpret_cast<const Scalar*>(builder.data()->data());
+    return data[index];
+  }
+
+  // Compute the hash of a scalar value
+  static int64_t HashValue(const Scalar& value, int byte_width) {
+    return HashUtil::Hash(&value, sizeof(Scalar), 0);
+  }
+
+  // Return whether the dictionary value at the given builder index is unequal to value
+  static bool SlotDifferent(const Builder& builder, int64_t index, const Scalar& value) {
+    return GetDictionaryValue(builder, index) != value;
+  }
+
+  // Append a value to the builder
+  static Status AppendValue(Builder& builder, const Scalar& value) {
+    return builder.Append(value);
+  }
+
+  // Append another builder's contents to the builder
+  static Status AppendBuilder(Builder& builder, const Builder& source_builder) {
+    return builder.AppendValues(
+        reinterpret_cast<const Scalar*>(source_builder.data()->data()),
+        source_builder.length(), nullptr);
+  }
+};
+
+// DictionaryHashHelper implementation for StringType / BinaryType
+template <typename T>
+struct DictionaryHashHelper<T, enable_if_binary<T>> {
+  using Builder = typename TypeTraits<T>::BuilderType;
+  using Scalar = typename DictionaryScalar<T>::type;
+
+  static Scalar GetDictionaryValue(const Builder& builder, int64_t index) {
+    int32_t v_length;
+    const uint8_t* v_ptr = builder.GetValue(index, &v_length);
+    return WrappedBinary(v_ptr, v_length);
+  }
+
+  static int64_t HashValue(const Scalar& value, int byte_width) {
+    return HashUtil::Hash(value.ptr_, value.length_, 0);
+  }
+
+  static bool SlotDifferent(const Builder& builder, int64_t index, const Scalar& value) {
+    int32_t other_length;
+    const uint8_t* other_ptr = builder.GetValue(index, &other_length);
+    return value.length_ != other_length ||
+           memcmp(value.ptr_, other_ptr, other_length) != 0;
+  }
+
+  static Status AppendValue(Builder& builder, const Scalar& value) {
+    return builder.Append(value.ptr_, value.length_);
+  }
+
+  static Status AppendBuilder(Builder& builder, const Builder& source_builder) {
+    for (uint64_t index = 0, limit = source_builder.length(); index < limit; ++index) {
+      int32_t length;
+      const uint8_t* ptr = source_builder.GetValue(index, &length);
+      RETURN_NOT_OK(builder.Append(ptr, length));
+    }
+    return Status::OK();
+  }
+};
+
+// DictionaryHashHelper implementation for FixedSizeBinaryType
+template <typename T>
+struct DictionaryHashHelper<T, enable_if_fixed_size_binary<T>> {
+  using Builder = typename TypeTraits<FixedSizeBinaryType>::BuilderType;
+  using Scalar = typename DictionaryScalar<FixedSizeBinaryType>::type;
+
+  static Scalar GetDictionaryValue(const Builder& builder, int64_t index) {
+    return builder.GetValue(index);
+  }
+
+  static int64_t HashValue(const Scalar& value, int byte_width) {
+    return HashUtil::Hash(value, byte_width, 0);
+  }
+
+  static bool SlotDifferent(const Builder& builder, int64_t index, const uint8_t* value) {
+    const int32_t width = builder.byte_width();
+    const uint8_t* other_value = builder.GetValue(index);
+    return memcmp(value, other_value, width) != 0;
+  }
+
+  static Status AppendValue(Builder& builder, const Scalar& value) {
+    return builder.Append(value);
+  }
+
+  static Status AppendBuilder(Builder& builder, const Builder& source_builder) {
+    for (uint64_t index = 0, limit = source_builder.length(); index < limit; ++index) {
+      const Scalar value = GetDictionaryValue(source_builder, index);
+      RETURN_NOT_OK(builder.Append(value));
+    }
+    return Status::OK();
+  }
+};
+
+}  // namespace
+
 template <typename T>
 DictionaryBuilder<T>::DictionaryBuilder(const std::shared_ptr<DataType>& type,
                                         MemoryPool* pool)
@@ -959,6 +1095,37 @@ Status DictionaryBuilder<NullType>::Resize(int64_t capacity) {
 }
 
 template <typename T>
+int64_t DictionaryBuilder<T>::HashValue(const Scalar& value) {
+  return DictionaryHashHelper<T>::HashValue(value, byte_width_);
+}
+
+template <typename T>
+typename DictionaryBuilder<T>::Scalar DictionaryBuilder<T>::GetDictionaryValue(
+    typename TypeTraits<T>::BuilderType& dictionary_builder, int64_t index) {
+  return DictionaryHashHelper<T>::GetDictionaryValue(dictionary_builder, index);
+}
+
+template <typename T>
+bool DictionaryBuilder<T>::SlotDifferent(hash_slot_t index, const Scalar& value) {
+  DCHECK_GE(index, 0);
+  if (index >= entry_id_offset_) {
+    // Lookup delta dictionary
+    DCHECK_LT(index - entry_id_offset_, dict_builder_.length());
+    return DictionaryHashHelper<T>::SlotDifferent(
+        dict_builder_, static_cast<int64_t>(index - entry_id_offset_), value);
+  } else {
+    DCHECK_LT(index, overflow_dict_builder_.length());
+    return DictionaryHashHelper<T>::SlotDifferent(overflow_dict_builder_,
+                                                  static_cast<int64_t>(index), value);
+  }
+}
+
+template <typename T>
+Status DictionaryBuilder<T>::AppendDictionary(const Scalar& value) {
+  return DictionaryHashHelper<T>::AppendValue(dict_builder_, value);
+}
+
+template <typename T>
 Status DictionaryBuilder<T>::Append(const Scalar& value) {
   RETURN_NOT_OK(Reserve(1));
   // Based on DictEncoder<DType>::Put
@@ -993,6 +1160,13 @@ Status DictionaryBuilder<T>::Append(const Scalar& value) {
 }
 
 template <typename T>
+Status DictionaryBuilder<T>::AppendNull() {
+  return values_builder_.AppendNull();
+}
+
+Status DictionaryBuilder<NullType>::AppendNull() { return values_builder_.AppendNull(); }
+
+template <typename T>
 Status DictionaryBuilder<T>::AppendArray(const Array& array) {
   const auto& numeric_array = checked_cast<const NumericArray<T>&>(array);
   for (int64_t i = 0; i < array.length(); i++) {
@@ -1030,17 +1204,9 @@ Status DictionaryBuilder<FixedSizeBinaryType>::AppendArray(const Array& array) {
 }
 
 template <typename T>
-Status DictionaryBuilder<T>::AppendNull() {
-  return values_builder_.AppendNull();
-}
-
-Status DictionaryBuilder<NullType>::AppendNull() { return values_builder_.AppendNull(); }
-
-template <typename T>
 Status DictionaryBuilder<T>::DoubleTableSize() {
-#define INNER_LOOP                                                               \
-  Scalar value = GetDictionaryValue(dict_builder_, static_cast<int64_t>(index)); \
-  int64_t j = HashValue(value) & new_mod_bitmask;
+#define INNER_LOOP \
+  int64_t j = HashValue(GetDictionaryValue(dict_builder_, index)) & new_mod_bitmask
 
   DOUBLE_TABLE_SIZE(, INNER_LOOP);
 
@@ -1048,18 +1214,12 @@ Status DictionaryBuilder<T>::DoubleTableSize() {
 }
 
 template <typename T>
-typename DictionaryBuilder<T>::Scalar DictionaryBuilder<T>::GetDictionaryValue(
-    typename TypeTraits<T>::BuilderType& dictionary_builder, int64_t index) {
-  const Scalar* data = reinterpret_cast<const Scalar*>(dictionary_builder.data()->data());
-  return data[index];
-}
-
-template <typename T>
 Status DictionaryBuilder<T>::FinishInternal(std::shared_ptr<ArrayData>* out) {
   entry_id_offset_ += dict_builder_.length();
-  RETURN_NOT_OK(overflow_dict_builder_.AppendValues(
-      reinterpret_cast<const DictionaryBuilder<T>::Scalar*>(dict_builder_.data()->data()),
-      dict_builder_.length(), nullptr));
+  // Store current dict entries for further uses of this DictionaryBuilder
+  RETURN_NOT_OK(
+      DictionaryHashHelper<T>::AppendBuilder(overflow_dict_builder_, dict_builder_));
+  DCHECK_EQ(entry_id_offset_, overflow_dict_builder_.length());
 
   std::shared_ptr<Array> dictionary;
   RETURN_NOT_OK(dict_builder_.Finish(&dictionary));
@@ -1080,158 +1240,25 @@ Status DictionaryBuilder<NullType>::FinishInternal(std::shared_ptr<ArrayData>* o
   return Status::OK();
 }
 
-template <>
-const uint8_t* DictionaryBuilder<FixedSizeBinaryType>::GetDictionaryValue(
-    typename TypeTraits<FixedSizeBinaryType>::BuilderType& dictionary_builder,
-    int64_t index) {
-  return dictionary_builder.GetValue(index);
-}
-
-template <>
-Status DictionaryBuilder<FixedSizeBinaryType>::FinishInternal(
-    std::shared_ptr<ArrayData>* out) {
-  entry_id_offset_ += dict_builder_.length();
-
-  for (uint64_t index = 0, limit = dict_builder_.length(); index < limit; ++index) {
-    const Scalar value = GetDictionaryValue(dict_builder_, index);
-    RETURN_NOT_OK(overflow_dict_builder_.Append(value));
-  }
-
-  std::shared_ptr<Array> dictionary;
-  RETURN_NOT_OK(dict_builder_.Finish(&dictionary));
-
-  RETURN_NOT_OK(values_builder_.FinishInternal(out));
-  (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary);
-
-  RETURN_NOT_OK(dict_builder_.Init(capacity_));
-  RETURN_NOT_OK(values_builder_.Init(capacity_));
-
-  return Status::OK();
-}
-
-template <typename T>
-int64_t DictionaryBuilder<T>::HashValue(const Scalar& value) {
-  return HashUtil::Hash(&value, sizeof(Scalar), 0);
-}
-
-template <>
-int64_t DictionaryBuilder<FixedSizeBinaryType>::HashValue(const Scalar& value) {
-  return HashUtil::Hash(value, byte_width_, 0);
-}
-
-template <typename T>
-bool DictionaryBuilder<T>::SlotDifferent(hash_slot_t index, const Scalar& value) {
-  const bool value_found =
-      index >= entry_id_offset_ &&
-      GetDictionaryValue(dict_builder_, static_cast<int64_t>(index - entry_id_offset_)) ==
-          value;
-  const bool value_found_overflow =
-      entry_id_offset_ > 0 &&
-      GetDictionaryValue(overflow_dict_builder_, static_cast<int64_t>(index)) == value;
-  return !(value_found || value_found_overflow);
-}
-
-template <>
-bool DictionaryBuilder<FixedSizeBinaryType>::SlotDifferent(hash_slot_t index,
-                                                           const Scalar& value) {
-  int32_t width = checked_cast<const FixedSizeBinaryType&>(*type_).byte_width();
-  bool value_found = false;
-  if (index >= entry_id_offset_) {
-    const Scalar other =
-        GetDictionaryValue(dict_builder_, static_cast<int64_t>(index - entry_id_offset_));
-    value_found = memcmp(other, value, width) == 0;
-  }
-
-  bool value_found_overflow = false;
-  if (entry_id_offset_ > 0) {
-    const Scalar other_overflow =
-        GetDictionaryValue(overflow_dict_builder_, static_cast<int64_t>(index));
-    value_found_overflow = memcmp(other_overflow, value, width) == 0;
-  }
-  return !(value_found || value_found_overflow);
-}
+//
+// StringType and BinaryType specializations
+//
 
-template <typename T>
-Status DictionaryBuilder<T>::AppendDictionary(const Scalar& value) {
-  return dict_builder_.Append(value);
-}
-
-#define BINARY_DICTIONARY_SPECIALIZATIONS(Type)                                        \
-  template <>                                                                          \
-  WrappedBinary DictionaryBuilder<Type>::GetDictionaryValue(                           \
-      typename TypeTraits<Type>::BuilderType& dictionary_builder, int64_t index) {     \
-    int32_t v_len;                                                                     \
-    const uint8_t* v = dictionary_builder.GetValue(                                    \
-        static_cast<int64_t>(index - entry_id_offset_), &v_len);                       \
-    return WrappedBinary(v, v_len);                                                    \
-  }                                                                                    \
-                                                                                       \
-  template <>                                                                          \
-  Status DictionaryBuilder<Type>::AppendDictionary(const WrappedBinary& value) {       \
-    return dict_builder_.Append(value.ptr_, value.length_);                            \
-  }                                                                                    \
-                                                                                       \
-  template <>                                                                          \
-  Status DictionaryBuilder<Type>::AppendArray(const Array& array) {                    \
-    const BinaryArray& binary_array = checked_cast<const BinaryArray&>(array);         \
-    WrappedBinary value(nullptr, 0);                                                   \
-    for (int64_t i = 0; i < array.length(); i++) {                                     \
-      if (array.IsNull(i)) {                                                           \
-        RETURN_NOT_OK(AppendNull());                                                   \
-      } else {                                                                         \
-        value.ptr_ = binary_array.GetValue(i, &value.length_);                         \
-        RETURN_NOT_OK(Append(value));                                                  \
-      }                                                                                \
-    }                                                                                  \
-    return Status::OK();                                                               \
-  }                                                                                    \
-                                                                                       \
-  template <>                                                                          \
-  int64_t DictionaryBuilder<Type>::HashValue(const WrappedBinary& value) {             \
-    return HashUtil::Hash(value.ptr_, value.length_, 0);                               \
-  }                                                                                    \
-                                                                                       \
-  template <>                                                                          \
-  bool DictionaryBuilder<Type>::SlotDifferent(hash_slot_t index,                       \
-                                              const WrappedBinary& value) {            \
-    int32_t other_length;                                                              \
-    bool value_found = false;                                                          \
-    if (index >= entry_id_offset_) {                                                   \
-      const uint8_t* other_value = dict_builder_.GetValue(                             \
-          static_cast<int64_t>(index - entry_id_offset_), &other_length);              \
-      value_found = other_length == value.length_ &&                                   \
-                    memcmp(other_value, value.ptr_, value.length_) == 0;               \
-    }                                                                                  \
-                                                                                       \
-    bool value_found_overflow = false;                                                 \
-    if (entry_id_offset_ > 0) {                                                        \
-      const uint8_t* other_value_overflow =                                            \
-          overflow_dict_builder_.GetValue(static_cast<int64_t>(index), &other_length); \
-      value_found_overflow =                                                           \
-          other_length == value.length_ &&                                             \
-          memcmp(other_value_overflow, value.ptr_, value.length_) == 0;                \
-    }                                                                                  \
-    return !(value_found || value_found_overflow);                                     \
-  }                                                                                    \
-                                                                                       \
-  template <>                                                                          \
-  Status DictionaryBuilder<Type>::FinishInternal(std::shared_ptr<ArrayData>* out) {    \
-    entry_id_offset_ += dict_builder_.length();                                        \
-    for (uint64_t index = 0, limit = dict_builder_.length(); index < limit; ++index) { \
-      int32_t out_length;                                                              \
-      const uint8_t* value = dict_builder_.GetValue(index, &out_length);               \
-      RETURN_NOT_OK(overflow_dict_builder_.Append(value, out_length));                 \
-    }                                                                                  \
-                                                                                       \
-    std::shared_ptr<Array> dictionary;                                                 \
-    RETURN_NOT_OK(dict_builder_.Finish(&dictionary));                                  \
-                                                                                       \
-    RETURN_NOT_OK(values_builder_.FinishInternal(out));                                \
-    (*out)->type = std::make_shared<DictionaryType>((*out)->type, dictionary);         \
-                                                                                       \
-    RETURN_NOT_OK(dict_builder_.Init(capacity_));                                      \
-    RETURN_NOT_OK(values_builder_.Init(capacity_));                                    \
-    return Status::OK();                                                               \
+#define BINARY_DICTIONARY_SPECIALIZATIONS(Type)                                \
+                                                                               \
+  template <>                                                                  \
+  Status DictionaryBuilder<Type>::AppendArray(const Array& array) {            \
+    const BinaryArray& binary_array = checked_cast<const BinaryArray&>(array); \
+    WrappedBinary value(nullptr, 0);                                           \
+    for (int64_t i = 0; i < array.length(); i++) {                             \
+      if (array.IsNull(i)) {                                                   \
+        RETURN_NOT_OK(AppendNull());                                           \
+      } else {                                                                 \
+        value.ptr_ = binary_array.GetValue(i, &value.length_);                 \
+        RETURN_NOT_OK(Append(value));                                          \
+      }                                                                        \
+    }                                                                          \
+    return Status::OK();                                                       \
   }
 
 BINARY_DICTIONARY_SPECIALIZATIONS(StringType);
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index e6de390..bab5764 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -837,6 +837,8 @@ class ARROW_EXPORT FixedSizeBinaryBuilder : public ArrayBuilder {
   /// \return size of values buffer so far
   int64_t value_data_length() const { return byte_builder_.length(); }
 
+  int32_t byte_width() const { return byte_width_; }
+
   /// Temporary access to a value.
   ///
   /// This pointer becomes invalid on the next modifying operation.
@@ -940,7 +942,7 @@ struct DictionaryScalar<StringType> {
 
 template <>
 struct DictionaryScalar<FixedSizeBinaryType> {
-  using type = uint8_t const*;
+  using type = const uint8_t*;
 };
 
 }  // namespace internal
@@ -984,10 +986,12 @@ class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder {
   bool is_building_delta() { return entry_id_offset_ > 0; }
 
  protected:
+  // Hash table implementation helpers
   Status DoubleTableSize();
   Scalar GetDictionaryValue(typename TypeTraits<T>::BuilderType& dictionary_builder,
                             int64_t index);
   int64_t HashValue(const Scalar& value);
+  // Check whether the dictionary entry in *slot* is equal to the given *value*
   bool SlotDifferent(hash_slot_t slot, const Scalar& value);
   Status AppendDictionary(const Scalar& value);
 
@@ -997,16 +1001,20 @@ class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder {
   /// Size of the table. Must be a power of 2.
   int64_t hash_table_size_;
 
-  // offset for the entry ids. Used to build delta dictionaries,
-  // increased on every InternalFinish by the number of current entries
-  // in the dictionary
+  // Offset for the dictionary entries in dict_builder_.
+  // Increased on every Finish call by the number of current entries
+  // in the dictionary.
   int64_t entry_id_offset_;
 
   // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j %
   // hash_table_size_, but uses far fewer CPU cycles
   int64_t mod_bitmask_;
 
+  // This builder accumulates new dictionary entries since the last Finish call
+  // (or since the beginning if Finish hasn't been called).
+  // In other words, it contains the current delta dictionary.
   typename TypeTraits<T>::BuilderType dict_builder_;
+  // This builder stores dictionary entries encountered before the last Finish call.
   typename TypeTraits<T>::BuilderType overflow_dict_builder_;
 
   AdaptiveIntBuilder values_builder_;
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index 7a84b14..87016ba 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -319,7 +319,10 @@ static bool IsEqualPrimitive(const PrimitiveArray& left, const PrimitiveArray& r
     for (int64_t i = 0; i < left.length(); ++i) {
       const bool left_null = left.IsNull(i);
       const bool right_null = right.IsNull(i);
-      if (!left_null && (memcmp(left_data, right_data, byte_width) != 0 || right_null)) {
+      if (left_null != right_null) {
+        return false;
+      }
+      if (!left_null && memcmp(left_data, right_data, byte_width) != 0) {
         return false;
       }
       left_data += byte_width;
diff --git a/cpp/src/arrow/compute/compute-test.cc b/cpp/src/arrow/compute/compute-test.cc
index e8dc2bc..6a92844 100644
--- a/cpp/src/arrow/compute/compute-test.cc
+++ b/cpp/src/arrow/compute/compute-test.cc
@@ -927,6 +927,8 @@ TYPED_TEST(TestHashKernelPrimitive, Unique) {
   auto type = TypeTraits<TypeParam>::type_singleton();
   CheckUnique<TypeParam, T>(&this->ctx_, type, {2, 1, 2, 1}, {true, false, true, true},
                             {2, 1}, {});
+  CheckUnique<TypeParam, T>(&this->ctx_, type, {2, 1, 3, 1}, {false, false, true, true},
+                            {3, 1}, {});
 }
 
 TYPED_TEST(TestHashKernelPrimitive, DictEncode) {
diff --git a/cpp/src/arrow/compute/kernels/cast.cc b/cpp/src/arrow/compute/kernels/cast.cc
index e5b9eaf..7987a1c 100644
--- a/cpp/src/arrow/compute/kernels/cast.cc
+++ b/cpp/src/arrow/compute/kernels/cast.cc
@@ -200,8 +200,8 @@ struct CastFunctor<O, I,
   void operator()(FunctionContext* ctx, const CastOptions& options,
                   const ArrayData& input, ArrayData* output) {
     auto in_data = GetValues<typename I::c_type>(input, 1);
-    internal::BitmapWriter writer(output->buffers[1]->mutable_data(), output->offset,
-                                  input.length);
+    internal::FirstTimeBitmapWriter writer(output->buffers[1]->mutable_data(),
+                                           output->offset, input.length);
 
     for (int64_t i = 0; i < input.length; ++i) {
       if (*in_data++ != 0) {
diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc
index 1c849d0..5ef8ee5 100644
--- a/cpp/src/arrow/compute/kernels/hash.cc
+++ b/cpp/src/arrow/compute/kernels/hash.cc
@@ -349,23 +349,24 @@ class HashTableKernel<Type, Action, enable_if_boolean<Type>> : public HashTable
       internal::BitmapReader valid_reader(arr.buffers[0]->data(), arr.offset, arr.length);
       for (int64_t i = 0; i < arr.length; ++i) {
         const bool is_null = valid_reader.IsNotSet();
-        const bool value = value_reader.IsSet();
-        const int j = value ? 1 : 0;
-        hash_slot_t slot = table_[j];
         valid_reader.Next();
-        value_reader.Next();
         if (is_null) {
+          value_reader.Next();
           action->ObserveNull();
           continue;
         }
+        const bool value = value_reader.IsSet();
+        value_reader.Next();
+        const int j = value ? 1 : 0;
+        hash_slot_t slot = table_[j];
         HASH_INNER_LOOP();
       }
     } else {
       for (int64_t i = 0; i < arr.length; ++i) {
         const bool value = value_reader.IsSet();
+        value_reader.Next();
         const int j = value ? 1 : 0;
         hash_slot_t slot = table_[j];
-        value_reader.Next();
         HASH_INNER_LOOP();
       }
     }
diff --git a/cpp/src/arrow/compute/kernels/util-internal.h b/cpp/src/arrow/compute/kernels/util-internal.h
index bde2676..acfcb4b 100644
--- a/cpp/src/arrow/compute/kernels/util-internal.h
+++ b/cpp/src/arrow/compute/kernels/util-internal.h
@@ -30,63 +30,6 @@ namespace compute {
 class FunctionContext;
 
 template <typename T>
-using is_number = std::is_base_of<Number, T>;
-
-template <typename T>
-struct has_c_type {
-  static constexpr bool value =
-      (std::is_base_of<PrimitiveCType, T>::value || std::is_base_of<DateType, T>::value ||
-       std::is_base_of<TimeType, T>::value || std::is_base_of<TimestampType, T>::value);
-};
-
-template <typename T>
-struct is_8bit_int {
-  static constexpr bool value =
-      (std::is_same<UInt8Type, T>::value || std::is_same<Int8Type, T>::value);
-};
-
-template <typename T>
-using enable_if_8bit_int = typename std::enable_if<is_8bit_int<T>::value>::type;
-
-template <typename T>
-using enable_if_primitive_ctype =
-    typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value>::type;
-
-template <typename T>
-using enable_if_date = typename std::enable_if<std::is_base_of<DateType, T>::value>::type;
-
-template <typename T>
-using enable_if_time = typename std::enable_if<std::is_base_of<TimeType, T>::value>::type;
-
-template <typename T>
-using enable_if_timestamp =
-    typename std::enable_if<std::is_base_of<TimestampType, T>::value>::type;
-
-template <typename T>
-using enable_if_has_c_type = typename std::enable_if<has_c_type<T>::value>::type;
-
-template <typename T>
-using enable_if_null = typename std::enable_if<std::is_same<NullType, T>::value>::type;
-
-template <typename T>
-using enable_if_binary =
-    typename std::enable_if<std::is_base_of<BinaryType, T>::value>::type;
-
-template <typename T>
-using enable_if_boolean =
-    typename std::enable_if<std::is_same<BooleanType, T>::value>::type;
-
-template <typename T>
-using enable_if_fixed_size_binary =
-    typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T>::value>::type;
-
-template <typename T>
-using enable_if_list = typename std::enable_if<std::is_base_of<ListType, T>::value>::type;
-
-template <typename T>
-using enable_if_number = typename std::enable_if<is_number<T>::value>::type;
-
-template <typename T>
 inline const T* GetValues(const ArrayData& data, int i) {
   return reinterpret_cast<const T*>(data.buffers[i]->data()) + data.offset;
 }
diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc
index cea5176..c5badc3 100644
--- a/cpp/src/arrow/ipc/feather-test.cc
+++ b/cpp/src/arrow/ipc/feather-test.cc
@@ -358,26 +358,41 @@ TEST_F(TestTableWriter, TimeTypes) {
   auto f3 = field("f3", timestamp(TimeUnit::SECOND, "US/Los_Angeles"));
   auto schema = ::arrow::schema({f0, f1, f2, f3});
 
-  std::vector<int64_t> values_vec = {0, 1, 2, 3, 4, 5, 6};
-  std::shared_ptr<Array> values;
-  ArrayFromVector<Int64Type, int64_t>(is_valid, values_vec, &values);
+  std::vector<int64_t> values64_vec = {0, 1, 2, 3, 4, 5, 6};
+  std::shared_ptr<Array> values64;
+  ArrayFromVector<Int64Type, int64_t>(is_valid, values64_vec, &values64);
 
-  std::vector<int32_t> date_values_vec = {0, 1, 2, 3, 4, 5, 6};
+  std::vector<int32_t> values32_vec = {10, 11, 12, 13, 14, 15, 16};
+  std::shared_ptr<Array> values32;
+  ArrayFromVector<Int32Type, int32_t>(is_valid, values32_vec, &values32);
+
+  std::vector<int32_t> date_values_vec = {20, 21, 22, 23, 24, 25, 26};
   std::shared_ptr<Array> date_array;
   ArrayFromVector<Date32Type, int32_t>(is_valid, date_values_vec, &date_array);
 
-  const auto& prim_values = checked_cast<const PrimitiveArray&>(*values);
-  BufferVector buffers = {prim_values.null_bitmap(), prim_values.values()};
+  const auto& prim_values64 = checked_cast<const PrimitiveArray&>(*values64);
+  BufferVector buffers64 = {prim_values64.null_bitmap(), prim_values64.values()};
+
+  const auto& prim_values32 = checked_cast<const PrimitiveArray&>(*values32);
+  BufferVector buffers32 = {prim_values32.null_bitmap(), prim_values32.values()};
 
+  // Push date32 ArrayData
   std::vector<std::shared_ptr<ArrayData>> arrays;
   arrays.push_back(date_array->data());
 
-  for (int i = 1; i < schema->num_fields(); ++i) {
-    arrays.emplace_back(ArrayData::Make(schema->field(i)->type(), values->length(),
-                                        BufferVector(buffers), values->null_count(), 0));
+  // Create time32 ArrayData
+  arrays.emplace_back(ArrayData::Make(schema->field(1)->type(), values32->length(),
+                                      BufferVector(buffers32), values32->null_count(),
+                                      0));
+
+  // Create timestamp ArrayData
+  for (int i = 2; i < schema->num_fields(); ++i) {
+    arrays.emplace_back(ArrayData::Make(schema->field(i)->type(), values64->length(),
+                                        BufferVector(buffers64), values64->null_count(),
+                                        0));
   }
 
-  auto batch = RecordBatch::Make(schema, values->length(), std::move(arrays));
+  auto batch = RecordBatch::Make(schema, 7, std::move(arrays));
   CheckBatch(*batch);
 }
 
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index bbc279f..4d959a9 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -621,6 +621,9 @@ INSTANTIATE_TEST_CASE_P(GenericIpcRoundTripTests, TestIpcRoundTrip, BATCH_CASES(
 INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
 INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES());
 
+// This test uses uninitialized memory
+
+#if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER))
 TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
   const int64_t length = static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1;
 
@@ -649,6 +652,7 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
 
   ASSERT_EQ(length, result->num_rows());
 }
+#endif
 
 void CheckBatchDictionaries(const RecordBatch& batch) {
   // Check that dictionaries that should be the same are the same
diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc
index cb9cf51..829c895 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -372,27 +372,42 @@ class ArrayWriter {
   template <typename T>
   typename std::enable_if<IsSignedInt<T>::value, void>::type WriteDataValues(
       const T& arr) {
+    static const char null_string[] = "0";
     const auto data = arr.raw_values();
-    for (int i = 0; i < arr.length(); ++i) {
-      writer_->Int64(data[i]);
+    for (int64_t i = 0; i < arr.length(); ++i) {
+      if (arr.IsValid(i)) {
+        writer_->Int64(data[i]);
+      } else {
+        writer_->RawNumber(null_string, sizeof(null_string));
+      }
     }
   }
 
   template <typename T>
   typename std::enable_if<IsUnsignedInt<T>::value, void>::type WriteDataValues(
       const T& arr) {
+    static const char null_string[] = "0";
     const auto data = arr.raw_values();
-    for (int i = 0; i < arr.length(); ++i) {
-      writer_->Uint64(data[i]);
+    for (int64_t i = 0; i < arr.length(); ++i) {
+      if (arr.IsValid(i)) {
+        writer_->Uint64(data[i]);
+      } else {
+        writer_->RawNumber(null_string, sizeof(null_string));
+      }
     }
   }
 
   template <typename T>
   typename std::enable_if<IsFloatingPoint<T>::value, void>::type WriteDataValues(
       const T& arr) {
+    static const char null_string[] = "0.";
     const auto data = arr.raw_values();
-    for (int i = 0; i < arr.length(); ++i) {
-      writer_->Double(data[i]);
+    for (int64_t i = 0; i < arr.length(); ++i) {
+      if (arr.IsValid(i)) {
+        writer_->Double(data[i]);
+      } else {
+        writer_->RawNumber(null_string, sizeof(null_string));
+      }
     }
   }
 
@@ -424,15 +439,24 @@ class ArrayWriter {
   }
 
   void WriteDataValues(const Decimal128Array& arr) {
+    static const char null_string[] = "0";
     for (int64_t i = 0; i < arr.length(); ++i) {
-      const Decimal128 value(arr.GetValue(i));
-      writer_->String(value.ToIntegerString());
+      if (arr.IsValid(i)) {
+        const Decimal128 value(arr.GetValue(i));
+        writer_->String(value.ToIntegerString());
+      } else {
+        writer_->String(null_string, sizeof(null_string));
+      }
     }
   }
 
   void WriteDataValues(const BooleanArray& arr) {
-    for (int i = 0; i < arr.length(); ++i) {
-      writer_->Bool(arr.Value(i));
+    for (int64_t i = 0; i < arr.length(); ++i) {
+      if (arr.IsValid(i)) {
+        writer_->Bool(arr.Value(i));
+      } else {
+        writer_->Bool(false);
+      }
     }
   }
 
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index ede52e9..331c349 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -26,6 +26,10 @@
 
 namespace arrow {
 
+//
+// Per-type type traits
+//
+
 template <typename T>
 struct TypeTraits {};
 
@@ -298,6 +302,67 @@ struct TypeTraits<DictionaryType> {
   constexpr static bool is_parameter_free = false;
 };
 
+//
+// Useful type predicates
+//
+
+template <typename T>
+using is_number = std::is_base_of<Number, T>;
+
+template <typename T>
+struct has_c_type {
+  static constexpr bool value =
+      (std::is_base_of<PrimitiveCType, T>::value || std::is_base_of<DateType, T>::value ||
+       std::is_base_of<TimeType, T>::value || std::is_base_of<TimestampType, T>::value);
+};
+
+template <typename T>
+struct is_8bit_int {
+  static constexpr bool value =
+      (std::is_same<UInt8Type, T>::value || std::is_same<Int8Type, T>::value);
+};
+
+template <typename T>
+using enable_if_8bit_int = typename std::enable_if<is_8bit_int<T>::value>::type;
+
+template <typename T>
+using enable_if_primitive_ctype =
+    typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value>::type;
+
+template <typename T>
+using enable_if_date = typename std::enable_if<std::is_base_of<DateType, T>::value>::type;
+
+template <typename T>
+using enable_if_time = typename std::enable_if<std::is_base_of<TimeType, T>::value>::type;
+
+template <typename T>
+using enable_if_timestamp =
+    typename std::enable_if<std::is_base_of<TimestampType, T>::value>::type;
+
+template <typename T>
+using enable_if_has_c_type = typename std::enable_if<has_c_type<T>::value>::type;
+
+template <typename T>
+using enable_if_null = typename std::enable_if<std::is_same<NullType, T>::value>::type;
+
+template <typename T>
+using enable_if_binary =
+    typename std::enable_if<std::is_base_of<BinaryType, T>::value>::type;
+
+template <typename T>
+using enable_if_boolean =
+    typename std::enable_if<std::is_same<BooleanType, T>::value>::type;
+
+template <typename T>
+using enable_if_fixed_size_binary =
+    typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T>::value>::type;
+
+template <typename T>
+using enable_if_list = typename std::enable_if<std::is_base_of<ListType, T>::value>::type;
+
+template <typename T>
+using enable_if_number = typename std::enable_if<is_number<T>::value>::type;
+
 namespace detail {
 
 // Not all type classes have a c_type
diff --git a/cpp/src/arrow/util/bit-util-benchmark.cc b/cpp/src/arrow/util/bit-util-benchmark.cc
index 06aa2ed..43f461c 100644
--- a/cpp/src/arrow/util/bit-util-benchmark.cc
+++ b/cpp/src/arrow/util/bit-util-benchmark.cc
@@ -166,6 +166,10 @@ static void BM_BitmapWriter(benchmark::State& state) {
   BenchmarkBitmapWriter<internal::BitmapWriter>(state, state.range(0));
 }
 
+static void BM_FirstTimeBitmapWriter(benchmark::State& state) {
+  BenchmarkBitmapWriter<internal::FirstTimeBitmapWriter>(state, state.range(0));
+}
+
 static void BM_CopyBitmap(benchmark::State& state) {  // NOLINT non-const reference
   const int kBufferSize = state.range(0);
   std::shared_ptr<Buffer> buffer = CreateRandomBuffer(kBufferSize);
@@ -202,5 +206,10 @@ BENCHMARK(BM_NaiveBitmapWriter)
 
 BENCHMARK(BM_BitmapWriter)->Args({100000})->MinTime(1.0)->Unit(benchmark::kMicrosecond);
 
+BENCHMARK(BM_FirstTimeBitmapWriter)
+    ->Args({100000})
+    ->MinTime(1.0)
+    ->Unit(benchmark::kMicrosecond);
+
 }  // namespace BitUtil
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/bit-util-test.cc b/cpp/src/arrow/util/bit-util-test.cc
index c527244..5e0b696 100644
--- a/cpp/src/arrow/util/bit-util-test.cc
+++ b/cpp/src/arrow/util/bit-util-test.cc
@@ -44,7 +44,8 @@ static void EnsureCpuInfoInitialized() {
   }
 }
 
-void WriteVectorToWriter(internal::BitmapWriter& writer, const std::vector<int> values) {
+template <class BitmapWriter>
+void WriteVectorToWriter(BitmapWriter& writer, const std::vector<int> values) {
   for (const auto& value : values) {
     if (value) {
       writer.Set();
@@ -228,6 +229,59 @@ TEST(BitmapWriter, DoesNotWriteOutOfBounds) {
   ASSERT_EQ((length - 5), num_values);
 }
 
+TEST(FirstTimeBitmapWriter, NormalOperation) {
+  for (const auto fill_byte_int : {0x00, 0xff}) {
+    const uint8_t fill_byte = static_cast<uint8_t>(fill_byte_int);
+    {
+      uint8_t bitmap[] = {fill_byte, fill_byte, fill_byte, fill_byte};
+      auto writer = internal::FirstTimeBitmapWriter(bitmap, 0, 12);
+      WriteVectorToWriter(writer, {0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 0, 1});
+      //                      {0b00110110, 0b1010, 0, 0}
+      ASSERT_BYTES_EQ(bitmap, {0x36, 0x0a});
+    }
+    {
+      uint8_t bitmap[] = {fill_byte, fill_byte, fill_byte, fill_byte};
+      auto writer = internal::FirstTimeBitmapWriter(bitmap, 4, 12);
+      WriteVectorToWriter(writer, {0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 0, 1});
+      //                      {0b00110110, 0b1010, 0, 0}
+      ASSERT_BYTES_EQ(bitmap, {static_cast<uint8_t>(0x60 | (fill_byte & 0x0f)), 0xa3});
+    }
+    // Consecutive write chunks
+    {
+      uint8_t bitmap[] = {fill_byte, fill_byte, fill_byte, fill_byte};
+      {
+        auto writer = internal::FirstTimeBitmapWriter(bitmap, 0, 6);
+        WriteVectorToWriter(writer, {0, 1, 1, 0, 1, 1});
+      }
+      {
+        auto writer = internal::FirstTimeBitmapWriter(bitmap, 6, 3);
+        WriteVectorToWriter(writer, {0, 0, 0});
+      }
+      {
+        auto writer = internal::FirstTimeBitmapWriter(bitmap, 9, 3);
+        WriteVectorToWriter(writer, {1, 0, 1});
+      }
+      ASSERT_BYTES_EQ(bitmap, {0x36, 0x0a});
+    }
+    {
+      uint8_t bitmap[] = {fill_byte, fill_byte, fill_byte, fill_byte};
+      {
+        auto writer = internal::FirstTimeBitmapWriter(bitmap, 4, 6);
+        WriteVectorToWriter(writer, {0, 1, 1, 0, 1, 1});
+      }
+      {
+        auto writer = internal::FirstTimeBitmapWriter(bitmap, 10, 3);
+        WriteVectorToWriter(writer, {0, 0, 0});
+      }
+      {
+        auto writer = internal::FirstTimeBitmapWriter(bitmap, 13, 3);
+        WriteVectorToWriter(writer, {1, 0, 1});
+      }
+      ASSERT_BYTES_EQ(bitmap, {static_cast<uint8_t>(0x60 | (fill_byte & 0x0f)), 0xa3});
+    }
+  }
+}
+
 TEST(BitmapAnd, Aligned) {
   std::shared_ptr<Buffer> left, right, out;
   int64_t length;
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index 3255ead..c012f7c 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -85,11 +85,20 @@ class Status;
 
 namespace BitUtil {
 
+//
+// Utilities for reading and writing individual bits by their index
+// in a memory area.
+//
+
+// Bitmask selecting the k-th bit in a byte
 static constexpr uint8_t kBitmask[] = {1, 2, 4, 8, 16, 32, 64, 128};
 
-// the ~i byte version of kBitmaks
+// the bitwise complement version of kBitmask
 static constexpr uint8_t kFlippedBitmask[] = {254, 253, 251, 247, 239, 223, 191, 127};
 
+// Bitmask selecting the (k - 1) preceding bits in a byte
+static constexpr uint8_t kPrecedingBitmask[] = {0, 1, 3, 7, 15, 31, 63, 127};
+
 static inline int64_t CeilByte(int64_t size) { return (size + 7) & ~7; }
 
 static inline int64_t BytesForBits(int64_t size) { return CeilByte(size) / 8; }
@@ -120,6 +129,8 @@ static inline void SetArrayBit(uint8_t* bits, int i, bool is_set) {
 static inline void SetBitTo(uint8_t* bits, int64_t i, bool bit_is_set) {
   // https://graphics.stanford.edu/~seander/bithacks.html
   // "Conditionally set or clear bits without branching"
+  // NOTE: this seems to confuse Valgrind as it reads from potentially
+  // uninitialized memory
   bits[i / 8] ^= static_cast<uint8_t>(-static_cast<uint8_t>(bit_is_set) ^ bits[i / 8]) &
                  kBitmask[i % 8];
 }
@@ -448,14 +459,17 @@ class BitmapReader {
 };
 
 class BitmapWriter {
+  // A sequential bitwise writer that preserves surrounding bit values.
+
  public:
   BitmapWriter(uint8_t* bitmap, int64_t start_offset, int64_t length)
       : bitmap_(bitmap), position_(0), length_(length) {
-    current_byte_ = 0;
     byte_offset_ = start_offset / 8;
     bit_mask_ = static_cast<uint8_t>(1 << (start_offset % 8));
     if (length > 0) {
       current_byte_ = bitmap[byte_offset_];
+    } else {
+      current_byte_ = 0;
     }
   }
 
@@ -495,6 +509,60 @@ class BitmapWriter {
   int64_t byte_offset_;
 };
 
+class FirstTimeBitmapWriter {
+  // Like BitmapWriter, but any bit values *following* the bits written
+  // might be clobbered.  It is hence faster than BitmapWriter, and can
+  // also avoid false positives with Valgrind.
+
+ public:
+  FirstTimeBitmapWriter(uint8_t* bitmap, int64_t start_offset, int64_t length)
+      : bitmap_(bitmap), position_(0), length_(length) {
+    current_byte_ = 0;
+    byte_offset_ = start_offset / 8;
+    bit_mask_ = static_cast<uint8_t>(1 << (start_offset % 8));
+    if (length > 0) {
+      current_byte_ = bitmap[byte_offset_] & BitUtil::kPrecedingBitmask[start_offset % 8];
+    } else {
+      current_byte_ = 0;
+    }
+  }
+
+  void Set() { current_byte_ |= bit_mask_; }
+
+  void Clear() {}
+
+  void Next() {
+    bit_mask_ = static_cast<uint8_t>(bit_mask_ << 1);
+    ++position_;
+    if (bit_mask_ == 0) {
+      // Finished this byte, need advancing
+      bit_mask_ = 0x01;
+      bitmap_[byte_offset_++] = current_byte_;
+      current_byte_ = 0;
+    }
+  }
+
+  void Finish() {
+    // Store current byte if we didn't went past bitmap storage
+    if (bit_mask_ != 0x01 || position_ < length_) {
+      bitmap_[byte_offset_] = current_byte_;
+    }
+  }
+
+  int64_t position() const { return position_; }
+
+ private:
+  uint8_t* bitmap_;
+  int64_t position_;
+  int64_t length_;
+
+  uint8_t current_byte_;
+  uint8_t bit_mask_;
+  int64_t byte_offset_;
+};
+
+// TODO: add a std::generate-like function for writing bitmaps?
+
 }  // namespace internal
 
 // ----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/hash.h b/cpp/src/arrow/util/hash.h
index 3597342..3a44478 100644
--- a/cpp/src/arrow/util/hash.h
+++ b/cpp/src/arrow/util/hash.h
@@ -39,6 +39,9 @@ static constexpr double kMaxHashTableLoad = 0.5;
 
 namespace internal {
 
+// TODO this ugliness should be rewritten as an inline function with
+// a callable argument.
+
 #define DOUBLE_TABLE_SIZE(SETUP_CODE, COMPUTE_HASH)                              \
   do {                                                                           \
     int64_t new_size = hash_table_size_ * 2;                                     \
diff --git a/cpp/valgrind.supp b/cpp/valgrind.supp
new file mode 100644
index 0000000..dc78eea
--- /dev/null
+++ b/cpp/valgrind.supp
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+{
+    # Casting to/from boolean might read uninitialized data as the null bitmap isn't considered
+    <boolean_cast>
+    Memcheck:Cond
+    fun:*CastFunctor*BooleanType*
+}
+{
+    # Data can be uninitialized when its null bit is set, but we write raw buffers
+    <write_unitialized_data>
+    Memcheck:Param
+    write(buf)
+    fun:__write_nocancel
+}

-- 
To stop receiving notification emails like this one, please contact
wesm@apache.org.