You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/06/04 23:55:50 UTC

[arrow] branch master updated: ARROW-8413: [C++][Parquet] Refactor Generating validity bitmap for values column

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5389008  ARROW-8413: [C++][Parquet] Refactor Generating validity bitmap for values column
5389008 is described below

commit 5389008df0267ba8d57edb7d6bb6ec0bfa10ff9a
Author: Micah Kornfield <em...@gmail.com>
AuthorDate: Thu Jun 4 18:55:16 2020 -0500

    ARROW-8413: [C++][Parquet] Refactor Generating validity bitmap for values column
    
    This change does the following:
    * Vectorizes computation for validity bitmaps with no repeated parents for all little-endian architectures
    * Vectorizes computation for validity bitmaps with repeated parents for AVX2 + architectures (really it requires BMI2)
    * Exposes some building blocks to do vectorized computation for all validity bitmaps in for parquet level data (more need to be handled for generating appropriate bitmaps and offset data for lists).  These will be added level_conversions.h in a future PR.
    * Replaces loops over bitmaps with SetBitsTo
    * Leaves a fallback for non BMI2/little-endian capable machines.
    
    With AVX2 enabled this seems to improve benchmarks by 20% for nullable columns (BM_Read..) on my box. I didn't see any impacts in other benchmarkmarks.
    
    See checklist for what is remaining.  If possible i'd like to get early feedback on naming and my approach to checking for BMI2.
    
    Still needed:
    - [x] Still adding more focused unit tests for what I've added.
    - [x] Move the changes to Bitmap::ToString to there own PR.
    
    Closes #6985 from emkornfield/ARROW-8413
    
    Lead-authored-by: Micah Kornfield <em...@gmail.com>
    Co-authored-by: François Saint-Jacques <fs...@gmail.com>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/cmake_modules/SetupCxxFlags.cmake         |   7 +-
 cpp/src/arrow/util/bit_util.cc                |  10 +-
 cpp/src/arrow/util/bit_util.h                 |  60 +++++++-
 cpp/src/arrow/util/bit_util_test.cc           | 181 ++++++++++++++++++++++++
 cpp/src/parquet/CMakeLists.txt                |   3 +
 cpp/src/parquet/column_reader.cc              |  15 +-
 cpp/src/parquet/column_reader.h               |  43 ------
 cpp/src/parquet/column_reader_test.cc         |  51 -------
 cpp/src/parquet/level_conversion.cc           | 194 ++++++++++++++++++++++++++
 cpp/src/parquet/level_conversion.h            |  67 +++++++++
 cpp/src/parquet/level_conversion_benchmark.cc |  78 +++++++++++
 cpp/src/parquet/level_conversion_test.cc      | 128 +++++++++++++++++
 dev/archery/archery/cli.py                    |   5 +
 dev/archery/archery/lang/cpp.py               |   3 +
 14 files changed, 731 insertions(+), 114 deletions(-)

diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index 726de0a..a7985c6 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -44,7 +44,7 @@ if(ARROW_CPU_FLAG STREQUAL "x86")
     set(ARROW_SSE4_2_FLAG "-msse4.2")
     set(ARROW_AVX2_FLAG "-mavx2")
     # skylake-avx512 consists of AVX512F,AVX512BW,AVX512VL,AVX512CD,AVX512DQ
-    set(ARROW_AVX512_FLAG "-march=skylake-avx512")
+    set(ARROW_AVX512_FLAG "-march=skylake-avx512 -mbmi2")
     check_cxx_compiler_flag(${ARROW_SSE4_2_FLAG} CXX_SUPPORTS_SSE4_2)
   endif()
   check_cxx_compiler_flag(${ARROW_AVX2_FLAG} CXX_SUPPORTS_AVX2)
@@ -313,13 +313,14 @@ if(ARROW_CPU_FLAG STREQUAL "x86")
       message(FATAL_ERROR "AVX512 required but compiler doesn't support it.")
     endif()
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} ${ARROW_AVX512_FLAG}")
-    add_definitions(-DARROW_HAVE_AVX512 -DARROW_HAVE_AVX2 -DARROW_HAVE_SSE4_2)
+    add_definitions(-DARROW_HAVE_AVX512 -DARROW_HAVE_AVX2 -DARROW_HAVE_BMI2
+                    -DARROW_HAVE_SSE4_2)
   elseif(ARROW_SIMD_LEVEL STREQUAL "AVX2")
     if(NOT CXX_SUPPORTS_AVX2)
       message(FATAL_ERROR "AVX2 required but compiler doesn't support it.")
     endif()
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} ${ARROW_AVX2_FLAG}")
-    add_definitions(-DARROW_HAVE_AVX2 -DARROW_HAVE_SSE4_2)
+    add_definitions(-DARROW_HAVE_AVX2 -DARROW_HAVE_BMI2 -DARROW_HAVE_SSE4_2)
   elseif(ARROW_SIMD_LEVEL STREQUAL "SSE4_2")
     if(NOT CXX_SUPPORTS_SSE4_2)
       message(FATAL_ERROR "SSE4.2 required but compiler doesn't support it.")
diff --git a/cpp/src/arrow/util/bit_util.cc b/cpp/src/arrow/util/bit_util.cc
index 07951d0..8b63da6 100644
--- a/cpp/src/arrow/util/bit_util.cc
+++ b/cpp/src/arrow/util/bit_util.cc
@@ -15,14 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Alias MSVC popcount to GCC name
-#ifdef _MSC_VER
-#include <intrin.h>
-#define __builtin_popcount __popcnt
-#include <nmmintrin.h>
-#define __builtin_popcountll _mm_popcnt_u64
-#endif
-
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
@@ -131,7 +123,7 @@ int64_t CountSetBits(const uint8_t* data, int64_t bit_offset, int64_t length) {
     const uint64_t* end = u64_data + p.aligned_words;
 
     for (auto iter = u64_data; iter < end; ++iter) {
-      count += __builtin_popcountll(*iter);
+      count += BitUtil::PopCount(*iter);
     }
   }
 
diff --git a/cpp/src/arrow/util/bit_util.h b/cpp/src/arrow/util/bit_util.h
index d1d6a52..e7e9804 100644
--- a/cpp/src/arrow/util/bit_util.h
+++ b/cpp/src/arrow/util/bit_util.h
@@ -43,13 +43,18 @@
 
 #if defined(_MSC_VER)
 #include <intrin.h>  // IWYU pragma: keep
+#include <nmmintrin.h>
 #pragma intrinsic(_BitScanReverse)
 #pragma intrinsic(_BitScanForward)
 #define ARROW_BYTE_SWAP64 _byteswap_uint64
 #define ARROW_BYTE_SWAP32 _byteswap_ulong
+#define ARROW_POPCOUNT64 __popcnt64
+#define ARROW_POPCOUNT32 __popcnt
 #else
 #define ARROW_BYTE_SWAP64 __builtin_bswap64
 #define ARROW_BYTE_SWAP32 __builtin_bswap32
+#define ARROW_POPCOUNT64 __builtin_popcountll
+#define ARROW_POPCOUNT32 __builtin_popcount
 #endif
 
 #include <algorithm>
@@ -107,6 +112,9 @@ static constexpr uint8_t kBytePopcount[] = {
     5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6,
     4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8};
 
+static inline uint64_t PopCount(uint64_t bitmap) { return ARROW_POPCOUNT64(bitmap); }
+static inline uint32_t PopCount(uint32_t bitmap) { return ARROW_POPCOUNT32(bitmap); }
+
 //
 // Bit-related computations on integer values
 //
@@ -578,6 +586,56 @@ class FirstTimeBitmapWriter {
     }
   }
 
+  /// Appends number_of_bits from word to valid_bits and valid_bits_offset.
+  ///
+  /// \param[in] word The LSB bitmap to append. Any bits past number_of_bits are assumed
+  ///            to be unset (i.e. 0).
+  /// \param[in] number_of_bits The number of bits to append from word.
+  void AppendWord(uint64_t word, int64_t number_of_bits) {
+    if (ARROW_PREDICT_FALSE(number_of_bits == 0)) {
+      return;
+    }
+
+    // Location that the first byte needs to be written to.
+    uint8_t* append_position = bitmap_ + byte_offset_;
+
+    // Update state variables except for current_byte_ here.
+    position_ += number_of_bits;
+    int64_t bit_offset = BitUtil::CountTrailingZeros(static_cast<uint32_t>(bit_mask_));
+    bit_mask_ = BitUtil::kBitmask[(bit_offset + number_of_bits) % 8];
+    byte_offset_ += (bit_offset + number_of_bits) / 8;
+
+    if (bit_offset != 0) {
+      // We are in the middle of the byte. This code updates the byte and shifts
+      // bits appropriately within word so it can be memcpy'd below.
+      int64_t bits_to_carry = 8 - bit_offset;
+      // Carry over bits from word to current_byte_. We assume any extra bits in word
+      // unset so no additional accounting is needed for when number_of_bits <
+      // bits_to_carry.
+      current_byte_ |= (word & BitUtil::kPrecedingBitmask[bits_to_carry]) << bit_offset;
+      // Check if everything is transfered into current_byte_.
+      if (ARROW_PREDICT_FALSE(number_of_bits < bits_to_carry)) {
+        return;
+      }
+      *append_position = current_byte_;
+      append_position++;
+      // Move the carry bits off of word.
+      word = word >> bits_to_carry;
+      number_of_bits -= bits_to_carry;
+    }
+    word = BitUtil::ToLittleEndian(word);
+    int64_t bytes_for_word = ::arrow::BitUtil::BytesForBits(number_of_bits);
+    std::memcpy(append_position, &word, bytes_for_word);
+    // At this point, the previous current_byte_ has been written to bitmap_.
+    // The new current_byte_ is either the last relevant byte in 'word'
+    // or cleared if the new position is byte aligned (i.e. a fresh byte).
+    if (bit_mask_ == 0x1) {
+      current_byte_ = 0;
+    } else {
+      current_byte_ = *(append_position + bytes_for_word - 1);
+    }
+  }
+
   void Set() { current_byte_ |= bit_mask_; }
 
   void Clear() {}
