You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2018/08/17 06:25:57 UTC
[parquet-cpp] branch master updated: PARQUET-1308: [C++] Use Arrow
thread pool, not Arrow ParallelFor, fix deprecated APIs,
upgrade clang-format version. Fix record delimiting bug
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 337c8eb PARQUET-1308: [C++] Use Arrow thread pool, not Arrow ParallelFor, fix deprecated APIs, upgrade clang-format version. Fix record delimiting bug
337c8eb is described below
commit 337c8eb0fb73f7f44e60aeeca83607c7b1e78b4c
Author: Wes McKinney <we...@apache.org>
AuthorDate: Fri Aug 17 02:25:50 2018 -0400
PARQUET-1308: [C++] Use Arrow thread pool, not Arrow ParallelFor, fix deprecated APIs, upgrade clang-format version. Fix record delimiting bug
Author: Wes McKinney <we...@apache.org>
Author: Antoine Pitrou <an...@python.org>
Author: Wes McKinney <we...@twosigma.com>
Closes #467 from pitrou/PARQUET-1308-arrow-thread-pool and squashes the following commits:
cab1aa8 [Wes McKinney] Fix deprecation warning
b23bc46 [Wes McKinney] Fix end of row group logic
8fbff2e [Wes McKinney] Fix record delimiting bug
c2bc1c8 [Wes McKinney] Print offending value
3e72282 [Wes McKinney] Find minimal repro that fails on Linux also
1f4f714 [Wes McKinney] Add temporary testing code
a2eea67 [Wes McKinney] Fix usages of deprecated APIs, upgrade to clang-format-6.0
45779a9 [Wes McKinney] Update Arrow external project version to master
2c06321 [Antoine Pitrou] Fix API name
5707f60 [Antoine Pitrou] Lint
e90a902 [Antoine Pitrou] PARQUET-1308: [C++] Use Arrow thread pool, not Arrow ParallelFor
---
benchmarks/decode_benchmark.cc | 4 +-
cmake_modules/ArrowExternalProject.cmake | 2 +-
cmake_modules/FindClangTools.cmake | 4 +-
cmake_modules/SetupCxxFlags.cmake | 1 +
src/parquet/arrow/arrow-reader-writer-benchmark.cc | 8 +--
src/parquet/arrow/arrow-reader-writer-test.cc | 61 ++++++++++----------
src/parquet/arrow/reader.cc | 63 ++++++++++++++-------
src/parquet/arrow/reader.h | 5 ++
src/parquet/arrow/record_reader.cc | 40 ++++++++-----
src/parquet/arrow/test-util.h | 65 ++++++++++++++++++----
src/parquet/types.h | 6 +-
11 files changed, 174 insertions(+), 85 deletions(-)
diff --git a/benchmarks/decode_benchmark.cc b/benchmarks/decode_benchmark.cc
index 8f2dfa0..3ae32b4 100644
--- a/benchmarks/decode_benchmark.cc
+++ b/benchmarks/decode_benchmark.cc
@@ -42,8 +42,8 @@ class DeltaBitPackEncoder {
uint8_t* Encode(int* encoded_len) {
uint8_t* result = new uint8_t[10 * 1024 * 1024];
- int num_mini_blocks = static_cast<int>(arrow::BitUtil::Ceil(num_values() - 1,
- mini_block_size_));
+ int num_mini_blocks = static_cast<int>(arrow::BitUtil::CeilDiv(num_values() - 1,
+ mini_block_size_));
uint8_t* mini_block_widths = NULL;
arrow::BitWriter writer(result, 10 * 1024 * 1024);
diff --git a/cmake_modules/ArrowExternalProject.cmake b/cmake_modules/ArrowExternalProject.cmake
index 4f23661..3d1a276 100644
--- a/cmake_modules/ArrowExternalProject.cmake
+++ b/cmake_modules/ArrowExternalProject.cmake
@@ -46,7 +46,7 @@ if (MSVC AND PARQUET_USE_STATIC_CRT)
endif()
if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
- set(ARROW_VERSION "501d60e918bd4d10c429ab34e0b8e8a87dffb732")
+ set(ARROW_VERSION "3edfd7caf2746eeba37d5ac7bfd3665cc159e7ad")
else()
set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
endif()
diff --git a/cmake_modules/FindClangTools.cmake b/cmake_modules/FindClangTools.cmake
index 215a5cd..56e2dd7 100644
--- a/cmake_modules/FindClangTools.cmake
+++ b/cmake_modules/FindClangTools.cmake
@@ -96,7 +96,9 @@ if (CLANG_FORMAT_VERSION)
endif()
else()
find_program(CLANG_FORMAT_BIN
- NAMES clang-format-4.0
+ NAMES clang-format-6.0
+ clang-format-5.0
+ clang-format-4.0
clang-format-3.9
clang-format-3.8
clang-format-3.7
diff --git a/cmake_modules/SetupCxxFlags.cmake b/cmake_modules/SetupCxxFlags.cmake
index 01ed85b..5ca3f4e 100644
--- a/cmake_modules/SetupCxxFlags.cmake
+++ b/cmake_modules/SetupCxxFlags.cmake
@@ -84,6 +84,7 @@ if ("${UPPERCASE_BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
-Wno-shadow -Wno-switch-enum -Wno-exit-time-destructors \
-Wno-global-constructors -Wno-weak-template-vtables -Wno-undefined-reinterpret-cast \
-Wno-implicit-fallthrough -Wno-unreachable-code-return \
+-Wno-documentation-deprecated-sync \
-Wno-float-equal -Wno-missing-prototypes \
-Wno-old-style-cast -Wno-covered-switch-default \
-Wno-format-nonliteral -Wno-missing-noreturn \
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index 15d2cf7..51eb0c2 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -104,9 +104,9 @@ std::shared_ptr<::arrow::Table> TableFromVector(
std::vector<uint8_t> valid_bytes(BENCHMARK_SIZE, 0);
int n = {0};
std::generate(valid_bytes.begin(), valid_bytes.end(), [&n] { return n++ % 2; });
- EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
+ EXIT_NOT_OK(builder.AppendValues(vec.data(), vec.size(), valid_bytes.data()));
} else {
- EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
+ EXIT_NOT_OK(builder.AppendValues(vec.data(), vec.size(), nullptr));
}
std::shared_ptr<::arrow::Array> array;
EXIT_NOT_OK(builder.Finish(&array));
@@ -126,9 +126,9 @@ std::shared_ptr<::arrow::Table> TableFromVector<BooleanType>(const std::vector<b
int n = {0};
std::generate(valid_bytes.begin(), valid_bytes.end(),
[&n] { return (n++ % 2) != 0; });
- EXIT_NOT_OK(builder.Append(vec, valid_bytes));
+ EXIT_NOT_OK(builder.AppendValues(vec, valid_bytes));
} else {
- EXIT_NOT_OK(builder.Append(vec));
+ EXIT_NOT_OK(builder.AppendValues(vec));
}
std::shared_ptr<::arrow::Array> array;
EXIT_NOT_OK(builder.Finish(&array));
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index d4f5b00..be3e611 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -320,8 +320,7 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
template <typename T>
using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
-void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads,
- int64_t row_group_size,
+void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t row_group_size,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::shared_ptr<Buffer>* out) {
auto sink = std::make_shared<InMemoryOutputStream>();
@@ -399,21 +398,21 @@ void AssertTablesEqual(const Table& expected, const Table& actual,
}
}
-void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
+void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
int64_t row_group_size, const std::vector<int>& column_subset,
std::shared_ptr<Table>* out,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(
- WriteTableToBuffer(table, num_threads, row_group_size, arrow_properties, &buffer));
+ WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer));
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
- reader->set_num_threads(num_threads);
+ reader->set_use_threads(use_threads);
if (column_subset.size() > 0) {
ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
@@ -427,7 +426,8 @@ void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
std::shared_ptr<Table> result;
- DoSimpleRoundtrip(table, 1, row_group_size, {}, &result, arrow_properties);
+ DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result,
+ arrow_properties);
ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result, false));
}
@@ -1270,13 +1270,14 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
// Use deprecated INT96 type
std::shared_ptr<Table> result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
- table, 1, table->num_rows(), {}, &result,
+ table, false /* use_threads */, table->num_rows(), {}, &result,
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));
ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result));
// Cast nanaoseconds to microseconds and use INT64 physical type
- ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result));
+ ASSERT_NO_FATAL_FAILURE(
+ DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
std::shared_ptr<Table> expected;
MakeDateTimeTypesTable(&table, true);
@@ -1339,14 +1340,14 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
std::shared_ptr<Table> milli_result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
- input, 1, input->num_rows(), {}, &milli_result,
+ input, false /* use_threads */, input->num_rows(), {}, &milli_result,
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));
ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_milli_result, *milli_result));
std::shared_ptr<Table> micro_result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
- input, 1, input->num_rows(), {}, µ_result,
+ input, false /* use_threads */, input->num_rows(), {}, µ_result,
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build()));
ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_micro_result, *micro_result));
}
@@ -1472,7 +1473,8 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
auto ex_table = Table::Make(ex_schema, ex_columns);
std::shared_ptr<Table> result;
- ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result));
+ ASSERT_NO_FATAL_FAILURE(
+ DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*ex_table, *result));
}
@@ -1515,7 +1517,8 @@ TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) {
->build();
std::shared_ptr<Table> result;
- DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result, arrow_writer_properties);
+ DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result,
+ arrow_writer_properties);
ASSERT_EQ(table->num_columns(), result->num_columns());
ASSERT_EQ(table->num_rows(), result->num_rows());
@@ -1561,17 +1564,18 @@ void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
*out = Table::Make(schema, columns);
}
-void MakeListArray(int num_rows, std::shared_ptr<::DataType>* out_type,
+void MakeListArray(int num_rows, int max_value_length,
+ std::shared_ptr<::DataType>* out_type,
std::shared_ptr<Array>* out_array) {
std::vector<int32_t> length_draws;
- randint(num_rows, 0, 100, &length_draws);
+ randint(num_rows, 0, max_value_length, &length_draws);
std::vector<int32_t> offset_values;
// Make sure some of them are length 0
int32_t total_elements = 0;
for (size_t i = 0; i < length_draws.size(); ++i) {
- if (length_draws[i] < 10) {
+ if (length_draws[i] < max_value_length / 10) {
length_draws[i] = 0;
}
offset_values.push_back(total_elements);
@@ -1599,14 +1603,14 @@ void MakeListArray(int num_rows, std::shared_ptr<::DataType>* out_type,
TEST(TestArrowReadWrite, MultithreadedRead) {
const int num_columns = 20;
const int num_rows = 1000;
- const int num_threads = 4;
+ const bool use_threads = true;
std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
std::shared_ptr<Table> result;
ASSERT_NO_FATAL_FAILURE(
- DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result));
+ DoSimpleRoundtrip(table, use_threads, table->num_rows(), {}, &result));
ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result));
}
@@ -1619,7 +1623,7 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
std::shared_ptr<Buffer> buffer;
- ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2,
+ ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
default_arrow_writer_properties(), &buffer));
std::unique_ptr<FileReader> reader;
@@ -1648,7 +1652,7 @@ TEST(TestArrowReadWrite, GetRecordBatchReader) {
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
std::shared_ptr<Buffer> buffer;
- ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2,
+ ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
default_arrow_writer_properties(), &buffer));
std::unique_ptr<FileReader> reader;
@@ -1681,7 +1685,7 @@ TEST(TestArrowReadWrite, ScanContents) {
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
std::shared_ptr<Buffer> buffer;
- ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, 1, num_rows / 2,
+ ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
default_arrow_writer_properties(), &buffer));
std::unique_ptr<FileReader> reader;
@@ -1700,7 +1704,7 @@ TEST(TestArrowReadWrite, ScanContents) {
TEST(TestArrowReadWrite, ReadColumnSubset) {
const int num_columns = 20;
const int num_rows = 1000;
- const int num_threads = 4;
+ const bool use_threads = true;
std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
@@ -1708,7 +1712,7 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
std::shared_ptr<Table> result;
std::vector<int> column_subset = {0, 4, 8, 10};
ASSERT_NO_FATAL_FAILURE(
- DoSimpleRoundtrip(table, num_threads, table->num_rows(), column_subset, &result));
+ DoSimpleRoundtrip(table, use_threads, table->num_rows(), column_subset, &result));
std::vector<std::shared_ptr<::arrow::Column>> ex_columns;
std::vector<std::shared_ptr<::arrow::Field>> ex_fields;
@@ -1723,19 +1727,21 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
}
TEST(TestArrowReadWrite, ListLargeRecords) {
- const int num_rows = 50;
+ // PARQUET-1308: This test passed on Linux when num_rows was smaller
+ const int num_rows = 2000;
std::shared_ptr<Array> list_array;
std::shared_ptr<::DataType> list_type;
- MakeListArray(num_rows, &list_type, &list_array);
+ MakeListArray(num_rows, 20, &list_type, &list_array);
auto schema = ::arrow::schema({::arrow::field("a", list_type)});
+
std::shared_ptr<Table> table = Table::Make(schema, {list_array});
std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(
- WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(), &buffer));
+ WriteTableToBuffer(table, 100, default_arrow_writer_properties(), &buffer));
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
@@ -1747,6 +1753,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_NO_FATAL_FAILURE(AssertTablesEqual(*table, *result));
+ // Read chunked
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
@@ -1754,8 +1761,6 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
std::unique_ptr<ColumnReader> col_reader;
ASSERT_OK(reader->GetColumn(0, &col_reader));
- auto expected = table->column(0)->data()->chunk(0);
-
std::vector<std::shared_ptr<Array>> pieces;
for (int i = 0; i < num_rows; ++i) {
std::shared_ptr<Array> piece;
@@ -1809,7 +1814,7 @@ auto GenerateInt32 = [](int length, std::shared_ptr<::DataType>* type,
auto GenerateList = [](int length, std::shared_ptr<::DataType>* type,
std::shared_ptr<Array>* array) {
- MakeListArray(length, type, array);
+ MakeListArray(length, 100, type, array);
};
TEST(TestArrowReadWrite, TableWithChunkedColumns) {
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index c0974ca..d0b397f 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -31,7 +31,7 @@
#include "arrow/util/bit-util.h"
#include "arrow/util/decimal.h"
#include "arrow/util/logging.h"
-#include "arrow/util/parallel.h"
+#include "arrow/util/thread-pool.h"
#include "parquet/arrow/record_reader.h"
#include "parquet/arrow/schema.h"
@@ -56,7 +56,6 @@ using parquet::schema::Node;
// Help reduce verbosity
using ParquetReader = parquet::ParquetFileReader;
-using arrow::ParallelFor;
using arrow::RecordBatchReader;
using parquet::internal::RecordReader;
@@ -212,7 +211,7 @@ class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
class FileReader::Impl {
public:
Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
- : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
+ : pool_(pool), reader_(std::move(reader)), use_threads_(false) {}
virtual ~Impl() {}
@@ -244,15 +243,14 @@ class FileReader::Impl {
int num_columns() const { return reader_->metadata()->num_columns(); }
- void set_num_threads(int num_threads) { num_threads_ = num_threads; }
+ void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
ParquetFileReader* reader() { return reader_.get(); }
private:
MemoryPool* pool_;
std::unique_ptr<ParquetFileReader> reader_;
-
- int num_threads_;
+ bool use_threads_;
};
class ColumnReader::ColumnReaderImpl {
@@ -462,14 +460,13 @@ Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
Status FileReader::Impl::ReadRowGroup(int row_group_index,
const std::vector<int>& indices,
- std::shared_ptr<::arrow::Table>* out) {
+ std::shared_ptr<Table>* out) {
std::shared_ptr<::arrow::Schema> schema;
RETURN_NOT_OK(GetSchema(indices, &schema));
auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
int num_columns = static_cast<int>(indices.size());
- int nthreads = std::min<int>(num_threads_, num_columns);
std::vector<std::shared_ptr<Column>> columns(num_columns);
// TODO(wesm): Refactor to share more code with ReadTable
@@ -483,12 +480,24 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
return Status::OK();
};
- if (nthreads == 1) {
+ if (use_threads_) {
+ std::vector<std::future<Status>> futures;
+ auto pool = ::arrow::internal::GetCpuThreadPool();
for (int i = 0; i < num_columns; i++) {
- RETURN_NOT_OK(ReadColumnFunc(i));
+ futures.push_back(pool->Submit(ReadColumnFunc, i));
}
+ Status final_status = Status::OK();
+ for (auto& fut : futures) {
+ Status st = fut.get();
+ if (!st.ok()) {
+ final_status = std::move(st);
+ }
+ }
+ RETURN_NOT_OK(final_status);
} else {
- RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc));
+ for (int i = 0; i < num_columns; i++) {
+ RETURN_NOT_OK(ReadColumnFunc(i));
+ }
}
*out = Table::Make(schema, columns);
@@ -508,7 +517,9 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
return Status::Invalid("Invalid column index");
}
- std::vector<std::shared_ptr<Column>> columns(field_indices.size());
+ int num_fields = static_cast<int>(field_indices.size());
+ std::vector<std::shared_ptr<Column>> columns(num_fields);
+
auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
std::shared_ptr<Array> array;
RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
@@ -516,14 +527,24 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
return Status::OK();
};
- int num_fields = static_cast<int>(field_indices.size());
- int nthreads = std::min<int>(num_threads_, num_fields);
- if (nthreads == 1) {
+ if (use_threads_) {
+ std::vector<std::future<Status>> futures;
+ auto pool = ::arrow::internal::GetCpuThreadPool();
for (int i = 0; i < num_fields; i++) {
- RETURN_NOT_OK(ReadColumnFunc(i));
+ futures.push_back(pool->Submit(ReadColumnFunc, i));
}
+ Status final_status = Status::OK();
+ for (auto& fut : futures) {
+ Status st = fut.get();
+ if (!st.ok()) {
+ final_status = std::move(st);
+ }
+ }
+ RETURN_NOT_OK(final_status);
} else {
- RETURN_NOT_OK(ParallelFor(nthreads, num_fields, ReadColumnFunc));
+ for (int i = 0; i < num_fields; i++) {
+ RETURN_NOT_OK(ReadColumnFunc(i));
+ }
}
std::shared_ptr<Table> table = Table::Make(schema, columns);
@@ -669,7 +690,11 @@ std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
int FileReader::num_row_groups() const { return impl_->num_row_groups(); }
-void FileReader::set_num_threads(int num_threads) { impl_->set_num_threads(num_threads); }
+void FileReader::set_num_threads(int num_threads) {}
+
+void FileReader::set_use_threads(bool use_threads) {
+ impl_->set_use_threads(use_threads);
+}
Status FileReader::ScanContents(std::vector<int> columns, const int32_t column_batch_size,
int64_t* num_rows) {
@@ -1350,7 +1375,7 @@ Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out
const int16_t* def_levels_data;
size_t def_levels_length;
RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
- RETURN_NOT_OK(GetEmptyBitmap(pool_, def_levels_length, &null_bitmap));
+ RETURN_NOT_OK(AllocateEmptyBitmap(pool_, def_levels_length, &null_bitmap));
uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
for (size_t i = 0; i < def_levels_length; i++) {
if (def_levels_data[i] < struct_def_level_) {
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 4d68c61..1e37d89 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -196,8 +196,13 @@ class PARQUET_EXPORT FileReader {
/// Set the number of threads to use during reads of multiple columns. By
/// default only 1 thread is used
+ /// \deprecated Use set_use_threads instead.
void set_num_threads(int num_threads);
+ /// Set whether to use multiple threads during reads of multiple columns.
+ /// By default only one thread is used.
+ void set_use_threads(bool use_threads);
+
virtual ~FileReader();
private:
diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc
index b4d8766..781e1ba 100644
--- a/src/parquet/arrow/record_reader.cc
+++ b/src/parquet/arrow/record_reader.cc
@@ -59,7 +59,7 @@ class RecordReader::RecordReaderImpl {
num_decoded_values_(0),
max_def_level_(descr->max_definition_level()),
max_rep_level_(descr->max_repetition_level()),
- at_record_start_(false),
+ at_record_start_(true),
records_read_(0),
values_written_(0),
values_capacity_(0),
@@ -91,6 +91,7 @@ class RecordReader::RecordReaderImpl {
virtual void ResetDecoders() = 0;
void SetPageReader(std::unique_ptr<PageReader> reader) {
+ at_record_start_ = true;
pager_ = std::move(reader);
ResetDecoders();
}
@@ -152,17 +153,26 @@ class RecordReader::RecordReaderImpl {
// Count logical records and number of values to read
while (levels_position_ < levels_written_) {
if (*rep_levels++ == 0) {
- at_record_start_ = true;
- if (records_read == num_records) {
- // We've found the number of records we were looking for
- break;
- } else {
- // Continue
+ // If at_record_start_ is true, we are seeing the start of a record
+ // for the second time, such as after repeated calls to
+ // DelimitRecords. In this case we must continue until we find
+ // another record start or exhausting the ColumnChunk
+ if (!at_record_start_) {
+ // We've reached the end of a record; increment the record count.
++records_read;
+ if (records_read == num_records) {
+ // We've found the number of records we were looking for. Set
+ // at_record_start_ to true and break
+ at_record_start_ = true;
+ break;
+ }
}
- } else {
- at_record_start_ = false;
}
+
+ // We have decided to consume the level at this position; therefore we
+ // must advance until we find another record boundary
+ at_record_start_ = false;
+
if (*def_levels++ == max_def_level_) {
++values_to_read;
}
@@ -435,11 +445,6 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
records_read += ReadRecordData(num_records);
}
- // HasNext invokes ReadNewPage
- if (records_read == 0 && !HasNext()) {
- return 0;
- }
-
int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
// If we are in the middle of a record, we continue until reaching the
@@ -448,6 +453,13 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
while (!at_record_start_ || records_read < num_records) {
// Is there more data to read in this row group?
if (!HasNext()) {
+ if (!at_record_start_) {
+ // We ended the row group while inside a record that we haven't seen
+ // the end of yet. So increment the record count for the last record in
+ // the row group
+ ++records_read;
+ at_record_start_ = true;
+ }
break;
}
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 2babacb..f0f9139 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -24,7 +24,12 @@
#include "arrow/type_traits.h"
#include "arrow/util/decimal.h"
+#include "parquet/arrow/record_reader.h"
+
namespace parquet {
+
+using internal::RecordReader;
+
namespace arrow {
using ::arrow::Array;
@@ -69,7 +74,7 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullA
::arrow::test::random_real(size, 0, static_cast<c_type>(0), static_cast<c_type>(1),
&values);
::arrow::NumericBuilder<ArrowType> builder;
- RETURN_NOT_OK(builder.Append(values.data(), values.size()));
+ RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
return builder.Finish(out);
}
@@ -83,7 +88,7 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
// Passing data type so this will work with TimestampType too
::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
::arrow::default_memory_pool());
- RETURN_NOT_OK(builder.Append(values.data(), values.size()));
+ RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
return builder.Finish(out);
}
@@ -99,7 +104,7 @@ typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NonNullAr
// Passing data type so this will work with TimestampType too
::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
::arrow::default_memory_pool());
- builder.Append(values.data(), values.size());
+ builder.AppendValues(values.data(), values.size());
return builder.Finish(out);
}
@@ -167,7 +172,7 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
&out_buf));
random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
- RETURN_NOT_OK(builder.Append(out_buf->data(), size));
+ RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size));
return builder.Finish(out);
}
@@ -177,7 +182,7 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullAr
std::vector<uint8_t> values;
::arrow::test::randint(size, 0, 1, &values);
::arrow::BooleanBuilder builder;
- RETURN_NOT_OK(builder.Append(values.data(), values.size()));
+ RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
return builder.Finish(out);
}
@@ -196,7 +201,7 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable
}
::arrow::NumericBuilder<ArrowType> builder;
- RETURN_NOT_OK(builder.Append(values.data(), values.size(), valid_bytes.data()));
+ RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
return builder.Finish(out);
}
@@ -219,7 +224,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Arra
// Passing data type so this will work with TimestampType too
::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
::arrow::default_memory_pool());
- RETURN_NOT_OK(builder.Append(values.data(), values.size(), valid_bytes.data()));
+ RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
return builder.Finish(out);
}
@@ -243,7 +248,7 @@ typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NullableA
// Passing data type so this will work with TimestampType too
::arrow::NumericBuilder<ArrowType> builder(std::make_shared<ArrowType>(),
::arrow::default_memory_pool());
- builder.Append(values.data(), values.size(), valid_bytes.data());
+ builder.AppendValues(values.data(), values.size(), valid_bytes.data());
return builder.Finish(out);
}
@@ -328,7 +333,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,
random_decimals(size, seed, precision, out_buf->mutable_data());
::arrow::Decimal128Builder builder(type);
- RETURN_NOT_OK(builder.Append(out_buf->data(), size, valid_bytes.data()));
+ RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data()));
return builder.Finish(out);
}
@@ -349,7 +354,7 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableA
}
::arrow::BooleanBuilder builder;
- RETURN_NOT_OK(builder.Append(values.data(), values.size(), valid_bytes.data()));
+ RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), valid_bytes.data()));
return builder.Finish(out);
}
@@ -463,13 +468,51 @@ void ExpectArrayT(void* expected, Array* result) {
template <>
void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
::arrow::BooleanBuilder builder;
- EXPECT_OK(builder.Append(reinterpret_cast<uint8_t*>(expected), result->length()));
+ EXPECT_OK(builder.AppendValues(reinterpret_cast<uint8_t*>(expected), result->length()));
std::shared_ptr<Array> expected_array;
EXPECT_OK(builder.Finish(&expected_array));
EXPECT_TRUE(result->Equals(*expected_array));
}
+template <typename ParquetType>
+void PrintBufferedLevels(const RecordReader& reader) {
+ using T = typename ::parquet::type_traits<ParquetType::type_num>::value_type;
+
+ const int16_t* def_levels = reader.def_levels();
+ const int16_t* rep_levels = reader.rep_levels();
+ const int64_t total_levels_read = reader.levels_position();
+
+ const T* values = reinterpret_cast<const T*>(reader.values());
+
+ std::cout << "def levels: ";
+ for (int64_t i = 0; i < total_levels_read; ++i) {
+ std::cout << def_levels[i] << " ";
+ }
+ std::cout << std::endl;
+
+ std::cout << "rep levels: ";
+ for (int64_t i = 0; i < total_levels_read; ++i) {
+ std::cout << rep_levels[i] << " ";
+ }
+ std::cout << std::endl;
+
+ std::cout << "values: ";
+ for (int64_t i = 0; i < reader.values_written(); ++i) {
+ std::cout << values[i] << " ";
+ }
+ std::cout << std::endl;
+}
+
+template <>
+void PrintBufferedLevels<ByteArrayType>(const RecordReader& reader) {}
+
+template <>
+void PrintBufferedLevels<FLBAType>(const RecordReader& reader) {}
+
+template <>
+void PrintBufferedLevels<Int96Type>(const RecordReader& reader) {}
+
} // namespace arrow
} // namespace parquet
diff --git a/src/parquet/types.h b/src/parquet/types.h
index aec9965..10789cb 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -114,13 +114,9 @@ struct Compression {
};
struct Encryption {
- enum type {
- AES_GCM_V1 = 0,
- AES_GCM_CTR_V1 = 1
- };
+ enum type { AES_GCM_V1 = 0, AES_GCM_CTR_V1 = 1 };
};
-
// parquet::PageType
struct PageType {
enum type { DATA_PAGE, INDEX_PAGE, DICTIONARY_PAGE, DATA_PAGE_V2 };