You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/09/15 16:49:52 UTC

[GitHub] [arrow] emkornfield commented on a change in pull request #8177: ARROW-8494: [C++][Parquet] Full support for reading mixed list and structs

emkornfield commented on a change in pull request #8177:
URL: https://github.com/apache/arrow/pull/8177#discussion_r488365834



##########
File path: cpp/src/parquet/arrow/reconstruct_internal_test.cc
##########
@@ -884,12 +875,12 @@ TEST_F(TestReconstructColumn, FAILING(TwoLevelListOptional)) {
 // List-in-struct
 //
 
-TEST_F(TestReconstructColumn, FAILING(NestedList1)) {
+TEST_F(TestReconstructColumn, NestedList1) {
   // Arrow schema: struct(a: list(int32 not null) not null) not null
   SetParquetSchema(GroupNode::Make(
-      "a", Repetition::REQUIRED,
+      "a", Repetition::REQUIRED,  // this

Review comment:
       sorry. removed.

##########
File path: cpp/src/parquet/level_conversion_test.cc
##########
@@ -45,25 +49,26 @@ TEST(TestColumnReader, DefinitionLevelsToBitmap) {
 
   std::vector<uint8_t> valid_bits(2, 0);
 
-  const int max_def_level = 3;
-  const int max_rep_level = 1;
+  LevelInfo level_info;
+  level_info.def_level = 3;
+  level_info.rep_level = 1;

Review comment:
       Yes.  I added a comment about this in level_conversion.cc. 
   
   > // It is simpler to rely on rep_level here until PARQUET-1899 is done and the code
   172  is deleted in a follow-up release.
   
   Once this is cleaned up it is not required.

##########
File path: cpp/src/parquet/level_conversion_test.cc
##########
@@ -108,22 +114,245 @@ TEST(GreaterThanBitmap, GeneratesExpectedBitmasks) {
 
 TEST(DefinitionLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) {
   std::vector<uint8_t> validity_bitmap(/*count*/ 8, 0);
-  int64_t null_count = 5;
-  int64_t values_read = 1;
 
+  ValidityBitmapInputOutput io;
+  io.values_read_upper_bound = 64;
+  io.values_read = 1;
+  io.null_count = 5;
+  io.valid_bits = validity_bitmap.data();
+  io.valid_bits_offset = 1;
+
+  LevelInfo level_info;
+  level_info.repeated_ancestor_def_level = 1;
+  level_info.def_level = 2;
+  level_info.rep_level = 1;
   // All zeros should be ignored, ones should be unset in the bitmp and 2 should be set.
   std::vector<int16_t> def_levels = {0, 0, 0, 2, 2, 1, 0, 2};
-  DefinitionLevelsToBitmap(
-      def_levels.data(), def_levels.size(), /*max_definition_level=*/2,
-      /*max_repetition_level=*/1, &values_read, &null_count, validity_bitmap.data(),
-      /*valid_bits_offset=*/1);
+  DefinitionLevelsToBitmap(def_levels.data(), def_levels.size(), level_info, &io);
 
   EXPECT_EQ(BitmapToString(validity_bitmap, /*bit_count=*/8), "01101000");
   for (size_t x = 1; x < validity_bitmap.size(); x++) {
     EXPECT_EQ(validity_bitmap[x], 0) << "index: " << x;
   }
-  EXPECT_EQ(null_count, /*5 + 1 =*/6);
-  EXPECT_EQ(values_read, 4);  // value should get overwritten.
+  EXPECT_EQ(io.null_count, /*5 + 1 =*/6);
+  EXPECT_EQ(io.values_read, 4);  // value should get overwritten.
+}
+
+class MultiLevelTestData {
+ public:
+  // Triply nested list values borrow from write_path
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  std::vector<int16_t> def_levels_{2, 7, 6, 7, 5, 3,  // first row
+                                   5, 5, 7, 7, 2, 7,  // second row
+                                   0,                 // third row
+                                   1};
+  std::vector<int16_t> rep_levels_{0, 1, 3, 3, 2, 1,  // first row
+                                   0, 1, 2, 3, 1, 1,  // second row
+                                   0, 0};
+};
+
+template <typename ConverterType>
+class NestedListTest : public testing::Test {
+ public:
+  MultiLevelTestData test_data_;
+  ConverterType converter_;
+};
+
+template <typename ListType>
+struct RepDefLevelConverter {
+  using ListLengthType = ListType;
+  ListLengthType* ComputeListInfo(const MultiLevelTestData& test_data,
+                                  LevelInfo level_info, ValidityBitmapInputOutput* output,
+                                  ListType* lengths) {
+    ConvertDefRepLevelsToList(test_data.def_levels_.data(), test_data.rep_levels_.data(),
+                              test_data.def_levels_.size(), level_info, output, lengths);
+    return lengths + output->values_read;
+  }
+};
+
+using ConverterTypes =
+    ::testing::Types<RepDefLevelConverter</*list_length_type=*/int32_t>,
+                     RepDefLevelConverter</*list_length_type=*/int64_t>>;
+TYPED_TEST_CASE(NestedListTest, ConverterTypes);
+
+TYPED_TEST(NestedListTest, OuterMostTest) {
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> 4 outer most lists (len(3), len(4), null, len(0))
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(5, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 4;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 4);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 3, 7, 7, 7));
+
+  EXPECT_EQ(validity_io.values_read, 4);
+  EXPECT_EQ(validity_io.null_count, 1);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/4), "1101");
+}
+
+TYPED_TEST(NestedListTest, MiddleListTest) {
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> middle lists (null, len(2), len(0),
+  //                  len(1), len(2), null, len(1),
+  //                  N/A,
+  //                  N/A
+  LevelInfo level_info;
+  level_info.rep_level = 2;
+  level_info.def_level = 4;
+  level_info.repeated_ancestor_def_level = 2;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(8, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 7;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 7);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 0, 2, 2, 3, 5, 5, 6));
+
+  EXPECT_EQ(validity_io.values_read, 7);
+  EXPECT_EQ(validity_io.null_count, 2);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/7), "0111101");
+}
+
+TYPED_TEST(NestedListTest, InnerMostListTest) {
+  // [null, [[1, null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> 4 inner lists (N/A, [len(3), len(0)], N/A
+  //                        len(0), [len(0), len(2)], N/A, len(1),
+  //                        N/A,
+  //                        N/A
+  LevelInfo level_info;
+  level_info.rep_level = 3;
+  level_info.def_level = 6;
+  level_info.repeated_ancestor_def_level = 4;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(7, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 6;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 6);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 3, 3, 3, 3, 5, 6));
+
+  EXPECT_EQ(validity_io.values_read, 6);
+  EXPECT_EQ(validity_io.null_count, 0);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/6), "111111");
+}
+
+TYPED_TEST(NestedListTest, SimpleLongList) {
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+  level_info.repeated_ancestor_def_level = 0;
+
+  // No empty lists.
+  this->test_data_.def_levels_ = std::vector<int16_t>(65 * 9, 2);
+  this->test_data_.rep_levels_.clear();
+  for (int x = 0; x < 65; x++) {
+    this->test_data_.rep_levels_.push_back(0);
+    this->test_data_.rep_levels_.insert(this->test_data_.rep_levels_.end(), 8,
+                                        /*rep_level=*/1);
+  }
+
+  std::vector<typename TypeParam::ListLengthType> lengths(66, 0);
+  std::vector<typename TypeParam::ListLengthType> expected_lengths(66, 0);
+  for (size_t x = 1; x < expected_lengths.size(); x++) {
+    expected_lengths[x] = x * 9;
+  }
+  std::vector<uint8_t> validity_output(9, 0);
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 65;
+  validity_io.valid_bits = validity_output.data();
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 65);
+  EXPECT_THAT(lengths, testing::ElementsAreArray(expected_lengths));
+
+  EXPECT_EQ(validity_io.values_read, 65);
+  EXPECT_EQ(validity_io.null_count, 0);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/65),
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "1");
+}
+
+TYPED_TEST(NestedListTest, TestOverflow) {
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+  level_info.repeated_ancestor_def_level = 0;
+
+  // No empty lists.
+  this->test_data_.def_levels_ = std::vector<int16_t>{2};
+  this->test_data_.rep_levels_ = std::vector<int16_t>{0};
+
+  std::vector<typename TypeParam::ListLengthType> lengths(
+      2, std::numeric_limits<typename TypeParam::ListLengthType>::max());

Review comment:
       its input/output I clarified in the declaration.

##########
File path: cpp/src/parquet/level_conversion_test.cc
##########
@@ -108,22 +114,245 @@ TEST(GreaterThanBitmap, GeneratesExpectedBitmasks) {
 
 TEST(DefinitionLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) {
   std::vector<uint8_t> validity_bitmap(/*count*/ 8, 0);
-  int64_t null_count = 5;
-  int64_t values_read = 1;
 
+  ValidityBitmapInputOutput io;
+  io.values_read_upper_bound = 64;
+  io.values_read = 1;
+  io.null_count = 5;
+  io.valid_bits = validity_bitmap.data();
+  io.valid_bits_offset = 1;
+
+  LevelInfo level_info;
+  level_info.repeated_ancestor_def_level = 1;
+  level_info.def_level = 2;
+  level_info.rep_level = 1;
   // All zeros should be ignored, ones should be unset in the bitmp and 2 should be set.
   std::vector<int16_t> def_levels = {0, 0, 0, 2, 2, 1, 0, 2};
-  DefinitionLevelsToBitmap(
-      def_levels.data(), def_levels.size(), /*max_definition_level=*/2,
-      /*max_repetition_level=*/1, &values_read, &null_count, validity_bitmap.data(),
-      /*valid_bits_offset=*/1);
+  DefinitionLevelsToBitmap(def_levels.data(), def_levels.size(), level_info, &io);
 
   EXPECT_EQ(BitmapToString(validity_bitmap, /*bit_count=*/8), "01101000");
   for (size_t x = 1; x < validity_bitmap.size(); x++) {
     EXPECT_EQ(validity_bitmap[x], 0) << "index: " << x;
   }
-  EXPECT_EQ(null_count, /*5 + 1 =*/6);
-  EXPECT_EQ(values_read, 4);  // value should get overwritten.
+  EXPECT_EQ(io.null_count, /*5 + 1 =*/6);
+  EXPECT_EQ(io.values_read, 4);  // value should get overwritten.
+}
+
+class MultiLevelTestData {
+ public:
+  // Triply nested list values borrow from write_path
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  std::vector<int16_t> def_levels_{2, 7, 6, 7, 5, 3,  // first row
+                                   5, 5, 7, 7, 2, 7,  // second row
+                                   0,                 // third row
+                                   1};
+  std::vector<int16_t> rep_levels_{0, 1, 3, 3, 2, 1,  // first row
+                                   0, 1, 2, 3, 1, 1,  // second row
+                                   0, 0};
+};
+
+template <typename ConverterType>
+class NestedListTest : public testing::Test {
+ public:
+  MultiLevelTestData test_data_;
+  ConverterType converter_;
+};
+
+template <typename ListType>
+struct RepDefLevelConverter {
+  using ListLengthType = ListType;
+  ListLengthType* ComputeListInfo(const MultiLevelTestData& test_data,
+                                  LevelInfo level_info, ValidityBitmapInputOutput* output,
+                                  ListType* lengths) {
+    ConvertDefRepLevelsToList(test_data.def_levels_.data(), test_data.rep_levels_.data(),
+                              test_data.def_levels_.size(), level_info, output, lengths);
+    return lengths + output->values_read;
+  }
+};
+
+using ConverterTypes =
+    ::testing::Types<RepDefLevelConverter</*list_length_type=*/int32_t>,
+                     RepDefLevelConverter</*list_length_type=*/int64_t>>;
+TYPED_TEST_CASE(NestedListTest, ConverterTypes);
+
+TYPED_TEST(NestedListTest, OuterMostTest) {
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> 4 outer most lists (len(3), len(4), null, len(0))
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(5, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 4;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 4);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 3, 7, 7, 7));
+
+  EXPECT_EQ(validity_io.values_read, 4);
+  EXPECT_EQ(validity_io.null_count, 1);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/4), "1101");
+}
+
+TYPED_TEST(NestedListTest, MiddleListTest) {
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> middle lists (null, len(2), len(0),
+  //                  len(1), len(2), null, len(1),
+  //                  N/A,
+  //                  N/A
+  LevelInfo level_info;
+  level_info.rep_level = 2;
+  level_info.def_level = 4;
+  level_info.repeated_ancestor_def_level = 2;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(8, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 7;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 7);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 0, 2, 2, 3, 5, 5, 6));
+
+  EXPECT_EQ(validity_io.values_read, 7);
+  EXPECT_EQ(validity_io.null_count, 2);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/7), "0111101");
+}
+
+TYPED_TEST(NestedListTest, InnerMostListTest) {
+  // [null, [[1, null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> 4 inner lists (N/A, [len(3), len(0)], N/A
+  //                        len(0), [len(0), len(2)], N/A, len(1),
+  //                        N/A,
+  //                        N/A
+  LevelInfo level_info;
+  level_info.rep_level = 3;
+  level_info.def_level = 6;
+  level_info.repeated_ancestor_def_level = 4;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(7, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 6;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 6);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 3, 3, 3, 3, 5, 6));
+
+  EXPECT_EQ(validity_io.values_read, 6);
+  EXPECT_EQ(validity_io.null_count, 0);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/6), "111111");
+}
+
+TYPED_TEST(NestedListTest, SimpleLongList) {
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+  level_info.repeated_ancestor_def_level = 0;
+
+  // No empty lists.
+  this->test_data_.def_levels_ = std::vector<int16_t>(65 * 9, 2);
+  this->test_data_.rep_levels_.clear();
+  for (int x = 0; x < 65; x++) {
+    this->test_data_.rep_levels_.push_back(0);
+    this->test_data_.rep_levels_.insert(this->test_data_.rep_levels_.end(), 8,
+                                        /*rep_level=*/1);
+  }
+
+  std::vector<typename TypeParam::ListLengthType> lengths(66, 0);
+  std::vector<typename TypeParam::ListLengthType> expected_lengths(66, 0);
+  for (size_t x = 1; x < expected_lengths.size(); x++) {
+    expected_lengths[x] = x * 9;
+  }
+  std::vector<uint8_t> validity_output(9, 0);
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 65;
+  validity_io.valid_bits = validity_output.data();
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 65);
+  EXPECT_THAT(lengths, testing::ElementsAreArray(expected_lengths));
+
+  EXPECT_EQ(validity_io.values_read, 65);
+  EXPECT_EQ(validity_io.null_count, 0);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/65),
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "11111111 "
+            "1");
+}
+
+TYPED_TEST(NestedListTest, TestOverflow) {
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+  level_info.repeated_ancestor_def_level = 0;
+
+  // No empty lists.
+  this->test_data_.def_levels_ = std::vector<int16_t>{2};
+  this->test_data_.rep_levels_ = std::vector<int16_t>{0};
+
+  std::vector<typename TypeParam::ListLengthType> lengths(
+      2, std::numeric_limits<typename TypeParam::ListLengthType>::max());
+
+  std::vector<uint8_t> validity_output(1, 0);
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 1;
+  validity_io.valid_bits = validity_output.data();
+  ASSERT_THROW(this->converter_.ComputeListInfo(this->test_data_, level_info,
+                                                &validity_io, lengths.data()),
+               ParquetException);
+
+  // Same thing should happen if the list already existed.
+  this->test_data_.rep_levels_ = std::vector<int16_t>{1};
+  ASSERT_THROW(this->converter_.ComputeListInfo(this->test_data_, level_info,
+                                                &validity_io, lengths.data()),
+               ParquetException);
+
+  // Should be OK because it shouldn't increment.
+  this->test_data_.def_levels_ = std::vector<int16_t>{0};

Review comment:
       I missed adding in the test to veriy this doesn't throw.

##########
File path: cpp/src/parquet/CMakeLists.txt
##########
@@ -202,6 +203,19 @@ set(PARQUET_SRCS
     stream_writer.cc
     types.cc)
 
+if(CXX_SUPPORTS_AVX2)
+  # AVX2 is used as a proxy for BMI2.

Review comment:
       See above, it probably doesn't cost much but the two are essentially synonymous, I can try to separate if you would prefer.

##########
File path: cpp/src/parquet/level_conversion_inc.h
##########
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "parquet/level_conversion.h"
+
+#include <algorithm>
+#include <limits>
+#if defined(ARROW_HAVE_BMI2)
+#if defined(_MSC_VER)
+#include <immintrin.h>
+#else
+#include <x86intrin.h>
+#endif  // _MSC_VER
+#endif  // ARROW_HAVE_BMI2
+
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+namespace parquet {
+namespace internal {
+namespace BMI_RUNTIME_VERSION {
+
+using ::arrow::internal::BitRun;
+using ::arrow::internal::BitRunReader;
+
+/// Algorithm to simulate pext using BitRunReader for cases where all bits
+/// not set or set.
+uint64_t RunBasedExtractMixed(uint64_t bitmap, uint64_t select_bitmap) {
+  bitmap = arrow::BitUtil::FromLittleEndian(bitmap);
+  uint64_t new_bitmap = 0;
+  ::arrow::internal::BitRunReader selection(reinterpret_cast<uint8_t*>(&select_bitmap),
+                                            /*start_offset=*/0, /*length=*/64);
+  ::arrow::internal::BitRun run = selection.NextRun();
+  int64_t selected_bits = 0;
+  while (run.length != 0) {
+    if (run.set) {
+      new_bitmap |= (bitmap & ::arrow::BitUtil::LeastSignficantBitMask(run.length))
+                    << selected_bits;
+      selected_bits += run.length;
+    }
+    bitmap = bitmap >> run.length;
+    run = selection.NextRun();
+  }
+  return arrow::BitUtil::ToLittleEndian(new_bitmap);
+}
+
+inline uint64_t RunBasedExtractImpl(uint64_t bitmap, uint64_t select_bitmap) {
+  /// These checks should be inline and are likely to be common cases.
+  if (select_bitmap == ~uint64_t{0}) {
+    return bitmap;
+  } else if (select_bitmap == 0) {
+    return 0;
+  }
+  /// Fallback to the slow method.
+  return RunBasedExtractMixed(bitmap, select_bitmap);
+}
+
+inline uint64_t ExtractBits(uint64_t bitmap, uint64_t select_bitmap) {
+#if defined(ARROW_HAVE_BMI2) && !defined(__MINGW32__)
+  return _pext_u64(bitmap, select_bitmap);
+#else
+  return RunBasedExtractImpl(bitmap, select_bitmap);
+#endif
+}
+
+template <bool has_repeated_parent>
+int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size,
+                                      int64_t upper_bound_remaining, LevelInfo level_info,
+                                      ::arrow::internal::FirstTimeBitmapWriter* writer) {
+  // Greater than level_info.def_level - 1 implies >= the def_level
+  uint64_t defined_bitmap =
+      internal::GreaterThanBitmap(def_levels, batch_size, level_info.def_level - 1);
+
+  DCHECK_LE(batch_size, 64);
+  if (has_repeated_parent) {
+    // Greater than level_info.repeated_ancestor_def_level - 1 implies >= the
+    // repeated_ancenstor_def_level
+    uint64_t present_bitmap = internal::GreaterThanBitmap(
+        def_levels, batch_size, level_info.repeated_ancestor_def_level - 1);
+    uint64_t selected_bits = ExtractBits(defined_bitmap, present_bitmap);

Review comment:
       I would need to think about this algorithm a little bit more and my expectation is that we should still be seeing runs for 0s or 1s in most cases.  As noted before if this simulation doesn't work well on an AMD box we can revert to the scalar version

##########
File path: cpp/src/parquet/level_conversion_test.cc
##########
@@ -108,22 +114,245 @@ TEST(GreaterThanBitmap, GeneratesExpectedBitmasks) {
 
 TEST(DefinitionLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) {
   std::vector<uint8_t> validity_bitmap(/*count*/ 8, 0);
-  int64_t null_count = 5;
-  int64_t values_read = 1;
 
+  ValidityBitmapInputOutput io;
+  io.values_read_upper_bound = 64;
+  io.values_read = 1;
+  io.null_count = 5;
+  io.valid_bits = validity_bitmap.data();
+  io.valid_bits_offset = 1;
+
+  LevelInfo level_info;
+  level_info.repeated_ancestor_def_level = 1;
+  level_info.def_level = 2;
+  level_info.rep_level = 1;
   // All zeros should be ignored, ones should be unset in the bitmp and 2 should be set.
   std::vector<int16_t> def_levels = {0, 0, 0, 2, 2, 1, 0, 2};
-  DefinitionLevelsToBitmap(
-      def_levels.data(), def_levels.size(), /*max_definition_level=*/2,
-      /*max_repetition_level=*/1, &values_read, &null_count, validity_bitmap.data(),
-      /*valid_bits_offset=*/1);
+  DefinitionLevelsToBitmap(def_levels.data(), def_levels.size(), level_info, &io);
 
   EXPECT_EQ(BitmapToString(validity_bitmap, /*bit_count=*/8), "01101000");
   for (size_t x = 1; x < validity_bitmap.size(); x++) {
     EXPECT_EQ(validity_bitmap[x], 0) << "index: " << x;
   }
-  EXPECT_EQ(null_count, /*5 + 1 =*/6);
-  EXPECT_EQ(values_read, 4);  // value should get overwritten.
+  EXPECT_EQ(io.null_count, /*5 + 1 =*/6);
+  EXPECT_EQ(io.values_read, 4);  // value should get overwritten.
+}
+
+class MultiLevelTestData {
+ public:
+  // Triply nested list values borrow from write_path
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  std::vector<int16_t> def_levels_{2, 7, 6, 7, 5, 3,  // first row
+                                   5, 5, 7, 7, 2, 7,  // second row
+                                   0,                 // third row
+                                   1};
+  std::vector<int16_t> rep_levels_{0, 1, 3, 3, 2, 1,  // first row
+                                   0, 1, 2, 3, 1, 1,  // second row
+                                   0, 0};
+};
+
+template <typename ConverterType>
+class NestedListTest : public testing::Test {
+ public:
+  MultiLevelTestData test_data_;
+  ConverterType converter_;
+};
+
+template <typename ListType>
+struct RepDefLevelConverter {
+  using ListLengthType = ListType;
+  ListLengthType* ComputeListInfo(const MultiLevelTestData& test_data,
+                                  LevelInfo level_info, ValidityBitmapInputOutput* output,
+                                  ListType* lengths) {
+    ConvertDefRepLevelsToList(test_data.def_levels_.data(), test_data.rep_levels_.data(),
+                              test_data.def_levels_.size(), level_info, output, lengths);
+    return lengths + output->values_read;
+  }
+};
+
+using ConverterTypes =
+    ::testing::Types<RepDefLevelConverter</*list_length_type=*/int32_t>,
+                     RepDefLevelConverter</*list_length_type=*/int64_t>>;
+TYPED_TEST_CASE(NestedListTest, ConverterTypes);
+
+TYPED_TEST(NestedListTest, OuterMostTest) {
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> 4 outer most lists (len(3), len(4), null, len(0))
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(5, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 4;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 4);

Review comment:
       that is a good question maybe implicit EQ.  fixed.

##########
File path: cpp/src/parquet/level_conversion_test.cc
##########
@@ -108,22 +114,245 @@ TEST(GreaterThanBitmap, GeneratesExpectedBitmasks) {
 
 TEST(DefinitionLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) {
   std::vector<uint8_t> validity_bitmap(/*count*/ 8, 0);
-  int64_t null_count = 5;
-  int64_t values_read = 1;
 
+  ValidityBitmapInputOutput io;
+  io.values_read_upper_bound = 64;
+  io.values_read = 1;
+  io.null_count = 5;
+  io.valid_bits = validity_bitmap.data();
+  io.valid_bits_offset = 1;
+
+  LevelInfo level_info;
+  level_info.repeated_ancestor_def_level = 1;
+  level_info.def_level = 2;
+  level_info.rep_level = 1;
   // All zeros should be ignored, ones should be unset in the bitmp and 2 should be set.
   std::vector<int16_t> def_levels = {0, 0, 0, 2, 2, 1, 0, 2};
-  DefinitionLevelsToBitmap(
-      def_levels.data(), def_levels.size(), /*max_definition_level=*/2,
-      /*max_repetition_level=*/1, &values_read, &null_count, validity_bitmap.data(),
-      /*valid_bits_offset=*/1);
+  DefinitionLevelsToBitmap(def_levels.data(), def_levels.size(), level_info, &io);
 
   EXPECT_EQ(BitmapToString(validity_bitmap, /*bit_count=*/8), "01101000");
   for (size_t x = 1; x < validity_bitmap.size(); x++) {
     EXPECT_EQ(validity_bitmap[x], 0) << "index: " << x;
   }
-  EXPECT_EQ(null_count, /*5 + 1 =*/6);
-  EXPECT_EQ(values_read, 4);  // value should get overwritten.
+  EXPECT_EQ(io.null_count, /*5 + 1 =*/6);
+  EXPECT_EQ(io.values_read, 4);  // value should get overwritten.
+}
+
+class MultiLevelTestData {
+ public:
+  // Triply nested list values borrow from write_path
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  std::vector<int16_t> def_levels_{2, 7, 6, 7, 5, 3,  // first row
+                                   5, 5, 7, 7, 2, 7,  // second row
+                                   0,                 // third row
+                                   1};
+  std::vector<int16_t> rep_levels_{0, 1, 3, 3, 2, 1,  // first row
+                                   0, 1, 2, 3, 1, 1,  // second row
+                                   0, 0};
+};
+
+template <typename ConverterType>
+class NestedListTest : public testing::Test {
+ public:
+  MultiLevelTestData test_data_;
+  ConverterType converter_;
+};
+
+template <typename ListType>
+struct RepDefLevelConverter {
+  using ListLengthType = ListType;
+  ListLengthType* ComputeListInfo(const MultiLevelTestData& test_data,
+                                  LevelInfo level_info, ValidityBitmapInputOutput* output,
+                                  ListType* lengths) {
+    ConvertDefRepLevelsToList(test_data.def_levels_.data(), test_data.rep_levels_.data(),
+                              test_data.def_levels_.size(), level_info, output, lengths);
+    return lengths + output->values_read;
+  }
+};
+
+using ConverterTypes =
+    ::testing::Types<RepDefLevelConverter</*list_length_type=*/int32_t>,
+                     RepDefLevelConverter</*list_length_type=*/int64_t>>;
+TYPED_TEST_CASE(NestedListTest, ConverterTypes);
+
+TYPED_TEST(NestedListTest, OuterMostTest) {
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> 4 outer most lists (len(3), len(4), null, len(0))
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(5, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 4;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 4);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 3, 7, 7, 7));
+
+  EXPECT_EQ(validity_io.values_read, 4);
+  EXPECT_EQ(validity_io.null_count, 1);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/4), "1101");
+}
+
+TYPED_TEST(NestedListTest, MiddleListTest) {
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> middle lists (null, len(2), len(0),
+  //                  len(1), len(2), null, len(1),
+  //                  N/A,
+  //                  N/A
+  LevelInfo level_info;
+  level_info.rep_level = 2;
+  level_info.def_level = 4;
+  level_info.repeated_ancestor_def_level = 2;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(8, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 7;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 7);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 0, 2, 2, 3, 5, 5, 6));
+
+  EXPECT_EQ(validity_io.values_read, 7);
+  EXPECT_EQ(validity_io.null_count, 2);
+  EXPECT_EQ(BitmapToString(validity_io.valid_bits, /*length=*/7), "0111101");
+}
+
+TYPED_TEST(NestedListTest, InnerMostListTest) {
+  // [null, [[1, null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> 4 inner lists (N/A, [len(3), len(0)], N/A

Review comment:
       yes.

##########
File path: cpp/src/parquet/level_conversion_test.cc
##########
@@ -108,22 +114,245 @@ TEST(GreaterThanBitmap, GeneratesExpectedBitmasks) {
 
 TEST(DefinitionLevelsToBitmap, WithRepetitionLevelFiltersOutEmptyListValues) {
   std::vector<uint8_t> validity_bitmap(/*count*/ 8, 0);
-  int64_t null_count = 5;
-  int64_t values_read = 1;
 
+  ValidityBitmapInputOutput io;
+  io.values_read_upper_bound = 64;
+  io.values_read = 1;
+  io.null_count = 5;
+  io.valid_bits = validity_bitmap.data();
+  io.valid_bits_offset = 1;
+
+  LevelInfo level_info;
+  level_info.repeated_ancestor_def_level = 1;
+  level_info.def_level = 2;
+  level_info.rep_level = 1;
   // All zeros should be ignored, ones should be unset in the bitmp and 2 should be set.
   std::vector<int16_t> def_levels = {0, 0, 0, 2, 2, 1, 0, 2};
-  DefinitionLevelsToBitmap(
-      def_levels.data(), def_levels.size(), /*max_definition_level=*/2,
-      /*max_repetition_level=*/1, &values_read, &null_count, validity_bitmap.data(),
-      /*valid_bits_offset=*/1);
+  DefinitionLevelsToBitmap(def_levels.data(), def_levels.size(), level_info, &io);
 
   EXPECT_EQ(BitmapToString(validity_bitmap, /*bit_count=*/8), "01101000");
   for (size_t x = 1; x < validity_bitmap.size(); x++) {
     EXPECT_EQ(validity_bitmap[x], 0) << "index: " << x;
   }
-  EXPECT_EQ(null_count, /*5 + 1 =*/6);
-  EXPECT_EQ(values_read, 4);  // value should get overwritten.
+  EXPECT_EQ(io.null_count, /*5 + 1 =*/6);
+  EXPECT_EQ(io.values_read, 4);  // value should get overwritten.
+}
+
+class MultiLevelTestData {
+ public:
+  // Triply nested list values borrow from write_path
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  std::vector<int16_t> def_levels_{2, 7, 6, 7, 5, 3,  // first row
+                                   5, 5, 7, 7, 2, 7,  // second row
+                                   0,                 // third row
+                                   1};
+  std::vector<int16_t> rep_levels_{0, 1, 3, 3, 2, 1,  // first row
+                                   0, 1, 2, 3, 1, 1,  // second row
+                                   0, 0};
+};
+
+template <typename ConverterType>
+class NestedListTest : public testing::Test {
+ public:
+  MultiLevelTestData test_data_;
+  ConverterType converter_;
+};
+
+template <typename ListType>
+struct RepDefLevelConverter {
+  using ListLengthType = ListType;
+  ListLengthType* ComputeListInfo(const MultiLevelTestData& test_data,
+                                  LevelInfo level_info, ValidityBitmapInputOutput* output,
+                                  ListType* lengths) {
+    ConvertDefRepLevelsToList(test_data.def_levels_.data(), test_data.rep_levels_.data(),
+                              test_data.def_levels_.size(), level_info, output, lengths);
+    return lengths + output->values_read;
+  }
+};
+
+using ConverterTypes =
+    ::testing::Types<RepDefLevelConverter</*list_length_type=*/int32_t>,
+                     RepDefLevelConverter</*list_length_type=*/int64_t>>;
+TYPED_TEST_CASE(NestedListTest, ConverterTypes);
+
+TYPED_TEST(NestedListTest, OuterMostTest) {
+  // [null, [[1 , null, 3], []], []],
+  // [[[]], [[], [1, 2]], null, [[3]]],
+  // null,
+  // []
+  // -> 4 outer most lists (len(3), len(4), null, len(0))
+  LevelInfo level_info;
+  level_info.rep_level = 1;
+  level_info.def_level = 2;
+
+  std::vector<typename TypeParam::ListLengthType> lengths(5, 0);
+  uint64_t validity_output;
+  ValidityBitmapInputOutput validity_io;
+  validity_io.values_read_upper_bound = 4;
+  validity_io.valid_bits = reinterpret_cast<uint8_t*>(&validity_output);
+  typename TypeParam::ListLengthType* next_position = this->converter_.ComputeListInfo(
+      this->test_data_, level_info, &validity_io, lengths.data());
+
+  EXPECT_THAT(next_position, lengths.data() + 4);
+  EXPECT_THAT(lengths, testing::ElementsAre(0, 3, 7, 7, 7));

Review comment:
       yes, it should be.  sorry about that, I changed my mind on semantics late in this PR and didn't rename.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -132,43 +137,49 @@ struct PARQUET_EXPORT LevelInfo {
   }
 };
 
-void PARQUET_EXPORT DefinitionLevelsToBitmap(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset);
-
-// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
-// They currently represent minimal functionality for vectorized computation of definition
-// levels.
-
-#if defined(ARROW_LITTLE_ENDIAN)
-/// Builds a bitmap by applying predicate to the level vector provided.
-///
-/// \param[in] levels Rep or def level array.
-/// \param[in] num_levels The number of levels to process (must be [0, 64])
-/// \param[in] predicate The predicate to apply (must have the signature `bool
-/// predicate(int16_t)`.
-/// \returns The bitmap using least significant "bit" ordering.
-///
-/// N.B. Correct byte ordering is dependent on little-endian architectures.
-///
-template <typename Predicate>
-uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
-  // Both clang and GCC can vectorize this automatically with SSE4/AVX2.
-  uint64_t mask = 0;
-  for (int x = 0; x < num_levels; x++) {
-    mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
-  }
-  return mask;
-}
-
-/// Builds a  bitmap where each set bit indicates the corresponding level is greater
-/// than rhs.
-static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
-                                         int16_t rhs) {
-  return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; });
-}
+/// Input/Output structure for reconstructed validity bitmaps.