@@ -594,7 +652,7 @@ class FirstTimeBitmapWriter {
   }
 
   void Finish() {
-    // Store current byte if we didn't went past bitmap storage
+    // Store current byte if we didn't go past bitmap storage
     if (length_ > 0 && (bit_mask_ != 0x01 || position_ < length_)) {
       bitmap_[byte_offset_] = current_byte_;
     }
diff --git a/cpp/src/arrow/util/bit_util_test.cc b/cpp/src/arrow/util/bit_util_test.cc
index 45f6daf..a501c7f 100644
--- a/cpp/src/arrow/util/bit_util_test.cc
+++ b/cpp/src/arrow/util/bit_util_test.cc
@@ -26,6 +26,7 @@
 #include <string>
 #include <vector>
 
+#include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
 #include "arrow/buffer.h"
@@ -46,6 +47,8 @@ using internal::CopyBitmap;
 using internal::CountSetBits;
 using internal::InvertBitmap;
 
+using ::testing::ElementsAreArray;
+
 template <class BitmapWriter>
 void WriteVectorToWriter(BitmapWriter& writer, const std::vector<int> values) {
   for (const auto& value : values) {
@@ -315,6 +318,184 @@ TEST(FirstTimeBitmapWriter, NormalOperation) {
   }
 }
 
+std::string BitmapToString(const uint8_t* bitmap, int64_t bit_count) {
+  return arrow::internal::Bitmap(bitmap, /*offset*/ 0, /*length=*/bit_count).ToString();
+}
+
+std::string BitmapToString(const std::vector<uint8_t>& bitmap, int64_t bit_count) {
+  return BitmapToString(bitmap.data(), bit_count);
+}
+
+TEST(FirstTimeBitmapWriter, AppendWordOffsetOverwritesCorrectBitsOnExistingByte) {
+  auto check_append = [](const std::string& expected_bits, int64_t offset) {
+    std::vector<uint8_t> valid_bits = {0x00};
+    constexpr int64_t kBitsAfterAppend = 8;
+    internal::FirstTimeBitmapWriter writer(valid_bits.data(), offset,
+                                           /*length=*/(8 * valid_bits.size()) - offset);
+    writer.AppendWord(/*word=*/0xFF, /*number_of_bits=*/kBitsAfterAppend - offset);
+    writer.Finish();
+    EXPECT_EQ(BitmapToString(valid_bits, kBitsAfterAppend), expected_bits);
+  };
+  check_append("11111111", 0);
+  check_append("01111111", 1);
+  check_append("00111111", 2);
+  check_append("00011111", 3);
+  check_append("00001111", 4);
+  check_append("00000111", 5);
+  check_append("00000011", 6);
+  check_append("00000001", 7);
+
+  auto check_with_set = [](const std::string& expected_bits, int64_t offset) {
+    std::vector<uint8_t> valid_bits = {0x1};
+    constexpr int64_t kBitsAfterAppend = 8;
+    internal::FirstTimeBitmapWriter writer(valid_bits.data(), offset,
+                                           /*length=*/(8 * valid_bits.size()) - offset);
+    writer.AppendWord(/*word=*/0xFF, /*number_of_bits=*/kBitsAfterAppend - offset);
+    writer.Finish();
+    EXPECT_EQ(BitmapToString(valid_bits, kBitsAfterAppend), expected_bits);
+  };
+  // 0ffset zero would not be a valid mask.
+  check_with_set("11111111", 1);
+  check_with_set("10111111", 2);
+  check_with_set("10011111", 3);
+  check_with_set("10001111", 4);
+  check_with_set("10000111", 5);
+  check_with_set("10000011", 6);
+  check_with_set("10000001", 7);
+
+  auto check_with_preceding = [](const std::string& expected_bits, int64_t offset) {
+    std::vector<uint8_t> valid_bits = {0xFF};
+    constexpr int64_t kBitsAfterAppend = 8;
+    internal::FirstTimeBitmapWriter writer(valid_bits.data(), offset,
+                                           /*length=*/(8 * valid_bits.size()) - offset);
+    writer.AppendWord(/*word=*/0xFF, /*number_of_bits=*/kBitsAfterAppend - offset);
+    writer.Finish();
+    EXPECT_EQ(BitmapToString(valid_bits, kBitsAfterAppend), expected_bits);
+  };
+  check_with_preceding("11111111", 0);
+  check_with_preceding("11111111", 1);
+  check_with_preceding("11111111", 2);
+  check_with_preceding("11111111", 3);
+  check_with_preceding("11111111", 4);
+  check_with_preceding("11111111", 5);
+  check_with_preceding("11111111", 6);
+  check_with_preceding("11111111", 7);
+}
+
+TEST(FirstTimeBitmapWriter, AppendZeroBitsHasNoImpact) {
+  std::vector<uint8_t> valid_bits(/*count=*/1, 0);
+  internal::FirstTimeBitmapWriter writer(valid_bits.data(), /*start_offset=*/1,
+                                         /*length=*/valid_bits.size() * 8);
+  writer.AppendWord(/*word=*/0xFF, /*number_of_bits=*/0);
+  writer.AppendWord(/*word=*/0xFF, /*number_of_bits=*/0);
+  writer.AppendWord(/*word=*/0x01, /*number_of_bits=*/1);
+  writer.Finish();
+  EXPECT_EQ(valid_bits[0], 0x2);
+}
+
+TEST(FirstTimeBitmapWriter, AppendLessThanByte) {
+  {
+    std::vector<uint8_t> valid_bits(/*count*/ 8, 0);
+    internal::FirstTimeBitmapWriter writer(valid_bits.data(), /*start_offset=*/1,
+                                           /*length=*/8);
+    writer.AppendWord(0xB, 4);
+    writer.Finish();
+    EXPECT_EQ(BitmapToString(valid_bits, /*bit_count=*/8), "01101000");
+  }
+  {
+    // Test with all bits initially set.
+    std::vector<uint8_t> valid_bits(/*count*/ 8, 0xFF);
+    internal::FirstTimeBitmapWriter writer(valid_bits.data(), /*start_offset=*/1,
+                                           /*length=*/8);
+    writer.AppendWord(0xB, 4);
+    writer.Finish();
+    EXPECT_EQ(BitmapToString(valid_bits, /*bit_count=*/8), "11101000");
+  }
+}
+
+TEST(FirstTimeBitmapWriter, AppendByteThenMore) {
+  {
+    std::vector<uint8_t> valid_bits(/*count*/ 8, 0);
+    internal::FirstTimeBitmapWriter writer(valid_bits.data(), /*start_offset=*/0,
+                                           /*length=*/9);
+    writer.AppendWord(0xC3, 8);
+    writer.AppendWord(0x01, 1);
+    writer.Finish();
+    EXPECT_EQ(BitmapToString(valid_bits, /*bit_count=*/9), "11000011 1");
+  }
+  {
+    std::vector<uint8_t> valid_bits(/*count*/ 8, 0xFF);
+    internal::FirstTimeBitmapWriter writer(valid_bits.data(), /*start_offset=*/0,
+                                           /*length=*/9);
+    writer.AppendWord(0xC3, 8);
+    writer.AppendWord(0x01, 1);
+    writer.Finish();
+    EXPECT_EQ(BitmapToString(valid_bits, /*bit_count=*/9), "11000011 1");
+  }
+}
+
+TEST(FirstTimeBitmapWriter, AppendWordShiftsBitsCorrectly) {
+  constexpr uint64_t kPattern = 0x9A9A9A9A9A9A9A9A;
+  auto check_append = [&](const std::string& leading_bits, const std::string& middle_bits,
+                          const std::string& trailing_bits, int64_t offset,
+                          bool preset_buffer_bits = false) {
+    ASSERT_GE(offset, 8);
+    std::vector<uint8_t> valid_bits(/*count=*/10, preset_buffer_bits ? 0xFF : 0);
+    valid_bits[0] = 0x99;
+    internal::FirstTimeBitmapWriter writer(valid_bits.data(), offset,
+                                           /*length=*/(9 * sizeof(kPattern)) - offset);
+    writer.AppendWord(/*word=*/kPattern, /*number_of_bits=*/64);
+    writer.Finish();
+    EXPECT_EQ(valid_bits[0], 0x99);  // shouldn't get changed.
+    EXPECT_EQ(BitmapToString(valid_bits.data() + 1, /*num_bits=*/8), leading_bits);
+    for (int x = 2; x < 9; x++) {
+      EXPECT_EQ(BitmapToString(valid_bits.data() + x, /*num_bits=*/8), middle_bits)
+          << "x: " << x << " " << offset << " " << BitmapToString(valid_bits.data(), 80);
+    }
+    EXPECT_EQ(BitmapToString(valid_bits.data() + 9, /*num_bits=*/8), trailing_bits);
+  };
+  // Original Pattern = "01011001"
+  check_append(/*leading_bits= */ "01011001", /*middle_bits=*/"01011001",
+               /*trailing_bits=*/"00000000", /*offset=*/8);
+  check_append("00101100", "10101100", "10000000", 9);
+  check_append("00010110", "01010110", "01000000", 10);
+  check_append("00001011", "00101011", "00100000", 11);
+  check_append("00000101", "10010101", "10010000", 12);
+  check_append("00000010", "11001010", "11001000", 13);
+  check_append("00000001", "01100101", "01100100", 14);
+  check_append("00000000", "10110010", "10110010", 15);
+
+  check_append(/*leading_bits= */ "01011001", /*middle_bits=*/"01011001",
+               /*trailing_bits=*/"11111111", /*offset=*/8, /*preset_buffer_bits=*/true);
+  check_append("10101100", "10101100", "10000000", 9, true);
+  check_append("11010110", "01010110", "01000000", 10, true);
+  check_append("11101011", "00101011", "00100000", 11, true);
+  check_append("11110101", "10010101", "10010000", 12, true);
+  check_append("11111010", "11001010", "11001000", 13, true);
+  check_append("11111101", "01100101", "01100100", 14, true);
+  check_append("11111110", "10110010", "10110010", 15, true);
+}
+
+TEST(TestAppendBitmap, AppendWordOnlyApproriateBytesWritten) {
+  std::vector<uint8_t> valid_bits = {0x00, 0x00};
+
+  uint64_t bitmap = 0x1FF;
+  {
+    internal::FirstTimeBitmapWriter writer(valid_bits.data(), /*start_offset=*/1,
+                                           /*length=*/(8 * valid_bits.size()) - 1);
+    writer.AppendWord(bitmap, /*number_of_bits*/ 7);
+    writer.Finish();
+    EXPECT_THAT(valid_bits, ElementsAreArray(std::vector<uint8_t>{0xFE, 0x00}));
+  }
+  {
+    internal::FirstTimeBitmapWriter writer(valid_bits.data(), /*start_offset=*/1,
+                                           /*length=*/(8 * valid_bits.size()) - 1);
+    writer.AppendWord(bitmap, /*number_of_bits*/ 8);
+    writer.Finish();
+    EXPECT_THAT(valid_bits, ElementsAreArray(std::vector<uint8_t>{0xFE, 0x03}));
+  }
+}
+
 // Tests for GenerateBits and GenerateBitsUnrolled
 
 struct GenerateBitsFunctor {
diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt
index b2b78f3..915ff75 100644
--- a/cpp/src/parquet/CMakeLists.txt
+++ b/cpp/src/parquet/CMakeLists.txt
@@ -187,6 +187,7 @@ set(PARQUET_SRCS
     file_writer.cc
     internal_file_decryptor.cc
     internal_file_encryptor.cc
+    level_conversion.cc
     metadata.cc
     murmur3.cc
     "${ARROW_SOURCE_DIR}/src/generated/parquet_constants.cpp"
@@ -336,6 +337,7 @@ set_source_files_properties(public_api_test.cc
 add_parquet_test(reader_test
                  SOURCES
                  column_reader_test.cc
+                 level_conversion_test.cc
                  column_scanner_test.cc
                  reader_test.cc
                  stream_reader_test.cc
@@ -371,6 +373,7 @@ add_parquet_test(schema_test)
 
 add_parquet_benchmark(column_io_benchmark)
 add_parquet_benchmark(encoding_benchmark)
+add_parquet_benchmark(level_conversion_benchmark)
 add_parquet_benchmark(arrow/reader_writer_benchmark PREFIX "parquet-arrow")
 
 if(ARROW_WITH_BROTLI)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 034defd..e218f84 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -42,6 +42,7 @@
 #include "parquet/encoding.h"
 #include "parquet/encryption_internal.h"
 #include "parquet/internal_file_decryptor.h"
+#include "parquet/level_conversion.h"
 #include "parquet/properties.h"
 #include "parquet/statistics.h"
 #include "parquet/thrift_internal.h"  // IWYU pragma: keep
@@ -882,9 +883,9 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
         }
       }
       total_values = this->ReadValues(values_to_read, values);
-      for (int64_t i = 0; i < total_values; i++) {
-        ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
-      }
+      ::arrow::BitUtil::SetBitsTo(valid_bits, valid_bits_offset,
+                                  /*length=*/total_values,
+                                  /*bits_are_set=*/true);
       *values_read = total_values;
     } else {
       internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, this->max_def_level_,
@@ -900,9 +901,9 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
   } else {
     // Required field, read all values
     total_values = this->ReadValues(batch_size, values);
-    for (int64_t i = 0; i < total_values; i++) {
-      ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
-    }
+    ::arrow::BitUtil::SetBitsTo(valid_bits, valid_bits_offset,
+                                /*length=*/total_values,
+                                /*bits_are_set=*/true);
     *null_count_out = 0;
     *levels_read = total_values;
   }
@@ -1323,7 +1324,7 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>,
     int64_t null_count = 0;
     if (nullable_values_) {
       int64_t values_with_nulls = 0;
-      internal::DefinitionLevelsToBitmap(
+      DefinitionLevelsToBitmap(
           def_levels() + start_levels_position, levels_position_ - start_levels_position,
           this->max_def_level_, this->max_rep_level_, &values_with_nulls, &null_count,
           valid_bits_->mutable_data(), values_written_);
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 4b33e65..7b5ee1b 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -314,49 +314,6 @@ class DictionaryRecordReader : virtual public RecordReader {
   virtual std::shared_ptr<::arrow::ChunkedArray> GetResult() = 0;
 };
 
-static inline void DefinitionLevelsToBitmap(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  // We assume here that valid_bits is large enough to accommodate the
-  // additional definition levels and the ones that have already been written
-  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
-                                                    num_def_levels);
-
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
-      valid_bits_writer.Set();
-    } else if (max_repetition_level > 0) {
-      // repetition+flat case
-      if (def_levels[i] == (max_definition_level - 1)) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        continue;
-      }
-    } else {
-      // non-repeated+nested case
-      if (def_levels[i] < max_definition_level) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        throw ParquetException("definition level exceeds maximum");
-      }
-    }
-
-    valid_bits_writer.Next();
-  }
-  valid_bits_writer.Finish();
-  *values_read = valid_bits_writer.position();
-}
-
-}  // namespace internal
-
-namespace internal {
-
 // TODO(itaiin): another code path split to merge when the general case is done
 static inline bool HasSpacedValues(const ColumnDescriptor* descr) {
   if (descr->max_repetition_level() > 0) {
diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc
index b8d51cf..6dc7de8 100644
--- a/cpp/src/parquet/column_reader_test.cc
+++ b/cpp/src/parquet/column_reader_test.cc
@@ -382,56 +382,5 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
   pages_.clear();
 }
 
-TEST(TestColumnReader, DefinitionLevelsToBitmap) {
-  // Bugs in this function were exposed in ARROW-3930
-  std::vector<int16_t> def_levels = {3, 3, 3, 2, 3, 3, 3, 3, 3};
-  std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1, 1};
-
-  std::vector<uint8_t> valid_bits(2, 0);
-
-  const int max_def_level = 3;
-  const int max_rep_level = 1;
-
-  int64_t values_read = -1;
-  int64_t null_count = 0;
-  internal::DefinitionLevelsToBitmap(def_levels.data(), 9, max_def_level, max_rep_level,
-                                     &values_read, &null_count, valid_bits.data(),
-                                     0 /* valid_bits_offset */);
-  ASSERT_EQ(9, values_read);
-  ASSERT_EQ(1, null_count);
-
-  // Call again with 0 definition levels, make sure that valid_bits is unmodified
-  const uint8_t current_byte = valid_bits[1];
-  null_count = 0;
-  internal::DefinitionLevelsToBitmap(def_levels.data(), 0, max_def_level, max_rep_level,
-                                     &values_read, &null_count, valid_bits.data(),
-                                     9 /* valid_bits_offset */);
-  ASSERT_EQ(0, values_read);
-  ASSERT_EQ(0, null_count);
-  ASSERT_EQ(current_byte, valid_bits[1]);
-}
-
-TEST(TestColumnReader, DefinitionLevelsToBitmapPowerOfTwo) {
-  // PARQUET-1623: Invalid memory access when decoding a valid bits vector that has a
-  // length equal to a power of two and also using a non-zero valid_bits_offset.  This
-  // should not fail when run with ASAN or valgrind.
-  std::vector<int16_t> def_levels = {3, 3, 3, 2, 3, 3, 3, 3};
-  std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1};
-  std::vector<uint8_t> valid_bits(1, 0);
-
-  const int max_def_level = 3;
-  const int max_rep_level = 1;
-
-  int64_t values_read = -1;
-  int64_t null_count = 0;
-
-  // Read the latter half of the validity bitmap
-  internal::DefinitionLevelsToBitmap(def_levels.data() + 4, 4, max_def_level,
-                                     max_rep_level, &values_read, &null_count,
-                                     valid_bits.data(), 4 /* valid_bits_offset */);
-  ASSERT_EQ(4, values_read);
-  ASSERT_EQ(0, null_count);
-}
-
 }  // namespace test
 }  // namespace parquet
