You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/06/22 16:14:41 UTC
parquet-cpp git commit: PARQUET-911: [C++] Support nested structs in
parquet_arrow
Repository: parquet-cpp
Updated Branches:
refs/heads/master 99759a38b -> 29ed01ea7
PARQUET-911: [C++] Support nested structs in parquet_arrow
Support for simple StructArray reads.
Not supported in conjunction with lists yet.
Author: Itai Incze <it...@gmail.com>
Closes #312 from itaiin/PARQUET-911 and squashes the following commits:
beb16f5 [Itai Incze] document ReadSchemaField API and fix Appveyor errors
9fe3038 [Itai Incze] review changes + BooleanArray bugfix
420bb76 [Itai Incze] fix struct field type bug + minor changes
c23f3fb [Itai Incze] Refactor per code reviews
11a12c3 [Itai Incze] fix msvc compiler errors
283f08d [Itai Incze] fix schema field ordering on partial read
aeb1384 [Itai Incze] fix per code review
8bd47e8 [Itai Incze] Fix column index bug and extend tests
01d69ee [Itai Incze] Fix osx compiler c-array init error
14cbef8 [Itai Incze] support for simple StructArray reads. Not supported in conjunction with lists yet.
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/29ed01ea
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/29ed01ea
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/29ed01ea
Branch: refs/heads/master
Commit: 29ed01ea70fb5eba22b6df547c33fca047e08031
Parents: 99759a3
Author: Itai Incze <it...@gmail.com>
Authored: Thu Jun 22 18:14:35 2017 +0200
Committer: Uwe L. Korn <uw...@apache.org>
Committed: Thu Jun 22 18:14:35 2017 +0200
----------------------------------------------------------------------
src/parquet/arrow/arrow-reader-writer-test.cc | 327 +++++++++++++++++--
src/parquet/arrow/reader.cc | 350 ++++++++++++++++++---
src/parquet/arrow/reader.h | 40 ++-
src/parquet/arrow/schema.cc | 9 +-
src/parquet/arrow/test-util.h | 2 +-
src/parquet/column/column-reader-test.cc | 13 +-
src/parquet/column/reader.h | 52 ++-
src/parquet/schema-test.cc | 26 ++
src/parquet/schema.cc | 80 ++++-
src/parquet/schema.h | 22 ++
src/parquet/util/schema-util.h | 84 +++++
11 files changed, 904 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index b9c77f1..16dddb0 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -50,6 +50,7 @@ using arrow::PrimitiveArray;
using arrow::Status;
using arrow::Table;
using arrow::TimeUnit;
+using arrow::ArrayVisitor;
using ArrowId = ::arrow::Type;
using ParquetType = parquet::Type;
@@ -387,7 +388,7 @@ class TestParquetIO : public ::testing::Test {
// Also test that slice offsets are respected
values = values->Slice(5, values->length() - 5);
std::shared_ptr<ListArray> lists;
- ASSERT_OK(MakeListArary(
+ ASSERT_OK(MakeListArray(
values, size, nullable_lists ? null_count : 0, nullable_elements, &lists));
*out = MakeSimpleTable(lists->Slice(3, size - 6), nullable_lists);
}
@@ -399,10 +400,10 @@ class TestParquetIO : public ::testing::Test {
ASSERT_OK(NullableArray<TestType>(
size * 6, nullable_elements ? null_count : 0, kDefaultSeed, &values));
std::shared_ptr<ListArray> lists;
- ASSERT_OK(MakeListArary(
+ ASSERT_OK(MakeListArray(
values, size * 3, nullable_lists ? null_count : 0, nullable_elements, &lists));
std::shared_ptr<ListArray> parent_lists;
- ASSERT_OK(MakeListArary(lists, size, nullable_parent_lists ? null_count : 0,
+ ASSERT_OK(MakeListArray(lists, size, nullable_parent_lists ? null_count : 0,
nullable_lists, &parent_lists));
*out = MakeSimpleTable(parent_lists, nullable_parent_lists);
}
@@ -1080,22 +1081,17 @@ TEST(TestArrowWrite, CheckChunkSize) {
Invalid, WriteTable(*table, ::arrow::default_memory_pool(), sink, chunk_size));
}
-class TestNestedSchemaRead : public ::testing::Test {
+class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
protected:
- virtual void SetUp() {
- // We are using parquet low-level file api to create the nested parquet
- CreateNestedParquet();
- InitReader(&reader_);
- }
+ // make it *3 to make it easily divisible by 3
+ const int NUM_SIMPLE_TEST_ROWS = SMALL_SIZE * 3;
+ std::shared_ptr<::arrow::Int32Array> values_array_ = nullptr;
- void InitReader(std::shared_ptr<FileReader>* out) {
+ void InitReader() {
std::shared_ptr<Buffer> buffer = nested_parquet_->GetBuffer();
- std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
- ::parquet::default_reader_properties(), nullptr, &reader));
-
- *out = std::move(reader);
+ ::parquet::default_reader_properties(), nullptr, &reader_));
}
void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
@@ -1110,61 +1106,330 @@ class TestNestedSchemaRead : public ::testing::Test {
writer_->Close();
}
- void CreateNestedParquet() {
+ void MakeValues(int num_rows) {
+ std::shared_ptr<Array> arr;
+ ASSERT_OK(NullableArray<::arrow::Int32Type>(num_rows, 0, kDefaultSeed, &arr));
+ values_array_ = std::dynamic_pointer_cast<::arrow::Int32Array>(arr);
+ }
+
+ void WriteColumnData(size_t num_rows, int16_t* def_levels,
+ int16_t* rep_levels, int32_t* values) {
+ auto typed_writer = static_cast<TypedColumnWriter<Int32Type>*>(
+ row_group_writer_->NextColumn());
+ typed_writer->WriteBatch(num_rows, def_levels, rep_levels, values);
+ }
+
+ void ValidateArray(const Array& array, size_t expected_nulls) {
+ ASSERT_EQ(array.length(), values_array_->length());
+ ASSERT_EQ(array.null_count(), expected_nulls);
+ // Also independently count the nulls
+ auto local_null_count = 0;
+ for (int i = 0; i < array.length(); i++) {
+ if (array.IsNull(i)) {
+ local_null_count++;
+ }
+ }
+ ASSERT_EQ(local_null_count, expected_nulls);
+ }
+
+ void ValidateColumnArray(const ::arrow::Int32Array& array,
+ size_t expected_nulls) {
+ ValidateArray(array, expected_nulls);
+
+ int j = 0;
+ for (int i = 0; i < values_array_->length(); i++) {
+ if (array.IsNull(i)) {
+ continue;
+ }
+ ASSERT_EQ(array.Value(i), values_array_->Value(j));
+ j++;
+ }
+ }
+
+ void ValidateTableArrayTypes(const Table& table) {
+ for (int i = 0; i < table.num_columns(); i++) {
+ const std::shared_ptr<::arrow::Field> schema_field = table.schema()->field(i);
+ const std::shared_ptr<Column> column = table.column(i);
+ // Compare with the column field
+ ASSERT_TRUE(schema_field->Equals(column->field()));
+ // Compare with the array type
+ ASSERT_TRUE(schema_field->type()->Equals(column->data()->chunk(0)->type()));
+ }
+ }
+
+ // A parquet with a simple nested schema
+ void CreateSimpleNestedParquet(Repetition::type struct_repetition) {
std::vector<NodePtr> parquet_fields;
- std::shared_ptr<Array> values;
+ // TODO(itaiin): We are using parquet low-level file api to create the nested parquet
+ // this needs to change when a nested writes are implemented
// create the schema:
- // required group group1 {
+ // <struct_repetition> group group1 {
// required int32 leaf1;
- // required int32 leaf2;
+ // optional int32 leaf2;
// }
// required int32 leaf3;
- parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED,
+ parquet_fields.push_back(GroupNode::Make("group1", struct_repetition,
{PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT32),
- PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
+ PrimitiveNode::Make("leaf2", Repetition::OPTIONAL, ParquetType::INT32)}));
parquet_fields.push_back(
PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT32));
- const int num_columns = 3;
auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields);
- InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), 0);
+ // Create definition levels for the different columns that contain interleaved
+ // nulls and values at all nesting levels
+
+ // definition levels for optional fields
+ std::vector<int16_t> leaf1_def_levels(NUM_SIMPLE_TEST_ROWS);
+ std::vector<int16_t> leaf2_def_levels(NUM_SIMPLE_TEST_ROWS);
+ std::vector<int16_t> leaf3_def_levels(NUM_SIMPLE_TEST_ROWS);
+ for (int i = 0; i < NUM_SIMPLE_TEST_ROWS; i++) {
+ // leaf1 is required within the optional group1, so it is only null
+ // when the group is null
+ leaf1_def_levels[i] = (i % 3 == 0) ? 0 : 1;
+ // leaf2 is optional, can be null in the primitive (def-level 1) or
+ // struct level (def-level 0)
+ leaf2_def_levels[i] = i % 3;
+ // leaf3 is required
+ leaf3_def_levels[i] = 0;
+ }
- for (int i = 0; i < num_columns; i++) {
- auto column_writer = row_group_writer_->NextColumn();
- auto typed_writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
- typed_writer->WriteBatch(0, nullptr, nullptr, nullptr);
+ std::vector<int16_t> rep_levels(NUM_SIMPLE_TEST_ROWS, 0);
+
+ // Produce values for the columns
+ MakeValues(NUM_SIMPLE_TEST_ROWS);
+ int32_t* values = reinterpret_cast<int32_t*>(values_array_->data()->mutable_data());
+
+ // Create the actual parquet file
+ InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node),
+ NUM_SIMPLE_TEST_ROWS);
+
+ // leaf1 column
+ WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf1_def_levels.data(),
+ rep_levels.data(), values);
+ // leaf2 column
+ WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf2_def_levels.data(),
+ rep_levels.data(), values);
+ // leaf3 column
+ WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf3_def_levels.data(),
+ rep_levels.data(), values);
+
+ FinalizeParquetFile();
+ InitReader();
+ }
+
+ NodePtr CreateSingleTypedNestedGroup(int index, int depth, int num_children,
+ Repetition::type node_repetition, ParquetType::type leaf_type) {
+ std::vector<NodePtr> children;
+
+ for (int i = 0; i < num_children; i++) {
+ if (depth <= 1) {
+ children.push_back(PrimitiveNode::Make("leaf",
+ node_repetition, leaf_type));
+ } else {
+ children.push_back(CreateSingleTypedNestedGroup(i, depth - 1, num_children,
+ node_repetition, leaf_type));
+ }
}
+ std::stringstream ss;
+ ss << "group-" << depth << "-" << index;
+ return NodePtr(GroupNode::Make(ss.str(), node_repetition, children));
+ }
+
+ // A deeply nested schema
+ void CreateMultiLevelNestedParquet(int num_trees, int tree_depth,
+ int num_children, int num_rows, Repetition::type node_repetition) {
+ // Create the schema
+ std::vector<NodePtr> parquet_fields;
+ for (int i = 0; i < num_trees; i++) {
+ parquet_fields.push_back(CreateSingleTypedNestedGroup(i, tree_depth, num_children,
+ node_repetition, ParquetType::INT32));
+ }
+ auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields);
+
+ int num_columns = num_trees * static_cast<int>((std::pow(num_children, tree_depth)));
+
+ std::vector<int16_t> def_levels(num_rows);
+ std::vector<int16_t> rep_levels(num_rows);
+ for (int i = 0; i < num_rows; i++) {
+ if (node_repetition == Repetition::REQUIRED) {
+ def_levels[i] = 0; // all is required
+ } else {
+ def_levels[i] = i % tree_depth; // all is optional
+ }
+ rep_levels[i] = 0; // none is repeated
+ }
+
+ // Produce values for the columns
+ MakeValues(num_rows);
+ int32_t* values = reinterpret_cast<int32_t*>(values_array_->data()->mutable_data());
+
+ // Create the actual parquet file
+ InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), num_rows);
+
+ for (int i = 0; i < num_columns; i++) {
+ WriteColumnData(num_rows, def_levels.data(), rep_levels.data(), values);
+ }
FinalizeParquetFile();
+ InitReader();
}
+ class DeepParquetTestVisitor : public ArrayVisitor {
+ public:
+ DeepParquetTestVisitor(Repetition::type node_repetition,
+ std::shared_ptr<::arrow::Int32Array> expected) :
+ node_repetition_(node_repetition), expected_(expected) {}
+
+ Status Validate(std::shared_ptr<Array> tree) {
+ return tree->Accept(this);
+ }
+
+ virtual Status Visit(const ::arrow::Int32Array& array) {
+ if (node_repetition_ == Repetition::REQUIRED) {
+ if (!array.Equals(expected_)) {
+ return Status::Invalid("leaf array data mismatch");
+ }
+ } else if (node_repetition_ == Repetition::OPTIONAL) {
+ if (array.length() != expected_->length()) {
+ return Status::Invalid("Bad leaf array length");
+ }
+ // expect only 1 value every `depth` row
+ if (array.null_count() != SMALL_SIZE) {
+ return Status::Invalid("Unexpected null count");
+ }
+ } else {
+ return Status::NotImplemented("Unsupported repetition");
+ }
+ return Status::OK();
+ }
+
+ virtual Status Visit(const ::arrow::StructArray& array) {
+ for (auto& child : array.fields()) {
+ if (node_repetition_ == Repetition::REQUIRED) {
+ RETURN_NOT_OK(child->Accept(this));
+ } else if (node_repetition_ == Repetition::OPTIONAL) {
+ // Null count Must be a multiple of SMALL_SIZE
+ if (array.null_count() % SMALL_SIZE != 0) {
+ return Status::Invalid("Unexpected struct null count");
+ }
+ } else {
+ return Status::NotImplemented("Unsupported repetition");
+ }
+ }
+ return Status::OK();
+ }
+
+ private:
+ Repetition::type node_repetition_;
+ std::shared_ptr<::arrow::Int32Array> expected_;
+ };
+
std::shared_ptr<InMemoryOutputStream> nested_parquet_;
- std::shared_ptr<FileReader> reader_;
+ std::unique_ptr<FileReader> reader_;
std::unique_ptr<ParquetFileWriter> writer_;
RowGroupWriter* row_group_writer_;
};
TEST_F(TestNestedSchemaRead, ReadIntoTableFull) {
+ CreateSimpleNestedParquet(Repetition::OPTIONAL);
+
std::shared_ptr<Table> table;
- ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table));
+ ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
+ ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
+ ASSERT_EQ(table->num_columns(), 2);
+ ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2);
+ ValidateTableArrayTypes(*table);
+
+ auto struct_field_array = std::static_pointer_cast<::arrow::StructArray>(
+ table->column(0)->data()->chunk(0));
+ auto leaf1_array = std::static_pointer_cast<::arrow::Int32Array>(
+ struct_field_array->field(0));
+ auto leaf2_array = std::static_pointer_cast<::arrow::Int32Array>(
+ struct_field_array->field(1));
+ auto leaf3_array = std::static_pointer_cast<::arrow::Int32Array>(
+ table->column(1)->data()->chunk(0));
+
+ // validate struct and leaf arrays
+
+ // validate struct array
+ ValidateArray(*struct_field_array, NUM_SIMPLE_TEST_ROWS / 3);
+ // validate leaf1
+ ValidateColumnArray(*leaf1_array, NUM_SIMPLE_TEST_ROWS / 3);
+ // validate leaf2
+ ValidateColumnArray(*leaf2_array, NUM_SIMPLE_TEST_ROWS * 2/ 3);
+ // validate leaf3
+ ValidateColumnArray(*leaf3_array, 0);
}
TEST_F(TestNestedSchemaRead, ReadTablePartial) {
+ CreateSimpleNestedParquet(Repetition::OPTIONAL);
std::shared_ptr<Table> table;
- ASSERT_RAISES(NotImplemented, reader_->ReadTable({0, 2}, &table));
- ASSERT_RAISES(NotImplemented, reader_->ReadTable({0, 1}, &table));
+ // columns: {group1.leaf1, leaf3}
+ ASSERT_OK_NO_THROW(reader_->ReadTable({0, 2}, &table));
+ ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
+ ASSERT_EQ(table->num_columns(), 2);
+ ASSERT_EQ(table->schema()->field(0)->name(), "group1");
+ ASSERT_EQ(table->schema()->field(1)->name(), "leaf3");
+ ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1);
+ ValidateTableArrayTypes(*table);
+
+ // columns: {group1.leaf1, group1.leaf2}
+ ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table));
+ ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
+ ASSERT_EQ(table->num_columns(), 1);
+ ASSERT_EQ(table->schema()->field(0)->name(), "group1");
+ ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2);
+ ValidateTableArrayTypes(*table);
// columns: {leaf3}
ASSERT_OK_NO_THROW(reader_->ReadTable({2}, &table));
- ASSERT_EQ(table->num_rows(), 0);
+ ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
ASSERT_EQ(table->num_columns(), 1);
+ ASSERT_EQ(table->schema()->field(0)->name(), "leaf3");
ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 0);
+ ValidateTableArrayTypes(*table);
+
+ // Test with different ordering
+ ASSERT_OK_NO_THROW(reader_->ReadTable({2, 0}, &table));
+ ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
+ ASSERT_EQ(table->num_columns(), 2);
+ ASSERT_EQ(table->schema()->field(0)->name(), "leaf3");
+ ASSERT_EQ(table->schema()->field(1)->name(), "group1");
+ ASSERT_EQ(table->schema()->field(1)->type()->num_children(), 1);
+ ValidateTableArrayTypes(*table);
+}
+
+TEST_F(TestNestedSchemaRead, StructAndListTogetherUnsupported) {
+ CreateSimpleNestedParquet(Repetition::REPEATED);
+ std::shared_ptr<Table> table;
+ ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table));
+}
+
+TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
+ const int num_trees = 10;
+ const int depth = 5;
+ const int num_children = 3;
+ int num_rows = SMALL_SIZE * depth;
+ CreateMultiLevelNestedParquet(num_trees, depth, num_children, num_rows, GetParam());
+ std::shared_ptr<Table> table;
+ ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
+ ASSERT_EQ(table->num_columns(), num_trees);
+ ASSERT_EQ(table->num_rows(), num_rows);
+
+ DeepParquetTestVisitor visitor(GetParam(), values_array_);
+ for (int i = 0; i < table->num_columns(); i++) {
+ auto tree = table->column(i)->data()->chunk(0);
+ ASSERT_OK_NO_THROW(visitor.Validate(tree));
+ }
}
+INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead,
+ ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL));
+
TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
// PARQUET-995
const char* data_dir = std::getenv("PARQUET_TEST_DATA");
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index a531454..7c1b381 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -28,6 +28,7 @@
#include "parquet/arrow/schema.h"
#include "parquet/util/bit-util.h"
+#include "parquet/util/schema-util.h"
#include "arrow/api.h"
@@ -37,11 +38,14 @@ using arrow::Column;
using arrow::Field;
using arrow::Int32Array;
using arrow::ListArray;
+using arrow::StructArray;
using arrow::MemoryPool;
using arrow::PoolBuffer;
using arrow::Status;
using arrow::Table;
+using parquet::schema::NodePtr;
+
// Help reduce verbosity
using ParquetReader = parquet::ParquetFileReader;
@@ -179,7 +183,13 @@ class FileReader::Impl {
virtual ~Impl() {}
Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
+ Status ReadSchemaField(int i, std::shared_ptr<Array>* out);
+ Status ReadSchemaField(int i, const std::vector<int>& indices,
+ std::shared_ptr<Array>* out);
+ Status GetReaderForNode(int index, const NodePtr& node, const std::vector<int>& indices,
+ int16_t def_level, std::unique_ptr<ColumnReader::Impl>* out);
Status ReadColumn(int i, std::shared_ptr<Array>* out);
+ Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
Status GetSchema(
const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out);
Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
@@ -204,9 +214,21 @@ class FileReader::Impl {
int num_threads_;
};
+typedef const int16_t* ValueLevelsPtr;
+
class ColumnReader::Impl {
public:
- Impl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
+ virtual ~Impl() {}
+ virtual Status NextBatch(int batch_size, std::shared_ptr<Array>* out) = 0;
+ virtual Status GetDefLevels(ValueLevelsPtr* data, size_t* length) = 0;
+ virtual Status GetRepLevels(ValueLevelsPtr* data, size_t* length) = 0;
+ virtual const std::shared_ptr<Field> field() = 0;
+};
+
+// Reader implementation for primitive arrays
+class PrimitiveImpl: public ColumnReader::Impl {
+ public:
+ PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
: pool_(pool),
input_(std::move(input)),
descr_(input_->descr()),
@@ -217,9 +239,9 @@ class ColumnReader::Impl {
NextRowGroup();
}
- virtual ~Impl() {}
+ virtual ~PrimitiveImpl() {}
- Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
+ Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
template <typename ArrowType, typename ParquetType>
Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
@@ -243,6 +265,11 @@ class ColumnReader::Impl {
Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels,
int64_t total_values_read, std::shared_ptr<Array>* array);
+ Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
+ Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+
+ const std::shared_ptr<Field> field() override { return field_; }
+
private:
void NextRowGroup();
@@ -272,6 +299,36 @@ class ColumnReader::Impl {
int64_t null_count_;
};
+// Reader implementation for struct array
+class StructImpl: public ColumnReader::Impl {
+ public:
+ explicit StructImpl(const std::vector<std::shared_ptr<Impl>>& children,
+ int16_t struct_def_level, MemoryPool* pool, const NodePtr& node)
+ : children_(children), struct_def_level_(struct_def_level), pool_(pool),
+ def_levels_buffer_(pool) {
+ InitField(node, children);
+ }
+
+ virtual ~StructImpl() {}
+
+ Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
+ Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
+ Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+ const std::shared_ptr<Field> field() override { return field_; }
+
+ private:
+ std::vector<std::shared_ptr<Impl>> children_;
+ int16_t struct_def_level_;
+ MemoryPool* pool_;
+ std::shared_ptr<Field> field_;
+ PoolBuffer def_levels_buffer_;
+
+ Status DefLevelsToNullArray(std::shared_ptr<MutableBuffer>* null_bitmap,
+ int64_t* null_count);
+ void InitField(const NodePtr& node,
+ const std::vector<std::shared_ptr<Impl>>& children);
+};
+
FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
: impl_(new FileReader::Impl(pool, std::move(reader))) {}
@@ -281,11 +338,96 @@ Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
std::unique_ptr<ColumnReader::Impl> impl(
- new ColumnReader::Impl(pool_, std::move(input)));
+ new PrimitiveImpl(pool_, std::move(input)));
*out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
return Status::OK();
}
+Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
+ const std::vector<int>& indices, int16_t def_level,
+ std::unique_ptr<ColumnReader::Impl>* out) {
+
+ *out = nullptr;
+
+ if (IsSimpleStruct(node)) {
+ const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node.get());
+ std::vector<std::shared_ptr<ColumnReader::Impl>> children;
+ for (int i = 0; i < group->field_count(); i++) {
+ std::unique_ptr<ColumnReader::Impl> child_reader;
+ // TODO(itaiin): Remove the -1 index hack when all types of nested reads
+ // are supported. This currently just signals the lower level reader resolution
+ // to abort
+ RETURN_NOT_OK(GetReaderForNode(index, group->field(i), indices,
+ def_level + 1, &child_reader));
+ if (child_reader != nullptr) {
+ children.push_back(std::move(child_reader));
+ }
+ }
+
+ if (children.size() > 0) {
+ *out = std::unique_ptr<ColumnReader::Impl>(
+ new StructImpl(children, def_level, pool_, node));
+ }
+ } else {
+ // This should be a flat field case - translate the field index to
+ // the correct column index by walking down to the leaf node
+ NodePtr walker = node;
+ while (!walker->is_primitive()) {
+ DCHECK(walker->is_group());
+ auto group = static_cast<GroupNode*>(walker.get());
+ if (group->field_count() != 1) {
+ return Status::NotImplemented(
+ "lists with structs are not supported.");
+ }
+ walker = group->field(0);
+ }
+ auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker.get());
+
+ // If the index of the column is found then a reader for the coliumn is needed.
+ // Otherwise *out keeps the nullptr value.
+ if (std::find(indices.begin(), indices.end(), column_index) != indices.end()) {
+ std::unique_ptr<ColumnReader> reader;
+ RETURN_NOT_OK(GetColumn(column_index, &reader));
+ *out = std::move(reader->impl_);
+ }
+ }
+
+ return Status::OK();
+}
+
+Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
+ std::vector<int> indices(reader_->metadata()->num_columns());
+
+ for (size_t j = 0; j < indices.size(); ++j) {
+ indices[j] = static_cast<int>(j);
+ }
+
+ return ReadSchemaField(i, indices, out);
+}
+
+Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
+ std::shared_ptr<Array>* out) {
+ auto parquet_schema = reader_->metadata()->schema();
+
+ auto node = parquet_schema->group_node()->field(i);
+ std::unique_ptr<ColumnReader::Impl> reader_impl;
+
+ RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl));
+ if (reader_impl == nullptr) {
+ *out = nullptr;
+ return Status::OK();
+ }
+
+ std::unique_ptr<ColumnReader> reader(new ColumnReader(std::move(reader_impl)));
+
+ int64_t batch_size = 0;
+ for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
+ batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+ }
+
+ return reader->NextBatch(static_cast<int>(batch_size), out);
+}
+
Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
std::unique_ptr<ColumnReader> flat_column_reader;
RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
@@ -327,7 +469,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
std::unique_ptr<ColumnReader::Impl> impl(
- new ColumnReader::Impl(pool_, std::move(input)));
+ new PrimitiveImpl(pool_, std::move(input)));
ColumnReader flat_column_reader(std::move(impl));
std::shared_ptr<Array> array;
@@ -357,9 +499,17 @@ Status FileReader::Impl::ReadTable(
int nthreads = std::min<int>(num_threads_, num_fields);
std::vector<std::shared_ptr<Column>> columns(num_fields);
- auto ReadColumnFunc = [&indices, &schema, &columns, this](int i) {
+ // We only need to read schema fields which have columns indicated
+ // in the indices vector
+ std::vector<int> field_indices;
+ if (!ColumnIndicesToFieldIndices(*reader_->metadata()->schema(),
+ indices, &field_indices)) {
+ return Status::Invalid("Invalid column index");
+ }
+
+ auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
std::shared_ptr<Array> array;
- RETURN_NOT_OK(ReadColumn(indices[i], &array));
+ RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
columns[i] = std::make_shared<Column>(schema->field(i), array);
return Status::OK();
};
@@ -424,6 +574,14 @@ Status FileReader::ReadColumn(int i, std::shared_ptr<Array>* out) {
}
}
+Status FileReader::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
+ try {
+ return impl_->ReadSchemaField(i, out);
+ } catch (const ::parquet::ParquetException& e) {
+ return ::arrow::Status::IOError(e.what());
+ }
+}
+
Status FileReader::ReadTable(std::shared_ptr<Table>* out) {
try {
return impl_->ReadTable(out);
@@ -471,7 +629,8 @@ const ParquetFileReader* FileReader::parquet_reader() const {
}
template <typename ArrowType, typename ParquetType>
-Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
+Status PrimitiveImpl::ReadNonNullableBatch(
+ TypedColumnReader<ParquetType>* reader,
int64_t values_to_read, int64_t* levels_read) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
@@ -491,7 +650,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader<ParquetType>*
#define NONNULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \
template <> \
- Status ColumnReader::Impl::ReadNonNullableBatch<ArrowType, ParquetType>( \
+ Status PrimitiveImpl::ReadNonNullableBatch<ArrowType, ParquetType>( \
TypedColumnReader<ParquetType> * reader, int64_t values_to_read, \
int64_t * levels_read) { \
int64_t values_read; \
@@ -515,7 +674,7 @@ NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
template <>
-Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
+Status PrimitiveImpl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
TypedColumnReader<Int96Type>* reader, int64_t values_to_read, int64_t* levels_read) {
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
@@ -533,7 +692,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ
}
template <>
-Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
+Status PrimitiveImpl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
TypedColumnReader<Int32Type>* reader, int64_t values_to_read, int64_t* levels_read) {
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
@@ -551,8 +710,8 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
}
template <>
-Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
- TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
+Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType,
+ BooleanType>(TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
int64_t* levels_read) {
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
@@ -569,7 +728,8 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanTyp
}
template <typename ArrowType, typename ParquetType>
-Status ColumnReader::Impl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader,
+Status PrimitiveImpl::ReadNullableBatch(
+ TypedColumnReader<ParquetType>* reader,
int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read,
int64_t* levels_read, int64_t* values_read) {
using ArrowCType = typename ArrowType::c_type;
@@ -599,7 +759,7 @@ Status ColumnReader::Impl::ReadNullableBatch(TypedColumnReader<ParquetType>* rea
#define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \
template <> \
- Status ColumnReader::Impl::ReadNullableBatch<ArrowType, ParquetType>( \
+ Status PrimitiveImpl::ReadNullableBatch<ArrowType, ParquetType>( \
TypedColumnReader<ParquetType> * reader, int16_t * def_levels, \
int16_t * rep_levels, int64_t values_to_read, int64_t * levels_read, \
int64_t * values_read) { \
@@ -625,7 +785,7 @@ NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
template <>
-Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
+Status PrimitiveImpl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int16_t* rep_levels,
int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
@@ -650,7 +810,7 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
}
template <>
-Status ColumnReader::Impl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
+Status PrimitiveImpl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
TypedColumnReader<Int32Type>* reader, int16_t* def_levels, int16_t* rep_levels,
int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
@@ -675,7 +835,7 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
}
template <>
-Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
+Status PrimitiveImpl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels,
int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
@@ -699,7 +859,7 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
}
template <typename ArrowType>
-Status ColumnReader::Impl::InitDataBuffer(int batch_size) {
+Status PrimitiveImpl::InitDataBuffer(int batch_size) {
using ArrowCType = typename ArrowType::c_type;
data_buffer_ = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType), false));
@@ -709,7 +869,7 @@ Status ColumnReader::Impl::InitDataBuffer(int batch_size) {
}
template <>
-Status ColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
+Status PrimitiveImpl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
data_buffer_ = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8, false));
data_buffer_ptr_ = data_buffer_->mutable_data();
@@ -718,7 +878,7 @@ Status ColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size)
return Status::OK();
}
-Status ColumnReader::Impl::InitValidBits(int batch_size) {
+Status PrimitiveImpl::InitValidBits(int batch_size) {
valid_bits_idx_ = 0;
if (descr_->max_definition_level() > 0) {
int valid_bits_size =
@@ -732,17 +892,13 @@ Status ColumnReader::Impl::InitValidBits(int batch_size) {
return Status::OK();
}
-Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels,
+Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
const int16_t* rep_levels, int64_t total_levels_read, std::shared_ptr<Array>* array) {
std::shared_ptr<::arrow::Schema> arrow_schema;
RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
input_->metadata()->key_value_metadata(), &arrow_schema));
std::shared_ptr<Field> current_field = arrow_schema->field(0);
- if (current_field->type()->id() == ::arrow::Type::STRUCT) {
- return Status::NotImplemented("Structs are not yet supported.");
- }
-
if (descr_->max_repetition_level() > 0) {
// Walk downwards to extract nullability
std::vector<bool> nullable;
@@ -843,7 +999,8 @@ Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels,
}
template <typename ArrowType, typename ParquetType>
-Status ColumnReader::Impl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out) {
+Status PrimitiveImpl::TypedReadBatch(
+ int batch_size, std::shared_ptr<Array>* out) {
using ArrowCType = typename ArrowType::c_type;
int values_to_read = batch_size;
@@ -901,7 +1058,7 @@ Status ColumnReader::Impl::TypedReadBatch(int batch_size, std::shared_ptr<Array>
}
template <>
-Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
+Status PrimitiveImpl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
int batch_size, std::shared_ptr<Array>* out) {
int values_to_read = batch_size;
int total_levels_read = 0;
@@ -917,9 +1074,6 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
while ((values_to_read > 0) && column_reader_) {
- if (descr_->max_definition_level() > 0) {
- RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false));
- }
auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
int64_t values_read;
int64_t levels_read;
@@ -974,7 +1128,7 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
}
template <typename ArrowType>
-Status ColumnReader::Impl::ReadByteArrayBatch(
+Status PrimitiveImpl::ReadByteArrayBatch(
int batch_size, std::shared_ptr<Array>* out) {
using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
@@ -1031,7 +1185,7 @@ Status ColumnReader::Impl::ReadByteArrayBatch(
}
template <typename ArrowType>
-Status ColumnReader::Impl::ReadFLBABatch(
+Status PrimitiveImpl::ReadFLBABatch(
int batch_size, int byte_width, std::shared_ptr<Array>* out) {
using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
int total_levels_read = 0;
@@ -1083,13 +1237,13 @@ Status ColumnReader::Impl::ReadFLBABatch(
}
template <>
-Status ColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
+Status PrimitiveImpl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
int batch_size, std::shared_ptr<Array>* out) {
return ReadByteArrayBatch<::arrow::BinaryType>(batch_size, out);
}
template <>
-Status ColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
+Status PrimitiveImpl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
int batch_size, std::shared_ptr<Array>* out) {
return ReadByteArrayBatch<::arrow::StringType>(batch_size, out);
}
@@ -1099,7 +1253,8 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
break;
-Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+Status PrimitiveImpl::NextBatch(
+ int batch_size, std::shared_ptr<Array>* out) {
if (!column_reader_) {
// Exhausted all row groups.
*out = nullptr;
@@ -1155,10 +1310,22 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
}
}
-void ColumnReader::Impl::NextRowGroup() {
+void PrimitiveImpl::NextRowGroup() {
column_reader_ = input_->Next();
}
+Status PrimitiveImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
+ *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
+ *length = def_levels_buffer_.size() / sizeof(int16_t);
+ return Status::OK();
+}
+
+Status PrimitiveImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
+ *data = reinterpret_cast<ValueLevelsPtr>(rep_levels_buffer_.data());
+ *length = rep_levels_buffer_.size() / sizeof(int16_t);
+ return Status::OK();
+}
+
ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
ColumnReader::~ColumnReader() {}
@@ -1167,5 +1334,114 @@ Status ColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
return impl_->NextBatch(batch_size, out);
}
+// StructImpl methods
+
+Status StructImpl::DefLevelsToNullArray(
+ std::shared_ptr<MutableBuffer>* null_bitmap_out,
+ int64_t* null_count_out) {
+ std::shared_ptr<MutableBuffer> null_bitmap;
+ auto null_count = 0;
+ ValueLevelsPtr def_levels_data;
+ size_t def_levels_length;
+ RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
+ RETURN_NOT_OK(GetEmptyBitmap(pool_,
+ def_levels_length, &null_bitmap));
+ uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
+ for (size_t i = 0; i < def_levels_length; i++) {
+ if (def_levels_data[i] < struct_def_level_) {
+ // Mark null
+ null_count += 1;
+ } else {
+ DCHECK_EQ(def_levels_data[i], struct_def_level_);
+ ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
+ }
+ }
+
+ *null_count_out = null_count;
+ *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
+ return Status::OK();
+}
+
+// TODO(itaiin): Consider caching the results of this calculation -
+// note that this is only used once for each read for now
+Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
+ *data = nullptr;
+ if (children_.size() == 0) {
+ // Empty struct
+ *length = 0;
+ return Status::OK();
+ }
+
+ // We have at least one child
+ ValueLevelsPtr child_def_levels;
+ size_t child_length;
+ RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
+ auto size = child_length * sizeof(int16_t);
+ def_levels_buffer_.Resize(size);
+ // Initialize with the minimal def level
+ std::memset(def_levels_buffer_.mutable_data(), -1, size);
+ auto result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+
+ // When a struct is defined, all of its children def levels are at least at
+ // nesting level, and def level equals nesting level.
+ // When a struct is not defined, all of its children def levels are less than
+ // the nesting level, and the def level equals max(children def levels)
+ // All other possibilities are malformed definition data.
+ for (auto& child : children_) {
+ size_t current_child_length;
+ RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, ¤t_child_length));
+ DCHECK_EQ(child_length, current_child_length);
+ for (size_t i = 0; i < child_length; i++) {
+ // Check that value is either uninitialized, or current
+ // and previous children def levels agree on the struct level
+ DCHECK((result_levels[i] == -1) ||
+ ((result_levels[i] >= struct_def_level_) ==
+ (child_def_levels[i] >= struct_def_level_)));
+ result_levels[i] = std::max(result_levels[i],
+ std::min(child_def_levels[i], struct_def_level_));
+ }
+ }
+ *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
+ *length = child_length;
+ return Status::OK();
+}
+
+void StructImpl::InitField(const NodePtr& node,
+ const std::vector<std::shared_ptr<Impl>>& children) {
+ // Make a shallow node to field conversion from the children fields
+ std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
+ for (size_t i = 0; i < children.size(); i++) {
+ fields[i] = children[i]->field();
+ }
+ auto type = std::make_shared<::arrow::StructType>(fields);
+ field_ = std::make_shared<Field>(node->name(), type);
+}
+
+Status StructImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
+ return Status::NotImplemented("GetRepLevels is not implemented for struct");
+}
+
+Status StructImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+ std::vector<std::shared_ptr<Array>> children_arrays;
+ std::shared_ptr<MutableBuffer> null_bitmap;
+ int64_t null_count;
+
+ // Gather children arrays and def levels
+ for (auto& child : children_) {
+ std::shared_ptr<Array> child_array;
+
+ RETURN_NOT_OK(child->NextBatch(batch_size, &child_array));
+
+ children_arrays.push_back(child_array);
+ }
+
+ RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
+
+ *out = std::make_shared<StructArray>(field()->type(), batch_size, children_arrays,
+ null_bitmap, null_count);
+
+ return Status::OK();
+}
+
} // namespace arrow
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 24601b8..3a97c83 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -99,10 +99,44 @@ class PARQUET_EXPORT FileReader {
// Read column as a whole into an Array.
::arrow::Status ReadColumn(int i, std::shared_ptr<::arrow::Array>* out);
- // Read a table of flat columns into a Table.
+ // NOTE: Experimental API
+ // Reads a specific top level schema field into an Array
+ // The index i refers the index of the top level schema field, which may
+ // be nested or flat - e.g.
+ //
+ // 0 foo.bar
+ // foo.bar.baz
+ // foo.qux
+ // 1 foo2
+ // 2 foo3
+ //
+ // i=0 will read the entire foo struct, i=1 the foo2 primitive column etc
+ ::arrow::Status ReadSchemaField(int i, std::shared_ptr<::arrow::Array>* out);
+
+ // NOTE: Experimental API
+ // Reads a specific top level schema field into an Array, while keeping only chosen
+ // leaf columns.
+ // The index i refers the index of the top level schema field, which may
+ // be nested or flat, and indices vector refers to the leaf column indices - e.g.
+ //
+ // i indices
+ // 0 0 foo.bar
+ // 0 1 foo.bar.baz
+ // 0 2 foo.qux
+ // 1 3 foo2
+ // 2 4 foo3
+ //
+ // i=0 indices={0,2} will read a partial struct with foo.bar and foo.quox columns
+ // i=1 indices={3} will read foo2 column
+ // i=1 indices={2} will result in out=nullptr
+ // leaf indices which are unrelated to the schema field are ignored
+ ::arrow::Status ReadSchemaField(int i, const std::vector<int>& indices,
+ std::shared_ptr<::arrow::Array>* out);
+
+ // Read a table of columns into a Table
::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);
- // Read a table of flat columns into a Table. Read only the indicated column
+ // Read a table of columns into a Table. Read only the indicated column
// indices (relative to the schema)
::arrow::Status ReadTable(
const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out);
@@ -154,6 +188,8 @@ class PARQUET_EXPORT ColumnReader {
explicit ColumnReader(std::unique_ptr<Impl> impl);
friend class FileReader;
+ friend class PrimitiveImpl;
+ friend class StructImpl;
};
// Helper function to create a file reader from an implementation of an Arrow
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 83968bc..a78a23b 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -22,6 +22,7 @@
#include <vector>
#include "parquet/api/schema.h"
+#include "parquet/util/schema-util.h"
#include "arrow/api.h"
@@ -224,11 +225,6 @@ Status StructFromGroup(const GroupNode* group,
return Status::OK();
}
-bool str_endswith_tuple(const std::string& str) {
- if (str.size() >= 6) { return str.substr(str.size() - 6, 6) == "_tuple"; }
- return false;
-}
-
Status NodeToList(const GroupNode* group,
const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) {
*out = nullptr;
@@ -240,8 +236,7 @@ Status NodeToList(const GroupNode* group,
// Special case mentioned in the format spec:
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
- if (list_group->field_count() == 1 && list_node->name() != "array" &&
- !str_endswith_tuple(list_node->name())) {
+ if (list_group->field_count() == 1 && !HasStructListName(*list_group)) {
// List of primitive type
std::shared_ptr<Field> item_field;
RETURN_NOT_OK(
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index a5337cf..e44fcb6 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -271,7 +271,7 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableA
/// Wrap an Array into a ListArray by splitting it up into size lists.
///
/// This helper function only supports (size/2) nulls.
-Status MakeListArary(const std::shared_ptr<Array>& values, int64_t size,
+Status MakeListArray(const std::shared_ptr<Array>& values, int64_t size,
int64_t null_count, bool nullable_values, std::shared_ptr<::arrow::ListArray>* out) {
// We always include an empty list
int64_t non_null_entries = size - null_count - 1;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index a31c817..e34ac4c 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -44,7 +44,8 @@ namespace test {
template <typename T>
static inline bool vector_equal_with_def_levels(const vector<T>& left,
- const vector<int16_t> def_levels, int16_t max_def_levels, const vector<T>& right) {
+ const vector<int16_t>& def_levels, int16_t max_def_levels, int16_t max_rep_levels,
+ const vector<T>& right) {
size_t i_left = 0;
size_t i_right = 0;
for (size_t i = 0; i < def_levels.size(); i++) {
@@ -57,9 +58,14 @@ static inline bool vector_equal_with_def_levels(const vector<T>& left,
}
i_left++;
i_right++;
- } else if (def_levels[i] == (max_def_levels - 1)) {
+ } else if (def_levels[i] == (max_def_levels -1)) {
// Null entry on the lowest nested level
i_right++;
+ } else if (def_levels[i] < (max_def_levels - 1)) {
+ // Null entry on a higher nesting level, only supported for non-repeating data
+ if (max_rep_levels == 0) {
+ i_right++;
+ }
}
}
@@ -142,7 +148,8 @@ class TestPrimitiveReader : public ::testing::Test {
if (max_def_level_ > 0) {
ASSERT_TRUE(vector_equal(def_levels_, dresult));
ASSERT_TRUE(
- vector_equal_with_def_levels(values_, dresult, max_def_level_, vresult));
+ vector_equal_with_def_levels(values_, dresult, max_def_level_,
+ max_rep_level_, vresult));
} else {
ASSERT_TRUE(vector_equal(values_, vresult));
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 9749e56..724773d 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -24,6 +24,7 @@
#include <memory>
#include <unordered_map>
#include <vector>
+#include <iostream>
#include <arrow/util/bit-util.h>
@@ -263,20 +264,35 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size,
}
inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
- int16_t max_definition_level, int64_t* values_read, int64_t* null_count,
+ int16_t max_definition_level, int16_t max_repetition_level,
+ int64_t* values_read, int64_t* null_count,
uint8_t* valid_bits, int64_t valid_bits_offset) {
int byte_offset = static_cast<int>(valid_bits_offset) / 8;
int bit_offset = static_cast<int>(valid_bits_offset) % 8;
uint8_t bitset = valid_bits[byte_offset];
+ // TODO(itaiin): As an interim solution we are splitting the code path here
+ // between repeated+flat column reads, and non-repeated+nested reads.
+ // Those paths need to be merged in the future
for (int i = 0; i < num_def_levels; ++i) {
if (def_levels[i] == max_definition_level) {
bitset |= (1 << bit_offset);
- } else if (def_levels[i] == (max_definition_level - 1)) {
- bitset &= ~(1 << bit_offset);
- *null_count += 1;
+ } else if (max_repetition_level > 0) {
+ // repetition+flat case
+ if (def_levels[i] == (max_definition_level - 1)) {
+ bitset &= ~(1 << bit_offset);
+ *null_count += 1;
+ } else {
+ continue;
+ }
} else {
- continue;
+ // non-repeated+nested case
+ if (def_levels[i] < max_definition_level) {
+ bitset &= ~(1 << bit_offset);
+ *null_count += 1;
+ } else {
+ throw ParquetException("definition level exceeds maximum");
+ }
}
bit_offset++;
@@ -322,9 +338,28 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int64_t batch_size,
}
}
+ // TODO(itaiin): another code path split to merge when the general case is done
+ bool has_spaced_values;
+ if (descr_->max_repetition_level() > 0) {
+ // repeated+flat case
+ has_spaced_values = !descr_->schema_node()->is_required();
+ } else {
+ // non-repeated+nested case
+ // Find if a node forces nulls in the lowest level along the hierarchy
+ const schema::Node* node = descr_->schema_node().get();
+ has_spaced_values = false;
+ while (node) {
+ auto parent = node->parent();
+ if (node->is_optional()) {
+ has_spaced_values = true;
+ break;
+ }
+ node = parent;
+ }
+ }
+
int64_t null_count = 0;
- if (descr_->schema_node()->is_required()) {
- // Node is required so there are no null entries on the lowest nesting level.
+ if (!has_spaced_values) {
int values_to_read = 0;
for (int64_t i = 0; i < num_def_levels; ++i) {
if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
@@ -336,8 +371,9 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int64_t batch_size,
*values_read = total_values;
} else {
int16_t max_definition_level = descr_->max_definition_level();
+ int16_t max_repetition_level = descr_->max_repetition_level();
DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
- values_read, &null_count, valid_bits, valid_bits_offset);
+ max_repetition_level, values_read, &null_count, valid_bits, valid_bits_offset);
total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
valid_bits, valid_bits_offset);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc
index f8a7205..58f23df 100644
--- a/src/parquet/schema-test.cc
+++ b/src/parquet/schema-test.cc
@@ -333,6 +333,21 @@ TEST_F(TestGroupNode, Equals) {
ASSERT_FALSE(group5.Equals(&group4));
}
+TEST_F(TestGroupNode, FieldIndex) {
+ NodeVector fields = Fields1();
+ GroupNode group("group", Repetition::REQUIRED, fields);
+ for (size_t i = 0; i < fields.size(); i++) {
+ auto field = group.field(static_cast<int>(i));
+ ASSERT_EQ(i, group.FieldIndex(*field.get()));
+ }
+
+ // Test a non field node
+ auto non_field_alien = Int32("alien", Repetition::REQUIRED); // other name
+ auto non_field_familiar = Int32("one", Repetition::REPEATED); // other node
+ ASSERT_TRUE(group.FieldIndex(*non_field_alien.get()) < 0);
+ ASSERT_TRUE(group.FieldIndex(*non_field_familiar.get()) < 0);
+}
+
// ----------------------------------------------------------------------
// Test convert group
@@ -648,6 +663,17 @@ TEST_F(TestSchemaDescriptor, BuildTree) {
ASSERT_EQ(descr_.Column(4)->path()->ToDotString(), "bag.records.item2");
ASSERT_EQ(descr_.Column(5)->path()->ToDotString(), "bag.records.item3");
+ for (int i = 0; i < nleaves; ++i) {
+ auto col = descr_.Column(i);
+ ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node().get()));
+ }
+
+ // Test non-column nodes find
+ NodePtr non_column_alien = Int32("alien", Repetition::REQUIRED); // other path
+ NodePtr non_column_familiar = Int32("a", Repetition::REPEATED); // other node
+ ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien.get()) < 0);
+ ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar.get()) < 0);
+
ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0).get());
ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3).get());
ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get());
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index a3debd0..1209ad1 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -44,6 +44,21 @@ std::shared_ptr<ColumnPath> ColumnPath::FromDotString(const std::string& dotstri
return std::shared_ptr<ColumnPath>(new ColumnPath(std::move(path)));
}
+std::shared_ptr<ColumnPath> ColumnPath::FromNode(const Node& node) {
+ // Build the path in reverse order as we traverse the nodes to the top
+ std::vector<std::string> rpath_;
+ const Node* cursor = &node;
+ // The schema node is not part of the ColumnPath
+ while (cursor->parent()) {
+ rpath_.push_back(cursor->name());
+ cursor = cursor->parent();
+ }
+
+ // Build ColumnPath in correct order
+ std::vector<std::string> path(rpath_.crbegin(), rpath_.crend());
+ return std::make_shared<ColumnPath>(std::move(path));
+}
+
std::shared_ptr<ColumnPath> ColumnPath::extend(const std::string& node_name) const {
std::vector<std::string> path;
path.reserve(path_.size() + 1);
@@ -70,6 +85,12 @@ const std::vector<std::string>& ColumnPath::ToDotVector() const {
// ----------------------------------------------------------------------
// Base node
+const std::shared_ptr<ColumnPath> Node::path() const {
+ // TODO(itaiin): Cache the result, or more precisely, cache ->ToDotString()
+ // since it is being used to access the leaf nodes
+ return ColumnPath::FromNode(*this);
+}
+
bool Node::EqualsInternal(const Node* other) const {
return type_ == other->type_ && name_ == other->name_ &&
repetition_ == other->repetition_ && logical_type_ == other->logical_type_;
@@ -229,6 +250,28 @@ bool GroupNode::Equals(const Node* other) const {
return EqualsInternal(static_cast<const GroupNode*>(other));
}
+int GroupNode::FieldIndex(const std::string& name) const {
+ auto search = field_name_to_idx_.find(name);
+ if (search == field_name_to_idx_.end()) {
+ // Not found
+ return -1;
+ }
+ return search->second;
+}
+
+int GroupNode::FieldIndex(const Node& node) const {
+ int result = FieldIndex(node.name());
+ if (result < 0) {
+ return -1;
+ }
+ DCHECK(result < field_count());
+ if (!node.Equals(field(result).get())) {
+ // Same name but not the same node
+ return -1;
+ }
+ return result;
+}
+
void GroupNode::Visit(Node::Visitor* visitor) {
visitor->Visit(this);
}
@@ -595,6 +638,8 @@ void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level,
// Primitive node, append to leaves
leaves_.push_back(ColumnDescriptor(node, max_def_level, max_rep_level, this));
leaf_to_base_.emplace(static_cast<int>(leaves_.size()) - 1, base);
+ leaf_to_idx_.emplace(
+ node->path()->ToDotString(), static_cast<int>(leaves_.size()) - 1);
}
}
@@ -620,6 +665,28 @@ const ColumnDescriptor* SchemaDescriptor::Column(int i) const {
return &leaves_[i];
}
+int SchemaDescriptor::ColumnIndex(const std::string& node_path) const {
+ auto search = leaf_to_idx_.find(node_path);
+ if (search == leaf_to_idx_.end()) {
+ // Not found
+ return -1;
+ }
+ return search->second;
+}
+
+int SchemaDescriptor::ColumnIndex(const Node& node) const {
+ int result = ColumnIndex(node.path()->ToDotString());
+ if (result < 0) {
+ return -1;
+ }
+ DCHECK(result < num_columns());
+ if (!node.Equals(Column(result)->schema_node().get())) {
+ // Same path but not the same node
+ return -1;
+ }
+ return result;
+}
+
const schema::NodePtr& SchemaDescriptor::GetColumnRoot(int i) const {
DCHECK(i >= 0 && i < static_cast<int>(leaves_.size()));
return leaf_to_base_.find(i)->second;
@@ -638,18 +705,7 @@ int ColumnDescriptor::type_length() const {
}
const std::shared_ptr<ColumnPath> ColumnDescriptor::path() const {
- // Build the path in reverse order as we traverse the nodes to the top
- std::vector<std::string> rpath_;
- const Node* node = primitive_node_;
- // The schema node is not part of the ColumnPath
- while (node->parent()) {
- rpath_.push_back(node->name());
- node = node->parent();
- }
-
- // Build ColumnPath in correct order
- std::vector<std::string> path_(rpath_.crbegin(), rpath_.crend());
- return std::make_shared<ColumnPath>(std::move(path_));
+ return primitive_node_->path();
}
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema.h b/src/parquet/schema.h
index 1615798..856f72d 100644
--- a/src/parquet/schema.h
+++ b/src/parquet/schema.h
@@ -38,6 +38,8 @@ class SchemaDescriptor;
namespace schema {
+class Node;
+
// List encodings: using the terminology from Impala to define different styles
// of representing logical lists (a.k.a. ARRAY types) in Parquet schemas. Since
// the converted type named in the Parquet metadata is ConvertedType::LIST we
@@ -87,6 +89,7 @@ class PARQUET_EXPORT ColumnPath {
explicit ColumnPath(std::vector<std::string>&& path) : path_(path) {}
static std::shared_ptr<ColumnPath> FromDotString(const std::string& dotstring);
+ static std::shared_ptr<ColumnPath> FromNode(const Node& node);
std::shared_ptr<ColumnPath> extend(const std::string& node_name) const;
std::string ToDotString() const;
@@ -139,6 +142,8 @@ class PARQUET_EXPORT Node {
const Node* parent() const { return parent_; }
+ const std::shared_ptr<ColumnPath> path() const;
+
// ToParquet returns an opaque void* to avoid exporting
// parquet::SchemaElement into the public API
virtual void ToParquet(void* opaque_element) const = 0;
@@ -249,6 +254,8 @@ class PARQUET_EXPORT GroupNode : public Node {
bool Equals(const Node* other) const override;
const NodePtr& field(int i) const { return fields_[i]; }
+ int FieldIndex(const std::string& name) const;
+ int FieldIndex(const Node& node) const;
int field_count() const { return static_cast<int>(fields_.size()); }
@@ -261,16 +268,23 @@ class PARQUET_EXPORT GroupNode : public Node {
const NodeVector& fields, LogicalType::type logical_type = LogicalType::NONE,
int id = -1)
: Node(Node::GROUP, name, repetition, logical_type, id), fields_(fields) {
+ field_name_to_idx_.clear();
+ auto field_idx = 0;
for (NodePtr& field : fields_) {
field->SetParent(this);
+ field_name_to_idx_[field->name()] = field_idx++;
}
}
NodeVector fields_;
bool EqualsInternal(const GroupNode* other) const;
+ // Mapping between field name to the field index
+ std::unordered_map<std::string, int> field_name_to_idx_;
+
FRIEND_TEST(TestGroupNode, Attrs);
FRIEND_TEST(TestGroupNode, Equals);
+ FRIEND_TEST(TestGroupNode, FieldIndex);
};
// ----------------------------------------------------------------------
@@ -362,6 +376,11 @@ class PARQUET_EXPORT SchemaDescriptor {
const ColumnDescriptor* Column(int i) const;
+ // Get the index of a column by its dotstring path, or negative value if not found
+ int ColumnIndex(const std::string& node_path) const;
+ // Get the index of a column by its node, or negative value if not found
+ int ColumnIndex(const schema::Node& node) const;
+
bool Equals(const SchemaDescriptor& other) const;
// The number of physical columns appearing in the file
@@ -398,6 +417,9 @@ class PARQUET_EXPORT SchemaDescriptor {
// -- -- -- c |
// -- -- -- -- d
std::unordered_map<int, const schema::NodePtr> leaf_to_base_;
+
+ // Mapping between ColumnPath DotString to the leaf index
+ std::unordered_map<std::string, int> leaf_to_idx_;
};
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/util/schema-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/schema-util.h b/src/parquet/util/schema-util.h
new file mode 100644
index 0000000..618d21e
--- /dev/null
+++ b/src/parquet/util/schema-util.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_SCHEMA_UTIL_H
+#define PARQUET_SCHEMA_UTIL_H
+
+#include <string>
+#include <vector>
+#include <unordered_set>
+
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/logging.h"
+
+using parquet::ParquetException;
+using parquet::SchemaDescriptor;
+using parquet::schema::GroupNode;
+using parquet::schema::NodePtr;
+using parquet::schema::Node;
+using parquet::LogicalType;
+
+inline bool str_endswith_tuple(const std::string& str) {
+ if (str.size() >= 6) { return str.substr(str.size() - 6, 6) == "_tuple"; }
+ return false;
+}
+
+// Special case mentioned in the format spec:
+// If the name is array or ends in _tuple, this should be a list of struct
+// even for single child elements.
+inline bool HasStructListName(const GroupNode& node) {
+ return (node.name() == "array" ||
+ str_endswith_tuple(node.name()));
+}
+
+// TODO(itaiin): This aux. function is to be deleted once repeated structs are supported
+inline bool IsSimpleStruct(const NodePtr& node) {
+ if (!node->is_group()) return false;
+ if (node->is_repeated()) return false;
+ if (node->logical_type() == LogicalType::LIST) return false;
+ // Special case mentioned in the format spec:
+ // If the name is array or ends in _tuple, this should be a list of struct
+ // even for single child elements.
+ auto group = static_cast<const GroupNode*>(node.get());
+ if (group->field_count() == 1 && HasStructListName(*group)) return false;
+
+ return true;
+}
+
+// Coalesce a list of schema fields indices which are the roots of the
+// columns referred by a list of column indices
+inline bool ColumnIndicesToFieldIndices(const SchemaDescriptor& descr,
+ const std::vector<int>& column_indices, std::vector<int>* out) {
+ const GroupNode* group = descr.group_node();
+ std::unordered_set<int> already_added;
+ out->clear();
+ for (auto& column_idx : column_indices) {
+ auto field_node = descr.GetColumnRoot(column_idx);
+ auto field_idx = group->FieldIndex(field_node->name());
+ if (field_idx < 0) {
+ return false;
+ }
+ auto insertion = already_added.insert(field_idx);
+ if (insertion.second) { out->push_back(field_idx); }
+ }
+
+ return true;
+}
+
+#endif // PARQUET_SCHEMA_UTIL_H