Review comment:
       done.  I thought this might get included in doxygen but i guess we somehow limit to only public APIs.

##########
File path: cpp/src/parquet/level_comparison.h
##########
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <algorithm>
+#include <cstdint>
+
+#include "arrow/util/bit_util.h"
+#include "parquet/platform.h"
+
+namespace parquet {
+namespace internal {
+
+// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
+// They currently represent minimal functionality for vectorized computation of definition
+// levels.
+
+/// Builds a bitmap by applying predicate to the level vector provided.
+///
+/// \param[in] levels Rep or def level array.
+/// \param[in] num_levels The number of levels to process (must be [0, 64])
+/// \param[in] predicate The predicate to apply (must have the signature `bool
+/// predicate(int16_t)`.
+/// \returns The bitmap using least significant "bit" ordering.
+///
+/// N.B. Correct byte ordering is dependent on little-endian architectures.
+///
+template <typename Predicate>
+inline uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels,
+                               Predicate predicate) {
+  // Both clang and GCC can vectorize this automatically with SSE4/AVX2.
+  uint64_t mask = 0;
+  for (int x = 0; x < num_levels; x++) {
+    mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
+  }
+  return ::arrow::BitUtil::ToLittleEndian(mask);
+}
+
+/// Builds a  bitmap where each set bit indicates the corresponding level is greater
+/// than rhs.
+uint64_t PARQUET_EXPORT GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
+                                          int16_t rhs);
+
+#if defined(ARROW_HAVE_RUNTIME_AVX2)
+uint64_t GreaterThanBitmapAvx2(const int16_t* levels, int64_t num_levels, int16_t rhs);
+#endif
+
+struct MinMax {
+  int16_t min;
+  int16_t max;
+};
+
+MinMax FindMinMax(const int16_t* levels, int64_t num_levels);
+
+#if defined(ARROW_HAVE_RUNTIME_AVX2)
+MinMax FindMinMaxAvx2(const int16_t* levels, int64_t num_levels);
+#endif
+
+// Used to make sure ODR rule isn't violated.
+namespace IMPL_NAMESPACE {

Review comment:
       went with PARQUET_IMPL_NAMESPACE

##########
File path: cpp/src/parquet/level_comparison.h
##########
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <algorithm>
+#include <cstdint>
+
+#include "arrow/util/bit_util.h"
+#include "parquet/platform.h"
+
+namespace parquet {
+namespace internal {
+
+// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.

Review comment:
       nope. removed.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -132,43 +137,49 @@ struct PARQUET_EXPORT LevelInfo {
   }
 };
 
-void PARQUET_EXPORT DefinitionLevelsToBitmap(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset);
-
-// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
-// They currently represent minimal functionality for vectorized computation of definition
-// levels.
-
-#if defined(ARROW_LITTLE_ENDIAN)
-/// Builds a bitmap by applying predicate to the level vector provided.
-///
-/// \param[in] levels Rep or def level array.
-/// \param[in] num_levels The number of levels to process (must be [0, 64])
-/// \param[in] predicate The predicate to apply (must have the signature `bool
-/// predicate(int16_t)`.
-/// \returns The bitmap using least significant "bit" ordering.
-///
-/// N.B. Correct byte ordering is dependent on little-endian architectures.
-///
-template <typename Predicate>
-uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
-  // Both clang and GCC can vectorize this automatically with SSE4/AVX2.
-  uint64_t mask = 0;
-  for (int x = 0; x < num_levels; x++) {
-    mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
-  }
-  return mask;
-}
-
-/// Builds a  bitmap where each set bit indicates the corresponding level is greater
-/// than rhs.
-static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
-                                         int16_t rhs) {
-  return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; });
-}
+/// Input/Output structure for reconstructed validity bitmaps.
+struct PARQUET_EXPORT ValidityBitmapInputOutput {
+  /// The maximum number of values_read expected (actual
+  /// values read must be less than or equal to this value.
+  /// If this number is exceeded methods will throw a
+  /// ParquetException.
+  int64_t values_read_upper_bound = 0;
+  /// The number of values added to the bitmap.
+  int64_t values_read = 0;
+  /// The number of nulls encountered.
+  int64_t null_count = 0;
+  // The validity bitmp to populate. Can only be null
+  // for DefRepLevelsToListInfo (if all that is needed is list lengths).
+  uint8_t* valid_bits = NULLPTR;
+  /// Input only, offset into valid_bits to start at.
+  int64_t valid_bits_offset = 0;
+};
 
+/// Converts def_levels to validity bitmaps for non-list arrays.
+void PARQUET_EXPORT DefinitionLevelsToBitmap(const int16_t* def_levels,
+                                             int64_t num_def_levels, LevelInfo level_info,
+                                             ValidityBitmapInputOutput* output);
+
+/// Reconstructs a validity bitmap and list lengths for a ListArray based on
+/// def/rep levels.
+void PARQUET_EXPORT ConvertDefRepLevelsToList(
+    const int16_t* def_levels, const int16_t* rep_levels, int64_t num_def_levels,
+    LevelInfo level_info, ValidityBitmapInputOutput* output,
+    ::arrow::util::variant<int32_t*, int64_t*> lengths);

Review comment:
       separated to two methods.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -132,43 +137,49 @@ struct PARQUET_EXPORT LevelInfo {
   }
 };
 
-void PARQUET_EXPORT DefinitionLevelsToBitmap(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset);
-
-// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
-// They currently represent minimal functionality for vectorized computation of definition
-// levels.
-
-#if defined(ARROW_LITTLE_ENDIAN)
-/// Builds a bitmap by applying predicate to the level vector provided.
-///
-/// \param[in] levels Rep or def level array.
-/// \param[in] num_levels The number of levels to process (must be [0, 64])
-/// \param[in] predicate The predicate to apply (must have the signature `bool
-/// predicate(int16_t)`.
-/// \returns The bitmap using least significant "bit" ordering.
-///
-/// N.B. Correct byte ordering is dependent on little-endian architectures.
-///
-template <typename Predicate>
-uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
-  // Both clang and GCC can vectorize this automatically with SSE4/AVX2.
-  uint64_t mask = 0;
-  for (int x = 0; x < num_levels; x++) {
-    mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
-  }
-  return mask;
-}
-
-/// Builds a  bitmap where each set bit indicates the corresponding level is greater
-/// than rhs.
-static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
-                                         int16_t rhs) {
-  return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; });
-}
+/// Input/Output structure for reconstructed validity bitmaps.
+struct PARQUET_EXPORT ValidityBitmapInputOutput {
+  /// The maximum number of values_read expected (actual
+  /// values read must be less than or equal to this value.
+  /// If this number is exceeded methods will throw a
+  /// ParquetException.
+  int64_t values_read_upper_bound = 0;
+  /// The number of values added to the bitmap.
+  int64_t values_read = 0;
+  /// The number of nulls encountered.
+  int64_t null_count = 0;
+  // The validity bitmp to populate. Can only be null
+  // for DefRepLevelsToListInfo (if all that is needed is list lengths).
+  uint8_t* valid_bits = NULLPTR;
+  /// Input only, offset into valid_bits to start at.
+  int64_t valid_bits_offset = 0;
+};
 