diff --git a/cpp/src/parquet/level_conversion.cc b/cpp/src/parquet/level_conversion.cc
new file mode 100644
index 0000000..cfa5df1
--- /dev/null
+++ b/cpp/src/parquet/level_conversion.cc
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "parquet/level_conversion.h"
+
+#include <algorithm>
+#include <limits>
+#if defined(ARROW_HAVE_BMI2)
+#include <x86intrin.h>
+#endif
+
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+
+namespace parquet {
+namespace internal {
+namespace {
+inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
+                            const int16_t max_expected_level) {
+  int16_t min_level = std::numeric_limits<int16_t>::max();
+  int16_t max_level = std::numeric_limits<int16_t>::min();
+  for (int x = 0; x < num_levels; x++) {
+    min_level = std::min(levels[x], min_level);
+    max_level = std::max(levels[x], max_level);
+  }
+  if (ARROW_PREDICT_FALSE(num_levels > 0 &&
+                          (min_level < 0 || max_level > max_expected_level))) {
+    throw ParquetException("definition level exceeds maximum");
+  }
+}
+
+#if !defined(ARROW_HAVE_AVX512)
+
+inline void DefinitionLevelsToBitmapScalar(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  // We assume here that valid_bits is large enough to accommodate the
+  // additional definition levels and the ones that have already been written
+  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
+                                                    num_def_levels);
+
+  // TODO(itaiin): As an interim solution we are splitting the code path here
+  // between repeated+flat column reads, and non-repeated+nested reads.
+  // Those paths need to be merged in the future
+  for (int i = 0; i < num_def_levels; ++i) {
+    if (def_levels[i] == max_definition_level) {
+      valid_bits_writer.Set();
+    } else if (max_repetition_level > 0) {
+      // repetition+flat case
+      if (def_levels[i] == (max_definition_level - 1)) {
+        valid_bits_writer.Clear();
+        *null_count += 1;
+      } else {
+        continue;
+      }
+    } else {
+      // non-repeated+nested case
+      if (def_levels[i] < max_definition_level) {
+        valid_bits_writer.Clear();
+        *null_count += 1;
+      } else {
+        throw ParquetException("definition level exceeds maximum");
+      }
+    }
+
+    valid_bits_writer.Next();
+  }
+  valid_bits_writer.Finish();
+  *values_read = valid_bits_writer.position();
+}
+#endif
+
+template <bool has_repeated_parent>
+int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size,
+                                      const int16_t required_definition_level,
+                                      ::arrow::internal::FirstTimeBitmapWriter* writer) {
+  CheckLevelRange(def_levels, batch_size, required_definition_level);
+  uint64_t defined_bitmap =
+      internal::GreaterThanBitmap(def_levels, batch_size, required_definition_level - 1);
+
+  DCHECK_LE(batch_size, 64);
+  if (has_repeated_parent) {
+#if defined(ARROW_HAVE_BMI2)
+    // This is currently a specialized code path assuming only (nested) lists
+    // present through the leaf (i.e. no structs). Upper level code only calls
+    // this method when the leaf-values are nullable (otherwise no spacing is needed),
+    // Because only nested lists exists it is sufficient to know that the field
+    // was either null or included it (i.e. definition level > max_definitation_level
+    // -2) If there where structs mixed in, we need to know the def_level of the
+    // repeated parent so we can check for def_level > "def level of repeated parent".
+    uint64_t present_bitmap = internal::GreaterThanBitmap(def_levels, batch_size,
+                                                          required_definition_level - 2);
+    uint64_t selected_bits = _pext_u64(defined_bitmap, present_bitmap);
+    writer->AppendWord(selected_bits, ::arrow::BitUtil::PopCount(present_bitmap));
+    return ::arrow::BitUtil::PopCount(selected_bits);
+#else
+    assert(false && "must not execute this without BMI2");
+#endif
+  } else {
+    writer->AppendWord(defined_bitmap, batch_size);
+    return ::arrow::BitUtil::PopCount(defined_bitmap);
+  }
+}
+
+template <bool has_repeated_parent>
+void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels,
+                                  const int16_t required_definition_level,
+                                  int64_t* values_read, int64_t* null_count,
+                                  uint8_t* valid_bits, int64_t valid_bits_offset) {
+  constexpr int64_t kBitMaskSize = 64;
+  ::arrow::internal::FirstTimeBitmapWriter writer(valid_bits,
+                                                  /*start_offset=*/valid_bits_offset,
+                                                  /*length=*/num_def_levels);
+  int64_t set_count = 0;
+  *values_read = 0;
+  while (num_def_levels > kBitMaskSize) {
+    set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
+        def_levels, kBitMaskSize, required_definition_level, &writer);
+    def_levels += kBitMaskSize;
+    num_def_levels -= kBitMaskSize;
+  }
+  set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
+      def_levels, num_def_levels, required_definition_level, &writer);
+
+  *values_read = writer.position();
+  *null_count += *values_read - set_count;
+  writer.Finish();
+}
+
+void DefinitionLevelsToBitmapLittleEndian(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  if (max_repetition_level > 0) {
+// This is a short term hack to prevent using the pext BMI2 instructions
+// on non-intel platforms where performance is subpar.
+// In the medium term we will hopefully be able to runtime dispatch
+// to use this on intel only platforms that support pext.
+#if defined(ARROW_HAVE_AVX512)
+    // BMI2 is required for efficient bit extraction.
+    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/true>(
+        def_levels, num_def_levels, max_definition_level, values_read, null_count,
+        valid_bits, valid_bits_offset);
+#else
+    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
+                                   max_repetition_level, values_read, null_count,
+                                   valid_bits, valid_bits_offset);
+#endif  // ARROW_HAVE_BMI2
+
+  } else {
+    // No BMI2 intsturctions are used for non-repeated case.
+    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/false>(
+        def_levels, num_def_levels, max_definition_level, values_read, null_count,
+        valid_bits, valid_bits_offset);
+  }
+}
+
+}  // namespace
+
+void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
+                              const int16_t max_definition_level,
+                              const int16_t max_repetition_level, int64_t* values_read,
+                              int64_t* null_count, uint8_t* valid_bits,
+                              int64_t valid_bits_offset) {
+#if ARROW_LITTLE_ENDIAN
+  DefinitionLevelsToBitmapLittleEndian(def_levels, num_def_levels, max_definition_level,
+                                       max_repetition_level, values_read, null_count,
+                                       valid_bits, valid_bits_offset);
+
+#else
+  DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
+                                 max_repetition_level, values_read, null_count,
+                                 valid_bits, valid_bits_offset);
+
+#endif
+}
+
+}  // namespace internal
+}  // namespace parquet
diff --git a/cpp/src/parquet/level_conversion.h b/cpp/src/parquet/level_conversion.h
new file mode 100644
index 0000000..ff91ca4
--- /dev/null
+++ b/cpp/src/parquet/level_conversion.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "parquet/platform.h"
+
+namespace parquet {
+namespace internal {
+
+void PARQUET_EXPORT DefinitionLevelsToBitmap(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset);
+
+// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
+// They currently represent minimal functionality for vectorized computation of definition
+// levels.
+
+#if defined(ARROW_LITTLE_ENDIAN)
+/// Builds a bitmap by applying predicate to the level vector provided.
+///
+/// \param[in] levels Rep or def level array.
+/// \param[in] num_levels The number of levels to process (must be [0, 64])
+/// \param[in] predicate The predicate to apply (must have the signature `bool
+/// predicate(int16_t)`.
+/// \returns The bitmap using least significant "bit" ordering.
+///
+/// N.B. Correct byte ordering is dependent on little-endian architectures.
+///
+template <typename Predicate>
+uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
+  // Both clang and GCC can vectorize this automatically with SSE4/AVX2.
+  uint64_t mask = 0;
+  for (int x = 0; x < num_levels; x++) {
+    mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
+  }
+  return mask;
+}
+
+/// Builds a  bitmap where each set bit indicates the corresponding level is greater
+/// than rhs.
+static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
+                                         int16_t rhs) {
+  return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; });
+}
+
+#endif
+
+}  // namespace internal
+}  // namespace parquet
diff --git a/cpp/src/parquet/level_conversion_benchmark.cc b/cpp/src/parquet/level_conversion_benchmark.cc
new file mode 100644
index 0000000..4f15838
--- /dev/null
+++ b/cpp/src/parquet/level_conversion_benchmark.cc
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <vector>
+
+#include "benchmark/benchmark.h"
+#include "parquet/level_conversion.h"
+
+constexpr int64_t kLevelCount = 2048;
+// Def level indicating the element is missing from the leaf
+// array (a parent repeated element was empty).
+constexpr int16_t kMissingDefLevel = 0;
+
+// Definition Level indicating the values has an entry in the leaf element.
+constexpr int16_t kPresentDefLevel = 2;
+
+// A repition level that indicates a repeated element.
+constexpr int16_t kHasRepeatedElements = 1;
+
+std::vector<uint8_t> RunDefinitionLevelsToBitmap(const std::vector<int16_t>& def_levels,
+                                                 ::benchmark::State* state) {
+  int64_t values_read = 0;
+  int64_t null_count = 0;
+  std::vector<uint8_t> bitmap(/*count=*/def_levels.size(), 0);
+  int rep = 0;
+  for (auto _ : *state) {
+    parquet::internal::DefinitionLevelsToBitmap(
+        def_levels.data(), def_levels.size(), /*max_definition_level=*/kPresentDefLevel,
+        /*max_repetition_level=*/kHasRepeatedElements, &values_read, &null_count,
+        bitmap.data(), /*valid_bits_offset=*/(rep++ % 8) * def_levels.size());
+  }
+  state->SetBytesProcessed(int64_t(state->iterations()) * def_levels.size());
+  return bitmap;
+}
+
+void BM_DefinitionLevelsToBitmapRepeatedAllMissing(::benchmark::State& state) {
+  std::vector<int16_t> def_levels(/*count=*/kLevelCount, kMissingDefLevel);
+  auto result = RunDefinitionLevelsToBitmap(def_levels, &state);
+  ::benchmark::DoNotOptimize(result);
+}
+
+BENCHMARK(BM_DefinitionLevelsToBitmapRepeatedAllMissing);
+
+void BM_DefinitionLevelsToBitmapRepeatedAllPresent(::benchmark::State& state) {
+  std::vector<int16_t> def_levels(/*count=*/kLevelCount, kPresentDefLevel);
+  auto result = RunDefinitionLevelsToBitmap(def_levels, &state);
+  ::benchmark::DoNotOptimize(result);
+}
+
+BENCHMARK(BM_DefinitionLevelsToBitmapRepeatedAllPresent);
+
+void BM_DefinitionLevelsToBitmapRepeatedMostPresent(::benchmark::State& state) {
+  std::vector<int16_t> def_levels(/*count=*/kLevelCount, kPresentDefLevel);
+  for (size_t x = 0; x < def_levels.size(); x++) {
+    if (x % 10 == 0) {
+      def_levels[x] = kMissingDefLevel;
+    }
+  }
+  auto result = RunDefinitionLevelsToBitmap(def_levels, &state);
+  ::benchmark::DoNotOptimize(result);
+}
+
+BENCHMARK(BM_DefinitionLevelsToBitmapRepeatedMostPresent);
diff --git a/cpp/src/parquet/level_conversion_test.cc b/cpp/src/parquet/level_conversion_test.cc
new file mode 100644
index 0000000..0bdbad5
--- /dev/null
+++ b/cpp/src/parquet/level_conversion_test.cc
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "parquet/level_conversion.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <string>
+#include <vector>
+
+#include "arrow/util/bit_util.h"
+
+namespace parquet {
+namespace internal {
+
+using ::testing::ElementsAreArray;
+
+std::string BitmapToString(const uint8_t* bitmap, int64_t bit_count) {
+  return arrow::internal::Bitmap(bitmap, /*offset*/ 0, /*length=*/bit_count).ToString();
+}
+
+std::string BitmapToString(const std::vector<uint8_t>& bitmap, int64_t bit_count) {
+  return BitmapToString(bitmap.data(), bit_count);
+}
+
+TEST(TestColumnReader, DefinitionLevelsToBitmap) {
+  // Bugs in this function were exposed in ARROW-3930
+  std::vector<int16_t> def_levels = {3, 3, 3, 2, 3, 3, 3, 3, 3};
+
+  std::vector<uint8_t> valid_bits(2, 0);
+
+  const int max_def_level = 3;
+  const int max_rep_level = 1;
+
+  int64_t values_read = -1;
+  int64_t null_count = 0;
+  internal::DefinitionLevelsToBitmap(def_levels.data(), 9, max_def_level, max_rep_level,
+                                     &values_read, &null_count, valid_bits.data(),
+                                     0 /* valid_bits_offset */);
+  ASSERT_EQ(9, values_read);
+  ASSERT_EQ(1, null_count);
+
+  // Call again with 0 definition levels, make sure that valid_bits is unmodified
+  const uint8_t current_byte = valid_bits[1];
+  null_count = 0;
+  internal::DefinitionLevelsToBitmap(def_levels.data(), 0, max_def_level, max_rep_level,
+                                     &values_read, &null_count, valid_bits.data(),
+                                     9 /* valid_bits_offset */);
+  ASSERT_EQ(0, values_read);
+  ASSERT_EQ(0, null_count);
+  ASSERT_EQ(current_byte, valid_bits[1]);
+}
+
+TEST(TestColumnReader, DefinitionLevelsToBitmapPowerOfTwo) {
+  // PARQUET-1623: Invalid memory access when decoding a valid bits vector that has a
+  // length equal to a power of two and also using a non-zero valid_bits_offset.  This
+  // should not fail when run with ASAN or valgrind.
+  std::vector<int16_t> def_levels = {3, 3, 3, 2, 3, 3, 3, 3};
+  std::vector<uint8_t> valid_bits(1, 0);
+
+  const int max_def_level = 3;
+  const int max_rep_level = 1;
+
+  int64_t values_read = -1;
+  int64_t null_count = 0;
+
+  // Read the latter half of the validity bitmap
+  internal::DefinitionLevelsToBitmap(def_levels.data() + 4, 4, max_def_level,
+                                     max_rep_level, &values_read, &null_count,
+                                     valid_bits.data(), 4 /* valid_bits_offset */);
+  ASSERT_EQ(4, values_read);
+  ASSERT_EQ(0, null_count);
+}
+
+#if defined(ARROW_LITTLE_ENDIAN)
+TEST(GreaterThanBitmap, GeneratesExpectedBitmasks) {
+  std::vector<int16_t> levels = {0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+                                 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+                                 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+                                 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7};
+  EXPECT_EQ(GreaterThanBitmap(levels.data(), /*num_levels=*/0, /*rhs*/ 0), 0);
+  EXPECT_EQ(GreaterThanBitmap(levels.data(), /*num_levels=*/64, /*rhs*/ 8), 0);
+  EXPECT_EQ(GreaterThanBitmap(levels.data(), /*num_levels=*/64, /*rhs*/ -1),
+            0xFFFFFFFFFFFFFFFF);
+  // Should be zero padded.
+  EXPECT_EQ(GreaterThanBitmap(levels.data(), /*num_levels=*/47, /*rhs*/ -1),
+            0x7FFFFFFFFFFF);
+  EXPECT_EQ(GreaterThanBitmap(levels.data(), /*num_levels=*/64, /*rhs*/ 6),
+            0x8080808080808080);
+}
+#endif
+
+TEST(DefinitionLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) {
+  std::vector<uint8_t> validity_bitmap(/*count*/ 8, 0);
+  int64_t null_count = 5;
+  int64_t values_read = 1;
+
+  // All zeros should be ignored, ones should be unset in the bitmp and 2 should be set.
+  std::vector<int16_t> def_levels = {0, 0, 0, 2, 2, 1, 0, 2};
+  DefinitionLevelsToBitmap(
+      def_levels.data(), def_levels.size(), /*max_definition_level=*/2,
+      /*max_repetition_level=*/1, &values_read, &null_count, validity_bitmap.data(),
+      /*valid_bits_offset=*/1);
+
+  EXPECT_EQ(BitmapToString(validity_bitmap, /*bit_count=*/8), "01101000");
+  for (size_t x = 1; x < validity_bitmap.size(); x++) {
+    EXPECT_EQ(validity_bitmap[x], 0) << "index: " << x;
+  }
+  EXPECT_EQ(null_count, /*5 + 1 =*/6);
+  EXPECT_EQ(values_read, 4);  // value should get overwritten.
+}
+
+}  // namespace internal
+}  // namespace parquet
diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py
index 15c05a4..a9fddca 100644
--- a/dev/archery/archery/cli.py
+++ b/dev/archery/archery/cli.py
@@ -102,6 +102,9 @@ build_type = click.Choice(["debug", "relwithdebinfo", "release"],
 warn_level_type = click.Choice(["everything", "checkin", "production"],
                                case_sensitive=False)
 
+simd_level = click.Choice(["NONE", "SSE4_2", "AVX2", "AVX512"],
+                          case_sensitive=True)
+
 
 def cpp_toolchain_options(cmd):
     options = [
@@ -133,6 +136,8 @@ def _apply_options(cmd, options):
               help="Controls compiler warnings -W(no-)error.")
 @click.option("--use-gold-linker", default=True, type=BOOL,
               help="Toggles ARROW_USE_LD_GOLD option.")
+@click.option("--simd-level", default="SSE4_2", type=simd_level,
+              help="Toggles ARROW_SIMD_LEVEL option.")
 # Tests and benchmarks
 @click.option("--with-tests", default=True, type=BOOL,
               help="Build with tests.")
diff --git a/dev/archery/archery/lang/cpp.py b/dev/archery/archery/lang/cpp.py
index 2769781..4af0f54 100644
--- a/dev/archery/archery/lang/cpp.py
+++ b/dev/archery/archery/lang/cpp.py
@@ -62,6 +62,7 @@ class CppConfiguration:
                  # extras
                  with_lint_only=False,
                  use_gold_linker=True,
+                 simd_level="SSE4_2",
                  cmake_extras=None):
         self._cc = cc
         self._cxx = cxx
@@ -111,6 +112,7 @@ class CppConfiguration:
 
         self.with_lint_only = with_lint_only
         self.use_gold_linker = use_gold_linker
+        self.simd_level = simd_level
 
         self.cmake_extras = cmake_extras
 
@@ -232,6 +234,7 @@ class CppConfiguration:
         broken_with_gold_ld = [self.with_fuzzing, self.with_gandiva]
         if self.use_gold_linker and not any(broken_with_gold_ld):
             yield ("ARROW_USE_LD_GOLD", truthifier(self.use_gold_linker))
+        yield ("ARROW_SIMD_LEVEL", or_else(self.simd_level, "SSE4_2"))
 
         # Detect custom conda toolchain
         if self.use_conda: