You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/04/26 05:22:16 UTC

[GitHub] [arrow] emkornfield commented on a change in pull request #6985: ARROW-8413: [C++][Parquet] Refactor Generating validity bitmap for values column

emkornfield commented on a change in pull request #6985:
URL: https://github.com/apache/arrow/pull/6985#discussion_r414295562



##########
File path: cpp/cmake_modules/SetupCxxFlags.cmake
##########
@@ -40,12 +40,13 @@ if(ARROW_CPU_FLAG STREQUAL "x86")
     set(CXX_SUPPORTS_SSE4_2 TRUE)
   else()
     set(ARROW_SSE4_2_FLAG "-msse4.2")
-    set(ARROW_AVX2_FLAG "-mavx2")
+    set(ARROW_AVX2_FLAG "-march=core-avx2")
     # skylake-avx512 consists of AVX512F,AVX512BW,AVX512VL,AVX512CD,AVX512DQ
     set(ARROW_AVX512_FLAG "-march=skylake-avx512")
     check_cxx_compiler_flag(${ARROW_SSE4_2_FLAG} CXX_SUPPORTS_SSE4_2)
   endif()
   check_cxx_compiler_flag(${ARROW_AVX2_FLAG} CXX_SUPPORTS_AVX2)
+  check_cxx_compiler_flag(${ARROW_AVX2_FLAG} CXX_SUPPORTS_AVX2)

Review comment:
       should be removed now.