+/// Converts def_levels to validity bitmaps for non-list arrays.
+void PARQUET_EXPORT DefinitionLevelsToBitmap(const int16_t* def_levels,
+                                             int64_t num_def_levels, LevelInfo level_info,
+                                             ValidityBitmapInputOutput* output);
+
+/// Reconstructs a validity bitmap and list lengths for a ListArray based on
+/// def/rep levels.
+void PARQUET_EXPORT ConvertDefRepLevelsToList(
+    const int16_t* def_levels, const int16_t* rep_levels, int64_t num_def_levels,
+    LevelInfo level_info, ValidityBitmapInputOutput* output,
+    ::arrow::util::variant<int32_t*, int64_t*> lengths);
+
+/// Reconstructs a validity bitmap for a struct that has nested children.

Review comment:
       tried to clarify.  it is list/repeated children.

##########
File path: cpp/src/parquet/level_conversion_inc.h
##########
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "parquet/level_conversion.h"
+
+#include <algorithm>
+#include <limits>
+#if defined(ARROW_HAVE_BMI2)
+#if defined(_MSC_VER)
+#include <immintrin.h>
+#else
+#include <x86intrin.h>
+#endif  // _MSC_VER
+#endif  // ARROW_HAVE_BMI2
+
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+namespace parquet {
+namespace internal {
+namespace BMI_RUNTIME_VERSION {
+
+using ::arrow::internal::BitRun;
+using ::arrow::internal::BitRunReader;
+
+/// Algorithm to simulate pext using BitRunReader for cases where all bits
+/// not set or set.
+uint64_t RunBasedExtractMixed(uint64_t bitmap, uint64_t select_bitmap) {
+  bitmap = arrow::BitUtil::FromLittleEndian(bitmap);
+  uint64_t new_bitmap = 0;
+  ::arrow::internal::BitRunReader selection(reinterpret_cast<uint8_t*>(&select_bitmap),
+                                            /*start_offset=*/0, /*length=*/64);
+  ::arrow::internal::BitRun run = selection.NextRun();
+  int64_t selected_bits = 0;
+  while (run.length != 0) {
+    if (run.set) {
+      new_bitmap |= (bitmap & ::arrow::BitUtil::LeastSignficantBitMask(run.length))

Review comment:
       already existing method.  filed https://issues.apache.org/jira/browse/ARROW-10009 to clean this up.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -132,43 +137,49 @@ struct PARQUET_EXPORT LevelInfo {
   }
 };
 
-void PARQUET_EXPORT DefinitionLevelsToBitmap(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset);
-
-// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
-// They currently represent minimal functionality for vectorized computation of definition
-// levels.
-
-#if defined(ARROW_LITTLE_ENDIAN)
-/// Builds a bitmap by applying predicate to the level vector provided.
-///
-/// \param[in] levels Rep or def level array.
-/// \param[in] num_levels The number of levels to process (must be [0, 64])
-/// \param[in] predicate The predicate to apply (must have the signature `bool
-/// predicate(int16_t)`.
-/// \returns The bitmap using least significant "bit" ordering.
-///
-/// N.B. Correct byte ordering is dependent on little-endian architectures.
-///
-template <typename Predicate>
-uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
-  // Both clang and GCC can vectorize this automatically with SSE4/AVX2.
-  uint64_t mask = 0;
-  for (int x = 0; x < num_levels; x++) {
-    mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
-  }
-  return mask;
-}
-
-/// Builds a  bitmap where each set bit indicates the corresponding level is greater
-/// than rhs.
-static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
-                                         int16_t rhs) {
-  return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; });
-}
+/// Input/Output structure for reconstructed validity bitmaps.
+struct PARQUET_EXPORT ValidityBitmapInputOutput {
+  /// The maximum number of values_read expected (actual
+  /// values read must be less than or equal to this value.
+  /// If this number is exceeded methods will throw a

Review comment:
       done.

##########
File path: cpp/src/parquet/level_conversion_inc.h
##########
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "parquet/level_conversion.h"
+
+#include <algorithm>
+#include <limits>
+#if defined(ARROW_HAVE_BMI2)
+#if defined(_MSC_VER)
+#include <immintrin.h>
+#else
+#include <x86intrin.h>
+#endif  // _MSC_VER

Review comment:
       yes.  sorry forgot about this.

##########
File path: cpp/cmake_modules/SetupCxxFlags.cmake
##########
@@ -50,7 +50,7 @@ if(ARROW_CPU_FLAG STREQUAL "x86")
     # skylake-avx512 consists of AVX512F,AVX512BW,AVX512VL,AVX512CD,AVX512DQ
     set(ARROW_AVX512_FLAG "-march=skylake-avx512 -mbmi2")
     # Append the avx2/avx512 subset option also, fix issue ARROW-9877 for homebrew-cpp
-    set(ARROW_AVX2_FLAG "${ARROW_AVX2_FLAG} -mavx2")
+    set(ARROW_AVX2_FLAG "${ARROW_AVX2_FLAG} -mavx2 -mbmi2")

Review comment:
       It is no riskier than windows, which as far as I can tell does not have a separate flag for this.  From what I can tell there from [wikipedia](https://en.wikipedia.org/wiki/Bit_manipulation_instruction_set) only a small number of AVX2 processors don't support BMI2 (some variants pre Excavator) which support avx2 

##########
File path: cpp/src/parquet/level_conversion_inc.h
##########
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "parquet/level_conversion.h"
+
+#include <algorithm>
+#include <limits>
+#if defined(ARROW_HAVE_BMI2)
+#if defined(_MSC_VER)
+#include <immintrin.h>
+#else
+#include <x86intrin.h>
+#endif  // _MSC_VER
+#endif  // ARROW_HAVE_BMI2
+
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+namespace parquet {
+namespace internal {
+namespace BMI_RUNTIME_VERSION {
+
+using ::arrow::internal::BitRun;
+using ::arrow::internal::BitRunReader;

Review comment:
       removed these, I think I used them in the bitmap related code but there is less of a reason to use them here.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -132,43 +137,49 @@ struct PARQUET_EXPORT LevelInfo {
   }
 };
 
-void PARQUET_EXPORT DefinitionLevelsToBitmap(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset);
-
-// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
-// They currently represent minimal functionality for vectorized computation of definition
-// levels.
-
-#if defined(ARROW_LITTLE_ENDIAN)
-/// Builds a bitmap by applying predicate to the level vector provided.
-///
-/// \param[in] levels Rep or def level array.
-/// \param[in] num_levels The number of levels to process (must be [0, 64])
-/// \param[in] predicate The predicate to apply (must have the signature `bool
-/// predicate(int16_t)`.
-/// \returns The bitmap using least significant "bit" ordering.
-///
-/// N.B. Correct byte ordering is dependent on little-endian architectures.
-///
-template <typename Predicate>
-uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
-  // Both clang and GCC can vectorize this automatically with SSE4/AVX2.
-  uint64_t mask = 0;
-  for (int x = 0; x < num_levels; x++) {
-    mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
-  }
-  return mask;
-}
-
-/// Builds a  bitmap where each set bit indicates the corresponding level is greater
-/// than rhs.
-static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
-                                         int16_t rhs) {
-  return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; });
-}
+/// Input/Output structure for reconstructed validity bitmaps.
+struct PARQUET_EXPORT ValidityBitmapInputOutput {
+  /// The maximum number of values_read expected (actual
+  /// values read must be less than or equal to this value.
+  /// If this number is exceeded methods will throw a
+  /// ParquetException.
+  int64_t values_read_upper_bound = 0;
+  /// The number of values added to the bitmap.
+  int64_t values_read = 0;
+  /// The number of nulls encountered.
+  int64_t null_count = 0;
+  // The validity bitmp to populate. Can only be null
+  // for DefRepLevelsToListInfo (if all that is needed is list lengths).
+  uint8_t* valid_bits = NULLPTR;
+  /// Input only, offset into valid_bits to start at.
+  int64_t valid_bits_offset = 0;
+};
 
+/// Converts def_levels to validity bitmaps for non-list arrays.
+void PARQUET_EXPORT DefinitionLevelsToBitmap(const int16_t* def_levels,
+                                             int64_t num_def_levels, LevelInfo level_info,
+                                             ValidityBitmapInputOutput* output);
+
+/// Reconstructs a validity bitmap and list lengths for a ListArray based on
+/// def/rep levels.
+void PARQUET_EXPORT ConvertDefRepLevelsToList(
+    const int16_t* def_levels, const int16_t* rep_levels, int64_t num_def_levels,
+    LevelInfo level_info, ValidityBitmapInputOutput* output,
+    ::arrow::util::variant<int32_t*, int64_t*> lengths);
+
+/// Reconstructs a validity bitmap for a struct that has nested children.
+void PARQUET_EXPORT ConvertDefRepLevelsToBitmap(const int16_t* def_levels,
+                                                const int16_t* rep_levels,
+                                                int64_t num_def_levels,
+                                                LevelInfo level_info,
+                                                ValidityBitmapInputOutput* output);
+
+uint64_t PARQUET_EXPORT RunBasedExtract(uint64_t bitmap, uint64_t selection);

Review comment:
       renamed to TestOnly and added a comment.  This is exposed to ensure we can get sufficient test coverage.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -19,6 +19,9 @@
 
 #include <cstdint>
 
+#include "arrow/util/bitmap.h"
+#include "arrow/util/optional.h"

Review comment:
       no it is left over from the bitmap code that I pulled out.

##########
File path: cpp/src/parquet/level_conversion.cc
##########
@@ -18,176 +18,190 @@
 
 #include <algorithm>
 #include <limits>
-#if defined(ARROW_HAVE_BMI2)
-#include <x86intrin.h>
-#endif
 
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
+#include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
 #include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+#define BMI_RUNTIME_VERSION standard
+#include "parquet/level_conversion_inc.h"
+#undef BMI_RUNTIME_VERSION
 
 namespace parquet {
 namespace internal {
 namespace {
-inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
-                            const int16_t max_expected_level) {
-  int16_t min_level = std::numeric_limits<int16_t>::max();
-  int16_t max_level = std::numeric_limits<int16_t>::min();
-  for (int x = 0; x < num_levels; x++) {
-    min_level = std::min(levels[x], min_level);
-    max_level = std::max(levels[x], max_level);
-  }
-  if (ARROW_PREDICT_FALSE(num_levels > 0 &&
-                          (min_level < 0 || max_level > max_expected_level))) {
-    throw ParquetException("definition level exceeds maximum");
-  }
-}
 
-#if !defined(ARROW_HAVE_AVX512)
-
-inline void DefinitionLevelsToBitmapScalar(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  // We assume here that valid_bits is large enough to accommodate the
-  // additional definition levels and the ones that have already been written
-  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
-                                                    num_def_levels);
-
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
+using ::arrow::internal::CpuInfo;
+
+#if !defined(ARROW_HAVE_RUNTIME_BMI2)
+void DefinitionLevelsToBitmapScalar(const int16_t* def_levels, int64_t num_def_levels,
+                                    LevelInfo level_info,
+                                    ValidityBitmapInputOutput* output) {
+  ::arrow::internal::FirstTimeBitmapWriter valid_bits_writer(
+      output->valid_bits,
+      /*start_offset=*/output->valid_bits_offset,
+      /*length=*/num_def_levels);
+  for (int x = 0; x < num_def_levels; x++) {
+    if (def_levels[x] < level_info.repeated_ancestor_def_level) {
+      continue;
+    }
+    if (ARROW_PREDICT_FALSE(valid_bits_writer.position() >
+                            output->values_read_upper_bound)) {
+      std::stringstream ss;
+      ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound;
+      throw ParquetException(ss.str());
+    }
+    if (def_levels[x] >= level_info.def_level) {
       valid_bits_writer.Set();
-    } else if (max_repetition_level > 0) {
-      // repetition+flat case
-      if (def_levels[i] == (max_definition_level - 1)) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        continue;
-      }
     } else {
-      // non-repeated+nested case
-      if (def_levels[i] < max_definition_level) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        throw ParquetException("definition level exceeds maximum");
-      }
+      valid_bits_writer.Clear();
+      output->null_count += 1;
     }
-
     valid_bits_writer.Next();
   }
   valid_bits_writer.Finish();
-  *values_read = valid_bits_writer.position();
-}
-#endif
-
-template <bool has_repeated_parent>
-int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size,
-                                      const int16_t required_definition_level,
-                                      ::arrow::internal::FirstTimeBitmapWriter* writer) {
-  CheckLevelRange(def_levels, batch_size, required_definition_level);
-  uint64_t defined_bitmap =
-      internal::GreaterThanBitmap(def_levels, batch_size, required_definition_level - 1);
-
-  DCHECK_LE(batch_size, 64);
-  if (has_repeated_parent) {
-#if defined(ARROW_HAVE_BMI2)
-    // This is currently a specialized code path assuming only (nested) lists
-    // present through the leaf (i.e. no structs). Upper level code only calls
-    // this method when the leaf-values are nullable (otherwise no spacing is needed),
-    // Because only nested lists exists it is sufficient to know that the field
-    // was either null or included it (i.e. definition level > max_definitation_level
-    // -2) If there where structs mixed in, we need to know the def_level of the
-    // repeated parent so we can check for def_level > "def level of repeated parent".
-    uint64_t present_bitmap = internal::GreaterThanBitmap(def_levels, batch_size,
-                                                          required_definition_level - 2);
-    uint64_t selected_bits = _pext_u64(defined_bitmap, present_bitmap);
-    writer->AppendWord(selected_bits, ::arrow::BitUtil::PopCount(present_bitmap));
-    return ::arrow::BitUtil::PopCount(selected_bits);
-#else
-    assert(false && "must not execute this without BMI2");
-#endif
-  } else {
-    writer->AppendWord(defined_bitmap, batch_size);
-    return ::arrow::BitUtil::PopCount(defined_bitmap);
+  output->values_read = valid_bits_writer.position();
+  if (output->null_count > 0 && level_info.null_slot_usage > 1) {
+    throw ParquetException(
+        "Null values with null_slot_usage > 1 not supported."
+        "(i.e. FixedSizeLists with null values are not supported");
   }
 }
+#endif
 
-template <bool has_repeated_parent>
-void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels,
-                                  const int16_t required_definition_level,
-                                  int64_t* values_read, int64_t* null_count,
-                                  uint8_t* valid_bits, int64_t valid_bits_offset) {
-  constexpr int64_t kBitMaskSize = 64;
-  ::arrow::internal::FirstTimeBitmapWriter writer(valid_bits,
-                                                  /*start_offset=*/valid_bits_offset,
-                                                  /*length=*/num_def_levels);
-  int64_t set_count = 0;
-  *values_read = 0;
-  while (num_def_levels > kBitMaskSize) {
-    set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-        def_levels, kBitMaskSize, required_definition_level, &writer);
-    def_levels += kBitMaskSize;
-    num_def_levels -= kBitMaskSize;
+template <typename LengthType>
+void DefRepLevelsToListInfo(const int16_t* def_levels, const int16_t* rep_levels,
+                            int64_t num_def_levels, LevelInfo level_info,
+                            ValidityBitmapInputOutput* output, LengthType* lengths) {
+  LengthType* orig_pos = lengths;
+  std::unique_ptr<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer;
+  if (output->valid_bits) {
+    valid_bits_writer.reset(new ::arrow::internal::FirstTimeBitmapWriter(
+        output->valid_bits, output->valid_bits_offset, num_def_levels));
   }
-  set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-      def_levels, num_def_levels, required_definition_level, &writer);
+  for (int x = 0; x < num_def_levels; x++) {
+    // Skip items that belong to empty ancenstor lists and futher nested lists.
+    if (def_levels[x] < level_info.repeated_ancestor_def_level ||
+        rep_levels[x] > level_info.rep_level) {
+      continue;
+    }
 
-  *values_read = writer.position();
-  *null_count += *values_read - set_count;
-  writer.Finish();
-}
+    if (ARROW_PREDICT_FALSE(
+            (valid_bits_writer != nullptr &&
+             valid_bits_writer->position() > output->values_read_upper_bound) ||
+            (lengths - orig_pos) > output->values_read_upper_bound)) {
+      std::stringstream ss;
+      ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound;
+      throw ParquetException(ss.str());
+    }
 
-void DefinitionLevelsToBitmapLittleEndian(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  if (max_repetition_level > 0) {
-// This is a short term hack to prevent using the pext BMI2 instructions
-// on non-intel platforms where performance is subpar.
-// In the medium term we will hopefully be able to runtime dispatch
-// to use this on intel only platforms that support pext.
-#if defined(ARROW_HAVE_AVX512)
-    // BMI2 is required for efficient bit extraction.
-    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/true>(
-        def_levels, num_def_levels, max_definition_level, values_read, null_count,
-        valid_bits, valid_bits_offset);
-#else
-    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
-                                   max_repetition_level, values_read, null_count,
-                                   valid_bits, valid_bits_offset);
-#endif  // ARROW_HAVE_BMI2
+    if (rep_levels[x] == level_info.rep_level) {
+      // A continuation of an existing list.
+      if (lengths != nullptr) {
+        if (ARROW_PREDICT_FALSE(*lengths == std::numeric_limits<LengthType>::max())) {
+          throw ParquetException("List index overflow.");
+        }
+        *lengths += 1;
+      }
+    } else {
+      // current_rep < list rep_level i.e. start of a list (ancenstor empty lists are
+      // filtered out above).
+      if (lengths != nullptr) {
+        ++lengths;
+        // Use cumulative lengths because this is what the more common
+        // Arrow list types expect.
+        *lengths = *(lengths - 1);
+        if (def_levels[x] >= level_info.def_level) {
+          if (ARROW_PREDICT_FALSE(*lengths == std::numeric_limits<LengthType>::max())) {
+            throw ParquetException("List index overflow.");
+          }
+          *lengths += 1;
+        }
+      }
 
-  } else {
-    // No BMI2 intsturctions are used for non-repeated case.
-    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/false>(
-        def_levels, num_def_levels, max_definition_level, values_read, null_count,
-        valid_bits, valid_bits_offset);
+      if (valid_bits_writer != nullptr) {
+        // the level_info def level for lists reflects element present level.
+        // the prior level distinguishes between empty lists.
+        if (def_levels[x] >= level_info.def_level - 1) {
+          valid_bits_writer->Set();
+        } else {
+          output->null_count++;
+          valid_bits_writer->Clear();
+        }
+        valid_bits_writer->Next();
+      }
+    }
+  }
+  if (valid_bits_writer != nullptr) {
+    valid_bits_writer->Finish();
+  }
+  if (lengths != nullptr) {
+    output->values_read = lengths - orig_pos;
+  } else if (valid_bits_writer != nullptr) {
+    output->values_read = valid_bits_writer->position();
+  }
+  if (output->null_count > 0 && level_info.null_slot_usage > 1) {
+    throw ParquetException(
+        "Null values with null_slot_usage > 1 not supported."
+        "(i.e. FixedSizeLists with null values are not supported)");
   }
 }
 
 }  // namespace
 
 void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
-                              const int16_t max_definition_level,
-                              const int16_t max_repetition_level, int64_t* values_read,
-                              int64_t* null_count, uint8_t* valid_bits,
-                              int64_t valid_bits_offset) {
-#if ARROW_LITTLE_ENDIAN
-  DefinitionLevelsToBitmapLittleEndian(def_levels, num_def_levels, max_definition_level,
-                                       max_repetition_level, values_read, null_count,
-                                       valid_bits, valid_bits_offset);
-
+                              LevelInfo level_info, ValidityBitmapInputOutput* output) {
+  if (level_info.rep_level > 0) {
+#if defined(ARROW_HAVE_RUNTIME_BMI2)
+    using FunctionType = decltype(&standard::DefinitionLevelsToBitmapSimd<true>);
+    static FunctionType fn =
+        CpuInfo::GetInstance()->HasEfficientBmi2()
+            ? DefinitionLevelsToBitmapBmi2WithRepeatedParent
+            : standard::DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/true>;
+    fn(def_levels, num_def_levels, level_info, output);
 #else
-  DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
-                                 max_repetition_level, values_read, null_count,
-                                 valid_bits, valid_bits_offset);
+    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, level_info, output);

Review comment:
       Added a comment.  It is likely the best choice for big-endian platforms.  It might also be the best choice for AMD CPUs (if you could see the performance difference of this PR on your machine compared to the master it would help inform if this should be used above.

##########
File path: cpp/cmake_modules/SetupCxxFlags.cmake
##########
@@ -50,7 +50,7 @@ if(ARROW_CPU_FLAG STREQUAL "x86")
     # skylake-avx512 consists of AVX512F,AVX512BW,AVX512VL,AVX512CD,AVX512DQ
     set(ARROW_AVX512_FLAG "-march=skylake-avx512 -mbmi2")
     # Append the avx2/avx512 subset option also, fix issue ARROW-9877 for homebrew-cpp
-    set(ARROW_AVX2_FLAG "${ARROW_AVX2_FLAG} -mavx2")
+    set(ARROW_AVX2_FLAG "${ARROW_AVX2_FLAG} -mavx2 -mbmi2")

Review comment:
       did you mean to tag someone else here?  Or do I get the final say :)

##########
File path: cpp/src/parquet/level_conversion.cc
##########
@@ -18,176 +18,190 @@
 
 #include <algorithm>
 #include <limits>
-#if defined(ARROW_HAVE_BMI2)
-#include <x86intrin.h>
-#endif
 
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
+#include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
 #include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+#define BMI_RUNTIME_VERSION standard
+#include "parquet/level_conversion_inc.h"
+#undef BMI_RUNTIME_VERSION
 
 namespace parquet {
 namespace internal {
 namespace {
-inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
-                            const int16_t max_expected_level) {
-  int16_t min_level = std::numeric_limits<int16_t>::max();
-  int16_t max_level = std::numeric_limits<int16_t>::min();
-  for (int x = 0; x < num_levels; x++) {
-    min_level = std::min(levels[x], min_level);
-    max_level = std::max(levels[x], max_level);
-  }
-  if (ARROW_PREDICT_FALSE(num_levels > 0 &&
-                          (min_level < 0 || max_level > max_expected_level))) {
-    throw ParquetException("definition level exceeds maximum");
-  }
-}
 
-#if !defined(ARROW_HAVE_AVX512)
-
-inline void DefinitionLevelsToBitmapScalar(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  // We assume here that valid_bits is large enough to accommodate the
-  // additional definition levels and the ones that have already been written
-  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
-                                                    num_def_levels);
-
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
+using ::arrow::internal::CpuInfo;
+
+#if !defined(ARROW_HAVE_RUNTIME_BMI2)
+void DefinitionLevelsToBitmapScalar(const int16_t* def_levels, int64_t num_def_levels,
+                                    LevelInfo level_info,
+                                    ValidityBitmapInputOutput* output) {
+  ::arrow::internal::FirstTimeBitmapWriter valid_bits_writer(
+      output->valid_bits,
+      /*start_offset=*/output->valid_bits_offset,
+      /*length=*/num_def_levels);
+  for (int x = 0; x < num_def_levels; x++) {
+    if (def_levels[x] < level_info.repeated_ancestor_def_level) {
+      continue;
+    }
+    if (ARROW_PREDICT_FALSE(valid_bits_writer.position() >

Review comment:
       carelessness, also fixed bound to ToList.

##########
File path: cpp/src/parquet/level_conversion.cc
##########
@@ -18,176 +18,190 @@
 
 #include <algorithm>
 #include <limits>
-#if defined(ARROW_HAVE_BMI2)
-#include <x86intrin.h>
-#endif
 
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
+#include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
 #include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+#define BMI_RUNTIME_VERSION standard
+#include "parquet/level_conversion_inc.h"
+#undef BMI_RUNTIME_VERSION
 
 namespace parquet {
 namespace internal {
 namespace {
-inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
-                            const int16_t max_expected_level) {
-  int16_t min_level = std::numeric_limits<int16_t>::max();
-  int16_t max_level = std::numeric_limits<int16_t>::min();
-  for (int x = 0; x < num_levels; x++) {
-    min_level = std::min(levels[x], min_level);
-    max_level = std::max(levels[x], max_level);
-  }
-  if (ARROW_PREDICT_FALSE(num_levels > 0 &&
-                          (min_level < 0 || max_level > max_expected_level))) {
-    throw ParquetException("definition level exceeds maximum");
-  }
-}
 
-#if !defined(ARROW_HAVE_AVX512)
-
-inline void DefinitionLevelsToBitmapScalar(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  // We assume here that valid_bits is large enough to accommodate the
-  // additional definition levels and the ones that have already been written
-  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
-                                                    num_def_levels);
-
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
+using ::arrow::internal::CpuInfo;
+
+#if !defined(ARROW_HAVE_RUNTIME_BMI2)
+void DefinitionLevelsToBitmapScalar(const int16_t* def_levels, int64_t num_def_levels,
+                                    LevelInfo level_info,
+                                    ValidityBitmapInputOutput* output) {
+  ::arrow::internal::FirstTimeBitmapWriter valid_bits_writer(
+      output->valid_bits,
+      /*start_offset=*/output->valid_bits_offset,
+      /*length=*/num_def_levels);
+  for (int x = 0; x < num_def_levels; x++) {
+    if (def_levels[x] < level_info.repeated_ancestor_def_level) {
+      continue;
+    }
+    if (ARROW_PREDICT_FALSE(valid_bits_writer.position() >
+                            output->values_read_upper_bound)) {
+      std::stringstream ss;
+      ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound;
+      throw ParquetException(ss.str());
+    }
+    if (def_levels[x] >= level_info.def_level) {
       valid_bits_writer.Set();
-    } else if (max_repetition_level > 0) {
-      // repetition+flat case
-      if (def_levels[i] == (max_definition_level - 1)) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        continue;
-      }
     } else {
-      // non-repeated+nested case
-      if (def_levels[i] < max_definition_level) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        throw ParquetException("definition level exceeds maximum");
-      }
+      valid_bits_writer.Clear();
+      output->null_count += 1;
     }
-
     valid_bits_writer.Next();
   }
   valid_bits_writer.Finish();
-  *values_read = valid_bits_writer.position();
-}
-#endif
-
-template <bool has_repeated_parent>
-int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size,
-                                      const int16_t required_definition_level,
-                                      ::arrow::internal::FirstTimeBitmapWriter* writer) {
-  CheckLevelRange(def_levels, batch_size, required_definition_level);
-  uint64_t defined_bitmap =
-      internal::GreaterThanBitmap(def_levels, batch_size, required_definition_level - 1);
-
-  DCHECK_LE(batch_size, 64);
-  if (has_repeated_parent) {
-#if defined(ARROW_HAVE_BMI2)
-    // This is currently a specialized code path assuming only (nested) lists
-    // present through the leaf (i.e. no structs). Upper level code only calls
-    // this method when the leaf-values are nullable (otherwise no spacing is needed),
-    // Because only nested lists exists it is sufficient to know that the field
-    // was either null or included it (i.e. definition level > max_definitation_level
-    // -2) If there where structs mixed in, we need to know the def_level of the
-    // repeated parent so we can check for def_level > "def level of repeated parent".
-    uint64_t present_bitmap = internal::GreaterThanBitmap(def_levels, batch_size,
-                                                          required_definition_level - 2);
-    uint64_t selected_bits = _pext_u64(defined_bitmap, present_bitmap);
-    writer->AppendWord(selected_bits, ::arrow::BitUtil::PopCount(present_bitmap));
-    return ::arrow::BitUtil::PopCount(selected_bits);
-#else
-    assert(false && "must not execute this without BMI2");
-#endif
-  } else {
-    writer->AppendWord(defined_bitmap, batch_size);
-    return ::arrow::BitUtil::PopCount(defined_bitmap);
+  output->values_read = valid_bits_writer.position();
+  if (output->null_count > 0 && level_info.null_slot_usage > 1) {
+    throw ParquetException(
+        "Null values with null_slot_usage > 1 not supported."
+        "(i.e. FixedSizeLists with null values are not supported");
   }
 }
+#endif
 
-template <bool has_repeated_parent>
-void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels,
-                                  const int16_t required_definition_level,
-                                  int64_t* values_read, int64_t* null_count,
-                                  uint8_t* valid_bits, int64_t valid_bits_offset) {
-  constexpr int64_t kBitMaskSize = 64;
-  ::arrow::internal::FirstTimeBitmapWriter writer(valid_bits,
-                                                  /*start_offset=*/valid_bits_offset,
-                                                  /*length=*/num_def_levels);
-  int64_t set_count = 0;
-  *values_read = 0;
-  while (num_def_levels > kBitMaskSize) {
-    set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-        def_levels, kBitMaskSize, required_definition_level, &writer);
-    def_levels += kBitMaskSize;
-    num_def_levels -= kBitMaskSize;
+template <typename LengthType>
+void DefRepLevelsToListInfo(const int16_t* def_levels, const int16_t* rep_levels,
+                            int64_t num_def_levels, LevelInfo level_info,
+                            ValidityBitmapInputOutput* output, LengthType* lengths) {
+  LengthType* orig_pos = lengths;
+  std::unique_ptr<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer;
+  if (output->valid_bits) {
+    valid_bits_writer.reset(new ::arrow::internal::FirstTimeBitmapWriter(
+        output->valid_bits, output->valid_bits_offset, num_def_levels));
   }
-  set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-      def_levels, num_def_levels, required_definition_level, &writer);
+  for (int x = 0; x < num_def_levels; x++) {
+    // Skip items that belong to empty ancenstor lists and futher nested lists.
+    if (def_levels[x] < level_info.repeated_ancestor_def_level ||
+        rep_levels[x] > level_info.rep_level) {
+      continue;
+    }
 
-  *values_read = writer.position();
-  *null_count += *values_read - set_count;
-  writer.Finish();
-}
+    if (ARROW_PREDICT_FALSE(
+            (valid_bits_writer != nullptr &&
+             valid_bits_writer->position() > output->values_read_upper_bound) ||
+            (lengths - orig_pos) > output->values_read_upper_bound)) {
+      std::stringstream ss;
+      ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound;
+      throw ParquetException(ss.str());
+    }
 
-void DefinitionLevelsToBitmapLittleEndian(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  if (max_repetition_level > 0) {
-// This is a short term hack to prevent using the pext BMI2 instructions
-// on non-intel platforms where performance is subpar.
-// In the medium term we will hopefully be able to runtime dispatch
-// to use this on intel only platforms that support pext.
-#if defined(ARROW_HAVE_AVX512)
-    // BMI2 is required for efficient bit extraction.
-    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/true>(
-        def_levels, num_def_levels, max_definition_level, values_read, null_count,
-        valid_bits, valid_bits_offset);
-#else
-    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
-                                   max_repetition_level, values_read, null_count,
-                                   valid_bits, valid_bits_offset);
-#endif  // ARROW_HAVE_BMI2
+    if (rep_levels[x] == level_info.rep_level) {
+      // A continuation of an existing list.
+      if (lengths != nullptr) {
+        if (ARROW_PREDICT_FALSE(*lengths == std::numeric_limits<LengthType>::max())) {
+          throw ParquetException("List index overflow.");
+        }
+        *lengths += 1;
+      }
+    } else {
+      // current_rep < list rep_level i.e. start of a list (ancenstor empty lists are
+      // filtered out above).
+      if (lengths != nullptr) {
+        ++lengths;
+        // Use cumulative lengths because this is what the more common

Review comment:
       clarified this is fixed size list vs variable size list issue.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -132,43 +137,49 @@ struct PARQUET_EXPORT LevelInfo {
   }
 };
 
-void PARQUET_EXPORT DefinitionLevelsToBitmap(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset);
-
-// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
-// They currently represent minimal functionality for vectorized computation of definition
-// levels.
-
-#if defined(ARROW_LITTLE_ENDIAN)
-/// Builds a bitmap by applying predicate to the level vector provided.
-///
-/// \param[in] levels Rep or def level array.
-/// \param[in] num_levels The number of levels to process (must be [0, 64])
-/// \param[in] predicate The predicate to apply (must have the signature `bool
-/// predicate(int16_t)`.
-/// \returns The bitmap using least significant "bit" ordering.
-///
-/// N.B. Correct byte ordering is dependent on little-endian architectures.
-///
-template <typename Predicate>
-uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
-  // Both clang and GCC can vectorize this automatically with SSE4/AVX2.
-  uint64_t mask = 0;
-  for (int x = 0; x < num_levels; x++) {
-    mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
-  }
-  return mask;
-}
-
-/// Builds a  bitmap where each set bit indicates the corresponding level is greater
-/// than rhs.
-static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
-                                         int16_t rhs) {
-  return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; });
-}
+/// Input/Output structure for reconstructed validity bitmaps.
+struct PARQUET_EXPORT ValidityBitmapInputOutput {
+  /// The maximum number of values_read expected (actual
+  /// values read must be less than or equal to this value.
+  /// If this number is exceeded methods will throw a
+  /// ParquetException.
+  int64_t values_read_upper_bound = 0;
+  /// The number of values added to the bitmap.
+  int64_t values_read = 0;
+  /// The number of nulls encountered.
+  int64_t null_count = 0;
+  // The validity bitmp to populate. Can only be null
+  // for DefRepLevelsToListInfo (if all that is needed is list lengths).
+  uint8_t* valid_bits = NULLPTR;
+  /// Input only, offset into valid_bits to start at.
+  int64_t valid_bits_offset = 0;
+};
 
+/// Converts def_levels to validity bitmaps for non-list arrays.
+void PARQUET_EXPORT DefinitionLevelsToBitmap(const int16_t* def_levels,
+                                             int64_t num_def_levels, LevelInfo level_info,
+                                             ValidityBitmapInputOutput* output);
+
+/// Reconstructs a validity bitmap and list lengths for a ListArray based on
+/// def/rep levels.
+void PARQUET_EXPORT ConvertDefRepLevelsToList(
+    const int16_t* def_levels, const int16_t* rep_levels, int64_t num_def_levels,
+    LevelInfo level_info, ValidityBitmapInputOutput* output,
+    ::arrow::util::variant<int32_t*, int64_t*> lengths);
+
+/// Reconstructs a validity bitmap for a struct that has nested children.
+void PARQUET_EXPORT ConvertDefRepLevelsToBitmap(const int16_t* def_levels,
+                                                const int16_t* rep_levels,
+                                                int64_t num_def_levels,
+                                                LevelInfo level_info,
+                                                ValidityBitmapInputOutput* output);
+
+uint64_t PARQUET_EXPORT RunBasedExtract(uint64_t bitmap, uint64_t selection);
+
+#if defined(ARROW_HAVE_RUNTIME_BMI2)
+void PARQUET_EXPORT DefinitionLevelsToBitmapBmi2WithRepeatedParent(

Review comment:
       yeah, I didn't think this through.  moved declaration into .cc with documentation pointing to instruction set specific .cc  did this for comparison as well.

##########
File path: cpp/src/parquet/level_conversion.cc
##########
@@ -18,176 +18,190 @@
 
 #include <algorithm>
 #include <limits>
-#if defined(ARROW_HAVE_BMI2)
-#include <x86intrin.h>
-#endif
 
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
+#include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
 #include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+#define BMI_RUNTIME_VERSION standard
+#include "parquet/level_conversion_inc.h"
+#undef BMI_RUNTIME_VERSION
 
 namespace parquet {
 namespace internal {
 namespace {
-inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
-                            const int16_t max_expected_level) {
-  int16_t min_level = std::numeric_limits<int16_t>::max();
-  int16_t max_level = std::numeric_limits<int16_t>::min();
-  for (int x = 0; x < num_levels; x++) {
-    min_level = std::min(levels[x], min_level);
-    max_level = std::max(levels[x], max_level);
-  }
-  if (ARROW_PREDICT_FALSE(num_levels > 0 &&
-                          (min_level < 0 || max_level > max_expected_level))) {
-    throw ParquetException("definition level exceeds maximum");
-  }
-}
 
-#if !defined(ARROW_HAVE_AVX512)
-
-inline void DefinitionLevelsToBitmapScalar(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  // We assume here that valid_bits is large enough to accommodate the
-  // additional definition levels and the ones that have already been written
-  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
-                                                    num_def_levels);
-
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
+using ::arrow::internal::CpuInfo;
+
+#if !defined(ARROW_HAVE_RUNTIME_BMI2)
+void DefinitionLevelsToBitmapScalar(const int16_t* def_levels, int64_t num_def_levels,
+                                    LevelInfo level_info,
+                                    ValidityBitmapInputOutput* output) {
+  ::arrow::internal::FirstTimeBitmapWriter valid_bits_writer(
+      output->valid_bits,
+      /*start_offset=*/output->valid_bits_offset,
+      /*length=*/num_def_levels);
+  for (int x = 0; x < num_def_levels; x++) {
+    if (def_levels[x] < level_info.repeated_ancestor_def_level) {

Review comment:
       Added comment. Yes, that is controlled below (we always use SIMD when this isn't nested.

##########
File path: cpp/src/parquet/level_conversion.cc
##########
@@ -18,176 +18,190 @@
 
 #include <algorithm>
 #include <limits>
-#if defined(ARROW_HAVE_BMI2)
-#include <x86intrin.h>
-#endif
 
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
+#include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
 #include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+#define BMI_RUNTIME_VERSION standard
+#include "parquet/level_conversion_inc.h"
+#undef BMI_RUNTIME_VERSION
 
 namespace parquet {
 namespace internal {
 namespace {
-inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
-                            const int16_t max_expected_level) {
-  int16_t min_level = std::numeric_limits<int16_t>::max();
-  int16_t max_level = std::numeric_limits<int16_t>::min();
-  for (int x = 0; x < num_levels; x++) {
-    min_level = std::min(levels[x], min_level);
-    max_level = std::max(levels[x], max_level);
-  }
-  if (ARROW_PREDICT_FALSE(num_levels > 0 &&
-                          (min_level < 0 || max_level > max_expected_level))) {
-    throw ParquetException("definition level exceeds maximum");
-  }
-}
 
-#if !defined(ARROW_HAVE_AVX512)
-
-inline void DefinitionLevelsToBitmapScalar(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  // We assume here that valid_bits is large enough to accommodate the
-  // additional definition levels and the ones that have already been written
-  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
-                                                    num_def_levels);
-
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
+using ::arrow::internal::CpuInfo;
+
+#if !defined(ARROW_HAVE_RUNTIME_BMI2)
+void DefinitionLevelsToBitmapScalar(const int16_t* def_levels, int64_t num_def_levels,
+                                    LevelInfo level_info,
+                                    ValidityBitmapInputOutput* output) {
+  ::arrow::internal::FirstTimeBitmapWriter valid_bits_writer(
+      output->valid_bits,
+      /*start_offset=*/output->valid_bits_offset,
+      /*length=*/num_def_levels);
+  for (int x = 0; x < num_def_levels; x++) {
+    if (def_levels[x] < level_info.repeated_ancestor_def_level) {
+      continue;
+    }
+    if (ARROW_PREDICT_FALSE(valid_bits_writer.position() >
+                            output->values_read_upper_bound)) {
+      std::stringstream ss;
+      ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound;
+      throw ParquetException(ss.str());
+    }
+    if (def_levels[x] >= level_info.def_level) {
       valid_bits_writer.Set();
-    } else if (max_repetition_level > 0) {
-      // repetition+flat case
-      if (def_levels[i] == (max_definition_level - 1)) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        continue;
-      }
     } else {
-      // non-repeated+nested case
-      if (def_levels[i] < max_definition_level) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        throw ParquetException("definition level exceeds maximum");
-      }
+      valid_bits_writer.Clear();
+      output->null_count += 1;
     }
-
     valid_bits_writer.Next();
   }
   valid_bits_writer.Finish();
-  *values_read = valid_bits_writer.position();
-}
-#endif
-
-template <bool has_repeated_parent>
-int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size,
-                                      const int16_t required_definition_level,
-                                      ::arrow::internal::FirstTimeBitmapWriter* writer) {
-  CheckLevelRange(def_levels, batch_size, required_definition_level);
-  uint64_t defined_bitmap =
-      internal::GreaterThanBitmap(def_levels, batch_size, required_definition_level - 1);
-
-  DCHECK_LE(batch_size, 64);
-  if (has_repeated_parent) {
-#if defined(ARROW_HAVE_BMI2)
-    // This is currently a specialized code path assuming only (nested) lists
-    // present through the leaf (i.e. no structs). Upper level code only calls
-    // this method when the leaf-values are nullable (otherwise no spacing is needed),
-    // Because only nested lists exists it is sufficient to know that the field
-    // was either null or included it (i.e. definition level > max_definitation_level
-    // -2) If there where structs mixed in, we need to know the def_level of the
-    // repeated parent so we can check for def_level > "def level of repeated parent".
-    uint64_t present_bitmap = internal::GreaterThanBitmap(def_levels, batch_size,
-                                                          required_definition_level - 2);
-    uint64_t selected_bits = _pext_u64(defined_bitmap, present_bitmap);
-    writer->AppendWord(selected_bits, ::arrow::BitUtil::PopCount(present_bitmap));
-    return ::arrow::BitUtil::PopCount(selected_bits);
-#else
-    assert(false && "must not execute this without BMI2");
-#endif
-  } else {
-    writer->AppendWord(defined_bitmap, batch_size);
-    return ::arrow::BitUtil::PopCount(defined_bitmap);
+  output->values_read = valid_bits_writer.position();
+  if (output->null_count > 0 && level_info.null_slot_usage > 1) {
+    throw ParquetException(
+        "Null values with null_slot_usage > 1 not supported."
+        "(i.e. FixedSizeLists with null values are not supported");
   }
 }
+#endif
 
-template <bool has_repeated_parent>
-void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels,
-                                  const int16_t required_definition_level,
-                                  int64_t* values_read, int64_t* null_count,
-                                  uint8_t* valid_bits, int64_t valid_bits_offset) {
-  constexpr int64_t kBitMaskSize = 64;
-  ::arrow::internal::FirstTimeBitmapWriter writer(valid_bits,
-                                                  /*start_offset=*/valid_bits_offset,
-                                                  /*length=*/num_def_levels);
-  int64_t set_count = 0;
-  *values_read = 0;
-  while (num_def_levels > kBitMaskSize) {
-    set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-        def_levels, kBitMaskSize, required_definition_level, &writer);
-    def_levels += kBitMaskSize;
-    num_def_levels -= kBitMaskSize;
+template <typename LengthType>
+void DefRepLevelsToListInfo(const int16_t* def_levels, const int16_t* rep_levels,
+                            int64_t num_def_levels, LevelInfo level_info,
+                            ValidityBitmapInputOutput* output, LengthType* lengths) {
+  LengthType* orig_pos = lengths;
+  std::unique_ptr<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer;
+  if (output->valid_bits) {
+    valid_bits_writer.reset(new ::arrow::internal::FirstTimeBitmapWriter(
+        output->valid_bits, output->valid_bits_offset, num_def_levels));
   }
-  set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-      def_levels, num_def_levels, required_definition_level, &writer);
+  for (int x = 0; x < num_def_levels; x++) {
+    // Skip items that belong to empty ancenstor lists and futher nested lists.

Review comment:
       done.  for fixed size lists we will need to fix-up level info appropriately, so that this check doesn't get triggered.

##########
File path: cpp/src/parquet/level_conversion.cc
##########
@@ -18,176 +18,190 @@
 
 #include <algorithm>
 #include <limits>
-#if defined(ARROW_HAVE_BMI2)
-#include <x86intrin.h>
-#endif
 
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
+#include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
 #include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+#define BMI_RUNTIME_VERSION standard
+#include "parquet/level_conversion_inc.h"
+#undef BMI_RUNTIME_VERSION
 
 namespace parquet {
 namespace internal {
 namespace {
-inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
-                            const int16_t max_expected_level) {
-  int16_t min_level = std::numeric_limits<int16_t>::max();
-  int16_t max_level = std::numeric_limits<int16_t>::min();
-  for (int x = 0; x < num_levels; x++) {
-    min_level = std::min(levels[x], min_level);
-    max_level = std::max(levels[x], max_level);
-  }
-  if (ARROW_PREDICT_FALSE(num_levels > 0 &&
-                          (min_level < 0 || max_level > max_expected_level))) {
-    throw ParquetException("definition level exceeds maximum");
-  }
-}
 
-#if !defined(ARROW_HAVE_AVX512)
-
-inline void DefinitionLevelsToBitmapScalar(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  // We assume here that valid_bits is large enough to accommodate the
-  // additional definition levels and the ones that have already been written
-  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
-                                                    num_def_levels);
-
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
+using ::arrow::internal::CpuInfo;
+
+#if !defined(ARROW_HAVE_RUNTIME_BMI2)
+void DefinitionLevelsToBitmapScalar(const int16_t* def_levels, int64_t num_def_levels,
+                                    LevelInfo level_info,
+                                    ValidityBitmapInputOutput* output) {
+  ::arrow::internal::FirstTimeBitmapWriter valid_bits_writer(
+      output->valid_bits,
+      /*start_offset=*/output->valid_bits_offset,
+      /*length=*/num_def_levels);
+  for (int x = 0; x < num_def_levels; x++) {
+    if (def_levels[x] < level_info.repeated_ancestor_def_level) {
+      continue;
+    }
+    if (ARROW_PREDICT_FALSE(valid_bits_writer.position() >
+                            output->values_read_upper_bound)) {
+      std::stringstream ss;
+      ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound;
+      throw ParquetException(ss.str());
+    }
+    if (def_levels[x] >= level_info.def_level) {
       valid_bits_writer.Set();
-    } else if (max_repetition_level > 0) {
-      // repetition+flat case
-      if (def_levels[i] == (max_definition_level - 1)) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        continue;
-      }
     } else {
-      // non-repeated+nested case
-      if (def_levels[i] < max_definition_level) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        throw ParquetException("definition level exceeds maximum");
-      }
+      valid_bits_writer.Clear();
+      output->null_count += 1;
     }
-
     valid_bits_writer.Next();
   }
   valid_bits_writer.Finish();
-  *values_read = valid_bits_writer.position();
-}
-#endif
-
-template <bool has_repeated_parent>
-int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size,
-                                      const int16_t required_definition_level,
-                                      ::arrow::internal::FirstTimeBitmapWriter* writer) {
-  CheckLevelRange(def_levels, batch_size, required_definition_level);
-  uint64_t defined_bitmap =
-      internal::GreaterThanBitmap(def_levels, batch_size, required_definition_level - 1);
-
-  DCHECK_LE(batch_size, 64);
-  if (has_repeated_parent) {
-#if defined(ARROW_HAVE_BMI2)
-    // This is currently a specialized code path assuming only (nested) lists
-    // present through the leaf (i.e. no structs). Upper level code only calls
-    // this method when the leaf-values are nullable (otherwise no spacing is needed),
-    // Because only nested lists exists it is sufficient to know that the field
-    // was either null or included it (i.e. definition level > max_definitation_level
-    // -2) If there where structs mixed in, we need to know the def_level of the
-    // repeated parent so we can check for def_level > "def level of repeated parent".
-    uint64_t present_bitmap = internal::GreaterThanBitmap(def_levels, batch_size,
-                                                          required_definition_level - 2);
-    uint64_t selected_bits = _pext_u64(defined_bitmap, present_bitmap);
-    writer->AppendWord(selected_bits, ::arrow::BitUtil::PopCount(present_bitmap));
-    return ::arrow::BitUtil::PopCount(selected_bits);
-#else
-    assert(false && "must not execute this without BMI2");
-#endif
-  } else {
-    writer->AppendWord(defined_bitmap, batch_size);
-    return ::arrow::BitUtil::PopCount(defined_bitmap);
+  output->values_read = valid_bits_writer.position();
+  if (output->null_count > 0 && level_info.null_slot_usage > 1) {
+    throw ParquetException(
+        "Null values with null_slot_usage > 1 not supported."
+        "(i.e. FixedSizeLists with null values are not supported");
   }
 }
+#endif
 
-template <bool has_repeated_parent>
-void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels,
-                                  const int16_t required_definition_level,
-                                  int64_t* values_read, int64_t* null_count,
-                                  uint8_t* valid_bits, int64_t valid_bits_offset) {
-  constexpr int64_t kBitMaskSize = 64;
-  ::arrow::internal::FirstTimeBitmapWriter writer(valid_bits,
-                                                  /*start_offset=*/valid_bits_offset,
-                                                  /*length=*/num_def_levels);
-  int64_t set_count = 0;
-  *values_read = 0;
-  while (num_def_levels > kBitMaskSize) {
-    set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-        def_levels, kBitMaskSize, required_definition_level, &writer);
-    def_levels += kBitMaskSize;
-    num_def_levels -= kBitMaskSize;
+template <typename LengthType>
+void DefRepLevelsToListInfo(const int16_t* def_levels, const int16_t* rep_levels,
+                            int64_t num_def_levels, LevelInfo level_info,
+                            ValidityBitmapInputOutput* output, LengthType* lengths) {
+  LengthType* orig_pos = lengths;
+  std::unique_ptr<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer;
+  if (output->valid_bits) {
+    valid_bits_writer.reset(new ::arrow::internal::FirstTimeBitmapWriter(
+        output->valid_bits, output->valid_bits_offset, num_def_levels));
   }
-  set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-      def_levels, num_def_levels, required_definition_level, &writer);
+  for (int x = 0; x < num_def_levels; x++) {
+    // Skip items that belong to empty ancenstor lists and futher nested lists.
+    if (def_levels[x] < level_info.repeated_ancestor_def_level ||
+        rep_levels[x] > level_info.rep_level) {
+      continue;
+    }
 
-  *values_read = writer.position();
-  *null_count += *values_read - set_count;
-  writer.Finish();
-}
+    if (ARROW_PREDICT_FALSE(
+            (valid_bits_writer != nullptr &&
+             valid_bits_writer->position() > output->values_read_upper_bound) ||
+            (lengths - orig_pos) > output->values_read_upper_bound)) {
+      std::stringstream ss;
+      ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound;
+      throw ParquetException(ss.str());
+    }
 
-void DefinitionLevelsToBitmapLittleEndian(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  if (max_repetition_level > 0) {
-// This is a short term hack to prevent using the pext BMI2 instructions
-// on non-intel platforms where performance is subpar.
-// In the medium term we will hopefully be able to runtime dispatch
-// to use this on intel only platforms that support pext.
-#if defined(ARROW_HAVE_AVX512)
-    // BMI2 is required for efficient bit extraction.
-    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/true>(
-        def_levels, num_def_levels, max_definition_level, values_read, null_count,
-        valid_bits, valid_bits_offset);
-#else
-    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
-                                   max_repetition_level, values_read, null_count,
-                                   valid_bits, valid_bits_offset);
-#endif  // ARROW_HAVE_BMI2
+    if (rep_levels[x] == level_info.rep_level) {
+      // A continuation of an existing list.
+      if (lengths != nullptr) {

Review comment:
       sorry, for the naming confusion.  I started with lengths but then transition to offsets.  I think I've cleaned up references in this file now.  Yes, we reuse this function for nullable structs with that have repeated children.  I added a comment for both checks.

##########
File path: cpp/src/parquet/level_conversion.cc
##########
@@ -18,176 +18,190 @@
 
 #include <algorithm>
 #include <limits>
-#if defined(ARROW_HAVE_BMI2)
-#include <x86intrin.h>
-#endif
 
+#include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
+#include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
 #include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+
+#define BMI_RUNTIME_VERSION standard
+#include "parquet/level_conversion_inc.h"
+#undef BMI_RUNTIME_VERSION
 
 namespace parquet {
 namespace internal {
 namespace {
-inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
-                            const int16_t max_expected_level) {
-  int16_t min_level = std::numeric_limits<int16_t>::max();
-  int16_t max_level = std::numeric_limits<int16_t>::min();
-  for (int x = 0; x < num_levels; x++) {
-    min_level = std::min(levels[x], min_level);
-    max_level = std::max(levels[x], max_level);
-  }
-  if (ARROW_PREDICT_FALSE(num_levels > 0 &&
-                          (min_level < 0 || max_level > max_expected_level))) {
-    throw ParquetException("definition level exceeds maximum");
-  }
-}
 
-#if !defined(ARROW_HAVE_AVX512)
-
-inline void DefinitionLevelsToBitmapScalar(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  // We assume here that valid_bits is large enough to accommodate the
-  // additional definition levels and the ones that have already been written
-  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
-                                                    num_def_levels);
-
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
+using ::arrow::internal::CpuInfo;
+
+#if !defined(ARROW_HAVE_RUNTIME_BMI2)
+void DefinitionLevelsToBitmapScalar(const int16_t* def_levels, int64_t num_def_levels,
+                                    LevelInfo level_info,
+                                    ValidityBitmapInputOutput* output) {
+  ::arrow::internal::FirstTimeBitmapWriter valid_bits_writer(
+      output->valid_bits,
+      /*start_offset=*/output->valid_bits_offset,
+      /*length=*/num_def_levels);
+  for (int x = 0; x < num_def_levels; x++) {
+    if (def_levels[x] < level_info.repeated_ancestor_def_level) {
+      continue;
+    }
+    if (ARROW_PREDICT_FALSE(valid_bits_writer.position() >
+                            output->values_read_upper_bound)) {
+      std::stringstream ss;
+      ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound;
+      throw ParquetException(ss.str());
+    }
+    if (def_levels[x] >= level_info.def_level) {
       valid_bits_writer.Set();
-    } else if (max_repetition_level > 0) {
-      // repetition+flat case
-      if (def_levels[i] == (max_definition_level - 1)) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        continue;
-      }
     } else {
-      // non-repeated+nested case
-      if (def_levels[i] < max_definition_level) {
-        valid_bits_writer.Clear();
-        *null_count += 1;
-      } else {
-        throw ParquetException("definition level exceeds maximum");
-      }
+      valid_bits_writer.Clear();
+      output->null_count += 1;
     }
-
     valid_bits_writer.Next();
   }
   valid_bits_writer.Finish();
-  *values_read = valid_bits_writer.position();
-}
-#endif
-
-template <bool has_repeated_parent>
-int64_t DefinitionLevelsBatchToBitmap(const int16_t* def_levels, const int64_t batch_size,
-                                      const int16_t required_definition_level,
-                                      ::arrow::internal::FirstTimeBitmapWriter* writer) {
-  CheckLevelRange(def_levels, batch_size, required_definition_level);
-  uint64_t defined_bitmap =
-      internal::GreaterThanBitmap(def_levels, batch_size, required_definition_level - 1);
-
-  DCHECK_LE(batch_size, 64);
-  if (has_repeated_parent) {
-#if defined(ARROW_HAVE_BMI2)
-    // This is currently a specialized code path assuming only (nested) lists
-    // present through the leaf (i.e. no structs). Upper level code only calls
-    // this method when the leaf-values are nullable (otherwise no spacing is needed),
-    // Because only nested lists exists it is sufficient to know that the field
-    // was either null or included it (i.e. definition level > max_definitation_level
-    // -2) If there where structs mixed in, we need to know the def_level of the
-    // repeated parent so we can check for def_level > "def level of repeated parent".
-    uint64_t present_bitmap = internal::GreaterThanBitmap(def_levels, batch_size,
-                                                          required_definition_level - 2);
-    uint64_t selected_bits = _pext_u64(defined_bitmap, present_bitmap);
-    writer->AppendWord(selected_bits, ::arrow::BitUtil::PopCount(present_bitmap));
-    return ::arrow::BitUtil::PopCount(selected_bits);
-#else
-    assert(false && "must not execute this without BMI2");
-#endif
-  } else {
-    writer->AppendWord(defined_bitmap, batch_size);
-    return ::arrow::BitUtil::PopCount(defined_bitmap);
+  output->values_read = valid_bits_writer.position();
+  if (output->null_count > 0 && level_info.null_slot_usage > 1) {
+    throw ParquetException(
+        "Null values with null_slot_usage > 1 not supported."
+        "(i.e. FixedSizeLists with null values are not supported");
   }
 }
+#endif
 
-template <bool has_repeated_parent>
-void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels,
-                                  const int16_t required_definition_level,
-                                  int64_t* values_read, int64_t* null_count,
-                                  uint8_t* valid_bits, int64_t valid_bits_offset) {
-  constexpr int64_t kBitMaskSize = 64;
-  ::arrow::internal::FirstTimeBitmapWriter writer(valid_bits,
-                                                  /*start_offset=*/valid_bits_offset,
-                                                  /*length=*/num_def_levels);
-  int64_t set_count = 0;
-  *values_read = 0;
-  while (num_def_levels > kBitMaskSize) {
-    set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-        def_levels, kBitMaskSize, required_definition_level, &writer);
-    def_levels += kBitMaskSize;
-    num_def_levels -= kBitMaskSize;
+template <typename LengthType>
+void DefRepLevelsToListInfo(const int16_t* def_levels, const int16_t* rep_levels,
+                            int64_t num_def_levels, LevelInfo level_info,
+                            ValidityBitmapInputOutput* output, LengthType* lengths) {
+  LengthType* orig_pos = lengths;
+  std::unique_ptr<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer;
+  if (output->valid_bits) {
+    valid_bits_writer.reset(new ::arrow::internal::FirstTimeBitmapWriter(
+        output->valid_bits, output->valid_bits_offset, num_def_levels));
   }
-  set_count += DefinitionLevelsBatchToBitmap<has_repeated_parent>(
-      def_levels, num_def_levels, required_definition_level, &writer);
+  for (int x = 0; x < num_def_levels; x++) {
+    // Skip items that belong to empty ancenstor lists and futher nested lists.
+    if (def_levels[x] < level_info.repeated_ancestor_def_level ||
+        rep_levels[x] > level_info.rep_level) {
+      continue;
+    }
 
-  *values_read = writer.position();
-  *null_count += *values_read - set_count;
-  writer.Finish();
-}
+    if (ARROW_PREDICT_FALSE(
+            (valid_bits_writer != nullptr &&
+             valid_bits_writer->position() > output->values_read_upper_bound) ||
+            (lengths - orig_pos) > output->values_read_upper_bound)) {
+      std::stringstream ss;
+      ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound;
+      throw ParquetException(ss.str());
+    }
 
-void DefinitionLevelsToBitmapLittleEndian(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
-  if (max_repetition_level > 0) {
-// This is a short term hack to prevent using the pext BMI2 instructions
-// on non-intel platforms where performance is subpar.
-// In the medium term we will hopefully be able to runtime dispatch
-// to use this on intel only platforms that support pext.
-#if defined(ARROW_HAVE_AVX512)
-    // BMI2 is required for efficient bit extraction.
-    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/true>(
-        def_levels, num_def_levels, max_definition_level, values_read, null_count,
-        valid_bits, valid_bits_offset);
-#else
-    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
-                                   max_repetition_level, values_read, null_count,
-                                   valid_bits, valid_bits_offset);
-#endif  // ARROW_HAVE_BMI2
+    if (rep_levels[x] == level_info.rep_level) {
+      // A continuation of an existing list.
+      if (lengths != nullptr) {
+        if (ARROW_PREDICT_FALSE(*lengths == std::numeric_limits<LengthType>::max())) {
+          throw ParquetException("List index overflow.");
+        }
+        *lengths += 1;
+      }
+    } else {
+      // current_rep < list rep_level i.e. start of a list (ancenstor empty lists are

Review comment:
       yes, its called autocomplete.  Once I fat finger one place, I typically auto complete more then once.  

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -132,43 +137,49 @@ struct PARQUET_EXPORT LevelInfo {
   }
 };
 
-void PARQUET_EXPORT DefinitionLevelsToBitmap(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset);
-
-// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
-// They currently represent minimal functionality for vectorized computation of definition
-// levels.
-
-#if defined(ARROW_LITTLE_ENDIAN)
-/// Builds a bitmap by applying predicate to the level vector provided.
-///
-/// \param[in] levels Rep or def level array.
-/// \param[in] num_levels The number of levels to process (must be [0, 64])
-/// \param[in] predicate The predicate to apply (must have the signature `bool
-/// predicate(int16_t)`.
-/// \returns The bitmap using least significant "bit" ordering.
-///
-/// N.B. Correct byte ordering is dependent on little-endian architectures.
-///
-template <typename Predicate>
-uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
-  // Both clang and GCC can vectorize this automatically with SSE4/AVX2.
-  uint64_t mask = 0;
-  for (int x = 0; x < num_levels; x++) {
-    mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
-  }
-  return mask;
-}
-
-/// Builds a  bitmap where each set bit indicates the corresponding level is greater
-/// than rhs.
-static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
-                                         int16_t rhs) {
-  return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; });
-}
+/// Input/Output structure for reconstructed validity bitmaps.
+struct PARQUET_EXPORT ValidityBitmapInputOutput {
+  /// The maximum number of values_read expected (actual
+  /// values read must be less than or equal to this value.
+  /// If this number is exceeded methods will throw a
+  /// ParquetException.
+  int64_t values_read_upper_bound = 0;
+  /// The number of values added to the bitmap.
+  int64_t values_read = 0;
+  /// The number of nulls encountered.
+  int64_t null_count = 0;
+  // The validity bitmp to populate. Can only be null

Review comment:
       No the bitmap is output only.  clarified each parameter here.

##########
File path: cpp/src/parquet/level_conversion_inc.h
##########
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "parquet/level_conversion.h"
+
+#include <algorithm>
+#include <limits>
+#if defined(ARROW_HAVE_BMI2)
+#if defined(_MSC_VER)
+#include <immintrin.h>
+#else
+#include <x86intrin.h>
+#endif  // _MSC_VER
+#endif  // ARROW_HAVE_BMI2
+
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/level_comparison.h"
+

Review comment:
       done.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -132,43 +137,49 @@ struct PARQUET_EXPORT LevelInfo {
   }
 };
 
-void PARQUET_EXPORT DefinitionLevelsToBitmap(
-    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
-    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, int64_t valid_bits_offset);
-
-// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
-// They currently represent minimal functionality for vectorized computation of definition
-// levels.
-
-#if defined(ARROW_LITTLE_ENDIAN)
-/// Builds a bitmap by applying predicate to the level vector provided.
-///
-/// \param[in] levels Rep or def level array.
-/// \param[in] num_levels The number of levels to process (must be [0, 64])
-/// \param[in] predicate The predicate to apply (must have the signature `bool
-/// predicate(int16_t)`.
-/// \returns The bitmap using least significant "bit" ordering.
-///
-/// N.B. Correct byte ordering is dependent on little-endian architectures.
-///
-template <typename Predicate>
-uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
-  // Both clang and GCC can vectorize this automatically with SSE4/AVX2.
-  uint64_t mask = 0;
-  for (int x = 0; x < num_levels; x++) {
-    mask |= static_cast<uint64_t>(predicate(levels[x]) ? 1 : 0) << x;
-  }
-  return mask;
-}
-
-/// Builds a  bitmap where each set bit indicates the corresponding level is greater
-/// than rhs.
-static inline uint64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
-                                         int16_t rhs) {
-  return LevelsToBitmap(levels, num_levels, [rhs](int16_t value) { return value > rhs; });
-}
+/// Input/Output structure for reconstructed validity bitmaps.
+struct PARQUET_EXPORT ValidityBitmapInputOutput {
+  /// The maximum number of values_read expected (actual
+  /// values read must be less than or equal to this value.
+  /// If this number is exceeded methods will throw a
+  /// ParquetException.
+  int64_t values_read_upper_bound = 0;
+  /// The number of values added to the bitmap.
+  int64_t values_read = 0;
+  /// The number of nulls encountered.
+  int64_t null_count = 0;
+  // The validity bitmp to populate. Can only be null
+  // for DefRepLevelsToListInfo (if all that is needed is list lengths).
+  uint8_t* valid_bits = NULLPTR;
+  /// Input only, offset into valid_bits to start at.
+  int64_t valid_bits_offset = 0;
+};
 
+/// Converts def_levels to validity bitmaps for non-list arrays.
+void PARQUET_EXPORT DefinitionLevelsToBitmap(const int16_t* def_levels,

Review comment:
       I tried to consolidation on use shorting to def/rep level everyplace and removing convert.  Let me know if this clarifies things.  I also added more guidance on each method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org