You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/17 06:27:00 UTC

[jira] [Commented] (PARQUET-1308) [C++] parquet::arrow should use thread pool, not ParallelFor

    [ https://issues.apache.org/jira/browse/PARQUET-1308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583431#comment-16583431 ] 

ASF GitHub Bot commented on PARQUET-1308:
-----------------------------------------

wesm closed pull request #467: PARQUET-1308: [C++] Use Arrow thread pool, not Arrow ParallelFor, fix deprecated APIs, upgrade clang-format version. Fix record delimiting bug
URL: https://github.com/apache/parquet-cpp/pull/467
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/benchmarks/decode_benchmark.cc b/benchmarks/decode_benchmark.cc
index 8f2dfa07..3ae32b4c 100644
--- a/benchmarks/decode_benchmark.cc
+++ b/benchmarks/decode_benchmark.cc
@@ -42,8 +42,8 @@ class DeltaBitPackEncoder {
 
   uint8_t* Encode(int* encoded_len) {
     uint8_t* result = new uint8_t[10 * 1024 * 1024];
-    int num_mini_blocks = static_cast<int>(arrow::BitUtil::Ceil(num_values() - 1,
-                                                                mini_block_size_));
+    int num_mini_blocks = static_cast<int>(arrow::BitUtil::CeilDiv(num_values() - 1,
+                                                                   mini_block_size_));
     uint8_t* mini_block_widths = NULL;
 
     arrow::BitWriter writer(result, 10 * 1024 * 1024);
diff --git a/cmake_modules/ArrowExternalProject.cmake b/cmake_modules/ArrowExternalProject.cmake
index 4f23661e..3d1a2760 100644
--- a/cmake_modules/ArrowExternalProject.cmake
+++ b/cmake_modules/ArrowExternalProject.cmake
@@ -46,7 +46,7 @@ if (MSVC AND PARQUET_USE_STATIC_CRT)
 endif()
 
 if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
-  set(ARROW_VERSION "501d60e918bd4d10c429ab34e0b8e8a87dffb732")
+  set(ARROW_VERSION "3edfd7caf2746eeba37d5ac7bfd3665cc159e7ad")
 else()
   set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
 endif()
diff --git a/cmake_modules/FindClangTools.cmake b/cmake_modules/FindClangTools.cmake
index 215a5cd9..56e2dd77 100644
--- a/cmake_modules/FindClangTools.cmake
+++ b/cmake_modules/FindClangTools.cmake
@@ -96,7 +96,9 @@ if (CLANG_FORMAT_VERSION)
     endif()
 else()
     find_program(CLANG_FORMAT_BIN
-      NAMES clang-format-4.0
+      NAMES clang-format-6.0
+      clang-format-5.0
+      clang-format-4.0
       clang-format-3.9
       clang-format-3.8
       clang-format-3.7
diff --git a/cmake_modules/SetupCxxFlags.cmake b/cmake_modules/SetupCxxFlags.cmake
index 01ed85bf..5ca3f4ef 100644
--- a/cmake_modules/SetupCxxFlags.cmake
+++ b/cmake_modules/SetupCxxFlags.cmake
@@ -84,6 +84,7 @@ if ("${UPPERCASE_BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
 -Wno-shadow -Wno-switch-enum -Wno-exit-time-destructors \
 -Wno-global-constructors -Wno-weak-template-vtables -Wno-undefined-reinterpret-cast \
 -Wno-implicit-fallthrough -Wno-unreachable-code-return \
+-Wno-documentation-deprecated-sync \
 -Wno-float-equal -Wno-missing-prototypes \
 -Wno-old-style-cast -Wno-covered-switch-default \
 -Wno-format-nonliteral -Wno-missing-noreturn \
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index 15d2cf72..51eb0c23 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -104,9 +104,9 @@ std::shared_ptr<::arrow::Table> TableFromVector(
     std::vector<uint8_t> valid_bytes(BENCHMARK_SIZE, 0);
     int n = {0};
     std::generate(valid_bytes.begin(), valid_bytes.end(), [&n] { return n++ % 2; });
-    EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
+    EXIT_NOT_OK(builder.AppendValues(vec.data(), vec.size(), valid_bytes.data()));
   } else {
-    EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
+    EXIT_NOT_OK(builder.AppendValues(vec.data(), vec.size(), nullptr));
   }
   std::shared_ptr<::arrow::Array> array;
   EXIT_NOT_OK(builder.Finish(&array));
@@ -126,9 +126,9 @@ std::shared_ptr<::arrow::Table> TableFromVector<BooleanType>(const std::vector<b
     int n = {0};
     std::generate(valid_bytes.begin(), valid_bytes.end(),
                   [&n] { return (n++ % 2) != 0; });
-    EXIT_NOT_OK(builder.Append(vec, valid_bytes));
+    EXIT_NOT_OK(builder.AppendValues(vec, valid_bytes));
   } else {
-    EXIT_NOT_OK(builder.Append(vec));
+    EXIT_NOT_OK(builder.AppendValues(vec));
   }
   std::shared_ptr<::arrow::Array> array;
   EXIT_NOT_OK(builder.Finish(&array));
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index d4f5b000..be3e6114 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -320,8 +320,7 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
 template <typename T>
 using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
 
-void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads,
-                        int64_t row_group_size,
+void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t row_group_size,
                         const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
                         std::shared_ptr<Buffer>* out) {
   auto sink = std::make_shared<InMemoryOutputStream>();
@@ -399,21 +398,21 @@ void AssertTablesEqual(const Table& expected, const Table& actual,
   }
 }
 
-void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
+void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
                        int64_t row_group_size, const std::vector<int>& column_subset,
                        std::shared_ptr<Table>* out,
                        const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
                            default_arrow_writer_properties()) {
   std::shared_ptr<Buffer> buffer;
   ASSERT_NO_FATAL_FAILURE(
-      WriteTableToBuffer(table, num_threads, row_group_size, arrow_properties, &buffer));
+      WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer));
 
   std::unique_ptr<FileReader> reader;
   ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
                               ::arrow::default_memory_pool(),
                               ::parquet::default_reader_properties(), nullptr, &reader));
 
-  reader->set_num_threads(num_threads);
+  reader->set_use_threads(use_threads);
 
   if (column_subset.size() > 0) {
     ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
@@ -427,7 +426,8 @@ void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group
                           const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
                               default_arrow_writer_properties()) {
   std::shared_ptr<Table> result;
-  DoSimpleRoundtrip(table, 1, row_group_size, {}, &result, arrow_properties);
+  DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result,
+                    arrow_properties);
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result, false));
 }
 
@@ -1270,13 +1270,14 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
   // Use deprecated INT96 type
   std::shared_ptr<Table> result;
   ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
-      table, 1, table->num_rows(), {}, &result,
+      table, false /* use_threads */, table->num_rows(), {}, &result,
       ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));
 
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result));
 
   // Cast nanaoseconds to microseconds and use INT64 physical type
-  ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result));
+  ASSERT_NO_FATAL_FAILURE(
+      DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
   std::shared_ptr<Table> expected;
   MakeDateTimeTypesTable(&table, true);
 
@@ -1339,14 +1340,14 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
 
   std::shared_ptr<Table> milli_result;
   ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
-      input, 1, input->num_rows(), {}, &milli_result,
+      input, false /* use_threads */, input->num_rows(), {}, &milli_result,
       ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));
 
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_milli_result, *milli_result));
 
   std::shared_ptr<Table> micro_result;
   ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
-      input, 1, input->num_rows(), {}, &micro_result,
+      input, false /* use_threads */, input->num_rows(), {}, &micro_result,
       ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build()));
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_micro_result, *micro_result));
 }
@@ -1472,7 +1473,8 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
   auto ex_table = Table::Make(ex_schema, ex_columns);
 
   std::shared_ptr<Table> result;
-  ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result));
+  ASSERT_NO_FATAL_FAILURE(
+      DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
 
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_table, *result));
 }
@@ -1515,7 +1517,8 @@ TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) {
                                      ->build();
 
   std::shared_ptr<Table> result;
-  DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result, arrow_writer_properties);
+  DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result,
+                    arrow_writer_properties);
 
   ASSERT_EQ(table->num_columns(), result->num_columns());
   ASSERT_EQ(table->num_rows(), result->num_rows());
@@ -1561,17 +1564,18 @@ void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
   *out = Table::Make(schema, columns);
 }
 
-void MakeListArray(int num_rows, std::shared_ptr<::DataType>* out_type,
+void MakeListArray(int num_rows, int max_value_length,
+                   std::shared_ptr<::DataType>* out_type,
                    std::shared_ptr<Array>* out_array) {
   std::vector<int32_t> length_draws;
-  randint(num_rows, 0, 100, &length_draws);
+  randint(num_rows, 0, max_value_length, &length_draws);
 
   std::vector<int32_t> offset_values;
 
   // Make sure some of them are length 0
   int32_t total_elements = 0;
   for (size_t i = 0; i < length_draws.size(); ++i) {
-    if (length_draws[i] < 10) {
+    if (length_draws[i] < max_value_length / 10) {
       length_draws[i] = 0;
     }
     offset_values.push_back(total_elements);
@@ -1599,14 +1603,14 @@ void MakeListArray(int num_rows, std::shared_ptr<::DataType>* out_type,
 TEST(TestArrowReadWrite, MultithreadedRead) {
   const int num_columns = 20;
   const int num_rows = 1000;
-  const int num_threads = 4;
+  const bool use_threads = true;
 
   std::shared_ptr<Table> table;
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
 
   std::shared_ptr<Table> result;
   ASSERT_NO_FATAL_FAILURE(
-      DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result));
+      DoSimpleRoundtrip(table, use_threads, table->num_rows(), {}, &result));
 
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result));
 }
@@ -1619,7 +1623,7 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
 
   std::shared_ptr<Buffer> buffer;
-  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2,
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
                                              default_arrow_writer_properties(), &buffer));
 
   std::unique_ptr<FileReader> reader;
@@ -1648,7 +1652,7 @@ TEST(TestArrowReadWrite, GetRecordBatchReader) {
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
 
   std::shared_ptr<Buffer> buffer;
-  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2,
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
                                              default_arrow_writer_properties(), &buffer));
 
   std::unique_ptr<FileReader> reader;
@@ -1681,7 +1685,7 @@ TEST(TestArrowReadWrite, ScanContents) {
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
 
   std::shared_ptr<Buffer> buffer;
-  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2,
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
                                              default_arrow_writer_properties(), &buffer));
 
   std::unique_ptr<FileReader> reader;
@@ -1700,7 +1704,7 @@ TEST(TestArrowReadWrite, ScanContents) {
 TEST(TestArrowReadWrite, ReadColumnSubset) {
   const int num_columns = 20;
   const int num_rows = 1000;
-  const int num_threads = 4;
+  const bool use_threads = true;
 
   std::shared_ptr<Table> table;
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
@@ -1708,7 +1712,7 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
   std::shared_ptr<Table> result;
   std::vector<int> column_subset = {0, 4, 8, 10};
   ASSERT_NO_FATAL_FAILURE(
-      DoSimpleRoundtrip(table, num_threads, table->num_rows(), column_subset, &result));
+      DoSimpleRoundtrip(table, use_threads, table->num_rows(), column_subset, &result));
 
   std::vector<std::shared_ptr<::arrow::Column>> ex_columns;
   std::vector<std::shared_ptr<::arrow::Field>> ex_fields;
@@ -1723,19 +1727,21 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
 }
 
 TEST(TestArrowReadWrite, ListLargeRecords) {
-  const int num_rows = 50;
+  // PARQUET-1308: This test passed on Linux when num_rows was smaller
+  const int num_rows = 2000;
 
   std::shared_ptr<Array> list_array;
   std::shared_ptr<::DataType> list_type;
 
-  MakeListArray(num_rows, &list_type, &list_array);
+  MakeListArray(num_rows, 20, &list_type, &list_array);
 
   auto schema = ::arrow::schema({::arrow::field("a", list_type)});
+
   std::shared_ptr<Table> table = Table::Make(schema, {list_array});
 
   std::shared_ptr<Buffer> buffer;
   ASSERT_NO_FATAL_FAILURE(
-      WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(), &buffer));
+      WriteTableToBuffer(table, 100, default_arrow_writer_properties(), &buffer));
 
   std::unique_ptr<FileReader> reader;
   ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
@@ -1747,6 +1753,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
   ASSERT_OK_NO_THROW(reader->ReadTable(&result));
   ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result));
 
+  // Read chunked
   ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
                               ::arrow::default_memory_pool(),
                               ::parquet::default_reader_properties(), nullptr, &reader));
@@ -1754,8 +1761,6 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
   std::unique_ptr<ColumnReader> col_reader;
   ASSERT_OK(reader->GetColumn(0, &col_reader));
 
-  auto expected = table->column(0)->data()->chunk(0);
-
   std::vector<std::shared_ptr<Array>> pieces;
   for (int i = 0; i < num_rows; ++i) {
     std::shared_ptr<Array> piece;
@@ -1809,7 +1814,7 @@ auto GenerateInt32 = [](int length, std::shared_ptr<::DataType>* type,
 
 auto GenerateList = [](int length, std::shared_ptr<::DataType>* type,
                        std::shared_ptr<Array>* array) {
-  MakeListArray(length, type, array);
+  MakeListArray(length, 100, type, array);
 };
 
 TEST(TestArrowReadWrite, TableWithChunkedColumns) {
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index c0974ca4..d0b397f3 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -31,7 +31,7 @@
 #include "arrow/util/bit-util.h"
 #include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
-#include "arrow/util/parallel.h"
+#include "arrow/util/thread-pool.h"
 
 #include "parquet/arrow/record_reader.h"
 #include "parquet/arrow/schema.h"
@@ -56,7 +56,6 @@ using parquet::schema::Node;
 
 // Help reduce verbosity
 using ParquetReader = parquet::ParquetFileReader;
-using arrow::ParallelFor;
 using arrow::RecordBatchReader;
 
 using parquet::internal::RecordReader;
@@ -212,7 +211,7 @@ class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
 class FileReader::Impl {
  public:
   Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
-      : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
+      : pool_(pool), reader_(std::move(reader)), use_threads_(false) {}
 
   virtual ~Impl() {}
 
@@ -244,15 +243,14 @@ class FileReader::Impl {
 
   int num_columns() const { return reader_->metadata()->num_columns(); }
 
-  void set_num_threads(int num_threads) { num_threads_ = num_threads; }
+  void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
 
   ParquetFileReader* reader() { return reader_.get(); }
 
  private:
   MemoryPool* pool_;
   std::unique_ptr<ParquetFileReader> reader_;
-
-  int num_threads_;
+  bool use_threads_;
 };
 
 class ColumnReader::ColumnReaderImpl {
@@ -462,14 +460,13 @@ Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
 
 Status FileReader::Impl::ReadRowGroup(int row_group_index,
                                       const std::vector<int>& indices,
-                                      std::shared_ptr<::arrow::Table>* out) {
+                                      std::shared_ptr<Table>* out) {
   std::shared_ptr<::arrow::Schema> schema;
   RETURN_NOT_OK(GetSchema(indices, &schema));
 
   auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
 
   int num_columns = static_cast<int>(indices.size());
-  int nthreads = std::min<int>(num_threads_, num_columns);
   std::vector<std::shared_ptr<Column>> columns(num_columns);
 
   // TODO(wesm): Refactor to share more code with ReadTable
@@ -483,12 +480,24 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
     return Status::OK();
   };
 
-  if (nthreads == 1) {
+  if (use_threads_) {
+    std::vector<std::future<Status>> futures;
+    auto pool = ::arrow::internal::GetCpuThreadPool();
     for (int i = 0; i < num_columns; i++) {
-      RETURN_NOT_OK(ReadColumnFunc(i));
+      futures.push_back(pool->Submit(ReadColumnFunc, i));
     }
+    Status final_status = Status::OK();
+    for (auto& fut : futures) {
+      Status st = fut.get();
+      if (!st.ok()) {
+        final_status = std::move(st);
+      }
+    }
+    RETURN_NOT_OK(final_status);
   } else {
-    RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc));
+    for (int i = 0; i < num_columns; i++) {
+      RETURN_NOT_OK(ReadColumnFunc(i));
+    }
   }
 
   *out = Table::Make(schema, columns);
@@ -508,7 +517,9 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
     return Status::Invalid("Invalid column index");
   }
 
-  std::vector<std::shared_ptr<Column>> columns(field_indices.size());
+  int num_fields = static_cast<int>(field_indices.size());
+  std::vector<std::shared_ptr<Column>> columns(num_fields);
+
   auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
     std::shared_ptr<Array> array;
     RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
@@ -516,14 +527,24 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
     return Status::OK();
   };
 
-  int num_fields = static_cast<int>(field_indices.size());
-  int nthreads = std::min<int>(num_threads_, num_fields);
-  if (nthreads == 1) {
+  if (use_threads_) {
+    std::vector<std::future<Status>> futures;
+    auto pool = ::arrow::internal::GetCpuThreadPool();
     for (int i = 0; i < num_fields; i++) {
-      RETURN_NOT_OK(ReadColumnFunc(i));
+      futures.push_back(pool->Submit(ReadColumnFunc, i));
     }
+    Status final_status = Status::OK();
+    for (auto& fut : futures) {
+      Status st = fut.get();
+      if (!st.ok()) {
+        final_status = std::move(st);
+      }
+    }
+    RETURN_NOT_OK(final_status);
   } else {
-    RETURN_NOT_OK(ParallelFor(nthreads, num_fields, ReadColumnFunc));
+    for (int i = 0; i < num_fields; i++) {
+      RETURN_NOT_OK(ReadColumnFunc(i));
+    }
   }
 
   std::shared_ptr<Table> table = Table::Make(schema, columns);
@@ -669,7 +690,11 @@ std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
 
 int FileReader::num_row_groups() const { return impl_->num_row_groups(); }
 
-void FileReader::set_num_threads(int num_threads) { impl_->set_num_threads(num_threads); }
+void FileReader::set_num_threads(int num_threads) {}
+
+void FileReader::set_use_threads(bool use_threads) {
+  impl_->set_use_threads(use_threads);
+}
 
 Status FileReader::ScanContents(std::vector<int> columns, const int32_t column_batch_size,
                                 int64_t* num_rows) {
@@ -1350,7 +1375,7 @@ Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out
   const int16_t* def_levels_data;
   size_t def_levels_length;
   RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  RETURN_NOT_OK(GetEmptyBitmap(pool_, def_levels_length, &null_bitmap));
+  RETURN_NOT_OK(AllocateEmptyBitmap(pool_, def_levels_length, &null_bitmap));
   uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
   for (size_t i = 0; i < def_levels_length; i++) {
     if (def_levels_data[i] < struct_def_level_) {
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 4d68c610..1e37d898 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -196,8 +196,13 @@ class PARQUET_EXPORT FileReader {
 
   /// Set the number of threads to use during reads of multiple columns. By
   /// default only 1 thread is used
+  /// \deprecated Use set_use_threads instead.
   void set_num_threads(int num_threads);
 
+  /// Set whether to use multiple threads during reads of multiple columns.
+  /// By default only one thread is used.
+  void set_use_threads(bool use_threads);
+
   virtual ~FileReader();
 
  private:
diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc
index b4d8766a..781e1ba4 100644
--- a/src/parquet/arrow/record_reader.cc
+++ b/src/parquet/arrow/record_reader.cc
@@ -59,7 +59,7 @@ class RecordReader::RecordReaderImpl {
         num_decoded_values_(0),
         max_def_level_(descr->max_definition_level()),
         max_rep_level_(descr->max_repetition_level()),
-        at_record_start_(false),
+        at_record_start_(true),
         records_read_(0),
         values_written_(0),
         values_capacity_(0),
@@ -91,6 +91,7 @@ class RecordReader::RecordReaderImpl {
   virtual void ResetDecoders() = 0;
 
   void SetPageReader(std::unique_ptr<PageReader> reader) {
+    at_record_start_ = true;
     pager_ = std::move(reader);
     ResetDecoders();
   }
@@ -152,17 +153,26 @@ class RecordReader::RecordReaderImpl {
     // Count logical records and number of values to read
     while (levels_position_ < levels_written_) {
       if (*rep_levels++ == 0) {
-        at_record_start_ = true;
-        if (records_read == num_records) {
-          // We've found the number of records we were looking for
-          break;
-        } else {
-          // Continue
+        // If at_record_start_ is true, we are seeing the start of a record
+        // for the second time, such as after repeated calls to
+        // DelimitRecords. In this case we must continue until we find
+        // another record start or exhausting the ColumnChunk
+        if (!at_record_start_) {
+          // We've reached the end of a record; increment the record count.
           ++records_read;
+          if (records_read == num_records) {
+            // We've found the number of records we were looking for. Set
+            // at_record_start_ to true and break
+            at_record_start_ = true;
+            break;
+          }
         }
-      } else {
-        at_record_start_ = false;
       }
+
+      // We have decided to consume the level at this position; therefore we
+      // must advance until we find another record boundary
+      at_record_start_ = false;
+
       if (*def_levels++ == max_def_level_) {
         ++values_to_read;
       }
@@ -435,11 +445,6 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
       records_read += ReadRecordData(num_records);
     }
 
-    // HasNext invokes ReadNewPage
-    if (records_read == 0 && !HasNext()) {
-      return 0;
-    }
-
     int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
 
     // If we are in the middle of a record, we continue until reaching the
@@ -448,6 +453,13 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
     while (!at_record_start_ || records_read < num_records) {
       // Is there more data to read in this row group?
       if (!HasNext()) {
+        if (!at_record_start_) {
+          // We ended the row group while inside a record that we haven't seen
+          // the end of yet. So increment the record count for the last record in
+          // the row group
+          ++records_read;
+          at_record_start_ = true;
+        }
         break;
       }
 
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 2babacb8..f0f91393 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -24,7 +24,12 @@
 #include "arrow/type_traits.h"
 #include "arrow/util/decimal.h"
 
+#include "parquet/arrow/record_reader.h"
+
 namespace parquet {
+
+using internal::RecordReader;
+
 namespace arrow {
 
 using ::arrow::Array;
@@ -69,7 +74,7 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullA
   ::arrow::test::random_real(size, 0, static_cast<c_type>(0), static_cast<c_type>(1),
                              &values);
   ::arrow::NumericBuilder<ArrowType> builder;
-  RETURN_NOT_OK(builder.Append(values.data(), values.size()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
   return builder.Finish(out);
 }
 
@@ -83,7 +88,7 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
                                              ::arrow::default_memory_pool());
-  RETURN_NOT_OK(builder.Append(values.data(), values.size()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
   return builder.Finish(out);
 }
 
@@ -99,7 +104,7 @@ typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NonNullAr
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
                                              ::arrow::default_memory_pool());
-  builder.Append(values.data(), values.size());
+  builder.AppendValues(values.data(), values.size());
   return builder.Finish(out);
 }
 
@@ -167,7 +172,7 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
                                         &out_buf));
   random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
 
-  RETURN_NOT_OK(builder.Append(out_buf->data(), size));
+  RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size));
   return builder.Finish(out);
 }
 
@@ -177,7 +182,7 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullAr
   std::vector<uint8_t> values;
   ::arrow::test::randint(size, 0, 1, &values);
   ::arrow::BooleanBuilder builder;
-  RETURN_NOT_OK(builder.Append(values.data(), values.size()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
   return builder.Finish(out);
 }
 
@@ -196,7 +201,7 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable
   }
 
   ::arrow::NumericBuilder<ArrowType> builder;
-  RETURN_NOT_OK(builder.Append(values.data(), values.size(), valid_bytes.data()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
   return builder.Finish(out);
 }
 
@@ -219,7 +224,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Arra
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
                                              ::arrow::default_memory_pool());
-  RETURN_NOT_OK(builder.Append(values.data(), values.size(), valid_bytes.data()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
   return builder.Finish(out);
 }
 
@@ -243,7 +248,7 @@ typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NullableA
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
                                              ::arrow::default_memory_pool());
-  builder.Append(values.data(), values.size(), valid_bytes.data());
+  builder.AppendValues(values.data(), values.size(), valid_bytes.data());
   return builder.Finish(out);
 }
 
@@ -328,7 +333,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,
   random_decimals(size, seed, precision, out_buf->mutable_data());
 
   ::arrow::Decimal128Builder builder(type);
-  RETURN_NOT_OK(builder.Append(out_buf->data(), size, valid_bytes.data()));
+  RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data()));
   return builder.Finish(out);
 }
 
@@ -349,7 +354,7 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableA
   }
 
   ::arrow::BooleanBuilder builder;
-  RETURN_NOT_OK(builder.Append(values.data(), values.size(), valid_bytes.data()));
+  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
   return builder.Finish(out);
 }
 
@@ -463,13 +468,51 @@ void ExpectArrayT(void* expected, Array* result) {
 template <>
 void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
   ::arrow::BooleanBuilder builder;
-  EXPECT_OK(builder.Append(reinterpret_cast<uint8_t*>(expected), result->length()));
+  EXPECT_OK(builder.AppendValues(reinterpret_cast<uint8_t*>(expected), result->length()));
 
   std::shared_ptr<Array> expected_array;
   EXPECT_OK(builder.Finish(&expected_array));
   EXPECT_TRUE(result->Equals(*expected_array));
 }
 
+template <typename ParquetType>
+void PrintBufferedLevels(const RecordReader& reader) {
+  using T = typename ::parquet::type_traits<ParquetType::type_num>::value_type;
+
+  const int16_t* def_levels = reader.def_levels();
+  const int16_t* rep_levels = reader.rep_levels();
+  const int64_t total_levels_read = reader.levels_position();
+
+  const T* values = reinterpret_cast<const T*>(reader.values());
+
+  std::cout << "def levels: ";
+  for (int64_t i = 0; i < total_levels_read; ++i) {
+    std::cout << def_levels[i] << " ";
+  }
+  std::cout << std::endl;
+
+  std::cout << "rep levels: ";
+  for (int64_t i = 0; i < total_levels_read; ++i) {
+    std::cout << rep_levels[i] << " ";
+  }
+  std::cout << std::endl;
+
+  std::cout << "values: ";
+  for (int64_t i = 0; i < reader.values_written(); ++i) {
+    std::cout << values[i] << " ";
+  }
+  std::cout << std::endl;
+}
+
+template <>
+void PrintBufferedLevels<ByteArrayType>(const RecordReader& reader) {}
+
+template <>
+void PrintBufferedLevels<FLBAType>(const RecordReader& reader) {}
+
+template <>
+void PrintBufferedLevels<Int96Type>(const RecordReader& reader) {}
+
 }  // namespace arrow
 
 }  // namespace parquet
diff --git a/src/parquet/types.h b/src/parquet/types.h
index aec99656..10789cbf 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -114,13 +114,9 @@ struct Compression {
 };
 
 struct Encryption {
-  enum type {
-    AES_GCM_V1 = 0,
-    AES_GCM_CTR_V1 = 1
-  };
+  enum type { AES_GCM_V1 = 0, AES_GCM_CTR_V1 = 1 };
 };
 
-
 // parquet::PageType
 struct PageType {
   enum type { DATA_PAGE, INDEX_PAGE, DICTIONARY_PAGE, DATA_PAGE_V2 };


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> [C++] parquet::arrow should use thread pool, not ParallelFor
> ------------------------------------------------------------
>
>                 Key: PARQUET-1308
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1308
>             Project: Parquet
>          Issue Type: Task
>          Components: parquet-cpp
>            Reporter: Antoine Pitrou
>            Assignee: Antoine Pitrou
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: cpp-1.5.0
>
>
> Arrow now has a global thread pool, parquet::arrow should use that instead of ParallelFor.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)