##########
File path: cpp/src/parquet/column_reader.cc
##########
@@ -50,6 +51,140 @@ using arrow::internal::checked_cast;
 
 namespace parquet {
 
+namespace {
+
+inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,

Review comment:
       I've moved all everything in the anonymous namespace to level_conversion.cc

##########
File path: cpp/src/parquet/column_reader.cc
##########
@@ -50,6 +51,140 @@ using arrow::internal::checked_cast;
 
 namespace parquet {
 
+namespace {
+
+inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
+                            const int16_t max_expected_level) {
+  int16_t min_level = std::numeric_limits<int16_t>::max();
+  int16_t max_level = std::numeric_limits<int16_t>::min();
+  for (int x = 0; x < num_levels; x++) {
+    min_level = std::min(levels[x], min_level);
+    max_level = std::max(levels[x], max_level);
+  }
+  if (ARROW_PREDICT_FALSE(num_levels > 0 && (min_level < 0 || max_level > max_level))) {
+    throw ParquetException("definition level exceeds maximum");
+  }
+}
+
+#if !defined(ARROW_HAVE_BMI2)
+
+inline void DefinitionLevelsToBitmapScalar(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  // We assume here that valid_bits is large enough to accommodate the
+  // additional definition levels and the ones that have already been written
+  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
+                                                    num_def_levels);
+
+  // TODO(itaiin): As an interim solution we are splitting the code path here
+  // between repeated+flat column reads, and non-repeated+nested reads.
+  // Those paths need to be merged in the future
+  for (int i = 0; i < num_def_levels; ++i) {
+    if (def_levels[i] == max_definition_level) {
+      valid_bits_writer.Set();
+    } else if (max_repetition_level > 0) {
+      // repetition+flat case
+      if (def_levels[i] == (max_definition_level - 1)) {
+        valid_bits_writer.Clear();
+        *null_count += 1;
+      } else {
+        continue;
+      }
+    } else {
+      // non-repeated+nested case
+      if (def_levels[i] < max_definition_level) {
+        valid_bits_writer.Clear();
+        *null_count += 1;
+      } else {
+        throw ParquetException("definition level exceeds maximum");
+      }
+    }
+
+    valid_bits_writer.Next();
+  }
+  valid_bits_writer.Finish();
+  *values_read = valid_bits_writer.position();
+}
+#endif
+
+template <bool has_repeated_parent>
+void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels,
+                                  const int16_t required_definition_level,
+                                  int64_t* values_read, int64_t* null_count,
+                                  uint8_t* valid_bits, int64_t valid_bits_offset) {
+  constexpr int64_t kBitMaskSize = 64;
+  int64_t set_count = 0;
+  *values_read = 0;
+  while (num_def_levels > 0) {
+    int64_t batch_size = std::min(num_def_levels, kBitMaskSize);
+    CheckLevelRange(def_levels, batch_size, required_definition_level);
+    uint64_t defined_bitmap = internal::GreaterThanBitmap(def_levels, batch_size,
+                                                          required_definition_level - 1);
+    if (has_repeated_parent) {
+      // This is currently a specialized code path assuming only (nested) lists

Review comment:
       done.

##########
File path: cpp/src/parquet/column_reader.cc
##########
@@ -50,6 +51,140 @@ using arrow::internal::checked_cast;
 
 namespace parquet {
 
+namespace {
+
+inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
+                            const int16_t max_expected_level) {
+  int16_t min_level = std::numeric_limits<int16_t>::max();
+  int16_t max_level = std::numeric_limits<int16_t>::min();
+  for (int x = 0; x < num_levels; x++) {
+    min_level = std::min(levels[x], min_level);
+    max_level = std::max(levels[x], max_level);
+  }
+  if (ARROW_PREDICT_FALSE(num_levels > 0 && (min_level < 0 || max_level > max_level))) {
+    throw ParquetException("definition level exceeds maximum");
+  }
+}
+
+#if !defined(ARROW_HAVE_BMI2)
+
+inline void DefinitionLevelsToBitmapScalar(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  // We assume here that valid_bits is large enough to accommodate the
+  // additional definition levels and the ones that have already been written
+  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
+                                                    num_def_levels);
+
+  // TODO(itaiin): As an interim solution we are splitting the code path here
+  // between repeated+flat column reads, and non-repeated+nested reads.
+  // Those paths need to be merged in the future
+  for (int i = 0; i < num_def_levels; ++i) {
+    if (def_levels[i] == max_definition_level) {
+      valid_bits_writer.Set();
+    } else if (max_repetition_level > 0) {
+      // repetition+flat case
+      if (def_levels[i] == (max_definition_level - 1)) {
+        valid_bits_writer.Clear();
+        *null_count += 1;
+      } else {
+        continue;
+      }
+    } else {
+      // non-repeated+nested case
+      if (def_levels[i] < max_definition_level) {
+        valid_bits_writer.Clear();
+        *null_count += 1;
+      } else {
+        throw ParquetException("definition level exceeds maximum");
+      }
+    }
+
+    valid_bits_writer.Next();
+  }
+  valid_bits_writer.Finish();
+  *values_read = valid_bits_writer.position();
+}
+#endif
+
+template <bool has_repeated_parent>
+void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels,
+                                  const int16_t required_definition_level,
+                                  int64_t* values_read, int64_t* null_count,
+                                  uint8_t* valid_bits, int64_t valid_bits_offset) {
+  constexpr int64_t kBitMaskSize = 64;
+  int64_t set_count = 0;
+  *values_read = 0;
+  while (num_def_levels > 0) {
+    int64_t batch_size = std::min(num_def_levels, kBitMaskSize);
+    CheckLevelRange(def_levels, batch_size, required_definition_level);
+    uint64_t defined_bitmap = internal::GreaterThanBitmap(def_levels, batch_size,
+                                                          required_definition_level - 1);
+    if (has_repeated_parent) {
+      // This is currently a specialized code path assuming only (nested) lists
+      // present through the leaf (i.e. no structs).
+      // Upper level code only calls this method
+      // when the leaf-values are nullable (otherwise no spacing is needed),
+      // Because only nested lists exists it is sufficient to know that the field
+      // was either null or included it (i.e. >= previous definition level -> > previous
+      // definition - 1). If there where structs mixed in, we need to know the def_level
+      // of the repeated parent so we can check for def_level > "def level of repeated
+      // parent".
+      uint64_t present_bitmap = internal::GreaterThanBitmap(
+          def_levels, batch_size, required_definition_level - 2);
+      *values_read += internal::AppendSelectedBitsToValidityBitmap(
+          /*new_bits=*/defined_bitmap,
+          /*selection_bitmap*/ present_bitmap, valid_bits, &valid_bits_offset,
+          &set_count);
+    } else {
+      internal::AppendToValidityBitmap(
+          /*new_bits=*/defined_bitmap,
+          /*new_bit_count=*/batch_size, valid_bits, &valid_bits_offset, &set_count);
+      *values_read += batch_size;
+    }
+    def_levels += batch_size;
+    num_def_levels -= batch_size;
+  }
+  *null_count += *values_read - set_count;
+}
+
+inline void DefinitionLevelsToBitmapDispatch(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  if (max_repetition_level > 0) {
+#if ARROW_LITTLE_ENDIAN
+
+#if defined(ARROW_HAVE_BMI2)
+    // BMI2 is required for efficient bit extraction.
+    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/true>(
+        def_levels, num_def_levels, max_definition_level, values_read, null_count,
+        valid_bits, valid_bits_offset);
+#else
+    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
+                                   max_repetition_level, values_read, null_count,
+                                   valid_bits, valid_bits_offset);
+#endif  // ARROW_HAVE_BMI2
+
+  } else {
+    // No Special intsturction are used for non-repeated case.

Review comment:
       done.

##########
File path: cpp/src/parquet/column_reader.cc
##########
@@ -50,6 +51,140 @@ using arrow::internal::checked_cast;
 
 namespace parquet {
 
+namespace {
+
+inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
+                            const int16_t max_expected_level) {
+  int16_t min_level = std::numeric_limits<int16_t>::max();
+  int16_t max_level = std::numeric_limits<int16_t>::min();
+  for (int x = 0; x < num_levels; x++) {
+    min_level = std::min(levels[x], min_level);
+    max_level = std::max(levels[x], max_level);
+  }
+  if (ARROW_PREDICT_FALSE(num_levels > 0 && (min_level < 0 || max_level > max_level))) {
+    throw ParquetException("definition level exceeds maximum");
+  }
+}
+
+#if !defined(ARROW_HAVE_BMI2)
+
+inline void DefinitionLevelsToBitmapScalar(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  // We assume here that valid_bits is large enough to accommodate the
+  // additional definition levels and the ones that have already been written
+  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
+                                                    num_def_levels);
+
+  // TODO(itaiin): As an interim solution we are splitting the code path here
+  // between repeated+flat column reads, and non-repeated+nested reads.
+  // Those paths need to be merged in the future
+  for (int i = 0; i < num_def_levels; ++i) {
+    if (def_levels[i] == max_definition_level) {
+      valid_bits_writer.Set();
+    } else if (max_repetition_level > 0) {
+      // repetition+flat case
+      if (def_levels[i] == (max_definition_level - 1)) {
+        valid_bits_writer.Clear();
+        *null_count += 1;
+      } else {
+        continue;
+      }
+    } else {
+      // non-repeated+nested case
+      if (def_levels[i] < max_definition_level) {
+        valid_bits_writer.Clear();
+        *null_count += 1;
+      } else {
+        throw ParquetException("definition level exceeds maximum");
+      }
+    }
+
+    valid_bits_writer.Next();
+  }
+  valid_bits_writer.Finish();
+  *values_read = valid_bits_writer.position();
+}
+#endif
+
+template <bool has_repeated_parent>
+void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels,
+                                  const int16_t required_definition_level,
+                                  int64_t* values_read, int64_t* null_count,
+                                  uint8_t* valid_bits, int64_t valid_bits_offset) {
+  constexpr int64_t kBitMaskSize = 64;
+  int64_t set_count = 0;
+  *values_read = 0;
+  while (num_def_levels > 0) {
+    int64_t batch_size = std::min(num_def_levels, kBitMaskSize);
+    CheckLevelRange(def_levels, batch_size, required_definition_level);
+    uint64_t defined_bitmap = internal::GreaterThanBitmap(def_levels, batch_size,
+                                                          required_definition_level - 1);
+    if (has_repeated_parent) {
+      // This is currently a specialized code path assuming only (nested) lists
+      // present through the leaf (i.e. no structs).
+      // Upper level code only calls this method
+      // when the leaf-values are nullable (otherwise no spacing is needed),
+      // Because only nested lists exists it is sufficient to know that the field
+      // was either null or included it (i.e. >= previous definition level -> > previous
+      // definition - 1). If there where structs mixed in, we need to know the def_level
+      // of the repeated parent so we can check for def_level > "def level of repeated
+      // parent".
+      uint64_t present_bitmap = internal::GreaterThanBitmap(
+          def_levels, batch_size, required_definition_level - 2);
+      *values_read += internal::AppendSelectedBitsToValidityBitmap(
+          /*new_bits=*/defined_bitmap,
+          /*selection_bitmap*/ present_bitmap, valid_bits, &valid_bits_offset,
+          &set_count);
+    } else {
+      internal::AppendToValidityBitmap(
+          /*new_bits=*/defined_bitmap,
+          /*new_bit_count=*/batch_size, valid_bits, &valid_bits_offset, &set_count);
+      *values_read += batch_size;
+    }
+    def_levels += batch_size;
+    num_def_levels -= batch_size;
+  }
+  *null_count += *values_read - set_count;
+}
+
+inline void DefinitionLevelsToBitmapDispatch(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  if (max_repetition_level > 0) {
+#if ARROW_LITTLE_ENDIAN
+
+#if defined(ARROW_HAVE_BMI2)
+    // BMI2 is required for efficient bit extraction.
+    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/true>(
+        def_levels, num_def_levels, max_definition_level, values_read, null_count,
+        valid_bits, valid_bits_offset);
+#else
+    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
+                                   max_repetition_level, values_read, null_count,
+                                   valid_bits, valid_bits_offset);
+#endif  // ARROW_HAVE_BMI2
+
+  } else {
+    // No Special intsturction are used for non-repeated case.
+    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/false>(
+        def_levels, num_def_levels, max_definition_level, values_read, null_count,
+        valid_bits, valid_bits_offset);
+  }
+}

Review comment:
       I think just 1.

##########
File path: cpp/src/parquet/column_reader.cc
##########
@@ -50,6 +51,140 @@ using arrow::internal::checked_cast;
 
 namespace parquet {
 
+namespace {
+
+inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,
+                            const int16_t max_expected_level) {
+  int16_t min_level = std::numeric_limits<int16_t>::max();
+  int16_t max_level = std::numeric_limits<int16_t>::min();
+  for (int x = 0; x < num_levels; x++) {
+    min_level = std::min(levels[x], min_level);
+    max_level = std::max(levels[x], max_level);
+  }
+  if (ARROW_PREDICT_FALSE(num_levels > 0 && (min_level < 0 || max_level > max_level))) {
+    throw ParquetException("definition level exceeds maximum");
+  }
+}
+
+#if !defined(ARROW_HAVE_BMI2)
+
+inline void DefinitionLevelsToBitmapScalar(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  // We assume here that valid_bits is large enough to accommodate the
+  // additional definition levels and the ones that have already been written
+  ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
+                                                    num_def_levels);
+
+  // TODO(itaiin): As an interim solution we are splitting the code path here
+  // between repeated+flat column reads, and non-repeated+nested reads.
+  // Those paths need to be merged in the future
+  for (int i = 0; i < num_def_levels; ++i) {
+    if (def_levels[i] == max_definition_level) {
+      valid_bits_writer.Set();
+    } else if (max_repetition_level > 0) {
+      // repetition+flat case
+      if (def_levels[i] == (max_definition_level - 1)) {
+        valid_bits_writer.Clear();
+        *null_count += 1;
+      } else {
+        continue;
+      }
+    } else {
+      // non-repeated+nested case
+      if (def_levels[i] < max_definition_level) {
+        valid_bits_writer.Clear();
+        *null_count += 1;
+      } else {
+        throw ParquetException("definition level exceeds maximum");
+      }
+    }
+
+    valid_bits_writer.Next();
+  }
+  valid_bits_writer.Finish();
+  *values_read = valid_bits_writer.position();
+}
+#endif
+
+template <bool has_repeated_parent>
+void DefinitionLevelsToBitmapSimd(const int16_t* def_levels, int64_t num_def_levels,
+                                  const int16_t required_definition_level,
+                                  int64_t* values_read, int64_t* null_count,
+                                  uint8_t* valid_bits, int64_t valid_bits_offset) {
+  constexpr int64_t kBitMaskSize = 64;
+  int64_t set_count = 0;
+  *values_read = 0;
+  while (num_def_levels > 0) {
+    int64_t batch_size = std::min(num_def_levels, kBitMaskSize);
+    CheckLevelRange(def_levels, batch_size, required_definition_level);
+    uint64_t defined_bitmap = internal::GreaterThanBitmap(def_levels, batch_size,
+                                                          required_definition_level - 1);
+    if (has_repeated_parent) {
+      // This is currently a specialized code path assuming only (nested) lists
+      // present through the leaf (i.e. no structs).
+      // Upper level code only calls this method
+      // when the leaf-values are nullable (otherwise no spacing is needed),
+      // Because only nested lists exists it is sufficient to know that the field
+      // was either null or included it (i.e. >= previous definition level -> > previous
+      // definition - 1). If there where structs mixed in, we need to know the def_level
+      // of the repeated parent so we can check for def_level > "def level of repeated
+      // parent".
+      uint64_t present_bitmap = internal::GreaterThanBitmap(
+          def_levels, batch_size, required_definition_level - 2);
+      *values_read += internal::AppendSelectedBitsToValidityBitmap(
+          /*new_bits=*/defined_bitmap,
+          /*selection_bitmap*/ present_bitmap, valid_bits, &valid_bits_offset,
+          &set_count);
+    } else {
+      internal::AppendToValidityBitmap(
+          /*new_bits=*/defined_bitmap,
+          /*new_bit_count=*/batch_size, valid_bits, &valid_bits_offset, &set_count);
+      *values_read += batch_size;
+    }
+    def_levels += batch_size;
+    num_def_levels -= batch_size;
+  }
+  *null_count += *values_read - set_count;
+}
+
+inline void DefinitionLevelsToBitmapDispatch(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  if (max_repetition_level > 0) {
+#if ARROW_LITTLE_ENDIAN
+
+#if defined(ARROW_HAVE_BMI2)
+    // BMI2 is required for efficient bit extraction.
+    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/true>(
+        def_levels, num_def_levels, max_definition_level, values_read, null_count,
+        valid_bits, valid_bits_offset);
+#else
+    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
+                                   max_repetition_level, values_read, null_count,
+                                   valid_bits, valid_bits_offset);
+#endif  // ARROW_HAVE_BMI2
+
+  } else {
+    // No Special intsturction are used for non-repeated case.
+    DefinitionLevelsToBitmapSimd</*has_repeated_parent=*/false>(
+        def_levels, num_def_levels, max_definition_level, values_read, null_count,
+        valid_bits, valid_bits_offset);
+  }
+}
+
+#else  // big-endian
+    // Optimized SIMD uses bit shifts that are unlikely to work on big endian platforms.
+    DefinitionLevelsToBitmapScalar(def_levels, num_def_levels, max_definition_level,
+                                   max_repitition_level, values_read, null_count,

Review comment:
       done.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/util/bit_util.h"
+
+#if defined(ARROW_HAVE_BMI2)
+#include "x86intrin.h"
+#endif
+
+namespace parquet {
+namespace internal {
+// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
+// They currently represent minimal functionality for vectorized computation of definition
+// levels.
+
+/// Builds a bitmap by applying predicate to the level vector provided.
+///
+/// \param[in] levels Rep or def level array.
+/// \param[in] num_levels The number of levels to process (must be [0, 64])
+/// \param[in] predicate The predicate to apply (must have the signature `bool
+/// predicate(int16_t)`.
+/// \returns The bitmap using least significant "bit" ordering.
+///
+/// N.B. Correct byte ordering is dependent on little-endian architectures.
+///
+template <typename Predicate>
+uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
+  // Both clang and GCC can vectorize this automatically with AVX2.
+  uint64_t mask = 0;
+  for (int x = 0; x < num_levels; x++) {
+    mask |= static_cast<int64_t>(predicate(levels[x]) ? 1 : 0) << x;
+  }
+  return mask;
+}
+
+/// Builds a  bitmap where each set bit indicates the correspond level is greater

Review comment:
       yes.  fixed.

##########
File path: cpp/src/parquet/column_reader.cc
##########
@@ -50,6 +51,140 @@ using arrow::internal::checked_cast;
 
 namespace parquet {
 
+namespace {
+
+inline void CheckLevelRange(const int16_t* levels, int64_t num_levels,

Review comment:
       Note that this prevents inlining of the Scalar version which was happening before, but I'm not sure that is a big deal.

##########
File path: cpp/src/parquet/level_conversion_test.cc
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/level_conversion.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include "arrow/util/bit_util.h"
+
+namespace parquet {
+namespace internal {
+
+using ::testing::ElementsAreArray;
+
+std::string ToString(const uint8_t* bitmap, int64_t bit_count) {

Review comment:
       done.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/util/bit_util.h"
+
+#if defined(ARROW_HAVE_BMI2)
+#include "x86intrin.h"
+#endif
+
+namespace parquet {
+namespace internal {
+// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
+// They currently represent minimal functionality for vectorized computation of definition
+// levels.
+
+/// Builds a bitmap by applying predicate to the level vector provided.
+///
+/// \param[in] levels Rep or def level array.
+/// \param[in] num_levels The number of levels to process (must be [0, 64])
+/// \param[in] predicate The predicate to apply (must have the signature `bool
+/// predicate(int16_t)`.
+/// \returns The bitmap using least significant "bit" ordering.
+///
+/// N.B. Correct byte ordering is dependent on little-endian architectures.
+///
+template <typename Predicate>
+uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
+  // Both clang and GCC can vectorize this automatically with AVX2.
+  uint64_t mask = 0;
+  for (int x = 0; x < num_levels; x++) {
+    mask |= static_cast<int64_t>(predicate(levels[x]) ? 1 : 0) << x;
+  }
+  return mask;
+}
+
+/// Builds a  bitmap where each set bit indicates the correspond level is greater
+/// than rhs.
+static inline int64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
+                                        int16_t rhs) {
+  return LevelsToBitmap(levels, num_levels, [&](int16_t value) { return value > rhs; });
+}
+
+/// Append bits number_of_bits from new_bits to valid_bits and valid_bits_offset.
+///
+/// \param[in] new_bits The zero-padded bitmap to append.
+/// \param[in] number_of_bits The number of bits to append from new_bits.
+/// \param[in] valid_bits_length The number of bytes allocated in valid_bits.
+/// \param[in] valid_bits_offset The bit-offset at which to start appending new bits.
+/// \param[in,out] valid_bits The validity bitmap to append to.
+/// \returns The new bit offset inside of valid_bits.
+static inline int64_t AppendBitmap(uint64_t new_bits, int64_t number_of_bits,
+                                   int64_t valid_bits_length, int64_t valid_bits_offset,
+                                   uint8_t* valid_bits) {
+  // Selection masks to retrieve all low order bits for each bytes.
+  constexpr uint64_t kLsbSelectionMasks[] = {
+      0,  // unused.
+      0x0101010101010101,
+      0x0303030303030303,
+      0x0707070707070707,
+      0x0F0F0F0F0F0F0F0F,
+      0x1F1F1F1F1F1F1F1F,
+      0x3F3F3F3F3F3F3F3F,
+      0x7F7F7F7F7F7F7F7F,
+  };
+  int64_t valid_byte_offset = valid_bits_offset / 8;
+  int64_t bit_offset = valid_bits_offset % 8;
+
+  int64_t new_offset = valid_bits_offset + number_of_bits;
+  union ByteAddressableBitmap {
+    explicit ByteAddressableBitmap(uint64_t mask) : mask(mask) {}
+    uint64_t mask;
+    uint8_t bytes[8];
+  };
+
+  if (bit_offset != 0) {
+    int64_t bits_to_carry = 8 - bit_offset;
+    // Get the mask the will select the lower order bits  (the ones to carry
+    // over to the existing byte and shift up.
+    const ByteAddressableBitmap carry_bits(kLsbSelectionMasks[bits_to_carry]);
+    // Mask to select non-carried bits.
+    const ByteAddressableBitmap inverse_selection(~carry_bits.mask);
+    // Fill out the last incomplete byte in the output, by extracting the least
+    // siginficant bits from the first byte.
+    const ByteAddressableBitmap new_bitmap(new_bits);
+    // valid bits should be a valid bitmask so all trailing bytes hsould be unset
+    // so no mask is need to start.
+    valid_bits[valid_byte_offset] =
+        valid_bits[valid_byte_offset] |  // See above the
+        (((new_bitmap.bytes[0] & carry_bits.bytes[0])) << bit_offset);
+
+    // We illustrate logic with a 3-byte example in little endian/LSB order.
+    // Note this ordering is the reversed from HEX masks above with are expressed
+    // big-endian/MSB and shifts right move the bits to the left (division).
+    // 0  1  2  3  4  5  6  7   8  9  10 11 12 13 14 15   16 17 18 19 20 21 22 23
+    // Shifted mask should look like this assuming bit offset = 6:
+    // 2  3  4  5  6  7  N  N   10 11 12 13 14 15  N  N   18 19 20 21 22 23  N  N
+    // clang-format on
+    uint64_t shifted_new_bits = (new_bits & inverse_selection.mask) >> bits_to_carry;
+    // captured_carry:
+    // 0  1  N  N  N  N  N  N   8  9  N  N  N   N  N  N   16 17  N  N  N  N  N  N
+    uint64_t captured_carry = carry_bits.mask & new_bits;
+    // mask_cary_bits:
+    // N  N  N  N  N  N  8  9   N  N  N  N  N   N 16 17    N  N   N  N  N  N  N  N
+    uint64_t mask_carry_bits = (captured_carry >> 8) << bit_offset;
+
+    new_bits = shifted_new_bits | mask_carry_bits;
+    // Don't overwrite the first byte
+    valid_byte_offset += 1;
+    number_of_bits -= bits_to_carry;
+  }
+
+  int64_t bytes_for_new_bits = ::arrow::BitUtil::BytesForBits(number_of_bits);
+  if (valid_bits_length - ::arrow::BitUtil::BytesForBits(valid_bits_offset) >=
+      static_cast<int64_t>(sizeof(new_bits))) {
+    // This should be the common case and  inlined as a single instruction which
+    // should be cheaper then the general case of calling mempcy, so it is likely
+    // worth the extra branch.
+    std::memcpy(valid_bits + valid_byte_offset, &new_bits, sizeof(new_bits));
+  } else {
+    std::memcpy(valid_bits + valid_byte_offset, &new_bits, bytes_for_new_bits);
+  }
+  return new_offset;
+}
+
+/// \brief Appends bit values to the validitdy bimap_valid bits, based on bitmaps
+/// generated by GreaterThanBitmap, and the appropriate treshold definition_leve.

Review comment:
       I agree now that the AppendWord is moved to FirstTimeBitmapWriter

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/util/bit_util.h"
+
+#if defined(ARROW_HAVE_BMI2)
+#include "x86intrin.h"
+#endif
+
+namespace parquet {
+namespace internal {
+// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
+// They currently represent minimal functionality for vectorized computation of definition
+// levels.
+
+/// Builds a bitmap by applying predicate to the level vector provided.
+///
+/// \param[in] levels Rep or def level array.
+/// \param[in] num_levels The number of levels to process (must be [0, 64])
+/// \param[in] predicate The predicate to apply (must have the signature `bool
+/// predicate(int16_t)`.
+/// \returns The bitmap using least significant "bit" ordering.
+///
+/// N.B. Correct byte ordering is dependent on little-endian architectures.
+///
+template <typename Predicate>
+uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
+  // Both clang and GCC can vectorize this automatically with AVX2.
+  uint64_t mask = 0;
+  for (int x = 0; x < num_levels; x++) {
+    mask |= static_cast<int64_t>(predicate(levels[x]) ? 1 : 0) << x;
+  }
+  return mask;
+}
+
+/// Builds a  bitmap where each set bit indicates the correspond level is greater
+/// than rhs.
+static inline int64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,

Review comment:
       oversight, fixed. wrapped this one and the one below if if/def guards for little endian.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/util/bit_util.h"
+
+#if defined(ARROW_HAVE_BMI2)
+#include "x86intrin.h"
+#endif
+
+namespace parquet {
+namespace internal {
+// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
+// They currently represent minimal functionality for vectorized computation of definition
+// levels.
+
+/// Builds a bitmap by applying predicate to the level vector provided.
+///
+/// \param[in] levels Rep or def level array.
+/// \param[in] num_levels The number of levels to process (must be [0, 64])
+/// \param[in] predicate The predicate to apply (must have the signature `bool
+/// predicate(int16_t)`.
+/// \returns The bitmap using least significant "bit" ordering.
+///
+/// N.B. Correct byte ordering is dependent on little-endian architectures.
+///
+template <typename Predicate>
+uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
+  // Both clang and GCC can vectorize this automatically with AVX2.
+  uint64_t mask = 0;
+  for (int x = 0; x < num_levels; x++) {
+    mask |= static_cast<int64_t>(predicate(levels[x]) ? 1 : 0) << x;
+  }
+  return mask;
+}
+
+/// Builds a  bitmap where each set bit indicates the correspond level is greater
+/// than rhs.
+static inline int64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
+                                        int16_t rhs) {
+  return LevelsToBitmap(levels, num_levels, [&](int16_t value) { return value > rhs; });
+}
+
+/// Append bits number_of_bits from new_bits to valid_bits and valid_bits_offset.
+///
+/// \param[in] new_bits The zero-padded bitmap to append.
+/// \param[in] number_of_bits The number of bits to append from new_bits.
+/// \param[in] valid_bits_length The number of bytes allocated in valid_bits.
+/// \param[in] valid_bits_offset The bit-offset at which to start appending new bits.
+/// \param[in,out] valid_bits The validity bitmap to append to.
+/// \returns The new bit offset inside of valid_bits.
+static inline int64_t AppendBitmap(uint64_t new_bits, int64_t number_of_bits,
+                                   int64_t valid_bits_length, int64_t valid_bits_offset,
+                                   uint8_t* valid_bits) {
+  // Selection masks to retrieve all low order bits for each bytes.
+  constexpr uint64_t kLsbSelectionMasks[] = {
+      0,  // unused.
+      0x0101010101010101,
+      0x0303030303030303,
+      0x0707070707070707,
+      0x0F0F0F0F0F0F0F0F,
+      0x1F1F1F1F1F1F1F1F,
+      0x3F3F3F3F3F3F3F3F,
+      0x7F7F7F7F7F7F7F7F,
+  };
+  int64_t valid_byte_offset = valid_bits_offset / 8;
+  int64_t bit_offset = valid_bits_offset % 8;
+
+  int64_t new_offset = valid_bits_offset + number_of_bits;
+  union ByteAddressableBitmap {

Review comment:
       I originally thought I would have to do more.  I removed the union.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/util/bit_util.h"
+
+#if defined(ARROW_HAVE_BMI2)
+#include "x86intrin.h"
+#endif
+
+namespace parquet {
+namespace internal {
+// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
+// They currently represent minimal functionality for vectorized computation of definition
+// levels.
+
+/// Builds a bitmap by applying predicate to the level vector provided.
+///
+/// \param[in] levels Rep or def level array.
+/// \param[in] num_levels The number of levels to process (must be [0, 64])
+/// \param[in] predicate The predicate to apply (must have the signature `bool
+/// predicate(int16_t)`.
+/// \returns The bitmap using least significant "bit" ordering.
+///
+/// N.B. Correct byte ordering is dependent on little-endian architectures.
+///
+template <typename Predicate>
+uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
+  // Both clang and GCC can vectorize this automatically with AVX2.
+  uint64_t mask = 0;
+  for (int x = 0; x < num_levels; x++) {
+    mask |= static_cast<int64_t>(predicate(levels[x]) ? 1 : 0) << x;
+  }
+  return mask;
+}
+
+/// Builds a  bitmap where each set bit indicates the correspond level is greater
+/// than rhs.
+static inline int64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
+                                        int16_t rhs) {
+  return LevelsToBitmap(levels, num_levels, [&](int16_t value) { return value > rhs; });
+}
+
+/// Append bits number_of_bits from new_bits to valid_bits and valid_bits_offset.
+///
+/// \param[in] new_bits The zero-padded bitmap to append.
+/// \param[in] number_of_bits The number of bits to append from new_bits.
+/// \param[in] valid_bits_length The number of bytes allocated in valid_bits.
+/// \param[in] valid_bits_offset The bit-offset at which to start appending new bits.
+/// \param[in,out] valid_bits The validity bitmap to append to.
+/// \returns The new bit offset inside of valid_bits.
+static inline int64_t AppendBitmap(uint64_t new_bits, int64_t number_of_bits,
+                                   int64_t valid_bits_length, int64_t valid_bits_offset,
+                                   uint8_t* valid_bits) {
+  // Selection masks to retrieve all low order bits for each bytes.
+  constexpr uint64_t kLsbSelectionMasks[] = {
+      0,  // unused.
+      0x0101010101010101,
+      0x0303030303030303,
+      0x0707070707070707,
+      0x0F0F0F0F0F0F0F0F,
+      0x1F1F1F1F1F1F1F1F,
+      0x3F3F3F3F3F3F3F3F,
+      0x7F7F7F7F7F7F7F7F,
+  };
+  int64_t valid_byte_offset = valid_bits_offset / 8;
+  int64_t bit_offset = valid_bits_offset % 8;
+
+  int64_t new_offset = valid_bits_offset + number_of_bits;
+  union ByteAddressableBitmap {
+    explicit ByteAddressableBitmap(uint64_t mask) : mask(mask) {}
+    uint64_t mask;
+    uint8_t bytes[8];
+  };
+
+  if (bit_offset != 0) {
+    int64_t bits_to_carry = 8 - bit_offset;
+    // Get the mask the will select the lower order bits  (the ones to carry
+    // over to the existing byte and shift up.
+    const ByteAddressableBitmap carry_bits(kLsbSelectionMasks[bits_to_carry]);
+    // Mask to select non-carried bits.
+    const ByteAddressableBitmap inverse_selection(~carry_bits.mask);
+    // Fill out the last incomplete byte in the output, by extracting the least
+    // siginficant bits from the first byte.
+    const ByteAddressableBitmap new_bitmap(new_bits);
+    // valid bits should be a valid bitmask so all trailing bytes hsould be unset
+    // so no mask is need to start.
+    valid_bits[valid_byte_offset] =
+        valid_bits[valid_byte_offset] |  // See above the
+        (((new_bitmap.bytes[0] & carry_bits.bytes[0])) << bit_offset);
+
+    // We illustrate logic with a 3-byte example in little endian/LSB order.
+    // Note this ordering is the reversed from HEX masks above with are expressed
+    // big-endian/MSB and shifts right move the bits to the left (division).
+    // 0  1  2  3  4  5  6  7   8  9  10 11 12 13 14 15   16 17 18 19 20 21 22 23
+    // Shifted mask should look like this assuming bit offset = 6:
+    // 2  3  4  5  6  7  N  N   10 11 12 13 14 15  N  N   18 19 20 21 22 23  N  N

Review comment:
       Add an additional line showing the mask.  Also clarified that N indicates not-set (not that they disappear, I'm not sure if this is what you meant?)  Or are you illustrating another error with the documentation?

##########
File path: cpp/src/parquet/level_conversion_test.cc
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/level_conversion.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include "arrow/util/bit_util.h"
+
+namespace parquet {
+namespace internal {
+
+using ::testing::ElementsAreArray;
+
+std::string ToString(const uint8_t* bitmap, int64_t bit_count) {
+  return arrow::internal::Bitmap(bitmap, /*offset*/ 0, /*length=*/bit_count).ToString();
+}
+
+std::string ToString(const std::vector<uint8_t>& bitmap, int64_t bit_count) {
+  return ToString(bitmap.data(), bit_count);
+}
+
+TEST(TestGreaterThanBitmap, GeneratesExpectedBitmasks) {
+  std::vector<int16_t> levels = {0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+                                 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+                                 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
+                                 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7};
+  EXPECT_EQ(GreaterThanBitmap(levels.data(), /*num_levels=*/0, /*rhs*/ 0), 0);
+  EXPECT_EQ(GreaterThanBitmap(levels.data(), /*num_levels=*/64, /*rhs*/ 8), 0);
+  EXPECT_EQ(GreaterThanBitmap(levels.data(), /*num_levels=*/64, /*rhs*/ -1),
+            0xFFFFFFFFFFFFFFFF);
+  // Should be zero padded.
+  EXPECT_EQ(GreaterThanBitmap(levels.data(), /*num_levels=*/47, /*rhs*/ -1),
+            0x7FFFFFFFFFFF);
+  EXPECT_EQ(GreaterThanBitmap(levels.data(), /*num_levels=*/64, /*rhs*/ 6),
+            0x8080808080808080);
+}
+
+TEST(TestAppendBitmap, TestOffsetOverwritesCorrectBitsOnExistingByte) {
+  auto check_append = [](const std::string& expected_bits, int64_t offset) {
+    std::vector<uint8_t> valid_bits = {0x00};
+    constexpr int64_t kBitsAfterAppend = 8;
+    ASSERT_EQ(
+        AppendBitmap(/*new_bits=*/0xFF, /*number_of_bits*/ 8 - offset,
+                     /*valid_bits_length=*/valid_bits.size(), offset, valid_bits.data()),
+        kBitsAfterAppend);
+    EXPECT_EQ(ToString(valid_bits, kBitsAfterAppend), expected_bits);
+  };
+  check_append("11111111", 0);
+  check_append("01111111", 1);
+  check_append("00111111", 2);
+  check_append("00011111", 3);
+  check_append("00001111", 4);
+  check_append("00000111", 5);
+  check_append("00000011", 6);
+  check_append("00000001", 7);
+}
+
+TEST(TestAppendBitmap, TestOffsetShiftBitsCorrectly) {
+  constexpr uint64_t kPattern = 0x9A9A9A9A9A9A9A9A;
+  auto check_append = [&](const std::string& leading_bits, const std::string& middle_bits,
+                          const std::string& trailing_bits, int64_t offset) {
+    ASSERT_GE(offset, 8);
+    std::vector<uint8_t> valid_bits(/*count=*/10, 0);
+    valid_bits[0] = 0x99;
+
+    AppendBitmap(/*new_bits=*/kPattern, /*number_of_bits*/ 64,
+                 /*valid_bits_length=*/valid_bits.size(), offset, valid_bits.data());
+    EXPECT_EQ(valid_bits[0], 0x99);  // shouldn't get chanked.
+    EXPECT_EQ(ToString(valid_bits.data() + 1, /*num_bits=*/8), leading_bits);
+    for (int x = 2; x < 9; x++) {
+      EXPECT_EQ(ToString(valid_bits.data() + x, /*num_bits=*/8), middle_bits);
+    }
+    EXPECT_EQ(ToString(valid_bits.data() + 9, /*num_bits=*/8), trailing_bits);
+  };
+  // Original Pattern = "01011001"
+  check_append(/*leading_bits= */ "01011001", /*middle_bits=*/"01011001",
+               /*trailing_bits=*/"00000000", /*offset=*/8);
+  check_append("00101100", "10101100", "10000000", 9);
+  check_append("00010110", "01010110", "01000000", 10);
+  check_append("00001011", "00101011", "00100000", 11);
+  check_append("00000101", "10010101", "10010000", 12);
+  check_append("00000010", "11001010", "11001000", 13);
+  check_append("00000001", "01100101", "01100100", 14);
+  check_append("00000000", "10110010", "10110010", 15);
+}
+
+TEST(TestAppendBitmap, AllBytesAreWrittenWithEnoughSpace) {
+  std::vector<uint8_t> valid_bits(/*count=*/9, 0);
+
+  uint64_t bitmap = 0xFFFFFFFFFFFFFFFF;
+  AppendBitmap(bitmap, /*number_of_bits*/ 7,
+               /*valid_bits_length=*/valid_bits.size(),
+               /*valid_bits_offset=*/1,
+               /*valid_bits=*/valid_bits.data());
+  EXPECT_THAT(valid_bits, ElementsAreArray(std::vector<uint8_t>{
+                              0xFE, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01}));
+}
+
+TEST(TestAppendBitmap, OnlyApproriateBytesWrittenWhenLessThen8BytesAvailable) {
+  std::vector<uint8_t> valid_bits = {0x00, 0x00};
+
+  uint64_t bitmap = 0x1FF;
+  AppendBitmap(bitmap, /*number_of_bits*/ 7,
+               /*valid_bits_length=*/2,
+               /*valid_bits_offset=*/1,
+               /*valid_bits=*/valid_bits.data());
+
+  EXPECT_THAT(valid_bits, ElementsAreArray(std::vector<uint8_t>{0xFE, 0x00}));
+
+  AppendBitmap(bitmap, /*number_of_bits*/ 9,
+               /*valid_bits_length=*/2,
+               /*valid_bits_offset=*/1,
+               /*valid_bits=*/valid_bits.data());
+  EXPECT_THAT(valid_bits, ElementsAreArray(std::vector<uint8_t>{0xFE, 0x03}));
+}
+
+TEST(TestAppendToValidityBitmap, BasicOperation) {
+  std::vector<uint8_t> validity_bitmap(/*count*/ 8, 0);
+  int64_t valid_bitmap_offset = 1;
+  int64_t set_bit_count = 5;
+  AppendToValidityBitmap(/*new_bits*/ 0x99, /*new_bit_count=*/31, validity_bitmap.data(),
+                         &valid_bitmap_offset, &set_bit_count);
+  EXPECT_EQ(ToString(validity_bitmap, valid_bitmap_offset),
+            "01001100 10000000 00000000 00000000");
+  EXPECT_EQ(set_bit_count, /*5 + 4 set bits=*/9);
+}
+
+TEST(TestAppendSelectedBitsToValidityBitmap, BasicOperation) {
+#if !defined(ARROW_HAVE_BMI2)

Review comment:
       good point. done.

##########
File path: cpp/src/parquet/level_conversion_test.cc
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/level_conversion.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include "arrow/util/bit_util.h"
+
+namespace parquet {
+namespace internal {
+
+using ::testing::ElementsAreArray;
+
+std::string ToString(const uint8_t* bitmap, int64_t bit_count) {
+  return arrow::internal::Bitmap(bitmap, /*offset*/ 0, /*length=*/bit_count).ToString();
+}
+
+std::string ToString(const std::vector<uint8_t>& bitmap, int64_t bit_count) {

Review comment:
       done.

##########
File path: cpp/src/parquet/level_conversion.h
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/util/bit_util.h"
+
+#if defined(ARROW_HAVE_BMI2)
+#include "x86intrin.h"
+#endif
+
+namespace parquet {
+namespace internal {
+// These APIs are likely to be revised as part of ARROW-8494 to reduce duplicate code.
+// They currently represent minimal functionality for vectorized computation of definition
+// levels.
+
+/// Builds a bitmap by applying predicate to the level vector provided.
+///
+/// \param[in] levels Rep or def level array.
+/// \param[in] num_levels The number of levels to process (must be [0, 64])
+/// \param[in] predicate The predicate to apply (must have the signature `bool
+/// predicate(int16_t)`.
+/// \returns The bitmap using least significant "bit" ordering.
+///
+/// N.B. Correct byte ordering is dependent on little-endian architectures.
+///
+template <typename Predicate>
+uint64_t LevelsToBitmap(const int16_t* levels, int64_t num_levels, Predicate predicate) {
+  // Both clang and GCC can vectorize this automatically with AVX2.
+  uint64_t mask = 0;
+  for (int x = 0; x < num_levels; x++) {
+    mask |= static_cast<int64_t>(predicate(levels[x]) ? 1 : 0) << x;
+  }
+  return mask;
+}
+
+/// Builds a  bitmap where each set bit indicates the correspond level is greater
+/// than rhs.
+static inline int64_t GreaterThanBitmap(const int16_t* levels, int64_t num_levels,
+                                        int16_t rhs) {
+  return LevelsToBitmap(levels, num_levels, [&](int16_t value) { return value > rhs; });
+}
+
+/// Append bits number_of_bits from new_bits to valid_bits and valid_bits_offset.
+///
+/// \param[in] new_bits The zero-padded bitmap to append.
+/// \param[in] number_of_bits The number of bits to append from new_bits.
+/// \param[in] valid_bits_length The number of bytes allocated in valid_bits.
+/// \param[in] valid_bits_offset The bit-offset at which to start appending new bits.
+/// \param[in,out] valid_bits The validity bitmap to append to.
+/// \returns The new bit offset inside of valid_bits.
+static inline int64_t AppendBitmap(uint64_t new_bits, int64_t number_of_bits,
+                                   int64_t valid_bits_length, int64_t valid_bits_offset,
+                                   uint8_t* valid_bits) {
+  // Selection masks to retrieve all low order bits for each bytes.
+  constexpr uint64_t kLsbSelectionMasks[] = {
+      0,  // unused.
+      0x0101010101010101,
+      0x0303030303030303,
+      0x0707070707070707,
+      0x0F0F0F0F0F0F0F0F,
+      0x1F1F1F1F1F1F1F1F,
+      0x3F3F3F3F3F3F3F3F,
+      0x7F7F7F7F7F7F7F7F,
+  };
+  int64_t valid_byte_offset = valid_bits_offset / 8;
+  int64_t bit_offset = valid_bits_offset % 8;
+
+  int64_t new_offset = valid_bits_offset + number_of_bits;
+  union ByteAddressableBitmap {
+    explicit ByteAddressableBitmap(uint64_t mask) : mask(mask) {}
+    uint64_t mask;
+    uint8_t bytes[8];
+  };
+
+  if (bit_offset != 0) {
+    int64_t bits_to_carry = 8 - bit_offset;
+    // Get the mask the will select the lower order bits  (the ones to carry
+    // over to the existing byte and shift up.
+    const ByteAddressableBitmap carry_bits(kLsbSelectionMasks[bits_to_carry]);
+    // Mask to select non-carried bits.
+    const ByteAddressableBitmap inverse_selection(~carry_bits.mask);
+    // Fill out the last incomplete byte in the output, by extracting the least
+    // siginficant bits from the first byte.
+    const ByteAddressableBitmap new_bitmap(new_bits);
+    // valid bits should be a valid bitmask so all trailing bytes hsould be unset
+    // so no mask is need to start.
+    valid_bits[valid_byte_offset] =
+        valid_bits[valid_byte_offset] |  // See above the
+        (((new_bitmap.bytes[0] & carry_bits.bytes[0])) << bit_offset);
+
+    // We illustrate logic with a 3-byte example in little endian/LSB order.
+    // Note this ordering is the reversed from HEX masks above with are expressed
+    // big-endian/MSB and shifts right move the bits to the left (division).
+    // 0  1  2  3  4  5  6  7   8  9  10 11 12 13 14 15   16 17 18 19 20 21 22 23
+    // Shifted mask should look like this assuming bit offset = 6:
+    // 2  3  4  5  6  7  N  N   10 11 12 13 14 15  N  N   18 19 20 21 22 23  N  N
+    // clang-format on
+    uint64_t shifted_new_bits = (new_bits & inverse_selection.mask) >> bits_to_carry;
+    // captured_carry:
+    // 0  1  N  N  N  N  N  N   8  9  N  N  N   N  N  N   16 17  N  N  N  N  N  N
+    uint64_t captured_carry = carry_bits.mask & new_bits;
+    // mask_cary_bits:
+    // N  N  N  N  N  N  8  9   N  N  N  N  N   N 16 17    N  N   N  N  N  N  N  N
+    uint64_t mask_carry_bits = (captured_carry >> 8) << bit_offset;
+
+    new_bits = shifted_new_bits | mask_carry_bits;
+    // Don't overwrite the first byte
+    valid_byte_offset += 1;
+    number_of_bits -= bits_to_carry;
+  }
+
+  int64_t bytes_for_new_bits = ::arrow::BitUtil::BytesForBits(number_of_bits);
+  if (valid_bits_length - ::arrow::BitUtil::BytesForBits(valid_bits_offset) >=
+      static_cast<int64_t>(sizeof(new_bits))) {
+    // This should be the common case and  inlined as a single instruction which
+    // should be cheaper then the general case of calling mempcy, so it is likely
+    // worth the extra branch.
+    std::memcpy(valid_bits + valid_byte_offset, &new_bits, sizeof(new_bits));
+  } else {
+    std::memcpy(valid_bits + valid_byte_offset, &new_bits, bytes_for_new_bits);
+  }
+  return new_offset;
+}
+
+/// \brief Appends bit values to the validitdy bimap_valid bits, based on bitmaps
+/// generated by GreaterThanBitmap, and the appropriate treshold definition_leve.
+///
+/// \param[in] new_bits Bitmap to append (intrepreted as Little-endian/LSB).
+/// \param[in] new_bit_count The number of bits to append  from new_bits.
+/// \param[in,out] validity_bitmap The validity bitmap to update.
+/// \param[in,out] validity_bitmap_offset The offset to start appending bits to in
+/// valid_bits (updated to latest bitmap).
+/// \param[in,out] set_bit_count The number of set bits appended is added to
+/// set_bit_count.
+void AppendToValidityBitmap(uint64_t new_bits, int64_t new_bit_count,
+                            uint8_t* validity_bitmap, int64_t* validity_bitmap_offset,
+                            int64_t* set_bit_count) {
+  int64_t min_valid_bits_size =
+      ::arrow::BitUtil::BytesForBits(new_bit_count + *validity_bitmap_offset);
+
+  *set_bit_count += ::arrow::BitUtil::PopCount(new_bits);
+  *validity_bitmap_offset = AppendBitmap(new_bits, new_bit_count, min_valid_bits_size,
+                                         *validity_bitmap_offset, validity_bitmap);
+}
+
+/// The same as AppendToValidityBitmap but only appends bits from bitmap that have
+/// a corresponding bit set in selection_bitmap.
+///
+/// \returns The number of bits appended.
+///
+/// N.B. This is only implemented for archiectures that suppor the BMI2 instruction
+/// set.
+int64_t AppendSelectedBitsToValidityBitmap(uint64_t new_bits, uint64_t selection_bitmap,
+                                           uint8_t* validity_bitmap,
+                                           int64_t* validity_bitmap_offset,
+                                           int64_t* set_bit_count) {
+#if defined(ARROW_HAVE_BMI2)
+  // If the parent list was empty at for the given slot it should not be added to the
+  // bitmap.
+  uint64_t selected_bits = _pext_u64(new_bits, selection_bitmap);
+  int64_t selected_count =
+      static_cast<int64_t>(::arrow::BitUtil::PopCount(selection_bitmap));
+
+  AppendToValidityBitmap(selected_bits, selected_count, validity_bitmap,
+                         validity_bitmap_offset, set_bit_count);
+  return selected_count;
+#else
+  // We shouldn't get here.

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org