You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/04/06 07:26:37 UTC
parquet-cpp git commit: PARQUET-946: Add ReadRowGroup and
num_row_group methods to arrow::FileReader
Repository: parquet-cpp
Updated Branches:
refs/heads/master f8573ebed -> d0646659c
PARQUET-946: Add ReadRowGroup and num_row_group methods to arrow::FileReader
There's a lot of room for improvement / further refactoring here -- the assumption that an entire column in a file is being read runs very deep in the Arrow reader, so I tried to do the minimum work to decouple the row group iteration. There's some code duplication in ReadRowGroup, but we should maybe save further cleanup for a future patch.
Author: Wes McKinney <we...@twosigma.com>
Closes #291 from wesm/PARQUET-946 and squashes the following commits:
6d2b48a [Wes McKinney] Add virtual dtor
c7589f7 [Wes McKinney] Add ReadRowGroup and num_row_group methods to arrow::FileReader
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/d0646659
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/d0646659
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/d0646659
Branch: refs/heads/master
Commit: d0646659c64585dccd9a8f75a9509c1ae8cfa1fb
Parents: f8573eb
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Apr 6 09:26:31 2017 +0200
Committer: Uwe L. Korn <uw...@apache.org>
Committed: Thu Apr 6 09:26:31 2017 +0200
----------------------------------------------------------------------
cmake_modules/FindClangTools.cmake | 19 +-
src/parquet/arrow/arrow-reader-writer-test.cc | 146 ++++++----
src/parquet/arrow/reader.cc | 305 +++++++++++++++------
src/parquet/arrow/reader.h | 7 +
src/parquet/arrow/test-util.h | 8 +-
src/parquet/column/writer.cc | 2 +-
6 files changed, 343 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/cmake_modules/FindClangTools.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindClangTools.cmake b/cmake_modules/FindClangTools.cmake
index c07c7d2..e4ee984 100644
--- a/cmake_modules/FindClangTools.cmake
+++ b/cmake_modules/FindClangTools.cmake
@@ -27,16 +27,16 @@
# This module defines
# CLANG_TIDY_BIN, The path to the clang tidy binary
# CLANG_TIDY_FOUND, Whether clang tidy was found
-# CLANG_FORMAT_BIN, The path to the clang format binary
+# CLANG_FORMAT_BIN, The path to the clang format binary
# CLANG_TIDY_FOUND, Whether clang format was found
-find_program(CLANG_TIDY_BIN
- NAMES clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6 clang-tidy
- PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
+find_program(CLANG_TIDY_BIN
+ NAMES clang-tidy-3.9 clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6 clang-tidy
+ PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
NO_DEFAULT_PATH
)
-if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" )
+if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" )
set(CLANG_TIDY_FOUND 0)
message("clang-tidy not found")
else()
@@ -44,17 +44,16 @@ else()
message("clang-tidy found at ${CLANG_TIDY_BIN}")
endif()
-find_program(CLANG_FORMAT_BIN
- NAMES clang-format-3.8 clang-format-3.7 clang-format-3.6 clang-format
- PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
+find_program(CLANG_FORMAT_BIN
+ NAMES clang-format-3.9 clang-format-3.8 clang-format-3.7 clang-format-3.6 clang-format
+ PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
NO_DEFAULT_PATH
)
-if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" )
+if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" )
set(CLANG_FORMAT_FOUND 0)
message("clang-format not found")
else()
set(CLANG_FORMAT_FOUND 1)
message("clang-format found at ${CLANG_FORMAT_BIN}")
endif()
-
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 3b232f9..dd46893 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -197,6 +197,36 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
template <typename T>
using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
+void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads,
+ int64_t row_group_size, std::shared_ptr<Buffer>* out) {
+ auto sink = std::make_shared<InMemoryOutputStream>();
+
+ ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
+ row_group_size, default_writer_properties()));
+ *out = sink->GetBuffer();
+}
+
+void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
+ int64_t row_group_size, const std::vector<int>& column_subset,
+ std::shared_ptr<Table>* out) {
+ std::shared_ptr<Buffer> buffer;
+ WriteTableToBuffer(table, num_threads, row_group_size, &buffer);
+
+ std::unique_ptr<FileReader> reader;
+ ASSERT_OK_NO_THROW(
+ OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+ ::parquet::default_reader_properties(), nullptr, &reader));
+
+ reader->set_num_threads(num_threads);
+
+ if (column_subset.size() > 0) {
+ ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
+ } else {
+ // Read everything
+ ASSERT_OK_NO_THROW(reader->ReadTable(out));
+ }
+}
+
template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
@@ -248,19 +278,6 @@ class TestParquetIO : public ::testing::Test {
ASSERT_NE(nullptr, out->get());
}
- void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
- std::shared_ptr<::arrow::Table> out;
- std::unique_ptr<FileReader> reader;
- ReaderFromSink(&reader);
- ReadTableFromFile(std::move(reader), &out);
- ASSERT_EQ(1, out->num_columns());
- ASSERT_EQ(values->length(), out->num_rows());
-
- std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
- ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
- }
-
void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements,
int64_t null_count, std::shared_ptr<Table>* out) {
std::shared_ptr<Array> values;
@@ -289,13 +306,23 @@ class TestParquetIO : public ::testing::Test {
*out = MakeSimpleTable(parent_lists, nullable_parent_lists);
}
- void WriteReadAndCheckSingleColumnTable(const std::shared_ptr<Table>& table) {
- std::shared_ptr<Array> values = table->column(0)->data()->chunk(0);
- this->sink_ = std::make_shared<InMemoryOutputStream>();
- ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
- values->length(), default_writer_properties()));
+ void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
+ std::shared_ptr<::arrow::Table> out;
+ std::unique_ptr<FileReader> reader;
+ ReaderFromSink(&reader);
+ ReadTableFromFile(std::move(reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(values->length(), out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+ }
- this->ReadAndCheckSingleColumnTable(values);
+ void CheckRoundTrip(const std::shared_ptr<Table>& table) {
+ std::shared_ptr<Table> result;
+ DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
+ ASSERT_TRUE(table->Equals(*result));
}
template <typename ArrayType>
@@ -401,37 +428,37 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, true);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, true, true, 10, &table);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, false, true, 10, &table);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, true, false, 10, &table);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, false, false, 0, &table);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table);
- this->WriteReadAndCheckSingleColumnTable(table);
+ this->CheckRoundTrip(table);
}
TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
@@ -756,18 +783,24 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
this->CheckSingleColumnRequiredTableRead(4);
}
-void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out) {
+void MakeDoubleTable(
+ int num_columns, int num_rows, int nchunks, std::shared_ptr<Table>* out) {
std::shared_ptr<::arrow::Column> column;
std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns);
std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns);
- std::shared_ptr<Array> values;
for (int i = 0; i < num_columns; ++i) {
+ std::vector<std::shared_ptr<Array>> arrays;
+ std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<::arrow::DoubleType>(
num_rows, num_rows / 10, static_cast<uint32_t>(i), &values));
std::stringstream ss;
ss << "col" << i;
- column = MakeColumn(ss.str(), values, true);
+
+ for (int j = 0; j < nchunks; ++j) {
+ arrays.push_back(values);
+ }
+ column = MakeColumn(ss.str(), arrays, true);
columns[i] = column;
fields[i] = column->field();
@@ -776,41 +809,46 @@ void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out)
*out = std::make_shared<Table>(schema, columns);
}
-void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
- const std::vector<int>& column_subset, std::shared_ptr<Table>* out) {
- auto sink = std::make_shared<InMemoryOutputStream>();
-
- ASSERT_OK_NO_THROW(WriteTable(
- *table, ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));
+TEST(TestArrowReadWrite, MultithreadedRead) {
+ const int num_columns = 20;
+ const int num_rows = 1000;
+ const int num_threads = 4;
- std::shared_ptr<Buffer> buffer = sink->GetBuffer();
- std::unique_ptr<FileReader> reader;
- ASSERT_OK_NO_THROW(
- OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
- ::parquet::default_reader_properties(), nullptr, &reader));
+ std::shared_ptr<Table> table;
+ MakeDoubleTable(num_columns, num_rows, 1, &table);
- reader->set_num_threads(num_threads);
+ std::shared_ptr<Table> result;
+ DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result);
- if (column_subset.size() > 0) {
- ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
- } else {
- // Read everything
- ASSERT_OK_NO_THROW(reader->ReadTable(out));
- }
+ ASSERT_TRUE(table->Equals(*result));
}
-TEST(TestArrowReadWrite, MultithreadedRead) {
+TEST(TestArrowReadWrite, ReadSingleRowGroup) {
const int num_columns = 20;
const int num_rows = 1000;
- const int num_threads = 4;
std::shared_ptr<Table> table;
- MakeDoubleTable(num_columns, num_rows, &table);
+ MakeDoubleTable(num_columns, num_rows, 1, &table);
- std::shared_ptr<Table> result;
- DoTableRoundtrip(table, num_threads, {}, &result);
+ std::shared_ptr<Buffer> buffer;
+ WriteTableToBuffer(table, 1, num_rows / 2, &buffer);
- ASSERT_TRUE(table->Equals(*result));
+ std::unique_ptr<FileReader> reader;
+ ASSERT_OK_NO_THROW(
+ OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+ ::parquet::default_reader_properties(), nullptr, &reader));
+
+ ASSERT_EQ(2, reader->num_row_groups());
+
+ std::shared_ptr<Table> r1, r2;
+ // Read everything
+ ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
+ ASSERT_OK_NO_THROW(reader->ReadRowGroup(1, &r2));
+
+ std::shared_ptr<Table> concatenated;
+ ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
+
+ ASSERT_TRUE(table->Equals(*concatenated));
}
TEST(TestArrowReadWrite, ReadColumnSubset) {
@@ -819,11 +857,11 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
const int num_threads = 4;
std::shared_ptr<Table> table;
- MakeDoubleTable(num_columns, num_rows, &table);
+ MakeDoubleTable(num_columns, num_rows, 1, &table);
std::shared_ptr<Table> result;
std::vector<int> column_subset = {0, 4, 8, 10};
- DoTableRoundtrip(table, num_threads, column_subset, &result);
+ DoSimpleRoundtrip(table, num_threads, table->num_rows(), column_subset, &result);
std::vector<std::shared_ptr<::arrow::Column>> ex_columns;
std::vector<std::shared_ptr<::arrow::Field>> ex_fields;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index a26c3ea..823aea9 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -60,19 +60,139 @@ static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timest
template <typename ArrowType>
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+// ----------------------------------------------------------------------
+// Helper for parallel for-loop
+
+template <class FUNCTION>
+Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
+ std::vector<std::thread> thread_pool;
+ thread_pool.reserve(nthreads);
+ std::atomic<int> task_counter(0);
+
+ std::mutex error_mtx;
+ bool error_occurred = false;
+ Status error;
+
+ for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
+ thread_pool.emplace_back(
+ [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() {
+ int task_id;
+ while (!error_occurred) {
+ task_id = task_counter.fetch_add(1);
+ if (task_id >= num_tasks) { break; }
+ Status s = func(task_id);
+ if (!s.ok()) {
+ std::lock_guard<std::mutex> lock(error_mtx);
+ error_occurred = true;
+ error = s;
+ break;
+ }
+ }
+ });
+ }
+ for (auto&& thread : thread_pool) {
+ thread.join();
+ }
+ if (error_occurred) { return error; }
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Iteration utilities
+
+// Abstraction to decouple row group iteration details from the ColumnReader,
+// so we can read only a single row group if we want
+class FileColumnIterator {
+ public:
+ explicit FileColumnIterator(int column_index, ParquetFileReader* reader)
+ : column_index_(column_index),
+ reader_(reader),
+ schema_(reader->metadata()->schema()) {}
+
+ virtual ~FileColumnIterator() {}
+
+ virtual std::shared_ptr<::parquet::ColumnReader> Next() = 0;
+
+ const SchemaDescriptor* schema() const { return schema_; }
+
+ const ColumnDescriptor* descr() const { return schema_->Column(column_index_); }
+
+ int column_index() const { return column_index_; }
+
+ protected:
+ int column_index_;
+ ParquetFileReader* reader_;
+ const SchemaDescriptor* schema_;
+};
+
+class AllRowGroupsIterator : public FileColumnIterator {
+ public:
+ explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader)
+ : FileColumnIterator(column_index, reader), next_row_group_(0) {}
+
+ std::shared_ptr<::parquet::ColumnReader> Next() override {
+ std::shared_ptr<::parquet::ColumnReader> result;
+ if (next_row_group_ < reader_->metadata()->num_row_groups()) {
+ result = reader_->RowGroup(next_row_group_)->Column(column_index_);
+ next_row_group_++;
+ } else {
+ result = nullptr;
+ }
+ return result;
+ };
+
+ private:
+ int next_row_group_;
+};
+
+class SingleRowGroupIterator : public FileColumnIterator {
+ public:
+ explicit SingleRowGroupIterator(
+ int column_index, int row_group_number, ParquetFileReader* reader)
+ : FileColumnIterator(column_index, reader),
+ row_group_number_(row_group_number),
+ done_(false) {}
+
+ std::shared_ptr<::parquet::ColumnReader> Next() override {
+ if (done_) { return nullptr; }
+
+ auto result = reader_->RowGroup(row_group_number_)->Column(column_index_);
+ done_ = true;
+ return result;
+ };
+
+ private:
+ int row_group_number_;
+ bool done_;
+};
+
+// ----------------------------------------------------------------------
+// File reader implementation
+
class FileReader::Impl {
public:
- Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader);
+ Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+ : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
+
virtual ~Impl() {}
- bool CheckForFlatColumn(const ColumnDescriptor* descr);
- bool CheckForFlatListColumn(const ColumnDescriptor* descr);
Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
Status ReadColumn(int i, std::shared_ptr<Array>* out);
- Status ReadTable(std::shared_ptr<Table>* out);
- Status ReadTable(const std::vector<int>& column_indices, std::shared_ptr<Table>* out);
+ Status GetSchema(
+ const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out);
+ Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
+ std::shared_ptr<::arrow::Table>* out);
+ Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table);
+ Status ReadTable(std::shared_ptr<Table>* table);
+ Status ReadRowGroup(int i, std::shared_ptr<Table>* table);
+
+ bool CheckForFlatColumn(const ColumnDescriptor* descr);
+ bool CheckForFlatListColumn(const ColumnDescriptor* descr);
+
const ParquetFileReader* parquet_reader() const { return reader_.get(); }
+ int num_row_groups() const { return reader_->metadata()->num_row_groups(); }
+
void set_num_threads(int num_threads) { num_threads_ = num_threads; }
private:
@@ -84,8 +204,17 @@ class FileReader::Impl {
class ColumnReader::Impl {
public:
- Impl(MemoryPool* pool, const ColumnDescriptor* descr, ParquetFileReader* reader,
- int column_index);
+ Impl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
+ : pool_(pool),
+ input_(std::move(input)),
+ descr_(input_->descr()),
+ values_buffer_(pool),
+ def_levels_buffer_(pool),
+ rep_levels_buffer_(pool) {
+ NodeToField(input_->descr()->schema_node(), &field_);
+ NextRowGroup();
+ }
+
virtual ~Impl() {}
Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
@@ -121,10 +250,9 @@ class ColumnReader::Impl {
};
MemoryPool* pool_;
+ std::unique_ptr<FileColumnIterator> input_;
const ColumnDescriptor* descr_;
- ParquetFileReader* reader_;
- int column_index_;
- int next_row_group_;
+
std::shared_ptr<::parquet::ColumnReader> column_reader_;
std::shared_ptr<Field> field_;
@@ -139,14 +267,16 @@ class ColumnReader::Impl {
int64_t null_count_;
};
-FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
- : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
+FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+ : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+
+FileReader::~FileReader() {}
Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
- const SchemaDescriptor* schema = reader_->metadata()->schema();
+ std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
std::unique_ptr<ColumnReader::Impl> impl(
- new ColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i));
+ new ColumnReader::Impl(pool_, std::move(input)));
*out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
return Status::OK();
}
@@ -163,55 +293,59 @@ Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
return flat_column_reader->NextBatch(batch_size, out);
}
-Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
- std::vector<int> column_indices(reader_->metadata()->num_columns());
-
- for (size_t i = 0; i < column_indices.size(); ++i) {
- column_indices[i] = i;
- }
- return ReadTable(column_indices, table);
+Status FileReader::Impl::GetSchema(
+ const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out) {
+ auto descr = reader_->metadata()->schema();
+ return FromParquetSchema(descr, indices, out);
}
-template <class FUNCTION>
-Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
- std::vector<std::thread> thread_pool;
- thread_pool.reserve(nthreads);
- std::atomic<int> task_counter(0);
+Status FileReader::Impl::ReadRowGroup(int row_group_index,
+ const std::vector<int>& indices, std::shared_ptr<::arrow::Table>* out) {
+ std::shared_ptr<::arrow::Schema> schema;
+ RETURN_NOT_OK(GetSchema(indices, &schema));
- std::mutex error_mtx;
- bool error_occurred = false;
- Status error;
+ auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
- for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
- thread_pool.emplace_back(
- [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() {
- int task_id;
- while (!error_occurred) {
- task_id = task_counter.fetch_add(1);
- if (task_id >= num_tasks) { break; }
- Status s = func(task_id);
- if (!s.ok()) {
- std::lock_guard<std::mutex> lock(error_mtx);
- error_occurred = true;
- error = s;
- break;
- }
- }
- });
- }
- for (auto&& thread : thread_pool) {
- thread.join();
+ int num_columns = static_cast<int>(indices.size());
+ int nthreads = std::min<int>(num_threads_, num_columns);
+ std::vector<std::shared_ptr<Column>> columns(num_columns);
+
+ // TODO(wesm): Refactor to share more code with ReadTable
+
+ auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
+ this](int i) {
+ int column_index = indices[i];
+ int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values();
+
+ std::unique_ptr<FileColumnIterator> input(
+ new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
+
+ std::unique_ptr<ColumnReader::Impl> impl(
+ new ColumnReader::Impl(pool_, std::move(input)));
+ ColumnReader flat_column_reader(std::move(impl));
+
+ std::shared_ptr<Array> array;
+ RETURN_NOT_OK(flat_column_reader.NextBatch(batch_size, &array));
+ columns[i] = std::make_shared<Column>(schema->field(i), array);
+ return Status::OK();
+ };
+
+ if (nthreads == 1) {
+ for (int i = 0; i < num_columns; i++) {
+ RETURN_NOT_OK(ReadColumnFunc(i));
+ }
+ } else {
+ RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc));
}
- if (error_occurred) { return error; }
+
+ *out = std::make_shared<Table>(schema, columns);
return Status::OK();
}
Status FileReader::Impl::ReadTable(
const std::vector<int>& indices, std::shared_ptr<Table>* table) {
- auto descr = reader_->metadata()->schema();
-
std::shared_ptr<::arrow::Schema> schema;
- RETURN_NOT_OK(FromParquetSchema(descr, indices, &schema));
+ RETURN_NOT_OK(GetSchema(indices, &schema));
int num_columns = static_cast<int>(indices.size());
int nthreads = std::min<int>(num_threads_, num_columns);
@@ -236,10 +370,23 @@ Status FileReader::Impl::ReadTable(
return Status::OK();
}
-FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
- : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
+ std::vector<int> indices(reader_->metadata()->num_columns());
-FileReader::~FileReader() {}
+ for (size_t i = 0; i < indices.size(); ++i) {
+ indices[i] = i;
+ }
+ return ReadTable(indices, table);
+}
+
+Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) {
+ std::vector<int> indices(reader_->metadata()->num_columns());
+
+ for (size_t i = 0; i < indices.size(); ++i) {
+ indices[i] = i;
+ }
+ return ReadRowGroup(i, indices, table);
+}
// Static ctor
Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
@@ -280,14 +427,35 @@ Status FileReader::ReadTable(std::shared_ptr<Table>* out) {
}
Status FileReader::ReadTable(
- const std::vector<int>& column_indices, std::shared_ptr<Table>* out) {
+ const std::vector<int>& indices, std::shared_ptr<Table>* out) {
+ try {
+ return impl_->ReadTable(indices, out);
+ } catch (const ::parquet::ParquetException& e) {
+ return ::arrow::Status::IOError(e.what());
+ }
+}
+
+Status FileReader::ReadRowGroup(int i, std::shared_ptr<Table>* out) {
+ try {
+ return impl_->ReadRowGroup(i, out);
+ } catch (const ::parquet::ParquetException& e) {
+ return ::arrow::Status::IOError(e.what());
+ }
+}
+
+Status FileReader::ReadRowGroup(
+ int i, const std::vector<int>& indices, std::shared_ptr<Table>* out) {
try {
- return impl_->ReadTable(column_indices, out);
+ return impl_->ReadRowGroup(i, indices, out);
} catch (const ::parquet::ParquetException& e) {
return ::arrow::Status::IOError(e.what());
}
}
+int FileReader::num_row_groups() const {
+ return impl_->num_row_groups();
+}
+
void FileReader::set_num_threads(int num_threads) {
impl_->set_num_threads(num_threads);
}
@@ -296,20 +464,6 @@ const ParquetFileReader* FileReader::parquet_reader() const {
return impl_->parquet_reader();
}
-ColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
- ParquetFileReader* reader, int column_index)
- : pool_(pool),
- descr_(descr),
- reader_(reader),
- column_index_(column_index),
- next_row_group_(0),
- values_buffer_(pool),
- def_levels_buffer_(pool),
- rep_levels_buffer_(pool) {
- NodeToField(descr_->schema_node(), &field_);
- NextRowGroup();
-}
-
template <typename ArrowType, typename ParquetType>
Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
int64_t values_to_read, int64_t* levels_read) {
@@ -563,7 +717,7 @@ Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels,
if (descr_->max_repetition_level() > 0) {
std::shared_ptr<::arrow::Schema> arrow_schema;
RETURN_NOT_OK(
- FromParquetSchema(reader_->metadata()->schema(), {column_index_}, &arrow_schema));
+ FromParquetSchema(input_->schema(), {input_->column_index()}, &arrow_schema));
// Walk downwards to extract nullability
std::shared_ptr<Field> current_field = arrow_schema->field(0);
@@ -912,12 +1066,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
}
void ColumnReader::Impl::NextRowGroup() {
- if (next_row_group_ < reader_->metadata()->num_row_groups()) {
- column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_);
- next_row_group_++;
- } else {
- column_reader_ = nullptr;
- }
+ column_reader_ = input_->Next();
}
ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 1aa9c3e..f12acaf 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -107,6 +107,13 @@ class PARQUET_EXPORT FileReader {
::arrow::Status ReadTable(
const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out);
+ ::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out);
+
+ ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out);
+
+ int num_row_groups() const;
+
const ParquetFileReader* parquet_reader() const;
/// Set the number of threads to use during reads of multiple columns. By
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 2cfc60a..bff952b 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -260,12 +260,18 @@ Status MakeListArary(const std::shared_ptr<Array>& values, int64_t size,
return Status::OK();
}
-std::shared_ptr<::arrow::Column> MakeColumn(
+static std::shared_ptr<::arrow::Column> MakeColumn(
const std::string& name, const std::shared_ptr<Array>& array, bool nullable) {
auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable);
return std::make_shared<::arrow::Column>(field, array);
}
+static std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name,
+ const std::vector<std::shared_ptr<Array>>& arrays, bool nullable) {
+ auto field = std::make_shared<::arrow::Field>(name, arrays[0]->type(), nullable);
+ return std::make_shared<::arrow::Column>(field, arrays);
+}
+
std::shared_ptr<::arrow::Table> MakeSimpleTable(
const std::shared_ptr<Array>& values, bool nullable) {
std::shared_ptr<::arrow::Column> column = MakeColumn("col", values, nullable);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 2ba4162..eb74147 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -213,7 +213,7 @@ TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
const WriterProperties* properties)
: ColumnWriter(metadata, std::move(pager), expected_rows,
(encoding == Encoding::PLAIN_DICTIONARY ||
- encoding == Encoding::RLE_DICTIONARY),
+ encoding == Encoding::RLE_DICTIONARY),
encoding, properties) {
switch (encoding) {
case Encoding::PLAIN: