You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2018/08/17 06:25:57 UTC

[parquet-cpp] branch master updated: PARQUET-1308: [C++] Use Arrow thread pool, not Arrow ParallelFor, fix deprecated APIs, upgrade clang-format version. Fix record delimiting bug

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 337c8eb  PARQUET-1308: [C++] Use Arrow thread pool, not Arrow ParallelFor, fix deprecated APIs, upgrade clang-format version. Fix record delimiting bug
337c8eb is described below

commit 337c8eb0fb73f7f44e60aeeca83607c7b1e78b4c
Author: Wes McKinney <we...@apache.org>
AuthorDate: Fri Aug 17 02:25:50 2018 -0400

    PARQUET-1308: [C++] Use Arrow thread pool, not Arrow ParallelFor, fix deprecated APIs, upgrade clang-format version. Fix record delimiting bug
    
    Author: Wes McKinney <we...@apache.org>
    Author: Antoine Pitrou <an...@python.org>
    Author: Wes McKinney <we...@twosigma.com>
    
    Closes #467 from pitrou/PARQUET-1308-arrow-thread-pool and squashes the following commits:
    
    cab1aa8 [Wes McKinney] Fix deprecation warning
    b23bc46 [Wes McKinney] Fix end of row group logic
    8fbff2e [Wes McKinney] Fix record delimiting bug
    c2bc1c8 [Wes McKinney] Print offending value
    3e72282 [Wes McKinney] Find minimal repro that fails on Linux also
    1f4f714 [Wes McKinney] Add temporary testing code
    a2eea67 [Wes McKinney] Fix usages of deprecated APIs, upgrade to clang-format-6.0
    45779a9 [Wes McKinney] Update Arrow external project version to master
    2c06321 [Antoine Pitrou] Fix API name
    5707f60 [Antoine Pitrou] Lint
    e90a902 [Antoine Pitrou] PARQUET-1308: [C++] Use Arrow thread pool, not Arrow ParallelFor
---
 benchmarks/decode_benchmark.cc                     |  4 +-
 cmake_modules/ArrowExternalProject.cmake           |  2 +-
 cmake_modules/FindClangTools.cmake                 |  4 +-
 cmake_modules/SetupCxxFlags.cmake                  |  1 +
 src/parquet/arrow/arrow-reader-writer-benchmark.cc |  8 +--
 src/parquet/arrow/arrow-reader-writer-test.cc      | 61 ++++++++++----------
 src/parquet/arrow/reader.cc                        | 63 ++++++++++++++-------
 src/parquet/arrow/reader.h                         |  5 ++
 src/parquet/arrow/record_reader.cc                 | 40 ++++++++-----
 src/parquet/arrow/test-util.h                      | 65 ++++++++++++++++++----
 src/parquet/types.h                                |  6 +-
 11 files changed, 174 insertions(+), 85 deletions(-)

diff --git a/benchmarks/decode_benchmark.cc b/benchmarks/decode_benchmark.cc
index 8f2dfa0..3ae32b4 100644
--- a/benchmarks/decode_benchmark.cc
+++ b/benchmarks/decode_benchmark.cc
@@ -42,8 +42,8 @@ class DeltaBitPackEncoder {
 
   uint8_t* Encode(int* encoded_len) {
     uint8_t* result = new uint8_t[10 * 1024 * 1024];
-    int num_mini_blocks = static_cast<int>(arrow::BitUtil::Ceil(num_values() - 1,
-                                                                mini_block_size_));
+    int num_mini_blocks = static_cast<int>(arrow::BitUtil::CeilDiv(num_values() - 1,
+                                                                   mini_block_size_));
     uint8_t* mini_block_widths = NULL;
 
     arrow::BitWriter writer(result, 10 * 1024 * 1024);
diff --git a/cmake_modules/ArrowExternalProject.cmake b/cmake_modules/ArrowExternalProject.cmake
index 4f23661..3d1a276 100644
--- a/cmake_modules/ArrowExternalProject.cmake
+++ b/cmake_modules/ArrowExternalProject.cmake
@@ -46,7 +46,7 @@ if (MSVC AND PARQUET_USE_STATIC_CRT)
 endif()
 
 if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
-  set(ARROW_VERSION "501d60e918bd4d10c429ab34e0b8e8a87dffb732")
+  set(ARROW_VERSION "3edfd7caf2746eeba37d5ac7bfd3665cc159e7ad")
 else()
   set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
 endif()
diff --git a/cmake_modules/FindClangTools.cmake b/cmake_modules/FindClangTools.cmake
index 215a5cd..56e2dd7 100644
--- a/cmake_modules/FindClangTools.cmake
+++ b/cmake_modules/FindClangTools.cmake
@@ -96,7 +96,9 @@ if (CLANG_FORMAT_VERSION)
     endif()
 else()
     find_program(CLANG_FORMAT_BIN
-      NAMES clang-format-4.0
+      NAMES clang-format-6.0
+      clang-format-5.0
+      clang-format-4.0
       clang-format-3.9
       clang-format-3.8
       clang-format-3.7
diff --git a/cmake_modules/SetupCxxFlags.cmake b/cmake_modules/SetupCxxFlags.cmake
index 01ed85b..5ca3f4e 100644
--- a/cmake_modules/SetupCxxFlags.cmake
+++ b/cmake_modules/SetupCxxFlags.cmake
@@ -84,6 +84,7 @@ if ("${UPPERCASE_BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
 -Wno-shadow -Wno-switch-enum -Wno-exit-time-destructors \
 -Wno-global-constructors -Wno-weak-template-vtables -Wno-undefined-reinterpret-cast \
 -Wno-implicit-fallthrough -Wno-unreachable-code-return \
+-Wno-documentation-deprecated-sync \
 -Wno-float-equal -Wno-missing-prototypes \
 -Wno-old-style-cast -Wno-covered-switch-default \
 -Wno-format-nonliteral -Wno-missing-noreturn \
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index 15d2cf7..51eb0c2 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -104,9 +104,9 @@ std::shared_ptr<::arrow::Table> TableFromVector(
     std::vector<uint8_t> valid_bytes(BENCHMARK_SIZE, 0);
     int n = {0};
     std::generate(valid_bytes.begin(), valid_bytes.end(), [&n] { return n++ % 2; });
-    EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
+    EXIT_NOT_OK(builder.AppendValues(vec.data(), vec.size(), valid_bytes.data()));
   } else {
-    EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
+    EXIT_NOT_OK(builder.AppendValues(vec.data(), vec.size(), nullptr));
   }
   std::shared_ptr<::arrow::Array> array;
   EXIT_NOT_OK(builder.Finish(&array));
@@ -126,9 +126,9 @@ std::shared_ptr<::arrow::Table> TableFromVector<BooleanType>(const std::vector<b
     int n = {0};
     std::generate(valid_bytes.begin(), valid_bytes.end(),
                   [&n] { return (n++ % 2) != 0; });
-    EXIT_NOT_OK(builder.Append(vec, valid_bytes));
+    EXIT_NOT_OK(builder.AppendValues(vec, valid_bytes));
   } else {
-    EXIT_NOT_OK(builder.Append(vec));
+    EXIT_NOT_OK(builder.AppendValues(vec));
   }
   std::shared_ptr<::arrow::Array> array;
   EXIT_NOT_OK(builder.Finish(&array));
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index d4f5b00..be3e611 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -320,8 +320,7 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
 template <typename T>
 using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
 
-void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads,
-                        int64_t row_group_size,
+void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t row_group_size,
                         const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
                         std::shared_ptr<Buffer>* out) {
   auto sink = std::make_shared<InMemoryOutputStream>();
@@ -399,21 +398,21 @@ void AssertTablesEqual(const Table& expected, const Table& actual,
   }
 }
 
-void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
+void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
                        int64_t row_group_size, const std::vector<int>& column_subset,
                        std::shared_ptr<Table>* out,
                        const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
                            default_arrow_writer_properties()) {
   std::shared_ptr<Buffer> buffer;
   ASSERT_NO_FATAL_FAILURE(
-      WriteTableToBuffer(table, num_threads, row_group_size, arrow_properties, &buffer));
+      WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer));
 
   std::unique_ptr<FileReader> reader;
   ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
                               ::arrow::default_memory_pool(),
                               ::parquet::default_reader_properties(), nullptr, &reader));
 
-  reader->set_num_threads(num_threads);
+  reader->set_use_threads(use_threads);
 
   if (column_subset.size() > 0) {
     ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
@@ -427,7 +426,8 @@ void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group
                           const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
                               default_arrow_writer_properties()) {
   std::shared_ptr<Table> result;
-  DoSimpleRoundtrip(table, 1, row_group_size, {}, &result, arrow_properties);
+  DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result,
+                    arrow_properties);
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result, false));
 }
 
@@ -1270,13 +1270,14 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
   // Use deprecated INT96 type
   std::shared_ptr<Table> result;
   ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
-      table, 1, table->num_rows(), {}, &result,
+      table, false /* use_threads */, table->num_rows(), {}, &result,
       ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));
 
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result));
 
   // Cast nanaoseconds to microseconds and use INT64 physical type
-  ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result));
+  ASSERT_NO_FATAL_FAILURE(
+      DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
   std::shared_ptr<Table> expected;
   MakeDateTimeTypesTable(&table, true);
 
@@ -1339,14 +1340,14 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
 
   std::shared_ptr<Table> milli_result;
   ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
-      input, 1, input->num_rows(), {}, &milli_result,
+      input, false /* use_threads */, input->num_rows(), {}, &milli_result,
       ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));
 
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_milli_result, *milli_result));
 
   std::shared_ptr<Table> micro_result;
   ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
-      input, 1, input->num_rows(), {}, &micro_result,
+      input, false /* use_threads */, input->num_rows(), {}, &micro_result,
       ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build()));
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_micro_result, *micro_result));
 }
@@ -1472,7 +1473,8 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
   auto ex_table = Table::Make(ex_schema, ex_columns);
 
   std::shared_ptr<Table> result;
-  ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result));
+  ASSERT_NO_FATAL_FAILURE(
+      DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
 
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_table, *result));
 }
@@ -1515,7 +1517,8 @@ TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) {
                                      ->build();
 
   std::shared_ptr<Table> result;
-  DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result, arrow_writer_properties);
+  DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result,
+                    arrow_writer_properties);
 
   ASSERT_EQ(table->num_columns(), result->num_columns());
   ASSERT_EQ(table->num_rows(), result->num_rows());
@@ -1561,17 +1564,18 @@ void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
   *out = Table::Make(schema, columns);
 }
 
-void MakeListArray(int num_rows, std::shared_ptr<::DataType>* out_type,
+void MakeListArray(int num_rows, int max_value_length,
+                   std::shared_ptr<::DataType>* out_type,
                    std::shared_ptr<Array>* out_array) {
   std::vector<int32_t> length_draws;
-  randint(num_rows, 0, 100, &length_draws);
+  randint(num_rows, 0, max_value_length, &length_draws);
 
   std::vector<int32_t> offset_values;
 
   // Make sure some of them are length 0
   int32_t total_elements = 0;
   for (size_t i = 0; i < length_draws.size(); ++i) {
-    if (length_draws[i] < 10) {
+    if (length_draws[i] < max_value_length / 10) {
       length_draws[i] = 0;
     }
     offset_values.push_back(total_elements);
@@ -1599,14 +1603,14 @@ void MakeListArray(int num_rows, std::shared_ptr<::DataType>* out_type,
 TEST(TestArrowReadWrite, MultithreadedRead) {
   const int num_columns = 20;
   const int num_rows = 1000;
-  const int num_threads = 4;
+  const bool use_threads = true;
 
   std::shared_ptr<Table> table;
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
 
   std::shared_ptr<Table> result;
   ASSERT_NO_FATAL_FAILURE(
-      DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result));
+      DoSimpleRoundtrip(table, use_threads, table->num_rows(), {}, &result));
 
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result));
 }
@@ -1619,7 +1623,7 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
 
   std::shared_ptr<Buffer> buffer;
-  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2,
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
                                              default_arrow_writer_properties(), &buffer));
 
   std::unique_ptr<FileReader> reader;
@@ -1648,7 +1652,7 @@ TEST(TestArrowReadWrite, GetRecordBatchReader) {
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
 
   std::shared_ptr<Buffer> buffer;
-  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2,
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
                                              default_arrow_writer_properties(), &buffer));
 
   std::unique_ptr<FileReader> reader;
@@ -1681,7 +1685,7 @@ TEST(TestArrowReadWrite, ScanContents) {
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
 
   std::shared_ptr<Buffer> buffer;
-  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2,
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
                                              default_arrow_writer_properties(), &buffer));
 
   std::unique_ptr<FileReader> reader;
@@ -1700,7 +1704,7 @@ TEST(TestArrowReadWrite, ScanContents) {
 TEST(TestArrowReadWrite, ReadColumnSubset) {
   const int num_columns = 20;
   const int num_rows = 1000;
-  const int num_threads = 4;
+  const bool use_threads = true;
 
   std::shared_ptr<Table> table;
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
@@ -1708,7 +1712,7 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
   std::shared_ptr<Table> result;
   std::vector<int> column_subset = {0, 4, 8, 10};
   ASSERT_NO_FATAL_FAILURE(
-      DoSimpleRoundtrip(table, num_threads, table->num_rows(), column_subset, &result));
+      DoSimpleRoundtrip(table, use_threads, table->num_rows(), column_subset, &result));
 
   std::vector<std::shared_ptr<::arrow::Column>> ex_columns;
   std::vector<std::shared_ptr<::arrow::Field>> ex_fields;
@@ -1723,19 +1727,21 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
 }
 
 TEST(TestArrowReadWrite, ListLargeRecords) {
-  const int num_rows = 50;
+  // PARQUET-1308: This test passed on Linux when num_rows was smaller
+  const int num_rows = 2000;
 
   std::shared_ptr<Array> list_array;
   std::shared_ptr<::DataType> list_type;
 
-  MakeListArray(num_rows, &list_type, &list_array);
+  MakeListArray(num_rows, 20, &list_type, &list_array);
 
   auto schema = ::arrow::schema({::arrow::field("a", list_type)});
+
   std::shared_ptr<Table> table = Table::Make(schema, {list_array});
 
   std::shared_ptr<Buffer> buffer;
   ASSERT_NO_FATAL_FAILURE(
-      WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(), &buffer));
+      WriteTableToBuffer(table, 100, default_arrow_writer_properties(), &buffer));
 
   std::unique_ptr<FileReader> reader;
   ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
@@ -1747,6 +1753,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
   ASSERT_OK_NO_THROW(reader->ReadTable(&result));
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result));
 
+  // Read chunked
   ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
                               ::arrow::default_memory_pool(),
                               ::parquet::default_reader_properties(), nullptr, &reader));
@@ -1754,8 +1761,6 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
   std::unique_ptr<ColumnReader> col_reader;
   ASSERT_OK(reader->GetColumn(0, &col_reader));
 
-  auto expected = table->column(0)->data()->chunk(0);
-
   std::vector<std::shared_ptr<Array>> pieces;
   for (int i = 0; i < num_rows; ++i) {
     std::shared_ptr<Array> piece;
@@ -1809,7 +1814,7 @@ auto GenerateInt32 = [](int length, std::shared_ptr<::DataType>* type,
 
 auto GenerateList = [](int length, std::shared_ptr<::DataType>* type,
                        std::shared_ptr<Array>* array) {
-  MakeListArray(length, type, array);
+  MakeListArray(length, 100, type, array);
 };
 
 TEST(TestArrowReadWrite, TableWithChunkedColumns) {
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index c0974ca..d0b397f 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -31,7 +31,7 @@
 #include "arrow/util/bit-util.h"
 #include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
-#include "arrow/util/parallel.h"
+#include "arrow/util/thread-pool.h"
 
 #include "parquet/arrow/record_reader.h"
 #include "parquet/arrow/schema.h"
@@ -56,7 +56,6 @@ using parquet::schema::Node;
 
 // Help reduce verbosity
 using ParquetReader = parquet::ParquetFileReader;
-using arrow::ParallelFor;
 using arrow::RecordBatchReader;
 
 using parquet::internal::RecordReader;
@@ -212,7 +211,7 @@ class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
 class FileReader::Impl {
  public:
   Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
-      : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
+      : pool_(pool), reader_(std::move(reader)), use_threads_(false) {}
 
   virtual ~Impl() {}
 
@@ -244,15 +243,14 @@ class FileReader::Impl {
 
   int num_columns() const { return reader_->metadata()->num_columns(); }
 
-  void set_num_threads(int num_threads) { num_threads_ = num_threads; }
+  void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
 
   ParquetFileReader* reader() { return reader_.get(); }
 
  private:
   MemoryPool* pool_;
   std::unique_ptr<ParquetFileReader> reader_;
-
-  int num_threads_;
+  bool use_threads_;
 };
 
 class ColumnReader::ColumnReaderImpl {
@@ -462,14 +460,13 @@ Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
 
 Status FileReader::Impl::ReadRowGroup(int row_group_index,
                                       const std::vector<int>& indices,
-                                      std::shared_ptr<::arrow::Table>* out) {
+                                      std::shared_ptr<Table>* out) {
   std::shared_ptr<::arrow::Schema> schema;
   RETURN_NOT_OK(GetSchema(indices, &schema));
 
   auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
 
   int num_columns = static_cast<int>(indices.size());
-  int nthreads = std::min<int>(num_threads_, num_columns);
   std::vector<std::shared_ptr<Column>> columns(num_columns);
 
   // TODO(wesm): Refactor to share more code with ReadTable
@@ -483,12 +480,24 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
     return Status::OK();
   };
 
-  if (nthreads == 1) {
+  if (use_threads_) {
+    std::vector<std::future<Status>> futures;
+    auto pool = ::arrow::internal::GetCpuThreadPool();
     for (int i = 0; i < num_columns; i++) {
-      RETURN_NOT_OK(ReadColumnFunc(i));
+      futures.push_back(pool->Submit(ReadColumnFunc, i));
     }
+    Status final_status = Status::OK();
+    for (auto& fut : futures) {
+      Status st = fut.get();
+      if (!st.ok()) {
+        final_status = std::move(st);
+      }
+    }
+    RETURN_NOT_OK(final_status);
   } else {
-    RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc));
+    for (int i = 0; i < num_columns; i++) {
+      RETURN_NOT_OK(ReadColumnFunc(i));
+    }
   }
 
   *out = Table::Make(schema, columns);
@@ -508,7 +517,9 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
     return Status::Invalid("Invalid column index");
   }
 
-  std::vector<std::shared_ptr<Column>> columns(field_indices.size());
+  int num_fields = static_cast<int>(field_indices.size());
+  std::vector<std::shared_ptr<Column>> columns(num_fields);
+
   auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
     std::shared_ptr<Array> array;
     RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
@@ -516,14 +527,24 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
     return Status::OK();
   };
 
-  int num_fields = static_cast<int>(field_indices.size());
-  int nthreads = std::min<int>(num_threads_, num_fields);
-  if (nthreads == 1) {
+  if (use_threads_) {
+    std::vector<std::future<Status>> futures;
+    auto pool = ::arrow::internal::GetCpuThreadPool();
     for (int i = 0; i < num_fields; i++) {
-      RETURN_NOT_OK(ReadColumnFunc(i));
+      futures.push_back(pool->Submit(ReadColumnFunc, i));
     }
+    Status final_status = Status::OK();
+    for (auto& fut : futures) {
+      Status st = fut.get();
+      if (!st.ok()) {
+        final_status = std::move(st);
+      }
+    }
+    RETURN_NOT_OK(final_status);
   } else {
-    RETURN_NOT_OK(ParallelFor(nthreads, num_fields, ReadColumnFunc));
+    for (int i = 0; i < num_fields; i++) {
+      RETURN_NOT_OK(ReadColumnFunc(i));
+    }
   }
 
   std::shared_ptr<Table> table = Table::Make(schema, columns);
@@ -669,7 +690,11 @@ std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
 
 int FileReader::num_row_groups() const { return impl_->num_row_groups(); }
 
-void FileReader::set_num_threads(int num_threads) { impl_->set_num_threads(num_threads); }
+void FileReader::set_num_threads(int num_threads) {}
+
+void FileReader::set_use_threads(bool use_threads) {
+  impl_->set_use_threads(use_threads);
+}
 
 Status FileReader::ScanContents(std::vector<int> columns, const int32_t column_batch_size,
                                 int64_t* num_rows) {
@@ -1350,7 +1375,7 @@ Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out
   const int16_t* def_levels_data;
   size_t def_levels_length;
   RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  RETURN_NOT_OK(GetEmptyBitmap(pool_, def_levels_length, &null_bitmap));
+  RETURN_NOT_OK(AllocateEmptyBitmap(pool_, def_levels_length, &null_bitmap));
   uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
   for (size_t i = 0; i < def_levels_length; i++) {
     if (def_levels_data[i] < struct_def_level_) {
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 4d68c61..1e37d89 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -196,8 +196,13 @@ class PARQUET_EXPORT FileReader {
 
   /// Set the number of threads to use during reads of multiple columns. By
   /// default only 1 thread is used
+  /// \deprecated Use set_use_threads instead.
   void set_num_threads(int num_threads);
 
+  /// Set whether to use multiple threads during reads of multiple columns.
+  /// By default only one thread is used.
+  void set_use_threads(bool use_threads);
+
   virtual ~FileReader();
 
  private:
diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc
index b4d8766..781e1ba 100644
--- a/src/parquet/arrow/record_reader.cc
+++ b/src/parquet/arrow/record_reader.cc
@@ -59,7 +59,7 @@ class RecordReader::RecordReaderImpl {
         num_decoded_values_(0),
         max_def_level_(descr->max_definition_level()),
         max_rep_level_(descr->max_repetition_level()),
-        at_record_start_(false),
+        at_record_start_(true),
         records_read_(0),
         values_written_(0),
         values_capacity_(0),
@@ -91,6 +91,7 @@ class RecordReader::RecordReaderImpl {
   virtual void ResetDecoders() = 0;
 
   void SetPageReader(std::unique_ptr<PageReader> reader) {
+    at_record_start_ = true;
     pager_ = std::move(reader);
     ResetDecoders();
   }
@@ -152,17 +153,26 @@ class RecordReader::RecordReaderImpl {
     // Count logical records and number of values to read
     while (levels_position_ < levels_written_) {
       if (*rep_levels++ == 0) {
-        at_record_start_ = true;
-        if (records_read == num_records) {
-          // We've found the number of records we were looking for
-          break;
-        } else {
-          // Continue
+        // If at_record_start_ is true, we are seeing the start of a record
+        // for the second time, such as after repeated calls to
+        // DelimitRecords. In this case we must continue until we find
+        // another record start or exhausting the ColumnChunk
+        if (!at_record_start_) {
+          // We've reached the end of a record; increment the record count.
           ++records_read;
+          if (records_read == num_records) {
+            // We've found the number of records we were looking for. Set
+            // at_record_start_ to true and break
+            at_record_start_ = true;
+            break;
+          }
         }
-      } else {
-        at_record_start_ = false;
       }
+
+      // We have decided to consume the level at this position; therefore we
+      // must advance until we find another record boundary
+      at_record_start_ = false;
+
       if (*def_levels++ == max_def_level_) {
         ++values_to_read;
       }
@@ -435,11 +445,6 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
       records_read += ReadRecordData(num_records);
     }
 
-    // HasNext invokes ReadNewPage
-    if (records_read == 0 && !HasNext()) {
-      return 0;
-    }
-
     int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
 
     // If we are in the middle of a record, we continue until reaching the
@@ -448,6 +453,13 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
     while (!at_record_start_ || records_read < num_records) {
       // Is there more data to read in this row group?
       if (!HasNext()) {
+        if (!at_record_start_) {
+          // We ended the row group while inside a record that we haven't seen
+          // the end of yet. So increment the record count for the last record in
+          // the row group
+          ++records_read;
+          at_record_start_ = true;
+        }
         break;
       }
 
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 2babacb..f0f9139 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -24,7 +24,12 @@
 #include "arrow/type_traits.h"
 #include "arrow/util/decimal.h"
 
+#include "parquet/arrow/record_reader.h"
+
 namespace parquet {
+
+using internal::RecordReader;
+
 namespace arrow {
 
 using ::arrow::Array;
@@ -69,7 +74,7 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullA
   ::arrow::test::random_real(size, 0, static_cast<c_type>(0), static_cast<c_type>(1),
                              &values);
   ::arrow::NumericBuilder<ArrowType> builder;
-  RETURN_NOT_OK(builder.Append(values.data(), values.size()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
   return builder.Finish(out);
 }
 
@@ -83,7 +88,7 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
                                              ::arrow::default_memory_pool());
-  RETURN_NOT_OK(builder.Append(values.data(), values.size()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
   return builder.Finish(out);
 }
 
@@ -99,7 +104,7 @@ typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NonNullAr
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
                                              ::arrow::default_memory_pool());
-  builder.Append(values.data(), values.size());
+  builder.AppendValues(values.data(), values.size());
   return builder.Finish(out);
 }
 
@@ -167,7 +172,7 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
                                         &out_buf));
   random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
 
-  RETURN_NOT_OK(builder.Append(out_buf->data(), size));
+  RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size));
   return builder.Finish(out);
 }
 
@@ -177,7 +182,7 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullAr
   std::vector<uint8_t> values;
   ::arrow::test::randint(size, 0, 1, &values);
   ::arrow::BooleanBuilder builder;
-  RETURN_NOT_OK(builder.Append(values.data(), values.size()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
   return builder.Finish(out);
 }
 
@@ -196,7 +201,7 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable
   }
 
   ::arrow::NumericBuilder<ArrowType> builder;
-  RETURN_NOT_OK(builder.Append(values.data(), values.size(), valid_bytes.data()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
   return builder.Finish(out);
 }
 
@@ -219,7 +224,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Arra
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
                                              ::arrow::default_memory_pool());
-  RETURN_NOT_OK(builder.Append(values.data(), values.size(), valid_bytes.data()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
   return builder.Finish(out);
 }
 
@@ -243,7 +248,7 @@ typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NullableA
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
                                              ::arrow::default_memory_pool());
-  builder.Append(values.data(), values.size(), valid_bytes.data());
+  builder.AppendValues(values.data(), values.size(), valid_bytes.data());
   return builder.Finish(out);
 }
 
@@ -328,7 +333,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,
   random_decimals(size, seed, precision, out_buf->mutable_data());
 
   ::arrow::Decimal128Builder builder(type);
-  RETURN_NOT_OK(builder.Append(out_buf->data(), size, valid_bytes.data()));
+  RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data()));
   return builder.Finish(out);
 }
 
@@ -349,7 +354,7 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableA
   }
 
   ::arrow::BooleanBuilder builder;
-  RETURN_NOT_OK(builder.Append(values.data(), values.size(), valid_bytes.data()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
   return builder.Finish(out);
 }
 
@@ -463,13 +468,51 @@ void ExpectArrayT(void* expected, Array* result) {
 template <>
 void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
   ::arrow::BooleanBuilder builder;
-  EXPECT_OK(builder.Append(reinterpret_cast<uint8_t*>(expected), result->length()));
+  EXPECT_OK(builder.AppendValues(reinterpret_cast<uint8_t*>(expected), result->length()));
 
   std::shared_ptr<Array> expected_array;
   EXPECT_OK(builder.Finish(&expected_array));
   EXPECT_TRUE(result->Equals(*expected_array));
 }
 
+template <typename ParquetType>
+void PrintBufferedLevels(const RecordReader& reader) {
+  using T = typename ::parquet::type_traits<ParquetType::type_num>::value_type;
+
+  const int16_t* def_levels = reader.def_levels();
+  const int16_t* rep_levels = reader.rep_levels();
+  const int64_t total_levels_read = reader.levels_position();
+
+  const T* values = reinterpret_cast<const T*>(reader.values());
+
+  std::cout << "def levels: ";
+  for (int64_t i = 0; i < total_levels_read; ++i) {
+    std::cout << def_levels[i] << " ";
+  }
+  std::cout << std::endl;
+
+  std::cout << "rep levels: ";
+  for (int64_t i = 0; i < total_levels_read; ++i) {
+    std::cout << rep_levels[i] << " ";
+  }
+  std::cout << std::endl;
+
+  std::cout << "values: ";
+  for (int64_t i = 0; i < reader.values_written(); ++i) {
+    std::cout << values[i] << " ";
+  }
+  std::cout << std::endl;
+}
+
+template <>
+void PrintBufferedLevels<ByteArrayType>(const RecordReader& reader) {}
+
+template <>
+void PrintBufferedLevels<FLBAType>(const RecordReader& reader) {}
+
+template <>
+void PrintBufferedLevels<Int96Type>(const RecordReader& reader) {}
+
 }  // namespace arrow
 
 }  // namespace parquet
diff --git a/src/parquet/types.h b/src/parquet/types.h
index aec9965..10789cb 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -114,13 +114,9 @@ struct Compression {
 };
 
 struct Encryption {
-  enum type {
-    AES_GCM_V1 = 0,
-    AES_GCM_CTR_V1 = 1
-  };
+  enum type { AES_GCM_V1 = 0, AES_GCM_CTR_V1 = 1 };
 };
 
-
 // parquet::PageType
 struct PageType {
   enum type { DATA_PAGE, INDEX_PAGE, DICTIONARY_PAGE, DATA_PAGE_V2 };