You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/08/02 01:53:31 UTC
[arrow] branch master updated: ARROW-6077: [C++][Parquet] Build
Arrow "schema tree" from Parquet schema to help with nested data
implementation
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 06fd2da ARROW-6077: [C++][Parquet] Build Arrow "schema tree" from Parquet schema to help with nested data implementation
06fd2da is described below
commit 06fd2da5e8e71b660e6eea4b7702ca175e31f3f5
Author: Wes McKinney <we...@apache.org>
AuthorDate: Thu Aug 1 20:53:17 2019 -0500
ARROW-6077: [C++][Parquet] Build Arrow "schema tree" from Parquet schema to help with nested data implementation
Introduces auxiliary internal `SchemaManifest` and `SchemaField` data structures.
This also permits dictionary-encoded subfields in a slightly more principled way (the dictionary type creation is resolved one time, so this removes the `FixSchema` hacks that were there before). I rewrote the nested schema conversion logic to hopefully be slightly easier to follow though it could still use some work. I added comments within to explain the 3 different styles of list encoding
There are a couple of API changes:
* The `FileReader::GetSchema(indices, &schema)` method has been removed. The way that "projected" schemas were being constructed was pretty hacky, and this function is non-essential to the operation of the class. I had to remove bindings in the GLib and R libraries for this function, but as far as I can tell these bindings were non-essential to operation, and were added only because the function was there to wrap.
* Added `FileWriter::Make` factory method, making constructor private
This patch was pretty unpleasant to do -- it removes some hacky functions used to create Arrow fields with leaf nodes trimmed. There is little functional change; it is an attempt to bring a cleaner structure for full-fledged nested data reading
I'm going to get on with seeing through user-facing dictionary-encoding functionality in Python
Closes #4971 from wesm/parquet-arrow-schema-tree and squashes the following commits:
e1f19c06b <Wes McKinney> Code review feedback
e2c117ad1 <Wes McKinney> Factor out list nesting into helper function
Authored-by: Wes McKinney <we...@apache.org>
Signed-off-by: Wes McKinney <we...@apache.org>
---
c_glib/parquet-glib/arrow-file-reader.cpp | 45 +-
c_glib/parquet-glib/arrow-file-reader.h | 5 -
c_glib/test/parquet/test-arrow-file-reader.rb | 13 -
cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 52 +-
cpp/src/parquet/arrow/arrow-schema-test.cc | 355 ++++------
cpp/src/parquet/arrow/reader.cc | 794 +++++++++-------------
cpp/src/parquet/arrow/reader.h | 18 +-
cpp/src/parquet/arrow/reader_internal.cc | 633 ++++++++++++++++-
cpp/src/parquet/arrow/reader_internal.h | 161 ++++-
cpp/src/parquet/arrow/schema.cc | 442 +-----------
cpp/src/parquet/arrow/schema.h | 41 --
cpp/src/parquet/arrow/writer.cc | 98 ++-
cpp/src/parquet/arrow/writer.h | 13 +-
cpp/src/parquet/file_writer.cc | 8 +-
cpp/src/parquet/metadata.cc | 18 +-
cpp/src/parquet/schema-internal.h | 51 --
cpp/src/parquet/schema.cc | 11 +
cpp/src/parquet/schema.h | 6 +
python/pyarrow/_parquet.pxd | 4 +-
r/R/arrowExports.R | 8 +-
r/R/parquet.R | 11 +-
r/src/arrowExports.cpp | 29 +-
r/src/parquet.cpp | 21 +-
23 files changed, 1408 insertions(+), 1429 deletions(-)
diff --git a/c_glib/parquet-glib/arrow-file-reader.cpp b/c_glib/parquet-glib/arrow-file-reader.cpp
index 217bd19..db59436 100644
--- a/c_glib/parquet-glib/arrow-file-reader.cpp
+++ b/c_glib/parquet-glib/arrow-file-reader.cpp
@@ -231,15 +231,8 @@ gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader,
{
auto parquet_arrow_file_reader = gparquet_arrow_file_reader_get_raw(reader);
- const auto n_columns =
- parquet_arrow_file_reader->parquet_reader()->metadata()->num_columns();
- std::vector<int> indices(n_columns);
- for (int i = 0; i < n_columns; ++i) {
- indices[i] = i;
- }
-
std::shared_ptr<arrow::Schema> arrow_schema;
- auto status = parquet_arrow_file_reader->GetSchema(indices, &arrow_schema);
+ auto status = parquet_arrow_file_reader->GetSchema(&arrow_schema);
if (garrow_error_check(error,
status,
"[parquet][arrow][file-reader][get-schema]")) {
@@ -250,42 +243,6 @@ gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader,
}
/**
- * gparquet_arrow_file_reader_select_schema:
- * @reader: A #GParquetArrowFileReader.
- * @column_indexes: (array length=n_column_indexes):
- * The array of column indexes to be selected.
- * @n_column_indexes: The length of `column_indexes`.
- * @error: (nullable): Return locatipcn for a #GError or %NULL.
- *
- * Returns: (transfer full) (nullable): A selected #GArrowSchema.
- *
- * Since: 0.12.0
- */
-GArrowSchema *
-gparquet_arrow_file_reader_select_schema(GParquetArrowFileReader *reader,
- gint *column_indexes,
- gsize n_column_indexes,
- GError **error)
-{
- auto parquet_arrow_file_reader = gparquet_arrow_file_reader_get_raw(reader);
-
- std::vector<int> indices(n_column_indexes);
- for (gsize i = 0; i < n_column_indexes; ++i) {
- indices[i] = column_indexes[i];
- }
-
- std::shared_ptr<arrow::Schema> arrow_schema;
- auto status = parquet_arrow_file_reader->GetSchema(indices, &arrow_schema);
- if (garrow_error_check(error,
- status,
- "[parquet][arrow][file-reader][select-schema]")) {
- return garrow_schema_new_raw(&arrow_schema);
- } else {
- return NULL;
- }
-}
-
-/**
* gparquet_arrow_file_reader_read_column_data:
* @reader: A #GParquetArrowFileReader.
* @i: The index of the column to be read. If it's negative, index is
diff --git a/c_glib/parquet-glib/arrow-file-reader.h b/c_glib/parquet-glib/arrow-file-reader.h
index a0d1a8e..5a6ec96 100644
--- a/c_glib/parquet-glib/arrow-file-reader.h
+++ b/c_glib/parquet-glib/arrow-file-reader.h
@@ -48,11 +48,6 @@ gparquet_arrow_file_reader_read_table(GParquetArrowFileReader *reader,
GArrowSchema *
gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader,
GError **error);
-GArrowSchema *
-gparquet_arrow_file_reader_select_schema(GParquetArrowFileReader *reader,
- gint *column_indexes,
- gsize n_column_indexes,
- GError **error);
GARROW_AVAILABLE_IN_1_0
GArrowChunkedArray *
diff --git a/c_glib/test/parquet/test-arrow-file-reader.rb b/c_glib/test/parquet/test-arrow-file-reader.rb
index 7ff17c2..d30c8e9 100644
--- a/c_glib/test/parquet/test-arrow-file-reader.rb
+++ b/c_glib/test/parquet/test-arrow-file-reader.rb
@@ -39,19 +39,6 @@ b: int32
SCHEMA
end
- def test_select_schema
- assert_equal(<<-SCHEMA.chomp, @reader.select_schema([0]).to_s)
-a: string
- SCHEMA
- assert_equal(<<-SCHEMA.chomp, @reader.select_schema([1]).to_s)
-b: int32
- SCHEMA
- assert_equal(<<-SCHEMA.chomp, @reader.select_schema([0, 1]).to_s)
-a: string
-b: int32
- SCHEMA
- end
-
def test_read_column
assert_equal([
Arrow::ChunkedArray.new([@a_array]),
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index d0d2536..2e57ab8 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -40,6 +40,7 @@
#include "parquet/api/writer.h"
#include "parquet/arrow/reader.h"
+#include "parquet/arrow/reader_internal.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/test-util.h"
#include "parquet/arrow/writer.h"
@@ -597,12 +598,16 @@ class TestParquetIO : public ::testing::Test {
std::shared_ptr<::arrow::Schema> arrow_schema;
ArrowReaderProperties props;
ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
- FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema), arrow_schema);
- ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
- ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*values));
- ASSERT_OK_NO_THROW(writer.Close());
- // writer.Close() should be idempotent
- ASSERT_OK_NO_THROW(writer.Close());
+
+ std::unique_ptr<FileWriter> writer;
+ ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+ MakeWriter(schema), arrow_schema,
+ default_arrow_writer_properties(), &writer));
+ ASSERT_OK_NO_THROW(writer->NewRowGroup(values->length()));
+ ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*values));
+ ASSERT_OK_NO_THROW(writer->Close());
+ // writer->Close() should be idempotent
+ ASSERT_OK_NO_THROW(writer->Close());
}
void ResetSink() { sink_ = CreateOutputStream(); }
@@ -789,13 +794,17 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
std::shared_ptr<::arrow::Schema> arrow_schema;
ArrowReaderProperties props;
ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
- FileWriter writer(default_memory_pool(), this->MakeWriter(schema), arrow_schema);
+
+ std::unique_ptr<FileWriter> writer;
+ ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+ this->MakeWriter(schema), arrow_schema,
+ default_arrow_writer_properties(), &writer));
for (int i = 0; i < 4; i++) {
- ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+ ASSERT_OK_NO_THROW(writer->NewRowGroup(chunk_size));
std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size);
- ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array));
+ ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*sliced_array));
}
- ASSERT_OK_NO_THROW(writer.Close());
+ ASSERT_OK_NO_THROW(writer->Close());
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
}
@@ -859,14 +868,17 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
std::shared_ptr<::arrow::Schema> arrow_schema;
ArrowReaderProperties props;
ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
- FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema),
- arrow_schema);
+
+ std::unique_ptr<FileWriter> writer;
+ ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+ this->MakeWriter(schema), arrow_schema,
+ default_arrow_writer_properties(), &writer));
for (int i = 0; i < 4; i++) {
- ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+ ASSERT_OK_NO_THROW(writer->NewRowGroup(chunk_size));
std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size);
- ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array));
+ ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*sliced_array));
}
- ASSERT_OK_NO_THROW(writer.Close());
+ ASSERT_OK_NO_THROW(writer->Close());
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
}
@@ -2624,11 +2636,15 @@ TEST(TestArrowReaderAdHoc, DISABLED_LargeStringColumn) {
GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)}));
auto writer = ParquetFileWriter::Open(sink, schm_node);
- FileWriter arrow_writer(default_memory_pool(), std::move(writer), table->schema());
+
+ std::unique_ptr<FileWriter> arrow_writer;
+ ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), std::move(writer),
+ table->schema(), default_arrow_writer_properties(),
+ &arrow_writer));
for (int i : {0, 1}) {
- ASSERT_OK_NO_THROW(arrow_writer.WriteTable(*table, table->num_rows())) << i;
+ ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table, table->num_rows())) << i;
}
- ASSERT_OK_NO_THROW(arrow_writer.Close());
+ ASSERT_OK_NO_THROW(arrow_writer->Close());
std::shared_ptr<Buffer> tables_buffer;
ASSERT_OK_NO_THROW(sink->Finish(&tables_buffer));
diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc
index 06646b4..b9a8e81 100644
--- a/cpp/src/parquet/arrow/arrow-schema-test.cc
+++ b/cpp/src/parquet/arrow/arrow-schema-test.cc
@@ -21,6 +21,7 @@
#include "gtest/gtest.h"
#include "parquet/arrow/reader.h"
+#include "parquet/arrow/reader_internal.h"
#include "parquet/arrow/schema.h"
#include "parquet/file_reader.h"
#include "parquet/schema.h"
@@ -65,35 +66,21 @@ class TestConvertParquetSchema : public ::testing::Test {
void CheckFlatSchema(const std::shared_ptr<::arrow::Schema>& expected_schema) {
ASSERT_EQ(expected_schema->num_fields(), result_schema_->num_fields());
for (int i = 0; i < expected_schema->num_fields(); ++i) {
- auto lhs = result_schema_->field(i);
- auto rhs = expected_schema->field(i);
- EXPECT_TRUE(lhs->Equals(rhs))
- << i << " " << lhs->ToString() << " != " << rhs->ToString();
+ auto result_field = result_schema_->field(i);
+ auto expected_field = expected_schema->field(i);
+ EXPECT_TRUE(result_field->Equals(expected_field))
+ << "Field " << i << "\n result: " << result_field->ToString()
+ << "\n expected: " << expected_field->ToString();
}
}
- ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes) {
- NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
- descr_.Init(schema);
- ArrowReaderProperties props;
- return FromParquetSchema(&descr_, props, &result_schema_);
- }
-
- ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes,
- const std::vector<int>& column_indices) {
- NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
- descr_.Init(schema);
- ArrowReaderProperties props;
- return FromParquetSchema(&descr_, column_indices, props, &result_schema_);
- }
-
::arrow::Status ConvertSchema(
const std::vector<NodePtr>& nodes,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr) {
NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
descr_.Init(schema);
ArrowReaderProperties props;
- return FromParquetSchema(&descr_, {}, props, key_value_metadata, &result_schema_);
+ return FromParquetSchema(&descr_, props, key_value_metadata, &result_schema_);
}
protected:
@@ -107,69 +94,68 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
parquet_fields.push_back(
PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
- arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+ arrow_fields.push_back(::arrow::field("boolean", BOOL, false));
parquet_fields.push_back(
PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
- arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+ arrow_fields.push_back(::arrow::field("int32", INT32, false));
parquet_fields.push_back(
PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
- arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+ arrow_fields.push_back(::arrow::field("int64", INT64, false));
parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
ParquetType::INT64,
ConvertedType::TIMESTAMP_MILLIS));
arrow_fields.push_back(
- std::make_shared<Field>("timestamp", ::arrow::timestamp(TimeUnit::MILLI), false));
+ ::arrow::field("timestamp", ::arrow::timestamp(TimeUnit::MILLI), false));
parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
ParquetType::INT64,
ConvertedType::TIMESTAMP_MICROS));
- arrow_fields.push_back(std::make_shared<Field>(
- "timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO), false));
+ arrow_fields.push_back(
+ ::arrow::field("timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO), false));
parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED,
ParquetType::INT32, ConvertedType::DATE));
- arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date32(), false));
+ arrow_fields.push_back(::arrow::field("date", ::arrow::date32(), false));
parquet_fields.push_back(PrimitiveNode::Make(
"time32", Repetition::REQUIRED, ParquetType::INT32, ConvertedType::TIME_MILLIS));
arrow_fields.push_back(
- std::make_shared<Field>("time32", ::arrow::time32(TimeUnit::MILLI), false));
+ ::arrow::field("time32", ::arrow::time32(TimeUnit::MILLI), false));
parquet_fields.push_back(PrimitiveNode::Make(
"time64", Repetition::REQUIRED, ParquetType::INT64, ConvertedType::TIME_MICROS));
arrow_fields.push_back(
- std::make_shared<Field>("time64", ::arrow::time64(TimeUnit::MICRO), false));
+ ::arrow::field("time64", ::arrow::time64(TimeUnit::MICRO), false));
parquet_fields.push_back(
PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96));
- arrow_fields.push_back(std::make_shared<Field>("timestamp96", TIMESTAMP_NS, false));
+ arrow_fields.push_back(::arrow::field("timestamp96", TIMESTAMP_NS, false));
parquet_fields.push_back(
PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
- arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+ arrow_fields.push_back(::arrow::field("float", FLOAT));
parquet_fields.push_back(
PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
- arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+ arrow_fields.push_back(::arrow::field("double", DOUBLE));
parquet_fields.push_back(
PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY));
- arrow_fields.push_back(std::make_shared<Field>("binary", BINARY));
+ arrow_fields.push_back(::arrow::field("binary", BINARY));
parquet_fields.push_back(PrimitiveNode::Make(
"string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8));
- arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+ arrow_fields.push_back(::arrow::field("string", UTF8));
parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL,
ParquetType::FIXED_LEN_BYTE_ARRAY,
ConvertedType::NONE, 12));
- arrow_fields.push_back(
- std::make_shared<Field>("flba-binary", ::arrow::fixed_size_binary(12)));
+ arrow_fields.push_back(::arrow::field("flba-binary", ::arrow::fixed_size_binary(12)));
- auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_OK(ConvertSchema(parquet_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
@@ -261,11 +247,11 @@ TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) {
parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::OPTIONAL,
c.logical_type, c.physical_type,
c.physical_length));
- arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype));
+ arrow_fields.push_back(::arrow::field(c.name, c.datatype));
}
ASSERT_OK(ConvertSchema(parquet_fields));
- auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}
@@ -275,26 +261,15 @@ TEST_F(TestConvertParquetSchema, DuplicateFieldNames) {
parquet_fields.push_back(
PrimitiveNode::Make("xxx", Repetition::REQUIRED, ParquetType::BOOLEAN));
- auto arrow_field1 = std::make_shared<Field>("xxx", BOOL, false);
+ auto arrow_field1 = ::arrow::field("xxx", BOOL, false);
parquet_fields.push_back(
PrimitiveNode::Make("xxx", Repetition::REQUIRED, ParquetType::INT32));
- auto arrow_field2 = std::make_shared<Field>("xxx", INT32, false);
+ auto arrow_field2 = ::arrow::field("xxx", INT32, false);
ASSERT_OK(ConvertSchema(parquet_fields));
arrow_fields = {arrow_field1, arrow_field2};
- ASSERT_NO_FATAL_FAILURE(
- CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields)));
-
- ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>({0, 1})));
- arrow_fields = {arrow_field1, arrow_field2};
- ASSERT_NO_FATAL_FAILURE(
- CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields)));
-
- ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>({1, 0})));
- arrow_fields = {arrow_field2, arrow_field1};
- ASSERT_NO_FATAL_FAILURE(
- CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields)));
+ ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(::arrow::schema(arrow_fields)));
}
TEST_F(TestConvertParquetSchema, ParquetKeyValueMetadata) {
@@ -303,11 +278,11 @@ TEST_F(TestConvertParquetSchema, ParquetKeyValueMetadata) {
parquet_fields.push_back(
PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
- arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+ arrow_fields.push_back(::arrow::field("boolean", BOOL, false));
parquet_fields.push_back(
PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
- arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+ arrow_fields.push_back(::arrow::field("int32", INT32, false));
auto key_value_metadata = std::make_shared<KeyValueMetadata>();
key_value_metadata->Append("foo", "bar");
@@ -327,7 +302,7 @@ TEST_F(TestConvertParquetSchema, ParquetEmptyKeyValueMetadata) {
parquet_fields.push_back(
PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
- arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+ arrow_fields.push_back(::arrow::field("int32", INT32, false));
std::shared_ptr<KeyValueMetadata> key_value_metadata = nullptr;
ASSERT_OK(ConvertSchema(parquet_fields, key_value_metadata));
@@ -343,24 +318,24 @@ TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) {
parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL,
ParquetType::FIXED_LEN_BYTE_ARRAY,
ConvertedType::DECIMAL, 4, 8, 4));
- arrow_fields.push_back(std::make_shared<Field>("flba-decimal", DECIMAL_8_4));
+ arrow_fields.push_back(::arrow::field("flba-decimal", DECIMAL_8_4));
parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY,
ConvertedType::DECIMAL, -1, 8, 4));
- arrow_fields.push_back(std::make_shared<Field>("binary-decimal", DECIMAL_8_4));
+ arrow_fields.push_back(::arrow::field("binary-decimal", DECIMAL_8_4));
parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL,
ParquetType::INT32, ConvertedType::DECIMAL,
-1, 8, 4));
- arrow_fields.push_back(std::make_shared<Field>("int32-decimal", DECIMAL_8_4));
+ arrow_fields.push_back(::arrow::field("int32-decimal", DECIMAL_8_4));
parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL,
ParquetType::INT64, ConvertedType::DECIMAL,
-1, 8, 4));
- arrow_fields.push_back(std::make_shared<Field>("int64-decimal", DECIMAL_8_4));
+ arrow_fields.push_back(::arrow::field("int64-decimal", DECIMAL_8_4));
- auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_OK(ConvertSchema(parquet_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
@@ -384,9 +359,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::REQUIRED, {list}, ConvertedType::LIST));
- auto arrow_element = std::make_shared<Field>("string", UTF8, true);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, false));
+ auto arrow_element = ::arrow::field("string", UTF8, true);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("my_list", arrow_list, false));
}
// // List<String> (list nullable, elements non-null)
@@ -401,9 +376,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST));
- auto arrow_element = std::make_shared<Field>("string", UTF8, false);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+ auto arrow_element = ::arrow::field("string", UTF8, false);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
}
// Element types can be nested structures. For example, a list of lists:
@@ -427,11 +402,11 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(GroupNode::Make("array_of_arrays", Repetition::OPTIONAL,
{list}, ConvertedType::LIST));
- auto arrow_inner_element = std::make_shared<Field>("int32", INT32, false);
- auto arrow_inner_list = std::make_shared<::arrow::ListType>(arrow_inner_element);
- auto arrow_element = std::make_shared<Field>("element", arrow_inner_list, false);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("array_of_arrays", arrow_list, true));
+ auto arrow_inner_element = ::arrow::field("int32", INT32, false);
+ auto arrow_inner_list = ::arrow::list(arrow_inner_element);
+ auto arrow_element = ::arrow::field("element", arrow_inner_list, false);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("array_of_arrays", arrow_list, true));
}
// // List<String> (list nullable, elements non-null)
@@ -446,9 +421,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
auto list = GroupNode::Make("element", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST));
- auto arrow_element = std::make_shared<Field>("str", UTF8, false);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+ auto arrow_element = ::arrow::field("str", UTF8, false);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
}
// // List<Integer> (nullable list, non-null elements)
@@ -460,9 +435,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
PrimitiveNode::Make("element", Repetition::REPEATED, ParquetType::INT32);
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {element}, ConvertedType::LIST));
- auto arrow_element = std::make_shared<Field>("element", INT32, false);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+ auto arrow_element = ::arrow::field("element", INT32, false);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
}
// // List<Tuple<String, Integer>> (nullable list, non-null elements)
@@ -481,13 +456,13 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
GroupNode::Make("element", Repetition::REPEATED, {str_element, num_element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {element}, ConvertedType::LIST));
- auto arrow_str = std::make_shared<Field>("str", UTF8, false);
- auto arrow_num = std::make_shared<Field>("num", INT32, false);
+ auto arrow_str = ::arrow::field("str", UTF8, false);
+ auto arrow_num = ::arrow::field("num", INT32, false);
std::vector<std::shared_ptr<Field>> fields({arrow_str, arrow_num});
- auto arrow_struct = std::make_shared<::arrow::StructType>(fields);
- auto arrow_element = std::make_shared<Field>("element", arrow_struct, false);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+ auto arrow_struct = ::arrow::struct_(fields);
+ auto arrow_element = ::arrow::field("element", arrow_struct, false);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
}
// // List<OneTuple<String>> (nullable list, non-null elements)
@@ -503,12 +478,12 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
auto array = GroupNode::Make("array", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, ConvertedType::LIST));
- auto arrow_str = std::make_shared<Field>("str", UTF8, false);
+ auto arrow_str = ::arrow::field("str", UTF8, false);
std::vector<std::shared_ptr<Field>> fields({arrow_str});
- auto arrow_struct = std::make_shared<::arrow::StructType>(fields);
- auto arrow_element = std::make_shared<Field>("array", arrow_struct, false);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+ auto arrow_struct = ::arrow::struct_(fields);
+ auto arrow_element = ::arrow::field("array", arrow_struct, false);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
}
// // List<OneTuple<String>> (nullable list, non-null elements)
@@ -524,12 +499,12 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
auto array = GroupNode::Make("my_list_tuple", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, ConvertedType::LIST));
- auto arrow_str = std::make_shared<Field>("str", UTF8, false);
+ auto arrow_str = ::arrow::field("str", UTF8, false);
std::vector<std::shared_ptr<Field>> fields({arrow_str});
- auto arrow_struct = std::make_shared<::arrow::StructType>(fields);
- auto arrow_element = std::make_shared<Field>("my_list_tuple", arrow_struct, false);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+ auto arrow_struct = ::arrow::struct_(fields);
+ auto arrow_element = ::arrow::field("my_list_tuple", arrow_struct, false);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
}
// One-level encoding: Only allows required lists with required cells
@@ -537,12 +512,12 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
{
parquet_fields.push_back(
PrimitiveNode::Make("name", Repetition::REPEATED, ParquetType::INT32));
- auto arrow_element = std::make_shared<Field>("name", INT32, false);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("name", arrow_list, false));
+ auto arrow_element = ::arrow::field("name", INT32, false);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("name", arrow_list, false));
}
- auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_OK(ConvertSchema(parquet_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
@@ -573,20 +548,20 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchema) {
parquet_fields.push_back(
PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64));
- auto group1_fields = {std::make_shared<Field>("leaf1", BOOL, false),
- std::make_shared<Field>("leaf2", INT32, false)};
- auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
- arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
- arrow_fields.push_back(std::make_shared<Field>("leaf3", INT64, false));
+ auto group1_fields = {::arrow::field("leaf1", BOOL, false),
+ ::arrow::field("leaf2", INT32, false)};
+ auto arrow_group1_type = ::arrow::struct_(group1_fields);
+ arrow_fields.push_back(::arrow::field("group1", arrow_group1_type, false));
+ arrow_fields.push_back(::arrow::field("leaf3", INT64, false));
}
- auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_OK(ConvertSchema(parquet_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}
-TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) {
+TEST_F(TestConvertParquetSchema, ParquetNestedSchema2) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
@@ -600,15 +575,6 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) {
// required int64 leaf4;
// }
// required int64 leaf5;
- //
- // Expected partial arrow schema (columns 0, 3, 4):
- // required group group1 {
- // required int64 leaf1;
- // }
- // required group group2 {
- // required int64 leaf4;
- // }
- // required int64 leaf5;
{
parquet_fields.push_back(GroupNode::Make(
"group1", Repetition::REQUIRED,
@@ -621,70 +587,19 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) {
parquet_fields.push_back(
PrimitiveNode::Make("leaf5", Repetition::REQUIRED, ParquetType::INT64));
- auto group1_fields = {std::make_shared<Field>("leaf1", INT64, false)};
- auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
- auto group2_fields = {std::make_shared<Field>("leaf4", INT64, false)};
- auto arrow_group2_type = std::make_shared<::arrow::StructType>(group2_fields);
-
- arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
- arrow_fields.push_back(std::make_shared<Field>("group2", arrow_group2_type, false));
- arrow_fields.push_back(std::make_shared<Field>("leaf5", INT64, false));
+ auto group1_fields = {::arrow::field("leaf1", INT64, false),
+ ::arrow::field("leaf2", INT64, false)};
+ auto arrow_group1_type = ::arrow::struct_(group1_fields);
+ auto group2_fields = {::arrow::field("leaf3", INT64, false),
+ ::arrow::field("leaf4", INT64, false)};
+ auto arrow_group2_type = ::arrow::struct_(group2_fields);
+ arrow_fields.push_back(::arrow::field("group1", arrow_group1_type, false));
+ arrow_fields.push_back(::arrow::field("group2", arrow_group2_type, false));
+ arrow_fields.push_back(::arrow::field("leaf5", INT64, false));
}
- auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
- ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>{0, 3, 4}));
-
- ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
-}
-
-TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartialOrdering) {
- std::vector<NodePtr> parquet_fields;
- std::vector<std::shared_ptr<Field>> arrow_fields;
-
- // Full Parquet Schema:
- // required group group1 {
- // required int64 leaf1;
- // required int64 leaf2;
- // }
- // required group group2 {
- // required int64 leaf3;
- // required int64 leaf4;
- // }
- // required int64 leaf5;
- //
- // Expected partial arrow schema (columns 3, 4, 0):
- // required group group2 {
- // required int64 leaf4;
- // }
- // required int64 leaf5;
- // required group group1 {
- // required int64 leaf1;
- // }
- {
- parquet_fields.push_back(GroupNode::Make(
- "group1", Repetition::REQUIRED,
- {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT64),
- PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT64)}));
- parquet_fields.push_back(GroupNode::Make(
- "group2", Repetition::REQUIRED,
- {PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64),
- PrimitiveNode::Make("leaf4", Repetition::REQUIRED, ParquetType::INT64)}));
- parquet_fields.push_back(
- PrimitiveNode::Make("leaf5", Repetition::REQUIRED, ParquetType::INT64));
-
- auto group1_fields = {std::make_shared<Field>("leaf1", INT64, false)};
- auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
- auto group2_fields = {std::make_shared<Field>("leaf4", INT64, false)};
- auto arrow_group2_type = std::make_shared<::arrow::StructType>(group2_fields);
-
- arrow_fields.push_back(std::make_shared<Field>("group2", arrow_group2_type, false));
- arrow_fields.push_back(std::make_shared<Field>("leaf5", INT64, false));
- arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
- }
-
- auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
- ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>{3, 4, 0}));
-
+ auto arrow_schema = ::arrow::schema(arrow_fields);
+ ASSERT_OK(ConvertSchema(parquet_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}
@@ -708,23 +623,21 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
"innerGroup", Repetition::REPEATED,
{PrimitiveNode::Make("leaf3", Repetition::OPTIONAL, ParquetType::INT32)})}));
- auto inner_group_fields = {std::make_shared<Field>("leaf3", INT32, true)};
- auto inner_group_type = std::make_shared<::arrow::StructType>(inner_group_fields);
+ auto inner_group_fields = {::arrow::field("leaf3", INT32, true)};
+ auto inner_group_type = ::arrow::struct_(inner_group_fields);
auto outer_group_fields = {
- std::make_shared<Field>("leaf2", INT32, true),
- std::make_shared<Field>(
+ ::arrow::field("leaf2", INT32, true),
+ ::arrow::field(
"innerGroup",
- ::arrow::list(std::make_shared<Field>("innerGroup", inner_group_type, false)),
- false)};
- auto outer_group_type = std::make_shared<::arrow::StructType>(outer_group_fields);
+ ::arrow::list(::arrow::field("innerGroup", inner_group_type, false)), false)};
+ auto outer_group_type = ::arrow::struct_(outer_group_fields);
- arrow_fields.push_back(std::make_shared<Field>("leaf1", INT32, true));
- arrow_fields.push_back(std::make_shared<Field>(
+ arrow_fields.push_back(::arrow::field("leaf1", INT32, true));
+ arrow_fields.push_back(::arrow::field(
"outerGroup",
- ::arrow::list(std::make_shared<Field>("outerGroup", outer_group_type, false)),
- false));
+ ::arrow::list(::arrow::field("outerGroup", outer_group_type, false)), false));
}
- auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_OK(ConvertSchema(parquet_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
@@ -750,7 +663,7 @@ class TestConvertArrowSchema : public ::testing::Test {
}
::arrow::Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
- arrow_schema_ = std::make_shared<::arrow::Schema>(fields);
+ arrow_schema_ = ::arrow::schema(fields);
std::shared_ptr<::parquet::WriterProperties> properties =
::parquet::default_writer_properties();
return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_);
@@ -767,51 +680,51 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
parquet_fields.push_back(
PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
- arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+ arrow_fields.push_back(::arrow::field("boolean", BOOL, false));
parquet_fields.push_back(
PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
- arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+ arrow_fields.push_back(::arrow::field("int32", INT32, false));
parquet_fields.push_back(
PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
- arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+ arrow_fields.push_back(::arrow::field("int64", INT64, false));
parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED,
ParquetType::INT32, ConvertedType::DATE));
- arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date32(), false));
+ arrow_fields.push_back(::arrow::field("date", ::arrow::date32(), false));
parquet_fields.push_back(PrimitiveNode::Make("date64", Repetition::REQUIRED,
ParquetType::INT32, ConvertedType::DATE));
- arrow_fields.push_back(std::make_shared<Field>("date64", ::arrow::date64(), false));
+ arrow_fields.push_back(::arrow::field("date64", ::arrow::date64(), false));
parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
ParquetType::INT64,
ConvertedType::TIMESTAMP_MILLIS));
- arrow_fields.push_back(std::make_shared<Field>(
- "timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));
+ arrow_fields.push_back(
+ ::arrow::field("timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));
parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
ParquetType::INT64,
ConvertedType::TIMESTAMP_MICROS));
- arrow_fields.push_back(std::make_shared<Field>(
- "timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));
+ arrow_fields.push_back(
+ ::arrow::field("timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));
parquet_fields.push_back(
PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
- arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+ arrow_fields.push_back(::arrow::field("float", FLOAT));
parquet_fields.push_back(
PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
- arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+ arrow_fields.push_back(::arrow::field("double", DOUBLE));
parquet_fields.push_back(PrimitiveNode::Make(
"string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8));
- arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+ arrow_fields.push_back(::arrow::field("string", UTF8));
parquet_fields.push_back(PrimitiveNode::Make(
"binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE));
- arrow_fields.push_back(std::make_shared<Field>("binary", BINARY));
+ arrow_fields.push_back(::arrow::field("binary", BINARY));
ASSERT_OK(ConvertSchema(arrow_fields));
@@ -920,7 +833,7 @@ TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) {
};
for (const FieldConstructionArguments& c : cases) {
- auto field = std::make_shared<Field>(c.name, c.datatype);
+ auto field = ::arrow::field(c.name, c.datatype);
ASSERT_RAISES(NotImplemented, ConvertSchema({field}));
}
}
@@ -937,38 +850,38 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) {
parquet_fields.push_back(
PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
- arrow_fields.push_back(std::make_shared<Field>(
+ arrow_fields.push_back(::arrow::field(
"int64", ::arrow::dictionary(::arrow::int8(), ::arrow::int64()), false));
parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED,
ParquetType::INT32, ConvertedType::DATE));
- arrow_fields.push_back(std::make_shared<Field>(
+ arrow_fields.push_back(::arrow::field(
"date", ::arrow::dictionary(::arrow::int8(), ::arrow::date32()), false));
parquet_fields.push_back(PrimitiveNode::Make("date64", Repetition::REQUIRED,
ParquetType::INT32, ConvertedType::DATE));
- arrow_fields.push_back(std::make_shared<Field>(
+ arrow_fields.push_back(::arrow::field(
"date64", ::arrow::dictionary(::arrow::int8(), ::arrow::date64()), false));
parquet_fields.push_back(
PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
- arrow_fields.push_back(std::make_shared<Field>(
- "float", ::arrow::dictionary(::arrow::int8(), ::arrow::float32())));
+ arrow_fields.push_back(
+ ::arrow::field("float", ::arrow::dictionary(::arrow::int8(), ::arrow::float32())));
parquet_fields.push_back(
PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
- arrow_fields.push_back(std::make_shared<Field>(
- "double", ::arrow::dictionary(::arrow::int8(), ::arrow::float64())));
+ arrow_fields.push_back(
+ ::arrow::field("double", ::arrow::dictionary(::arrow::int8(), ::arrow::float64())));
parquet_fields.push_back(PrimitiveNode::Make(
"string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8));
- arrow_fields.push_back(std::make_shared<Field>(
- "string", ::arrow::dictionary(::arrow::int8(), ::arrow::utf8())));
+ arrow_fields.push_back(
+ ::arrow::field("string", ::arrow::dictionary(::arrow::int8(), ::arrow::utf8())));
parquet_fields.push_back(PrimitiveNode::Make(
"binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE));
- arrow_fields.push_back(std::make_shared<Field>(
- "binary", ::arrow::dictionary(::arrow::int8(), ::arrow::binary())));
+ arrow_fields.push_back(
+ ::arrow::field("binary", ::arrow::dictionary(::arrow::int8(), ::arrow::binary())));
ASSERT_OK(ConvertSchema(arrow_fields));
@@ -993,9 +906,9 @@ TEST_F(TestConvertArrowSchema, ParquetLists) {
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::REQUIRED, {list}, ConvertedType::LIST));
- auto arrow_element = std::make_shared<Field>("string", UTF8, true);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, false));
+ auto arrow_element = ::arrow::field("string", UTF8, true);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("my_list", arrow_list, false));
}
// // List<String> (list nullable, elements non-null)
@@ -1010,9 +923,9 @@ TEST_F(TestConvertArrowSchema, ParquetLists) {
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST));
- auto arrow_element = std::make_shared<Field>("string", UTF8, false);
- auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
- arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+ auto arrow_element = ::arrow::field("string", UTF8, false);
+ auto arrow_list = ::arrow::list(arrow_element);
+ arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
}
ASSERT_OK(ConvertSchema(arrow_fields));
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index e710e3b..11be571 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -19,7 +19,6 @@
#include <algorithm>
#include <cstring>
-#include <deque>
#include <functional>
#include <future>
#include <numeric>
@@ -27,7 +26,6 @@
#include <vector>
#include "arrow/array.h"
-#include "arrow/builder.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/type.h"
@@ -58,7 +56,9 @@ using arrow::StructArray;
using arrow::Table;
using arrow::TimestampArray;
+using parquet::schema::GroupNode;
using parquet::schema::Node;
+using parquet::schema::PrimitiveNode;
// Help reduce verbosity
using ParquetReader = parquet::ParquetFileReader;
@@ -77,114 +77,37 @@ namespace parquet {
namespace arrow {
class ColumnChunkReaderImpl;
-class ColumnReaderImpl;
-ArrowReaderProperties default_arrow_reader_properties() {
- static ArrowReaderProperties default_reader_props;
- return default_reader_props;
-}
-
-// ----------------------------------------------------------------------
-// Iteration utilities
-
-// Abstraction to decouple row group iteration details from the ColumnReader,
-// so we can read only a single row group if we want
-class FileColumnIterator {
+class ColumnReaderImpl : public ColumnReader {
public:
- explicit FileColumnIterator(int column_index, ParquetFileReader* reader,
- std::vector<int> row_groups)
- : column_index_(column_index),
- reader_(reader),
- schema_(reader->metadata()->schema()),
- row_groups_(row_groups.begin(), row_groups.end()) {}
-
- virtual ~FileColumnIterator() {}
-
- std::unique_ptr<::parquet::PageReader> NextChunk() {
- if (row_groups_.empty()) {
- return nullptr;
- }
-
- auto row_group_reader = reader_->RowGroup(row_groups_.front());
- row_groups_.pop_front();
- return row_group_reader->GetColumnPageReader(column_index_);
- }
-
- const SchemaDescriptor* schema() const { return schema_; }
+ enum ReaderType { PRIMITIVE, LIST, STRUCT };
- const ColumnDescriptor* descr() const { return schema_->Column(column_index_); }
-
- std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); }
+ virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0;
+ virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0;
+ virtual const std::shared_ptr<Field> field() = 0;
- int column_index() const { return column_index_; }
+ virtual const ColumnDescriptor* descr() const = 0;
- protected:
- int column_index_;
- ParquetFileReader* reader_;
- const SchemaDescriptor* schema_;
- std::deque<int> row_groups_;
+ virtual ReaderType type() const = 0;
};
-using FileColumnIteratorFactory =
- std::function<FileColumnIterator*(int, ParquetFileReader*)>;
-
-class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
- public:
- explicit RowGroupRecordBatchReader(const std::vector<int>& row_group_indices,
- const std::vector<int>& column_indices,
- std::shared_ptr<::arrow::Schema> schema,
- FileReader* reader, int64_t batch_size)
- : column_readers_(),
- row_group_indices_(row_group_indices),
- column_indices_(column_indices),
- schema_(schema),
- file_reader_(reader),
- batch_size_(batch_size) {}
-
- ~RowGroupRecordBatchReader() override {}
-
- std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
-
- Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
- if (column_readers_.empty()) {
- // Initialize the column readers
- column_readers_.reserve(column_indices_.size());
-
- for (size_t i = 0; i < column_indices_.size(); ++i) {
- ColumnReaderPtr tmp;
- RETURN_NOT_OK(file_reader_->GetColumn(column_indices_[i], &tmp));
- column_readers_.emplace_back(std::move(tmp));
- }
- }
-
- // TODO (hatemhelal): Consider refactoring this to share logic with ReadTable as this
- // does not currently honor the use_threads option.
- std::vector<std::shared_ptr<ChunkedArray>> columns(column_indices_.size());
+ArrowReaderProperties default_arrow_reader_properties() {
+ static ArrowReaderProperties default_reader_props;
+ return default_reader_props;
+}
- for (size_t i = 0; i < column_indices_.size(); ++i) {
- RETURN_NOT_OK(column_readers_[i]->NextBatch(batch_size_, &columns[i]));
- }
+// ----------------------------------------------------------------------
+// FileReaderImpl forward declaration
- // Create an intermediate table and use TableBatchReader as an adaptor to a
- // RecordBatch
- std::shared_ptr<Table> table = Table::Make(schema_, columns);
- RETURN_NOT_OK(table->Validate());
- ::arrow::TableBatchReader table_batch_reader(*table);
- return table_batch_reader.ReadNext(out);
- }
+namespace {
- private:
- using ColumnReaderPtr = std::unique_ptr<ColumnReader>;
- std::vector<ColumnReaderPtr> column_readers_;
- std::vector<int> row_group_indices_;
- std::vector<int> column_indices_;
- std::shared_ptr<::arrow::Schema> schema_;
- FileReader* file_reader_;
- int64_t batch_size_;
-};
+std::vector<int> Arange(int length) {
+ std::vector<int> result(length);
+ std::iota(result.begin(), result.end(), 0);
+ return result;
+}
-// ----------------------------------------------------------------------
-// FileReaderImpl forward declaration
+} // namespace
class FileReaderImpl : public FileReader {
public:
@@ -193,8 +116,16 @@ class FileReaderImpl : public FileReader {
: pool_(pool), reader_(std::move(reader)), reader_properties_(properties) {}
Status Init() {
- // TODO(wesm): Smarter schema/column-reader initialization for nested data
- return Status::OK();
+ return BuildSchemaManifest(reader_->metadata()->schema(), reader_properties_,
+ &manifest_);
+ }
+
+ std::vector<int> AllRowGroups() {
+ return Arange(reader_->metadata()->num_row_groups());
+ }
+
+ std::vector<int> AllColumnIndices() {
+ return Arange(reader_->metadata()->num_columns());
}
FileColumnIteratorFactory SomeRowGroupsFactory(std::vector<int> row_groups) {
@@ -203,28 +134,36 @@ class FileReaderImpl : public FileReader {
};
}
- std::vector<int> AllRowGroups() {
- std::vector<int> row_groups(reader_->metadata()->num_row_groups());
- std::iota(row_groups.begin(), row_groups.end(), 0);
- return row_groups;
+ FileColumnIteratorFactory AllRowGroupsFactory() {
+ return SomeRowGroupsFactory(AllRowGroups());
}
- std::vector<int> AllColumnIndices() {
- std::vector<int> indices(reader_->metadata()->num_columns());
- std::iota(indices.begin(), indices.end(), 0);
- return indices;
+ Status BoundsCheckColumn(int column) {
+ if (column < 0 || column >= this->num_columns()) {
+ return Status::Invalid("Column index out of bounds (got ", column,
+ ", should be "
+ "between 0 and ",
+ this->num_columns() - 1, ")");
+ }
+ return Status::OK();
}
- FileColumnIteratorFactory AllRowGroupsFactory() {
- return SomeRowGroupsFactory(AllRowGroups());
+ Status BoundsCheckRowGroup(int row_group) {
+ // row group indices check
+ if (row_group < 0 || row_group >= num_row_groups()) {
+ return Status::Invalid("Some index in row_group_indices is ", row_group,
+ ", which is either < 0 or >= num_row_groups(",
+ num_row_groups(), ")");
+ }
+ return Status::OK();
}
int64_t GetTotalRecords(const std::vector<int>& row_groups, int column_chunk = 0) {
// Can throw exception
int64_t records = 0;
- for (int j = 0; j < static_cast<int>(row_groups.size()); j++) {
+ for (auto row_group : row_groups) {
records += reader_->metadata()
- ->RowGroup(row_groups[j])
+ ->RowGroup(row_group)
->ColumnChunk(column_chunk)
->num_values();
}
@@ -233,18 +172,43 @@ class FileReaderImpl : public FileReader {
std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;
- Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices,
- int16_t def_level, FileColumnIteratorFactory iterator_factory,
- std::unique_ptr<ColumnReaderImpl>* out);
-
Status ReadTable(const std::vector<int>& indices,
std::shared_ptr<Table>* out) override {
return ReadRowGroups(AllRowGroups(), indices, out);
}
+ Status GetFieldReader(int i, const std::vector<int>& indices,
+ const std::vector<int>& row_groups,
+ std::unique_ptr<ColumnReaderImpl>* out) {
+ ReaderContext ctx;
+ ctx.reader = reader_.get();
+ ctx.pool = pool_;
+ ctx.iterator_factory = SomeRowGroupsFactory(row_groups);
+ ctx.filter_leaves = true;
+ ctx.included_leaves.insert(indices.begin(), indices.end());
+ return manifest_.schema_fields[i].GetReader(ctx, out);
+ }
+
Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
std::unique_ptr<ColumnReader>* out);
+ Status ReadSchemaField(int i, const std::vector<int>& indices,
+ const std::vector<int>& row_groups,
+ std::shared_ptr<Field>* out_field,
+ std::shared_ptr<ChunkedArray>* out) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ std::unique_ptr<ColumnReaderImpl> reader;
+ RETURN_NOT_OK(GetFieldReader(i, indices, row_groups, &reader));
+
+ *out_field = reader->field();
+
+ // TODO(wesm): This calculation doesn't make much sense when we have repeated
+ // schema nodes
+ int64_t records_to_read = GetTotalRecords(row_groups, i);
+ return reader->NextBatch(records_to_read, out);
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+
Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override {
return GetColumn(i, AllRowGroupsFactory(), out);
}
@@ -254,15 +218,12 @@ class FileReaderImpl : public FileReader {
reader_->metadata()->key_value_metadata(), out);
}
- Status GetSchema(const std::vector<int>& indices,
- std::shared_ptr<::arrow::Schema>* out) override {
- return FromParquetSchema(reader_->metadata()->schema(), indices, reader_properties_,
- reader_->metadata()->key_value_metadata(), out);
- }
-
Status ReadSchemaField(int i, const std::vector<int>& indices,
const std::vector<int>& row_groups,
- std::shared_ptr<ChunkedArray>* out);
+ std::shared_ptr<ChunkedArray>* out) {
+ std::shared_ptr<Field> unused;
+ return ReadSchemaField(i, indices, row_groups, &unused, out);
+ }
Status ReadSchemaField(int i, const std::vector<int>& indices,
std::shared_ptr<ChunkedArray>* out) {
@@ -288,11 +249,7 @@ class FileReaderImpl : public FileReader {
}
Status ReadTable(std::shared_ptr<Table>* table) override {
- std::vector<int> indices(reader_->metadata()->num_columns());
- for (size_t i = 0; i < indices.size(); ++i) {
- indices[i] = static_cast<int>(i);
- }
- return ReadTable(indices, table);
+ return ReadTable(AllColumnIndices(), table);
}
Status ReadRowGroups(const std::vector<int>& row_groups,
@@ -313,54 +270,13 @@ class FileReaderImpl : public FileReader {
return ReadRowGroup(i, AllColumnIndices(), table);
}
- std::vector<int> GetDictionaryIndices(const std::vector<int>& indices) {
- // Select the column indices that were read as DictionaryArray
- std::vector<int> dict_indices(indices);
- auto remove_func = [this](int i) { return !reader_properties_.read_dictionary(i); };
- auto it = std::remove_if(dict_indices.begin(), dict_indices.end(), remove_func);
- dict_indices.erase(it, dict_indices.end());
- return dict_indices;
- }
-
- std::shared_ptr<::arrow::Schema> FixSchema(
- const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices,
- const std::vector<std::shared_ptr<::arrow::ChunkedArray>>& columns) {
- // Fix the schema with the actual DictionaryType that was read
- auto fields = old_schema.fields();
-
- for (int idx : dict_indices) {
- fields[idx] = old_schema.field(idx)->WithType(columns[idx]->type());
- }
- return std::make_shared<::arrow::Schema>(fields, old_schema.metadata());
- }
-
Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
- std::shared_ptr<RecordBatchReader>* out) override {
- return GetRecordBatchReader(row_group_indices, AllColumnIndices(), out);
- }
-
- Status BoundsCheckRowGroup(int row_group) {
- // row group indices check
- if (row_group < 0 || row_group >= num_row_groups()) {
- return Status::Invalid("Some index in row_group_indices is ", row_group,
- ", which is either < 0 or >= num_row_groups(",
- num_row_groups(), ")");
- }
- return Status::OK();
- }
+ const std::vector<int>& column_indices,
+ std::shared_ptr<RecordBatchReader>* out) override;
Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
- const std::vector<int>& column_indices,
std::shared_ptr<RecordBatchReader>* out) override {
- // column indices check
- std::shared_ptr<::arrow::Schema> schema;
- RETURN_NOT_OK(GetSchema(column_indices, &schema));
- for (auto row_group_index : row_group_indices) {
- RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index));
- }
- *out = std::make_shared<RowGroupRecordBatchReader>(row_group_indices, column_indices,
- schema, this, batch_size());
- return Status::OK();
+ return GetRecordBatchReader(row_group_indices, AllColumnIndices(), out);
}
int num_columns() const { return reader_->metadata()->num_columns(); }
@@ -373,8 +289,6 @@ class FileReaderImpl : public FileReader {
reader_properties_.set_use_threads(use_threads);
}
- int64_t batch_size() const { return reader_properties_.batch_size(); }
-
Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
int64_t* num_rows) override {
BEGIN_PARQUET_CATCH_EXCEPTIONS
@@ -383,10 +297,68 @@ class FileReaderImpl : public FileReader {
END_PARQUET_CATCH_EXCEPTIONS
}
- private:
MemoryPool* pool_;
std::unique_ptr<ParquetFileReader> reader_;
ArrowReaderProperties reader_properties_;
+
+ SchemaManifest manifest_;
+};
+
+class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
+ public:
+ RowGroupRecordBatchReader(std::vector<std::unique_ptr<ColumnReaderImpl>> field_readers,
+ std::shared_ptr<::arrow::Schema> schema, int64_t batch_size)
+ : field_readers_(std::move(field_readers)),
+ schema_(schema),
+ batch_size_(batch_size) {}
+
+ ~RowGroupRecordBatchReader() override {}
+
+ std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
+
+ static Status Make(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices, FileReaderImpl* reader,
+ int64_t batch_size,
+ std::shared_ptr<::arrow::RecordBatchReader>* out) {
+ std::vector<int> field_indices;
+ if (!reader->manifest_.GetFieldIndices(column_indices, &field_indices)) {
+ return Status::Invalid("Invalid column index");
+ }
+ std::vector<std::unique_ptr<ColumnReaderImpl>> field_readers(field_indices.size());
+ std::vector<std::shared_ptr<Field>> fields;
+ for (size_t i = 0; i < field_indices.size(); ++i) {
+ RETURN_NOT_OK(reader->GetFieldReader(field_indices[i], column_indices, row_groups,
+ &field_readers[i]));
+ fields.push_back(field_readers[i]->field());
+ }
+ *out = std::make_shared<RowGroupRecordBatchReader>(
+ std::move(field_readers), ::arrow::schema(fields), batch_size);
+ return Status::OK();
+ }
+
+ Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
+ // TODO (hatemhelal): Consider refactoring this to share logic with ReadTable as this
+ // does not currently honor the use_threads option.
+ std::vector<std::shared_ptr<ChunkedArray>> columns(field_readers_.size());
+ for (size_t i = 0; i < field_readers_.size(); ++i) {
+ RETURN_NOT_OK(field_readers_[i]->NextBatch(batch_size_, &columns[i]));
+ if (columns[i]->num_chunks() > 1) {
+ return Status::NotImplemented("This class cannot yet iterate chunked arrays");
+ }
+ }
+
+ // Create an intermediate table and use TableBatchReader as an adaptor to a
+ // RecordBatch
+ std::shared_ptr<Table> table = Table::Make(schema_, columns);
+ RETURN_NOT_OK(table->Validate());
+ ::arrow::TableBatchReader table_batch_reader(*table);
+ return table_batch_reader.ReadNext(out);
+ }
+
+ private:
+ std::vector<std::unique_ptr<ColumnReaderImpl>> field_readers_;
+ std::shared_ptr<::arrow::Schema> schema_;
+ int64_t batch_size_;
};
class ColumnChunkReaderImpl : public ColumnChunkReader {
@@ -428,34 +400,26 @@ class RowGroupReaderImpl : public RowGroupReader {
int row_group_index_;
};
-class ColumnReaderImpl : public ColumnReader {
+// Leaf reader is for primitive arrays and primitive children of nested arrays
+class LeafReader : public ColumnReaderImpl {
public:
- virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0;
- virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0;
- virtual const std::shared_ptr<Field> field() = 0;
-};
-
-// Reader implementation for primitive arrays
-class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReaderImpl {
- public:
- PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input,
- bool read_dictionary)
- : pool_(pool), input_(std::move(input)), descr_(input_->descr()) {
- record_reader_ = RecordReader::Make(descr_, pool_, read_dictionary);
- Status s = NodeToField(*input_->descr()->schema_node(), &field_);
- DCHECK_OK(s);
+ LeafReader(const ReaderContext& ctx, const std::shared_ptr<Field>& field,
+ std::unique_ptr<FileColumnIterator> input)
+ : ctx_(ctx), field_(field), input_(std::move(input)), descr_(input_->descr()) {
+ record_reader_ = RecordReader::Make(descr_, ctx_.pool,
+ field->type()->id() == ::arrow::Type::DICTIONARY);
NextRowGroup();
}
- Status GetDefLevels(const int16_t** data, size_t* length) override {
+ Status GetDefLevels(const int16_t** data, int64_t* length) override {
*data = record_reader_->def_levels();
- *length = record_reader_->levels_written();
+ *length = record_reader_->levels_position();
return Status::OK();
}
- Status GetRepLevels(const int16_t** data, size_t* length) override {
+ Status GetRepLevels(const int16_t** data, int64_t* length) override {
*data = record_reader_->rep_levels();
- *length = record_reader_->levels_written();
+ *length = record_reader_->levels_position();
return Status::OK();
}
@@ -477,17 +441,15 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReaderImpl {
}
}
RETURN_NOT_OK(
- TransferColumnData(record_reader_.get(), field_->type(), descr_, pool_, out));
-
- // Nest nested types, if not nested returns unmodifed
- RETURN_NOT_OK(WrapIntoListArray(out));
+ TransferColumnData(record_reader_.get(), field_->type(), descr_, ctx_.pool, out));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
- Status WrapIntoListArray(std::shared_ptr<ChunkedArray>* inout_array);
-
const std::shared_ptr<Field> field() override { return field_; }
+ const ColumnDescriptor* descr() const override { return descr_; }
+
+ ReaderType type() const override { return PRIMITIVE; }
private:
void NextRowGroup() {
@@ -495,188 +457,111 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReaderImpl {
record_reader_->SetPageReader(std::move(page_reader));
}
- MemoryPool* pool_;
+ ReaderContext ctx_;
+ std::shared_ptr<Field> field_;
std::unique_ptr<FileColumnIterator> input_;
const ColumnDescriptor* descr_;
-
std::shared_ptr<RecordReader> record_reader_;
- std::shared_ptr<Field> field_;
};
-Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<ChunkedArray>* inout_array) {
- if (descr_->max_repetition_level() == 0) {
- // Flat, no action
- return Status::OK();
- }
-
- std::shared_ptr<Array> flat_array;
+class NestedListReader : public ColumnReaderImpl {
+ public:
+ NestedListReader(const ReaderContext& ctx, std::shared_ptr<Field> field,
+ int16_t max_definition_level, int16_t max_repetition_level,
+ std::unique_ptr<ColumnReaderImpl> item_reader)
+ : ctx_(ctx),
+ field_(field),
+ max_definition_level_(max_definition_level),
+ max_repetition_level_(max_repetition_level),
+ item_reader_(std::move(item_reader)) {}
- // ARROW-3762(wesm): If inout_array is a chunked array, we reject as this is
- // not yet implemented
- if ((*inout_array)->num_chunks() > 1) {
- return Status::NotImplemented(
- "Nested data conversions not implemented for "
- "chunked array outputs");
+ Status GetDefLevels(const int16_t** data, int64_t* length) override {
+ return item_reader_->GetDefLevels(data, length);
}
- flat_array = (*inout_array)->chunk(0);
-
- const int16_t* def_levels = record_reader_->def_levels();
- const int16_t* rep_levels = record_reader_->rep_levels();
- const int64_t total_levels_read = record_reader_->levels_position();
- std::shared_ptr<::arrow::Schema> arrow_schema;
- RETURN_NOT_OK(FromParquetSchema(
- input_->schema(), {input_->column_index()}, default_arrow_reader_properties(),
- input_->metadata()->key_value_metadata(), &arrow_schema));
- std::shared_ptr<Field> current_field = arrow_schema->field(0);
-
- if (current_field->type()->num_children() > 0 &&
- flat_array->type_id() == ::arrow::Type::DICTIONARY) {
- // XXX(wesm): Handling of nested types and dictionary encoding needs to be
- // significantly refactored
- return Status::Invalid("Cannot have nested types containing dictionary arrays yet");
+ Status GetRepLevels(const int16_t** data, int64_t* length) override {
+ return item_reader_->GetRepLevels(data, length);
}
- // Walk downwards to extract nullability
- std::vector<bool> nullable;
- std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
- std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
- nullable.push_back(current_field->nullable());
- while (current_field->type()->num_children() > 0) {
- if (current_field->type()->num_children() > 1) {
- return Status::NotImplemented("Fields with more than one child are not supported.");
- } else {
- if (current_field->type()->id() != ::arrow::Type::LIST) {
- return Status::NotImplemented("Currently only nesting with Lists is supported.");
- }
- current_field = current_field->type()->child(0);
- }
- offset_builders.emplace_back(
- std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_));
- valid_bits_builders.emplace_back(
- std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_));
- nullable.push_back(current_field->nullable());
- }
-
- int64_t list_depth = offset_builders.size();
- // This describes the minimal definition that describes a level that
- // reflects a value in the primitive values array.
- int16_t values_def_level = descr_->max_definition_level();
- if (nullable[nullable.size() - 1]) {
- values_def_level--;
- }
-
- // The definition levels that are needed so that a list is declared
- // as empty and not null.
- std::vector<int16_t> empty_def_level(list_depth);
- int def_level = 0;
- for (int i = 0; i < list_depth; i++) {
- if (nullable[i]) {
- def_level++;
- }
- empty_def_level[i] = static_cast<int16_t>(def_level);
- def_level++;
- }
-
- int32_t values_offset = 0;
- std::vector<int64_t> null_counts(list_depth, 0);
- for (int64_t i = 0; i < total_levels_read; i++) {
- int16_t rep_level = rep_levels[i];
- if (rep_level < descr_->max_repetition_level()) {
- for (int64_t j = rep_level; j < list_depth; j++) {
- if (j == (list_depth - 1)) {
- RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
- } else {
- RETURN_NOT_OK(offset_builders[j]->Append(
- static_cast<int32_t>(offset_builders[j + 1]->length())));
- }
-
- if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
- RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
- null_counts[j]++;
- break;
- } else {
- RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
- if (empty_def_level[j] == def_levels[i]) {
- break;
- }
- }
- }
- }
- if (def_levels[i] >= values_def_level) {
- values_offset++;
+ Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override {
+ if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
+ return Status::Invalid("Mix of struct and list types not yet supported");
}
- }
- // Add the final offset to all lists
- for (int64_t j = 0; j < list_depth; j++) {
- if (j == (list_depth - 1)) {
- RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
- } else {
- RETURN_NOT_OK(offset_builders[j]->Append(
- static_cast<int32_t>(offset_builders[j + 1]->length())));
+
+ RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+
+ // ARROW-3762(wesm): If item reader yields a chunked array, we reject as
+ // this is not yet implemented
+ if ((*out)->num_chunks() > 1) {
+ return Status::NotImplemented(
+ "Nested data conversions not implemented for chunked array outputs");
}
- }
- std::vector<std::shared_ptr<Buffer>> offsets;
- std::vector<std::shared_ptr<Buffer>> valid_bits;
- std::vector<int64_t> list_lengths;
- for (int64_t j = 0; j < list_depth; j++) {
- list_lengths.push_back(offset_builders[j]->length() - 1);
- std::shared_ptr<Array> array;
- RETURN_NOT_OK(offset_builders[j]->Finish(&array));
- offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
- RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
- valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
+ const int16_t* def_levels;
+ const int16_t* rep_levels;
+ int64_t num_levels;
+ RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
+ RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
+ std::shared_ptr<Array> result;
+ RETURN_NOT_OK(ReconstructNestedList((*out)->chunk(0), field_, max_definition_level_,
+ max_repetition_level_, def_levels, rep_levels,
+ num_levels, ctx_.pool, &result));
+ *out = std::make_shared<ChunkedArray>(result);
+ return Status::OK();
}
- std::shared_ptr<Array> output = flat_array;
- for (int64_t j = list_depth - 1; j >= 0; j--) {
- auto list_type =
- ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
- output = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
- output, valid_bits[j], null_counts[j]);
- }
- *inout_array = std::make_shared<ChunkedArray>(output);
- return Status::OK();
-}
+ const std::shared_ptr<Field> field() override { return field_; }
+
+ const ColumnDescriptor* descr() const override { return nullptr; }
-// Reader implementation for struct array
+ ReaderType type() const override { return LIST; }
+
+ private:
+ ReaderContext ctx_;
+ std::shared_ptr<Field> field_;
+ int16_t max_definition_level_;
+ int16_t max_repetition_level_;
+ std::unique_ptr<ColumnReaderImpl> item_reader_;
+};
-class PARQUET_NO_EXPORT StructImpl : public ColumnReaderImpl {
+class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
public:
- explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children,
- int16_t struct_def_level, MemoryPool* pool, const Node* node)
- : children_(children), struct_def_level_(struct_def_level), pool_(pool) {
- InitField(node, children);
- }
+ explicit StructReader(const ReaderContext& ctx, const SchemaField& schema_field,
+ std::shared_ptr<Field> filtered_field,
+ std::vector<std::unique_ptr<ColumnReaderImpl>>&& children)
+ : ctx_(ctx),
+ schema_field_(schema_field),
+ filtered_field_(filtered_field),
+ struct_def_level_(schema_field.max_definition_level),
+ children_(std::move(children)) {}
Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
- Status GetDefLevels(const int16_t** data, size_t* length) override;
- Status GetRepLevels(const int16_t** data, size_t* length) override;
- const std::shared_ptr<Field> field() override { return field_; }
+ Status GetDefLevels(const int16_t** data, int64_t* length) override;
+ Status GetRepLevels(const int16_t** data, int64_t* length) override;
+ const std::shared_ptr<Field> field() override { return filtered_field_; }
+ const ColumnDescriptor* descr() const override { return nullptr; }
+ ReaderType type() const override { return STRUCT; }
private:
- std::vector<std::shared_ptr<ColumnReaderImpl>> children_;
+ ReaderContext ctx_;
+ SchemaField schema_field_;
+ std::shared_ptr<Field> filtered_field_;
int16_t struct_def_level_;
- MemoryPool* pool_;
- std::shared_ptr<Field> field_;
+ std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-
Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
- void InitField(const Node* node,
- const std::vector<std::shared_ptr<ColumnReaderImpl>>& children);
};
-Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out,
- int64_t* null_count_out) {
+Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out,
+ int64_t* null_count_out) {
std::shared_ptr<Buffer> null_bitmap;
auto null_count = 0;
const int16_t* def_levels_data;
- size_t def_levels_length;
+ int64_t def_levels_length;
RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
- RETURN_NOT_OK(AllocateEmptyBitmap(pool_, def_levels_length, &null_bitmap));
+ RETURN_NOT_OK(AllocateEmptyBitmap(ctx_.pool, def_levels_length, &null_bitmap));
uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
- for (size_t i = 0; i < def_levels_length; i++) {
+ for (int64_t i = 0; i < def_levels_length; i++) {
if (def_levels_data[i] < struct_def_level_) {
// Mark null
null_count += 1;
@@ -693,7 +578,7 @@ Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out
// TODO(itaiin): Consider caching the results of this calculation -
// note that this is only used once for each read for now
-Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
+Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
*data = nullptr;
if (children_.size() == 0) {
// Empty struct
@@ -703,10 +588,10 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
// We have at least one child
const int16_t* child_def_levels;
- size_t child_length;
+ int64_t child_length;
RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
auto size = child_length * sizeof(int16_t);
- RETURN_NOT_OK(AllocateResizableBuffer(pool_, size, &def_levels_buffer_));
+ RETURN_NOT_OK(AllocateResizableBuffer(ctx_.pool, size, &def_levels_buffer_));
// Initialize with the minimal def level
std::memset(def_levels_buffer_->mutable_data(), -1, size);
auto result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
@@ -717,10 +602,10 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
// the nesting level, and the def level equals max(children def levels)
// All other possibilities are malformed definition data.
for (auto& child : children_) {
- size_t current_child_length;
+ int64_t current_child_length;
RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, ¤t_child_length));
DCHECK_EQ(child_length, current_child_length);
- for (size_t i = 0; i < child_length; i++) {
+ for (int64_t i = 0; i < child_length; i++) {
// Check that value is either uninitialized, or current
// and previous children def levels agree on the struct level
DCHECK((result_levels[i] == -1) || ((result_levels[i] >= struct_def_level_) ==
@@ -730,34 +615,26 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
}
}
*data = reinterpret_cast<const int16_t*>(def_levels_buffer_->data());
- *length = child_length;
+ *length = static_cast<int64_t>(child_length);
return Status::OK();
}
-void StructImpl::InitField(
- const Node* node, const std::vector<std::shared_ptr<ColumnReaderImpl>>& children) {
- // Make a shallow node to field conversion from the children fields
- std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
- for (size_t i = 0; i < children.size(); i++) {
- fields[i] = children[i]->field();
- }
-
- auto type = ::arrow::struct_(fields);
- field_ = ::arrow::field(node->name(), type, node->is_optional());
-}
-
-Status StructImpl::GetRepLevels(const int16_t** data, size_t* length) {
+Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
return Status::NotImplemented("GetRepLevels is not implemented for struct");
}
-Status StructImpl::NextBatch(int64_t records_to_read,
- std::shared_ptr<ChunkedArray>* out) {
+Status StructReader::NextBatch(int64_t records_to_read,
+ std::shared_ptr<ChunkedArray>* out) {
std::vector<std::shared_ptr<Array>> children_arrays;
std::shared_ptr<Buffer> null_bitmap;
int64_t null_count;
// Gather children arrays and def levels
for (auto& child : children_) {
+ if (child->type() == ColumnReaderImpl::LIST) {
+ return Status::Invalid("Mix of struct and list types not yet supported");
+ }
+
std::shared_ptr<ChunkedArray> field;
RETURN_NOT_OK(child->NextBatch(records_to_read, &field));
@@ -787,57 +664,85 @@ Status StructImpl::NextBatch(int64_t records_to_read,
// ----------------------------------------------------------------------
// File reader implementation
-Status FileReaderImpl::GetReaderForNode(int index, const Node* node,
- const std::vector<int>& indices,
- int16_t def_level,
- FileColumnIteratorFactory iterator_factory,
- std::unique_ptr<ColumnReaderImpl>* out) {
- *out = nullptr;
-
- if (schema::IsSimpleStruct(node)) {
- const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node);
- std::vector<std::shared_ptr<ColumnReaderImpl>> children;
- for (int i = 0; i < group->field_count(); i++) {
- std::unique_ptr<ColumnReaderImpl> child_reader;
- // TODO(itaiin): Remove the -1 index hack when all types of nested reads
- // are supported. This currently just signals the lower level reader resolution
- // to abort
- RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices,
- static_cast<int16_t>(def_level + 1),
- iterator_factory, &child_reader));
- if (child_reader != nullptr) {
- children.push_back(std::move(child_reader));
- }
+Status SchemaField::GetReader(const ReaderContext& ctx,
+ std::unique_ptr<ColumnReaderImpl>* out) const {
+ auto type_id = this->field->type()->id();
+ if (this->children.size() == 0) {
+ std::unique_ptr<FileColumnIterator> input(
+ ctx.iterator_factory(this->column_index, ctx.reader));
+ out->reset(new LeafReader(ctx, this->field, std::move(input)));
+ } else if (type_id == ::arrow::Type::LIST) {
+ // We can only read lists-of-lists or structs at the moment
+ auto list_field = this->field;
+ auto child = &this->children[0];
+ while (child->field->type()->id() == ::arrow::Type::LIST) {
+ child = &child->children[0];
}
-
- if (children.size() > 0) {
- *out = std::unique_ptr<ColumnReaderImpl>(
- new StructImpl(children, def_level, pool_, node));
+ if (child->field->type()->id() == ::arrow::Type::STRUCT) {
+ return Status::NotImplemented("Lists of structs not yet supported");
}
- } else {
- // This should be a flat field case - translate the field index to
- // the correct column index by walking down to the leaf node
- const Node* walker = node;
- while (!walker->is_primitive()) {
- DCHECK(walker->is_group());
- auto group = static_cast<const schema::GroupNode*>(walker);
- if (group->field_count() != 1) {
- return Status::NotImplemented("lists with structs are not supported.");
+ if (!ctx.IncludesLeaf(child->column_index)) {
+ *out = nullptr;
+ return Status::OK();
+ }
+ std::unique_ptr<ColumnReaderImpl> child_reader;
+ RETURN_NOT_OK(child->GetReader(ctx, &child_reader));
+ // Use the max definition/repetition level of the leaf here
+ out->reset(new NestedListReader(ctx, list_field, child->max_definition_level,
+ child->max_repetition_level,
+ std::move(child_reader)));
+ } else if (type_id == ::arrow::Type::STRUCT) {
+ std::vector<std::shared_ptr<Field>> child_fields;
+ std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers;
+ for (const auto& child : this->children) {
+ if (child.is_leaf() && !ctx.IncludesLeaf(child.column_index)) {
+ // Excluded leaf
+ continue;
+ }
+ std::unique_ptr<ColumnReaderImpl> child_reader;
+ RETURN_NOT_OK(child.GetReader(ctx, &child_reader));
+ if (!child_reader) {
+ // If all children were pruned, then we do not try to read this field
+ continue;
}
- walker = group->field(0).get();
+ child_fields.push_back(child.field);
+ child_readers.emplace_back(std::move(child_reader));
}
- auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker);
-
- // If the index of the column is found then a reader for the column is needed.
- // Otherwise *out keeps the nullptr value.
- if (std::find(indices.begin(), indices.end(), column_index) != indices.end()) {
- std::unique_ptr<ColumnReader> reader;
- RETURN_NOT_OK(GetColumn(column_index, iterator_factory, &reader));
- *out = std::unique_ptr<ColumnReaderImpl>(
- static_cast<ColumnReaderImpl*>(reader.release()));
+ if (child_fields.size() == 0) {
+ *out = nullptr;
+ return Status::OK();
}
+ auto filtered_field = ::arrow::field(
+ this->field->name(), ::arrow::struct_(child_fields), this->field->nullable());
+ out->reset(new StructReader(ctx, *this, filtered_field, std::move(child_readers)));
+ } else {
+ return Status::Invalid("Unsupported nested type: ", this->field->ToString());
}
+ return Status::OK();
+}
+Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_group_indices,
+ const std::vector<int>& column_indices,
+ std::shared_ptr<RecordBatchReader>* out) {
+ // column indices check
+ for (auto row_group_index : row_group_indices) {
+ RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index));
+ }
+ return RowGroupRecordBatchReader::Make(row_group_indices, column_indices, this,
+ reader_properties_.batch_size(), out);
+}
+
+Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
+ std::unique_ptr<ColumnReader>* out) {
+ RETURN_NOT_OK(BoundsCheckColumn(i));
+ ReaderContext ctx;
+ ctx.reader = reader_.get();
+ ctx.pool = pool_;
+ ctx.iterator_factory = AllRowGroupsFactory();
+ ctx.filter_leaves = false;
+ std::unique_ptr<ColumnReaderImpl> result;
+ RETURN_NOT_OK(manifest_.schema_fields[i].GetReader(ctx, &result));
+ out->reset(result.release());
return Status::OK();
}
@@ -846,21 +751,20 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<Table>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
- std::shared_ptr<::arrow::Schema> schema;
- RETURN_NOT_OK(GetSchema(indices, &schema));
-
// We only need to read schema fields which have columns indicated
// in the indices vector
std::vector<int> field_indices;
- if (!schema::ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices,
- &field_indices)) {
+ if (!manifest_.GetFieldIndices(indices, &field_indices)) {
return Status::Invalid("Invalid column index");
}
+
int num_fields = static_cast<int>(field_indices.size());
+ std::vector<std::shared_ptr<Field>> fields(num_fields);
std::vector<std::shared_ptr<ChunkedArray>> columns(num_fields);
auto ReadColumnFunc = [&](int i) {
- return ReadSchemaField(field_indices[i], indices, row_groups, &columns[i]);
+ return ReadSchemaField(field_indices[i], indices, row_groups, &fields[i],
+ &columns[i]);
};
if (reader_properties_.use_threads()) {
@@ -883,49 +787,9 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
}
}
- auto dict_indices = GetDictionaryIndices(indices);
- if (!dict_indices.empty()) {
- schema = FixSchema(*schema, dict_indices, columns);
- }
- std::shared_ptr<Table> table = Table::Make(schema, columns);
- RETURN_NOT_OK(table->Validate());
- *out = table;
- return Status::OK();
- END_PARQUET_CATCH_EXCEPTIONS
-}
-
-Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
- std::unique_ptr<ColumnReader>* out) {
- if (i < 0 || i >= this->num_columns()) {
- return Status::Invalid("Column index out of bounds (got ", i,
- ", should be "
- "between 0 and ",
- this->num_columns() - 1, ")");
- }
-
- std::unique_ptr<FileColumnIterator> input(iterator_factory(i, reader_.get()));
- *out = std::unique_ptr<ColumnReader>(
- new PrimitiveImpl(pool_, std::move(input), reader_properties_.read_dictionary(i)));
- return Status::OK();
-}
-
-Status FileReaderImpl::ReadSchemaField(int i, const std::vector<int>& indices,
- const std::vector<int>& row_groups,
- std::shared_ptr<ChunkedArray>* out) {
- BEGIN_PARQUET_CATCH_EXCEPTIONS
- auto parquet_schema = reader_->metadata()->schema();
- auto node = parquet_schema->group_node()->field(i).get();
- std::unique_ptr<ColumnReaderImpl> reader_impl;
- RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, SomeRowGroupsFactory(row_groups),
- &reader_impl));
- if (reader_impl == nullptr) {
- *out = nullptr;
- return Status::OK();
- }
- // TODO(wesm): This calculation doesn't make much sense when we have repeated
- // schema nodes
- int64_t records_to_read = GetTotalRecords(row_groups, i);
- return reader_impl->NextBatch(records_to_read, out);
+ auto result_schema = ::arrow::schema(fields, reader_->metadata()->key_value_metadata());
+ *out = Table::Make(result_schema, columns);
+ return (*out)->Validate();
END_PARQUET_CATCH_EXCEPTIONS
}
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 5b2f2c7..e73fcc0 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -28,6 +28,7 @@
namespace arrow {
class ChunkedArray;
+class KeyValueMetadata;
class RecordBatchReader;
class Schema;
class Table;
@@ -39,6 +40,7 @@ namespace parquet {
class FileMetaData;
class ParquetFileReader;
class ReaderProperties;
+class SchemaDescriptor;
namespace arrow {
@@ -171,11 +173,6 @@ class PARQUET_EXPORT FileReader {
/// \brief Return arrow schema for all the columns.
virtual ::arrow::Status GetSchema(std::shared_ptr<::arrow::Schema>* out) = 0;
- /// \brief Return arrow schema by apply selection of column indices.
- /// \returns error status if passed wrong indices.
- virtual ::arrow::Status GetSchema(const std::vector<int>& indices,
- std::shared_ptr<::arrow::Schema>* out) = 0;
-
// Read column as a whole into an Array.
virtual ::arrow::Status ReadColumn(int i,
std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
@@ -311,6 +308,17 @@ PARQUET_EXPORT
const ArrowReaderProperties& properties,
std::unique_ptr<FileReader>* reader);
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(
+ const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties,
+ const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata,
+ std::shared_ptr<::arrow::Schema>* out);
+
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
+ const ArrowReaderProperties& properties,
+ std::shared_ptr<::arrow::Schema>* out);
+
} // namespace arrow
} // namespace parquet
diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc
index de00173..f41e875 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -22,18 +22,24 @@
#include <cstdint>
#include <cstring>
#include <memory>
+#include <string>
#include <type_traits>
+#include <utility>
#include <vector>
+#include <boost/algorithm/string/predicate.hpp>
+
#include "arrow/array.h"
+#include "arrow/builder.h"
#include "arrow/compute/kernel.h"
#include "arrow/table.h"
#include "arrow/type.h"
-#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
#include "arrow/util/int-util.h"
#include "arrow/util/logging.h"
#include "arrow/util/ubsan.h"
+#include "parquet/arrow/reader.h"
#include "parquet/column_reader.h"
#include "parquet/platform.h"
#include "parquet/schema.h"
@@ -55,10 +61,15 @@ using arrow::TimestampArray;
using arrow::compute::Datum;
using ::arrow::BitUtil::FromBigEndian;
+using ::arrow::internal::checked_cast;
using ::arrow::internal::SafeLeftShift;
using ::arrow::util::SafeLoadAs;
using parquet::internal::RecordReader;
+using parquet::schema::GroupNode;
+using parquet::schema::Node;
+using parquet::schema::PrimitiveNode;
+using ParquetType = parquet::Type;
namespace parquet {
namespace arrow {
@@ -67,6 +78,497 @@ template <typename ArrowType>
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
// ----------------------------------------------------------------------
+// Schema logic
+
+static Status MakeArrowDecimal(const LogicalType& logical_type,
+ std::shared_ptr<DataType>* out) {
+ const auto& decimal = checked_cast<const DecimalLogicalType&>(logical_type);
+ *out = ::arrow::decimal(decimal.precision(), decimal.scale());
+ return Status::OK();
+}
+
+static Status MakeArrowInt(const LogicalType& logical_type,
+ std::shared_ptr<DataType>* out) {
+ const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
+ switch (integer.bit_width()) {
+ case 8:
+ *out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
+ break;
+ case 16:
+ *out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
+ break;
+ case 32:
+ *out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
+ break;
+ default:
+ return Status::TypeError(logical_type.ToString(),
+ " can not annotate physical type Int32");
+ }
+ return Status::OK();
+}
+
+static Status MakeArrowInt64(const LogicalType& logical_type,
+ std::shared_ptr<DataType>* out) {
+ const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
+ switch (integer.bit_width()) {
+ case 64:
+ *out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
+ break;
+ default:
+ return Status::TypeError(logical_type.ToString(),
+ " can not annotate physical type Int64");
+ }
+ return Status::OK();
+}
+
+static Status MakeArrowTime32(const LogicalType& logical_type,
+ std::shared_ptr<DataType>* out) {
+ const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
+ switch (time.time_unit()) {
+ case LogicalType::TimeUnit::MILLIS:
+ *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
+ break;
+ default:
+ return Status::TypeError(logical_type.ToString(),
+ " can not annotate physical type Time32");
+ }
+ return Status::OK();
+}
+
+static Status MakeArrowTime64(const LogicalType& logical_type,
+ std::shared_ptr<DataType>* out) {
+ const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
+ switch (time.time_unit()) {
+ case LogicalType::TimeUnit::MICROS:
+ *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
+ break;
+ case LogicalType::TimeUnit::NANOS:
+ *out = ::arrow::time64(::arrow::TimeUnit::NANO);
+ break;
+ default:
+ return Status::TypeError(logical_type.ToString(),
+ " can not annotate physical type Time64");
+ }
+ return Status::OK();
+}
+
+static Status MakeArrowTimestamp(const LogicalType& logical_type,
+ std::shared_ptr<DataType>* out) {
+ const auto& timestamp = checked_cast<const TimestampLogicalType&>(logical_type);
+ const bool utc_normalized =
+ timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc();
+ static const char* utc_timezone = "UTC";
+ switch (timestamp.time_unit()) {
+ case LogicalType::TimeUnit::MILLIS:
+ *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone)
+ : ::arrow::timestamp(::arrow::TimeUnit::MILLI));
+ break;
+ case LogicalType::TimeUnit::MICROS:
+ *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone)
+ : ::arrow::timestamp(::arrow::TimeUnit::MICRO));
+ break;
+ case LogicalType::TimeUnit::NANOS:
+ *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone)
+ : ::arrow::timestamp(::arrow::TimeUnit::NANO));
+ break;
+ default:
+ return Status::TypeError("Unrecognized time unit in timestamp logical_type: ",
+ logical_type.ToString());
+ }
+ return Status::OK();
+}
+
+static Status FromByteArray(const LogicalType& logical_type,
+ std::shared_ptr<DataType>* out) {
+ switch (logical_type.type()) {
+ case LogicalType::Type::STRING:
+ *out = ::arrow::utf8();
+ break;
+ case LogicalType::Type::DECIMAL:
+ RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
+ break;
+ case LogicalType::Type::NONE:
+ case LogicalType::Type::ENUM:
+ case LogicalType::Type::JSON:
+ case LogicalType::Type::BSON:
+ *out = ::arrow::binary();
+ break;
+ default:
+ return Status::NotImplemented("Unhandled logical logical_type ",
+ logical_type.ToString(), " for binary array");
+ }
+ return Status::OK();
+}
+
+static Status FromFLBA(const LogicalType& logical_type, int32_t physical_length,
+ std::shared_ptr<DataType>* out) {
+ switch (logical_type.type()) {
+ case LogicalType::Type::DECIMAL:
+ RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
+ break;
+ case LogicalType::Type::NONE:
+ case LogicalType::Type::INTERVAL:
+ case LogicalType::Type::UUID:
+ *out = ::arrow::fixed_size_binary(physical_length);
+ break;
+ default:
+ return Status::NotImplemented("Unhandled logical logical_type ",
+ logical_type.ToString(),
+ " for fixed-length binary array");
+ }
+
+ return Status::OK();
+}
+
+static Status FromInt32(const LogicalType& logical_type, std::shared_ptr<DataType>* out) {
+ switch (logical_type.type()) {
+ case LogicalType::Type::INT:
+ RETURN_NOT_OK(MakeArrowInt(logical_type, out));
+ break;
+ case LogicalType::Type::DATE:
+ *out = ::arrow::date32();
+ break;
+ case LogicalType::Type::TIME:
+ RETURN_NOT_OK(MakeArrowTime32(logical_type, out));
+ break;
+ case LogicalType::Type::DECIMAL:
+ RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
+ break;
+ case LogicalType::Type::NONE:
+ *out = ::arrow::int32();
+ break;
+ default:
+ return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
+ " for INT32");
+ }
+ return Status::OK();
+}
+
+static Status FromInt64(const LogicalType& logical_type, std::shared_ptr<DataType>* out) {
+ switch (logical_type.type()) {
+ case LogicalType::Type::INT:
+ RETURN_NOT_OK(MakeArrowInt64(logical_type, out));
+ break;
+ case LogicalType::Type::DECIMAL:
+ RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
+ break;
+ case LogicalType::Type::TIMESTAMP:
+ RETURN_NOT_OK(MakeArrowTimestamp(logical_type, out));
+ break;
+ case LogicalType::Type::TIME:
+ RETURN_NOT_OK(MakeArrowTime64(logical_type, out));
+ break;
+ case LogicalType::Type::NONE:
+ *out = ::arrow::int64();
+ break;
+ default:
+ return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
+ " for INT64");
+ }
+ return Status::OK();
+}
+
+Status GetPrimitiveType(const schema::PrimitiveNode& primitive,
+ std::shared_ptr<DataType>* out) {
+ const std::shared_ptr<const LogicalType>& logical_type = primitive.logical_type();
+ if (logical_type->is_invalid() || logical_type->is_null()) {
+ *out = ::arrow::null();
+ return Status::OK();
+ }
+
+ switch (primitive.physical_type()) {
+ case ParquetType::BOOLEAN:
+ *out = ::arrow::boolean();
+ break;
+ case ParquetType::INT32:
+ RETURN_NOT_OK(FromInt32(*logical_type, out));
+ break;
+ case ParquetType::INT64:
+ RETURN_NOT_OK(FromInt64(*logical_type, out));
+ break;
+ case ParquetType::INT96:
+ *out = ::arrow::timestamp(::arrow::TimeUnit::NANO);
+ break;
+ case ParquetType::FLOAT:
+ *out = ::arrow::float32();
+ break;
+ case ParquetType::DOUBLE:
+ *out = ::arrow::float64();
+ break;
+ case ParquetType::BYTE_ARRAY:
+ RETURN_NOT_OK(FromByteArray(*logical_type, out));
+ break;
+ case ParquetType::FIXED_LEN_BYTE_ARRAY:
+ RETURN_NOT_OK(FromFLBA(*logical_type, primitive.type_length(), out));
+ break;
+ default: {
+ // PARQUET-1565: This can occur if the file is corrupt
+ return Status::IOError("Invalid physical column type: ",
+ TypeToString(primitive.physical_type()));
+ }
+ }
+ return Status::OK();
+}
+
+struct SchemaTreeContext {
+ SchemaManifest* manifest;
+ ArrowReaderProperties properties;
+ const SchemaDescriptor* schema;
+
+ void LinkParent(const SchemaField* child, const SchemaField* parent) {
+ manifest->child_to_parent[child] = parent;
+ }
+
+ void RecordLeaf(const SchemaField* leaf) {
+ manifest->column_index_to_field[leaf->column_index] = leaf;
+ }
+};
+
+Status GetTypeForNode(int column_index, const schema::PrimitiveNode& primitive_node,
+ SchemaTreeContext* ctx, std::shared_ptr<DataType>* out) {
+ std::shared_ptr<DataType> storage_type;
+ RETURN_NOT_OK(GetPrimitiveType(primitive_node, &storage_type));
+ if (ctx->properties.read_dictionary(column_index)) {
+ *out = ::arrow::dictionary(::arrow::int32(), storage_type);
+ } else {
+ *out = storage_type;
+ }
+ return Status::OK();
+}
+
+Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level,
+ SchemaTreeContext* ctx, const SchemaField* parent,
+ SchemaField* out);
+
+Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level,
+ int16_t max_rep_level, SchemaTreeContext* ctx,
+ const SchemaField* parent, SchemaField* out);
+
+Status PopulateLeaf(int column_index, const std::shared_ptr<Field>& field,
+ int16_t max_def_level, int16_t max_rep_level, SchemaTreeContext* ctx,
+ const SchemaField* parent, SchemaField* out) {
+ out->field = field;
+ out->column_index = column_index;
+ out->max_definition_level = max_def_level;
+ out->max_repetition_level = max_rep_level;
+ ctx->RecordLeaf(out);
+ ctx->LinkParent(out, parent);
+ return Status::OK();
+}
+
+// Special case mentioned in the format spec:
+// If the name is array or ends in _tuple, this should be a list of struct
+// even for single child elements.
+bool HasStructListName(const GroupNode& node) {
+ return node.name() == "array" || boost::algorithm::ends_with(node.name(), "_tuple");
+}
+
+Status GroupToStruct(const GroupNode& node, int16_t max_def_level, int16_t max_rep_level,
+ SchemaTreeContext* ctx, const SchemaField* parent,
+ SchemaField* out) {
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+ out->children.resize(node.field_count());
+ for (int i = 0; i < node.field_count(); i++) {
+ RETURN_NOT_OK(NodeToSchemaField(*node.field(i), max_def_level, max_rep_level, ctx,
+ out, &out->children[i]));
+ arrow_fields.push_back(out->children[i].field);
+ }
+ auto struct_type = ::arrow::struct_(arrow_fields);
+ out->field = ::arrow::field(node.name(), struct_type, node.is_optional());
+ out->max_definition_level = max_def_level;
+ out->max_repetition_level = max_rep_level;
+ return Status::OK();
+}
+
+Status ListToSchemaField(const GroupNode& group, int16_t max_def_level,
+ int16_t max_rep_level, SchemaTreeContext* ctx,
+ const SchemaField* parent, SchemaField* out) {
+ if (group.field_count() != 1) {
+ return Status::NotImplemented(
+ "Only LIST-annotated groups with a single child can be handled.");
+ }
+
+ out->children.resize(1);
+ SchemaField* child_field = &out->children[0];
+
+ ctx->LinkParent(out, parent);
+ ctx->LinkParent(child_field, out);
+
+ const Node& list_node = *group.field(0);
+
+ if (!list_node.is_repeated()) {
+ return Status::NotImplemented(
+ "Non-repeated nodes in a LIST-annotated group are not supported.");
+ }
+
+ ++max_def_level;
+ ++max_rep_level;
+ if (list_node.is_group()) {
+ // Resolve 3-level encoding
+ //
+ // required/optional group name=whatever {
+ // repeated group name=list {
+ // required/optional TYPE item;
+ // }
+ // }
+ //
+ // yields list<item: TYPE ?nullable> ?nullable
+ //
+ // We distinguish the special base that we have
+ //
+ // required/optional group name=whatever {
+ // repeated group name=array or $SOMETHING_tuple {
+ // required/optional TYPE item;
+ // }
+ // }
+ //
+ // In this latter case, the inner type of the list should be a struct
+ // rather than a primitive value
+ //
+ // yields list<item: struct<item: TYPE ?nullable> not null> ?nullable
+ const auto& list_group = static_cast<const GroupNode&>(list_node);
+ // Special case mentioned in the format spec:
+ // If the name is array or ends in _tuple, this should be a list of struct
+ // even for single child elements.
+ if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
+ // List of primitive type
+ RETURN_NOT_OK(NodeToSchemaField(*list_group.field(0), max_def_level, max_rep_level,
+ ctx, out, child_field));
+ } else {
+ RETURN_NOT_OK(
+ GroupToStruct(list_group, max_def_level, max_rep_level, ctx, out, child_field));
+ }
+ } else {
+ // Two-level list encoding
+ //
+ // required/optional group LIST {
+ // repeated TYPE;
+ // }
+ const auto& primitive_node = static_cast<const PrimitiveNode&>(list_node);
+ int column_index = ctx->schema->GetColumnIndex(primitive_node);
+ std::shared_ptr<DataType> type;
+ RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type));
+ auto item_field = ::arrow::field(list_node.name(), type, /*nullable=*/false);
+ RETURN_NOT_OK(PopulateLeaf(column_index, item_field, max_def_level, max_rep_level,
+ ctx, out, child_field));
+ }
+ out->field = ::arrow::field(group.name(), ::arrow::list(child_field->field),
+ group.is_optional());
+ out->max_definition_level = max_def_level;
+ out->max_repetition_level = max_rep_level;
+ return Status::OK();
+}
+
+Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level,
+ int16_t max_rep_level, SchemaTreeContext* ctx,
+ const SchemaField* parent, SchemaField* out) {
+ if (node.logical_type()->is_list()) {
+ return ListToSchemaField(node, max_def_level, max_rep_level, ctx, parent, out);
+ }
+ std::shared_ptr<DataType> type;
+ if (node.is_repeated()) {
+ // Simple repeated struct
+ //
+ // repeated group $NAME {
+ // r/o TYPE[0] f0
+ // r/o TYPE[1] f1
+ // }
+ out->children.resize(1);
+ RETURN_NOT_OK(
+ GroupToStruct(node, max_def_level, max_rep_level, ctx, out, &out->children[0]));
+ out->field = ::arrow::field(node.name(), ::arrow::list(out->children[0].field),
+ node.is_optional());
+ out->max_definition_level = max_def_level;
+ out->max_repetition_level = max_rep_level;
+ return Status::OK();
+ } else {
+ return GroupToStruct(node, max_def_level, max_rep_level, ctx, parent, out);
+ }
+}
+
+Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level,
+ SchemaTreeContext* ctx, const SchemaField* parent,
+ SchemaField* out) {
+ if (node.is_optional()) {
+ ++max_def_level;
+ } else if (node.is_repeated()) {
+ // Repeated fields add a definition level. This is used to distinguish
+ // between an empty list and a list with an item in it.
+ ++max_rep_level;
+ ++max_def_level;
+ }
+
+ ctx->LinkParent(out, parent);
+
+ // Now, walk the schema and create a ColumnDescriptor for each leaf node
+ if (node.is_group()) {
+ return GroupToSchemaField(static_cast<const GroupNode&>(node), max_def_level,
+ max_rep_level, ctx, parent, out);
+ } else {
+ const auto& primitive_node = static_cast<const PrimitiveNode&>(node);
+ int column_index = ctx->schema->GetColumnIndex(primitive_node);
+ std::shared_ptr<DataType> type;
+ RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type));
+ if (node.is_repeated()) {
+ // One-level list encoding, e.g.
+ // a: repeated int32;
+ out->children.resize(1);
+ auto child_field = ::arrow::field(node.name(), type, /*nullable=*/false);
+ RETURN_NOT_OK(PopulateLeaf(column_index, child_field, max_def_level, max_rep_level,
+ ctx, out, &out->children[0]));
+
+ out->field = ::arrow::field(node.name(), ::arrow::list(child_field),
+ /*nullable=*/false);
+ // Is this right?
+ out->max_definition_level = max_def_level;
+ out->max_repetition_level = max_rep_level;
+ return Status::OK();
+ } else {
+ return PopulateLeaf(column_index,
+ ::arrow::field(node.name(), type, node.is_optional()),
+ max_def_level, max_rep_level, ctx, parent, out);
+ }
+ }
+}
+
+Status BuildSchemaManifest(const SchemaDescriptor* schema,
+ const ArrowReaderProperties& properties,
+ SchemaManifest* manifest) {
+ SchemaTreeContext ctx;
+ ctx.manifest = manifest;
+ ctx.properties = properties;
+ ctx.schema = schema;
+ const GroupNode& schema_node = *schema->group_node();
+ manifest->descr = schema;
+ manifest->schema_fields.resize(schema_node.field_count());
+ for (int i = 0; i < static_cast<int>(schema_node.field_count()); ++i) {
+ RETURN_NOT_OK(NodeToSchemaField(*schema_node.field(i), 0, 0, &ctx,
+ /*parent=*/nullptr, &manifest->schema_fields[i]));
+ }
+ return Status::OK();
+}
+
+Status FromParquetSchema(
+ const SchemaDescriptor* schema, const ArrowReaderProperties& properties,
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
+ std::shared_ptr<::arrow::Schema>* out) {
+ SchemaManifest manifest;
+ RETURN_NOT_OK(BuildSchemaManifest(schema, properties, &manifest));
+ std::vector<std::shared_ptr<Field>> fields(manifest.schema_fields.size());
+ for (int i = 0; i < static_cast<int>(fields.size()); i++) {
+ fields[i] = manifest.schema_fields[i].field;
+ }
+ *out = ::arrow::schema(fields, key_value_metadata);
+ return Status::OK();
+}
+
+Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
+ const ArrowReaderProperties& properties,
+ std::shared_ptr<::arrow::Schema>* out) {
+ return FromParquetSchema(parquet_schema, properties, nullptr, out);
+}
+
+// ----------------------------------------------------------------------
// Primitive types
template <typename ArrowType, typename ParquetType>
@@ -183,11 +685,8 @@ Status TransferDictionary(RecordReader* reader,
auto dict_reader = dynamic_cast<internal::DictionaryRecordReader*>(reader);
DCHECK(dict_reader);
*out = dict_reader->GetResult();
-
- const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(*(*out)->type());
- if (!logical_value_type->Equals(*dict_type.value_type())) {
- *out = CastChunksTo(**out,
- ::arrow::dictionary(dict_type.index_type(), logical_value_type));
+ if (!logical_value_type->Equals(*(*out)->type())) {
+ *out = CastChunksTo(**out, logical_value_type);
}
return Status::OK();
}
@@ -196,7 +695,8 @@ Status TransferBinary(RecordReader* reader,
const std::shared_ptr<DataType>& logical_value_type,
std::shared_ptr<ChunkedArray>* out) {
if (reader->read_dictionary()) {
- return TransferDictionary(reader, logical_value_type, out);
+ return TransferDictionary(
+ reader, ::arrow::dictionary(::arrow::int32(), logical_value_type), out);
}
auto binary_reader = dynamic_cast<internal::BinaryRecordReader*>(reader);
DCHECK(binary_reader);
@@ -519,7 +1019,12 @@ Status TransferColumnData(internal::RecordReader* reader,
const ColumnDescriptor* descr, MemoryPool* pool,
std::shared_ptr<ChunkedArray>* out) {
Datum result;
+ std::shared_ptr<ChunkedArray> chunked_result;
switch (value_type->id()) {
+ case ::arrow::Type::DICTIONARY: {
+ RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result));
+ result = chunked_result;
+ } break;
case ::arrow::Type::NA: {
result = std::make_shared<::arrow::NullArray>(reader->values_written());
break;
@@ -548,9 +1053,8 @@ Status TransferColumnData(internal::RecordReader* reader,
case ::arrow::Type::FIXED_SIZE_BINARY:
case ::arrow::Type::BINARY:
case ::arrow::Type::STRING: {
- std::shared_ptr<ChunkedArray> out;
- RETURN_NOT_OK(TransferBinary(reader, value_type, &out));
- result = out;
+ RETURN_NOT_OK(TransferBinary(reader, value_type, &chunked_result));
+ result = chunked_result;
} break;
case ::arrow::Type::DECIMAL: {
switch (descr->physical_type()) {
@@ -612,5 +1116,114 @@ Status TransferColumnData(internal::RecordReader* reader,
return Status::OK();
}
+Status ReconstructNestedList(const std::shared_ptr<Array>& arr,
+ std::shared_ptr<Field> field, int16_t max_def_level,
+ int16_t max_rep_level, const int16_t* def_levels,
+ const int16_t* rep_levels, int64_t total_levels,
+ ::arrow::MemoryPool* pool, std::shared_ptr<Array>* out) {
+ // Walk downwards to extract nullability
+ std::vector<bool> nullable;
+ std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
+ std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
+ nullable.push_back(field->nullable());
+ while (field->type()->num_children() > 0) {
+ if (field->type()->num_children() > 1) {
+ return Status::NotImplemented("Fields with more than one child are not supported.");
+ } else {
+ if (field->type()->id() != ::arrow::Type::LIST) {
+ return Status::NotImplemented("Currently only nesting with Lists is supported.");
+ }
+ field = field->type()->child(0);
+ }
+ offset_builders.emplace_back(
+ std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool));
+ valid_bits_builders.emplace_back(
+ std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool));
+ nullable.push_back(field->nullable());
+ }
+
+ int64_t list_depth = offset_builders.size();
+ // This describes the minimal definition that describes a level that
+ // reflects a value in the primitive values array.
+ int16_t values_def_level = max_def_level;
+ if (nullable[nullable.size() - 1]) {
+ values_def_level--;
+ }
+
+ // The definition levels that are needed so that a list is declared
+ // as empty and not null.
+ std::vector<int16_t> empty_def_level(list_depth);
+ int def_level = 0;
+ for (int i = 0; i < list_depth; i++) {
+ if (nullable[i]) {
+ def_level++;
+ }
+ empty_def_level[i] = static_cast<int16_t>(def_level);
+ def_level++;
+ }
+
+ int32_t values_offset = 0;
+ std::vector<int64_t> null_counts(list_depth, 0);
+ for (int64_t i = 0; i < total_levels; i++) {
+ int16_t rep_level = rep_levels[i];
+ if (rep_level < max_rep_level) {
+ for (int64_t j = rep_level; j < list_depth; j++) {
+ if (j == (list_depth - 1)) {
+ RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
+ } else {
+ RETURN_NOT_OK(offset_builders[j]->Append(
+ static_cast<int32_t>(offset_builders[j + 1]->length())));
+ }
+
+ if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
+ RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
+ null_counts[j]++;
+ break;
+ } else {
+ RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
+ if (empty_def_level[j] == def_levels[i]) {
+ break;
+ }
+ }
+ }
+ }
+ if (def_levels[i] >= values_def_level) {
+ values_offset++;
+ }
+ }
+ // Add the final offset to all lists
+ for (int64_t j = 0; j < list_depth; j++) {
+ if (j == (list_depth - 1)) {
+ RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
+ } else {
+ RETURN_NOT_OK(offset_builders[j]->Append(
+ static_cast<int32_t>(offset_builders[j + 1]->length())));
+ }
+ }
+
+ std::vector<std::shared_ptr<Buffer>> offsets;
+ std::vector<std::shared_ptr<Buffer>> valid_bits;
+ std::vector<int64_t> list_lengths;
+ for (int64_t j = 0; j < list_depth; j++) {
+ list_lengths.push_back(offset_builders[j]->length() - 1);
+ std::shared_ptr<Array> array;
+ RETURN_NOT_OK(offset_builders[j]->Finish(&array));
+ offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
+ RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
+ valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
+ }
+
+ *out = arr;
+
+ // TODO(wesm): Use passed-in field
+ for (int64_t j = list_depth - 1; j >= 0; j--) {
+ auto list_type =
+ ::arrow::list(::arrow::field("item", (*out)->type(), nullable[j + 1]));
+ *out = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
+ *out, valid_bits[j], null_counts[j]);
+ }
+ return Status::OK();
+}
+
} // namespace arrow
} // namespace parquet
diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h
index 5c0fa5e..cbf44ec 100644
--- a/cpp/src/parquet/arrow/reader_internal.h
+++ b/cpp/src/parquet/arrow/reader_internal.h
@@ -17,17 +17,32 @@
#pragma once
+#include <deque>
#include <memory>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "arrow/status.h"
+
+#include "parquet/column_reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/schema.h"
namespace arrow {
+class Array;
class ChunkedArray;
class DataType;
+class Field;
class MemoryPool;
-class Status;
+class Schema;
} // namespace arrow
+using arrow::Status;
+
namespace parquet {
class ColumnDescriptor;
@@ -40,11 +55,145 @@ class RecordReader;
namespace arrow {
-::arrow::Status TransferColumnData(internal::RecordReader* reader,
- std::shared_ptr<::arrow::DataType> value_type,
- const ColumnDescriptor* descr,
- ::arrow::MemoryPool* pool,
- std::shared_ptr<::arrow::ChunkedArray>* out);
+class ArrowReaderProperties;
+class ColumnReaderImpl;
+
+// ----------------------------------------------------------------------
+// Iteration utilities
+
+// Abstraction to decouple row group iteration details from the ColumnReader,
+// so we can read only a single row group if we want
+class FileColumnIterator {
+ public:
+ explicit FileColumnIterator(int column_index, ParquetFileReader* reader,
+ std::vector<int> row_groups)
+ : column_index_(column_index),
+ reader_(reader),
+ schema_(reader->metadata()->schema()),
+ row_groups_(row_groups.begin(), row_groups.end()) {}
+
+ virtual ~FileColumnIterator() {}
+
+ std::unique_ptr<::parquet::PageReader> NextChunk() {
+ if (row_groups_.empty()) {
+ return nullptr;
+ }
+
+ auto row_group_reader = reader_->RowGroup(row_groups_.front());
+ row_groups_.pop_front();
+ return row_group_reader->GetColumnPageReader(column_index_);
+ }
+
+ const SchemaDescriptor* schema() const { return schema_; }
+
+ const ColumnDescriptor* descr() const { return schema_->Column(column_index_); }
+
+ std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); }
+
+ int column_index() const { return column_index_; }
+
+ protected:
+ int column_index_;
+ ParquetFileReader* reader_;
+ const SchemaDescriptor* schema_;
+ std::deque<int> row_groups_;
+};
+
+using FileColumnIteratorFactory =
+ std::function<FileColumnIterator*(int, ParquetFileReader*)>;
+
+Status TransferColumnData(::parquet::internal::RecordReader* reader,
+ std::shared_ptr<::arrow::DataType> value_type,
+ const ColumnDescriptor* descr, ::arrow::MemoryPool* pool,
+ std::shared_ptr<::arrow::ChunkedArray>* out);
+
+Status ReconstructNestedList(const std::shared_ptr<::arrow::Array>& arr,
+ std::shared_ptr<::arrow::Field> field, int16_t max_def_level,
+ int16_t max_rep_level, const int16_t* def_levels,
+ const int16_t* rep_levels, int64_t total_levels,
+ ::arrow::MemoryPool* pool,
+ std::shared_ptr<::arrow::Array>* out);
+
+struct ReaderContext {
+ ParquetFileReader* reader;
+ ::arrow::MemoryPool* pool;
+ FileColumnIteratorFactory iterator_factory;
+ bool filter_leaves;
+ std::unordered_set<int> included_leaves;
+
+ bool IncludesLeaf(int leaf_index) const {
+ return (!this->filter_leaves ||
+ (included_leaves.find(leaf_index) != included_leaves.end()));
+ }
+};
+
+struct PARQUET_EXPORT SchemaField {
+ std::shared_ptr<::arrow::Field> field;
+ std::vector<SchemaField> children;
+
+ // Only set for leaf nodes
+ int column_index = -1;
+
+ int16_t max_definition_level;
+ int16_t max_repetition_level;
+
+ bool is_leaf() const { return column_index != -1; }
+
+ Status GetReader(const ReaderContext& context,
+ std::unique_ptr<ColumnReaderImpl>* out) const;
+};
+
+struct SchemaManifest {
+ const SchemaDescriptor* descr;
+ std::vector<SchemaField> schema_fields;
+
+ std::unordered_map<int, const SchemaField*> column_index_to_field;
+ std::unordered_map<const SchemaField*, const SchemaField*> child_to_parent;
+
+ Status GetColumnField(int column_index, const SchemaField** out) const {
+ auto it = column_index_to_field.find(column_index);
+ if (it == column_index_to_field.end()) {
+ return Status::KeyError("Column index ", column_index,
+ " not found in schema manifest, may be malformed");
+ }
+ *out = it->second;
+ return Status::OK();
+ }
+
+ const SchemaField* GetParent(const SchemaField* field) const {
+ // Returns nullptr also if not found
+ auto it = child_to_parent.find(field);
+ if (it == child_to_parent.end()) {
+ return nullptr;
+ }
+ return it->second;
+ }
+
+ bool GetFieldIndices(const std::vector<int>& column_indices, std::vector<int>* out) {
+ // Coalesce a list of schema fields indices which are the roots of the
+ // columns referred by a list of column indices
+ const schema::GroupNode* group = descr->group_node();
+ std::unordered_set<int> already_added;
+ out->clear();
+ for (auto& column_idx : column_indices) {
+ auto field_node = descr->GetColumnRoot(column_idx);
+ auto field_idx = group->FieldIndex(*field_node);
+ if (field_idx < 0) {
+ return false;
+ }
+ auto insertion = already_added.insert(field_idx);
+ if (insertion.second) {
+ out->push_back(field_idx);
+ }
+ }
+ return true;
+ }
+};
+
+PARQUET_EXPORT
+Status BuildSchemaManifest(const SchemaDescriptor* schema,
+ const ArrowReaderProperties& properties,
+ SchemaManifest* manifest);
} // namespace arrow
} // namespace parquet
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index 6404338..82c8566 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -53,446 +53,8 @@ namespace parquet {
namespace arrow {
-const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
-const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
-const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
-
-static Status MakeArrowDecimal(const LogicalType& logical_type,
- std::shared_ptr<ArrowType>* out) {
- const auto& decimal = checked_cast<const DecimalLogicalType&>(logical_type);
- *out = ::arrow::decimal(decimal.precision(), decimal.scale());
- return Status::OK();
-}
-
-static Status MakeArrowInt(const LogicalType& logical_type,
- std::shared_ptr<ArrowType>* out) {
- const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
- switch (integer.bit_width()) {
- case 8:
- *out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
- break;
- case 16:
- *out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
- break;
- case 32:
- *out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
- break;
- default:
- return Status::TypeError(logical_type.ToString(),
- " can not annotate physical type Int32");
- }
- return Status::OK();
-}
-
-static Status MakeArrowInt64(const LogicalType& logical_type,
- std::shared_ptr<ArrowType>* out) {
- const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
- switch (integer.bit_width()) {
- case 64:
- *out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
- break;
- default:
- return Status::TypeError(logical_type.ToString(),
- " can not annotate physical type Int64");
- }
- return Status::OK();
-}
-
-static Status MakeArrowTime32(const LogicalType& logical_type,
- std::shared_ptr<ArrowType>* out) {
- const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
- switch (time.time_unit()) {
- case LogicalType::TimeUnit::MILLIS:
- *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
- break;
- default:
- return Status::TypeError(logical_type.ToString(),
- " can not annotate physical type Time32");
- }
- return Status::OK();
-}
-
-static Status MakeArrowTime64(const LogicalType& logical_type,
- std::shared_ptr<ArrowType>* out) {
- const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
- switch (time.time_unit()) {
- case LogicalType::TimeUnit::MICROS:
- *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
- break;
- case LogicalType::TimeUnit::NANOS:
- *out = ::arrow::time64(::arrow::TimeUnit::NANO);
- break;
- default:
- return Status::TypeError(logical_type.ToString(),
- " can not annotate physical type Time64");
- }
- return Status::OK();
-}
-
-static Status MakeArrowTimestamp(const LogicalType& logical_type,
- std::shared_ptr<ArrowType>* out) {
- const auto& timestamp = checked_cast<const TimestampLogicalType&>(logical_type);
- const bool utc_normalized =
- timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc();
- static const char* utc_timezone = "UTC";
- switch (timestamp.time_unit()) {
- case LogicalType::TimeUnit::MILLIS:
- *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone)
- : ::arrow::timestamp(::arrow::TimeUnit::MILLI));
- break;
- case LogicalType::TimeUnit::MICROS:
- *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone)
- : ::arrow::timestamp(::arrow::TimeUnit::MICRO));
- break;
- case LogicalType::TimeUnit::NANOS:
- *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone)
- : ::arrow::timestamp(::arrow::TimeUnit::NANO));
- break;
- default:
- return Status::TypeError("Unrecognized time unit in timestamp logical_type: ",
- logical_type.ToString());
- }
- return Status::OK();
-}
-
-static Status FromByteArray(const LogicalType& logical_type,
- std::shared_ptr<ArrowType>* out) {
- switch (logical_type.type()) {
- case LogicalType::Type::STRING:
- *out = ::arrow::utf8();
- break;
- case LogicalType::Type::DECIMAL:
- RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
- break;
- case LogicalType::Type::NONE:
- case LogicalType::Type::ENUM:
- case LogicalType::Type::JSON:
- case LogicalType::Type::BSON:
- *out = ::arrow::binary();
- break;
- default:
- return Status::NotImplemented("Unhandled logical logical_type ",
- logical_type.ToString(), " for binary array");
- }
- return Status::OK();
-}
-
-static Status FromFLBA(const LogicalType& logical_type, int32_t physical_length,
- std::shared_ptr<ArrowType>* out) {
- switch (logical_type.type()) {
- case LogicalType::Type::DECIMAL:
- RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
- break;
- case LogicalType::Type::NONE:
- case LogicalType::Type::INTERVAL:
- case LogicalType::Type::UUID:
- *out = ::arrow::fixed_size_binary(physical_length);
- break;
- default:
- return Status::NotImplemented("Unhandled logical logical_type ",
- logical_type.ToString(),
- " for fixed-length binary array");
- }
-
- return Status::OK();
-}
-
-static Status FromInt32(const LogicalType& logical_type,
- std::shared_ptr<ArrowType>* out) {
- switch (logical_type.type()) {
- case LogicalType::Type::INT:
- RETURN_NOT_OK(MakeArrowInt(logical_type, out));
- break;
- case LogicalType::Type::DATE:
- *out = ::arrow::date32();
- break;
- case LogicalType::Type::TIME:
- RETURN_NOT_OK(MakeArrowTime32(logical_type, out));
- break;
- case LogicalType::Type::DECIMAL:
- RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
- break;
- case LogicalType::Type::NONE:
- *out = ::arrow::int32();
- break;
- default:
- return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
- " for INT32");
- }
- return Status::OK();
-}
-
-static Status FromInt64(const LogicalType& logical_type,
- std::shared_ptr<ArrowType>* out) {
- switch (logical_type.type()) {
- case LogicalType::Type::INT:
- RETURN_NOT_OK(MakeArrowInt64(logical_type, out));
- break;
- case LogicalType::Type::DECIMAL:
- RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
- break;
- case LogicalType::Type::TIMESTAMP:
- RETURN_NOT_OK(MakeArrowTimestamp(logical_type, out));
- break;
- case LogicalType::Type::TIME:
- RETURN_NOT_OK(MakeArrowTime64(logical_type, out));
- break;
- case LogicalType::Type::NONE:
- *out = ::arrow::int64();
- break;
- default:
- return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
- " for INT64");
- }
- return Status::OK();
-}
-
-Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr<ArrowType>* out) {
- const std::shared_ptr<const LogicalType>& logical_type = primitive.logical_type();
- if (logical_type->is_invalid() || logical_type->is_null()) {
- *out = ::arrow::null();
- return Status::OK();
- }
-
- switch (primitive.physical_type()) {
- case ParquetType::BOOLEAN:
- *out = ::arrow::boolean();
- break;
- case ParquetType::INT32:
- RETURN_NOT_OK(FromInt32(*logical_type, out));
- break;
- case ParquetType::INT64:
- RETURN_NOT_OK(FromInt64(*logical_type, out));
- break;
- case ParquetType::INT96:
- *out = TIMESTAMP_NS;
- break;
- case ParquetType::FLOAT:
- *out = ::arrow::float32();
- break;
- case ParquetType::DOUBLE:
- *out = ::arrow::float64();
- break;
- case ParquetType::BYTE_ARRAY:
- RETURN_NOT_OK(FromByteArray(*logical_type, out));
- break;
- case ParquetType::FIXED_LEN_BYTE_ARRAY:
- RETURN_NOT_OK(FromFLBA(*logical_type, primitive.type_length(), out));
- break;
- default: {
- // PARQUET-1565: This can occur if the file is corrupt
- return Status::IOError("Invalid physical column type: ",
- TypeToString(primitive.physical_type()));
- }
- }
- return Status::OK();
-}
-
-// Forward declaration
-Status NodeToFieldInternal(const Node& node,
- const std::unordered_set<const Node*>* included_leaf_nodes,
- std::shared_ptr<Field>* out);
-
-/*
- * Auxilary function to test if a parquet schema node is a leaf node
- * that should be included in a resulting arrow schema
- */
-inline bool IsIncludedLeaf(const Node& node,
- const std::unordered_set<const Node*>* included_leaf_nodes) {
- if (included_leaf_nodes == nullptr) {
- return true;
- }
- auto search = included_leaf_nodes->find(&node);
- return (search != included_leaf_nodes->end());
-}
-
-Status StructFromGroup(const GroupNode& group,
- const std::unordered_set<const Node*>* included_leaf_nodes,
- std::shared_ptr<ArrowType>* out) {
- std::vector<std::shared_ptr<Field>> fields;
- std::shared_ptr<Field> field;
-
- *out = nullptr;
-
- for (int i = 0; i < group.field_count(); i++) {
- RETURN_NOT_OK(NodeToFieldInternal(*group.field(i), included_leaf_nodes, &field));
- if (field != nullptr) {
- fields.push_back(field);
- }
- }
- if (fields.size() > 0) {
- *out = std::make_shared<::arrow::StructType>(fields);
- }
- return Status::OK();
-}
-
-Status NodeToList(const GroupNode& group,
- const std::unordered_set<const Node*>* included_leaf_nodes,
- std::shared_ptr<ArrowType>* out) {
- *out = nullptr;
- if (group.field_count() == 1) {
- // This attempts to resolve the preferred 3-level list encoding.
- const Node& list_node = *group.field(0);
- if (list_node.is_group() && list_node.is_repeated()) {
- const auto& list_group = static_cast<const GroupNode&>(list_node);
- // Special case mentioned in the format spec:
- // If the name is array or ends in _tuple, this should be a list of struct
- // even for single child elements.
- if (list_group.field_count() == 1 && !schema::HasStructListName(list_group)) {
- // List of primitive type
- std::shared_ptr<Field> item_field;
- RETURN_NOT_OK(
- NodeToFieldInternal(*list_group.field(0), included_leaf_nodes, &item_field));
-
- if (item_field != nullptr) {
- *out = ::arrow::list(item_field);
- }
- } else {
- // List of struct
- std::shared_ptr<ArrowType> inner_type;
- RETURN_NOT_OK(StructFromGroup(list_group, included_leaf_nodes, &inner_type));
- if (inner_type != nullptr) {
- auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
- *out = ::arrow::list(item_field);
- }
- }
- } else if (list_node.is_repeated()) {
- // repeated primitive node
- std::shared_ptr<ArrowType> inner_type;
- if (IsIncludedLeaf(static_cast<const Node&>(list_node), included_leaf_nodes)) {
- RETURN_NOT_OK(
- FromPrimitive(static_cast<const PrimitiveNode&>(list_node), &inner_type));
- auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
- *out = ::arrow::list(item_field);
- }
- } else {
- return Status::NotImplemented(
- "Non-repeated groups in a LIST-annotated group are not supported.");
- }
- } else {
- return Status::NotImplemented(
- "Only LIST-annotated groups with a single child can be handled.");
- }
- return Status::OK();
-}
-
-Status NodeToField(const Node& node, std::shared_ptr<Field>* out) {
- return NodeToFieldInternal(node, nullptr, out);
-}
-
-Status NodeToFieldInternal(const Node& node,
- const std::unordered_set<const Node*>* included_leaf_nodes,
- std::shared_ptr<Field>* out) {
- std::shared_ptr<ArrowType> type = nullptr;
- bool nullable = !node.is_required();
-
- *out = nullptr;
-
- if (node.is_repeated()) {
- // 1-level LIST encoding fields are required
- std::shared_ptr<ArrowType> inner_type;
- if (node.is_group()) {
- RETURN_NOT_OK(StructFromGroup(static_cast<const GroupNode&>(node),
- included_leaf_nodes, &inner_type));
- } else if (IsIncludedLeaf(node, included_leaf_nodes)) {
- RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &inner_type));
- }
- if (inner_type != nullptr) {
- auto item_field = std::make_shared<Field>(node.name(), inner_type, false);
- type = ::arrow::list(item_field);
- nullable = false;
- }
- } else if (node.is_group()) {
- const auto& group = static_cast<const GroupNode&>(node);
- if (node.logical_type()->is_list()) {
- RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type));
- } else {
- RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type));
- }
- } else {
- // Primitive (leaf) node
- if (IsIncludedLeaf(node, included_leaf_nodes)) {
- RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &type));
- }
- }
- if (type != nullptr) {
- *out = std::make_shared<Field>(node.name(), type, nullable);
- }
- return Status::OK();
-}
-
-std::shared_ptr<Field> ToDictionary32(const Field& field) {
- auto new_ty = ::arrow::dictionary(::arrow::int32(), field.type());
- return field.WithType(new_ty);
-}
-
-Status FromParquetSchema(
- const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
- std::shared_ptr<::arrow::Schema>* out) {
- const GroupNode& schema_node = *parquet_schema->group_node();
-
- int num_fields = static_cast<int>(schema_node.field_count());
- std::vector<std::shared_ptr<Field>> fields(num_fields);
- for (int i = 0; i < num_fields; i++) {
- RETURN_NOT_OK(NodeToField(*schema_node.field(i), &fields[i]));
- }
- *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
- return Status::OK();
-}
-
-Status FromParquetSchema(
- const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
- const ArrowReaderProperties& properties,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
- std::shared_ptr<::arrow::Schema>* out) {
- // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
- // from the root Parquet node
-
- // Put the right leaf nodes in an unordered set
- // Index in column_indices should be unique, duplicate indices are merged into one and
- // ordering by its first appearing.
- int num_columns = static_cast<int>(column_indices.size());
- std::unordered_set<const Node*> top_nodes; // to deduplicate the top nodes
- std::vector<const Node*> base_nodes; // to keep the ordering
- std::unordered_set<const Node*> included_leaf_nodes(num_columns);
- for (int i = 0; i < num_columns; i++) {
- const ColumnDescriptor* column_desc = parquet_schema->Column(column_indices[i]);
- const Node* node = column_desc->schema_node().get();
-
- included_leaf_nodes.insert(node);
- const Node* column_root = parquet_schema->GetColumnRoot(column_indices[i]);
- auto it = top_nodes.insert(column_root);
- if (it.second) {
- base_nodes.push_back(column_root);
- }
- }
-
- std::vector<std::shared_ptr<Field>> fields;
- std::shared_ptr<Field> field;
- for (auto node : base_nodes) {
- RETURN_NOT_OK(NodeToFieldInternal(*node, &included_leaf_nodes, &field));
- if (field != nullptr) {
- fields.push_back(field);
- }
- }
-
- *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
- return Status::OK();
-}
-
-Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
- const std::vector<int>& column_indices,
- const ArrowReaderProperties& properties,
- std::shared_ptr<::arrow::Schema>* out) {
- return FromParquetSchema(parquet_schema, column_indices, properties, nullptr, out);
-}
-
-Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
- const ArrowReaderProperties& properties,
- std::shared_ptr<::arrow::Schema>* out) {
- return FromParquetSchema(parquet_schema, properties, nullptr, out);
-}
+// ----------------------------------------------------------------------
+// Parquet to Arrow schema conversion
Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name,
bool nullable, const WriterProperties& properties,
diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h
index 1133431..b3cc66b 100644
--- a/cpp/src/parquet/arrow/schema.h
+++ b/cpp/src/parquet/arrow/schema.h
@@ -39,50 +39,9 @@ class WriterProperties;
namespace arrow {
-class ArrowReaderProperties;
class ArrowWriterProperties;
PARQUET_EXPORT
-::arrow::Status NodeToField(const schema::Node& node,
- std::shared_ptr<::arrow::Field>* out);
-
-/// Convert parquet schema to arrow schema with selected indices
-/// \param parquet_schema to be converted
-/// \param column_indices indices of leaf nodes in parquet schema tree. Appearing ordering
-/// matters for the converted schema. Repeated indices are ignored
-/// except for the first one
-/// \param properties reader options for FileReader
-/// \param key_value_metadata optional metadata, can be nullptr
-/// \param out the corresponding arrow schema
-/// \return Status::OK() on a successful conversion.
-PARQUET_EXPORT
-::arrow::Status FromParquetSchema(
- const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
- const ArrowReaderProperties& properties,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
- std::shared_ptr<::arrow::Schema>* out);
-
-// Without indices
-PARQUET_EXPORT
-::arrow::Status FromParquetSchema(
- const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
- std::shared_ptr<::arrow::Schema>* out);
-
-// Without metadata
-PARQUET_EXPORT
-::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
- const std::vector<int>& column_indices,
- const ArrowReaderProperties& properties,
- std::shared_ptr<::arrow::Schema>* out);
-
-// Without metadata or indices
-PARQUET_EXPORT
-::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
- const ArrowReaderProperties& properties,
- std::shared_ptr<::arrow::Schema>* out);
-
-PARQUET_EXPORT
::arrow::Status FieldToNode(const std::shared_ptr<::arrow::Field>& field,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties,
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index bb2bab1..ee3b880 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -19,6 +19,7 @@
#include <algorithm>
#include <cstddef>
+#include <deque>
#include <type_traits>
#include <utility>
#include <vector>
@@ -33,6 +34,7 @@
#include "arrow/util/logging.h"
#include "parquet/arrow/reader.h"
+#include "parquet/arrow/reader_internal.h"
#include "parquet/arrow/schema.h"
#include "parquet/column_writer.h"
#include "parquet/exception.h"
@@ -78,7 +80,12 @@ namespace {
class LevelBuilder {
public:
- explicit LevelBuilder(MemoryPool* pool) : def_levels_(pool), rep_levels_(pool) {}
+ explicit LevelBuilder(MemoryPool* pool, const SchemaField* schema_field,
+ const SchemaManifest* schema_manifest)
+ : def_levels_(pool),
+ rep_levels_(pool),
+ schema_field_(schema_field),
+ schema_manifest_(schema_manifest) {}
Status VisitInline(const Array& array);
@@ -121,8 +128,23 @@ class LevelBuilder {
#undef NOT_IMPLEMENTED_VISIT
- Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
- int64_t* values_offset, int64_t* num_values, int64_t* num_levels,
+ Status ExtractNullability() {
+ // Walk upwards to extract nullability
+ const SchemaField* current_field = schema_field_;
+ while (current_field != nullptr) {
+ nullable_.push_front(current_field->field->nullable());
+ if (current_field->field->type()->num_children() > 1) {
+ return Status::NotImplemented(
+ "Fields with more than one child are not supported.");
+ } else {
+ current_field = schema_manifest_->GetParent(current_field);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status GenerateLevels(const Array& array, int64_t* values_offset, int64_t* num_values,
+ int64_t* num_levels,
const std::shared_ptr<ResizableBuffer>& def_levels_scratch,
std::shared_ptr<Buffer>* def_levels_out,
std::shared_ptr<Buffer>* rep_levels_out,
@@ -135,18 +157,7 @@ class LevelBuilder {
*values_offset = min_offset_idx_;
*values_array = values_array_;
- // Walk downwards to extract nullability
- std::shared_ptr<Field> current_field = field;
- nullable_.push_back(current_field->nullable());
- while (current_field->type()->num_children() > 0) {
- if (current_field->type()->num_children() > 1) {
- return Status::NotImplemented(
- "Fields with more than one child are not supported.");
- } else {
- current_field = current_field->type()->child(0);
- }
- nullable_.push_back(current_field->nullable());
- }
+ RETURN_NOT_OK(ExtractNullability());
// Generate the levels.
if (nullable_.size() == 1) {
@@ -264,11 +275,14 @@ class LevelBuilder {
Int16BufferBuilder def_levels_;
Int16BufferBuilder rep_levels_;
+ const SchemaField* schema_field_;
+ const SchemaManifest* schema_manifest_;
+
std::vector<int64_t> null_counts_;
std::vector<const uint8_t*> valid_bitmaps_;
std::vector<const int32_t*> offsets_;
std::vector<int32_t> array_offsets_;
- std::vector<bool> nullable_;
+ std::deque<bool> nullable_;
int64_t min_offset_idx_;
int64_t max_offset_idx_;
@@ -319,8 +333,12 @@ Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type
class ArrowColumnWriter {
public:
ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer,
- const std::shared_ptr<Field>& field)
- : ctx_(ctx), writer_(column_writer), field_(field) {}
+ const SchemaField* schema_field,
+ const SchemaManifest* schema_manifest)
+ : ctx_(ctx),
+ writer_(column_writer),
+ schema_field_(schema_field),
+ schema_manifest_(schema_manifest) {}
Status Write(const Array& data);
@@ -430,7 +448,8 @@ class ArrowColumnWriter {
ColumnWriterContext* ctx_;
ColumnWriter* writer_;
- std::shared_ptr<Field> field_;
+ const SchemaField* schema_field_;
+ const SchemaManifest* schema_manifest_;
};
template <typename ParquetType, typename ArrowType>
@@ -945,11 +964,10 @@ Status ArrowColumnWriter::Write(const Array& data) {
int64_t values_offset = 0;
int64_t num_levels = 0;
int64_t num_values = 0;
- LevelBuilder level_builder(ctx_->memory_pool);
-
+ LevelBuilder level_builder(ctx_->memory_pool, schema_field_, schema_manifest_);
std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
RETURN_NOT_OK(level_builder.GenerateLevels(
- data, field_, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
+ data, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
&def_levels_buffer, &rep_levels_buffer, &_values_array));
const int16_t* def_levels = nullptr;
if (def_levels_buffer) {
@@ -1021,6 +1039,11 @@ class FileWriter::Impl {
arrow_properties_(arrow_properties),
closed_(false) {}
+ Status Init() {
+ return BuildSchemaManifest(writer_->schema(), default_arrow_reader_properties(),
+ &schema_manifest_);
+ }
+
Status NewRowGroup(int64_t chunk_size) {
if (row_group_writer_ != nullptr) {
PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
@@ -1075,17 +1098,11 @@ class FileWriter::Impl {
ColumnWriter* column_writer;
PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
- // TODO(wesm): This trick to construct a schema for one Parquet root node
- // will not work for arbitrary nested data
- int current_column_idx = row_group_writer_->current_column();
- std::shared_ptr<::arrow::Schema> arrow_schema;
- RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
- default_arrow_reader_properties(),
- writer_->key_value_metadata(), &arrow_schema));
-
- ArrowColumnWriter arrow_writer(&column_write_context_, column_writer,
- arrow_schema->field(0));
-
+ const SchemaField* schema_field;
+ RETURN_NOT_OK(schema_manifest_.GetColumnField(row_group_writer_->current_column(),
+ &schema_field));
+ ArrowColumnWriter arrow_writer(&column_write_context_, column_writer, schema_field,
+ &schema_manifest_);
RETURN_NOT_OK(arrow_writer.Write(*data, offset, size));
return arrow_writer.Close();
}
@@ -1101,6 +1118,8 @@ class FileWriter::Impl {
private:
friend class FileWriter;
+ SchemaManifest schema_manifest_;
+
std::unique_ptr<ParquetFileWriter> writer_;
RowGroupWriter* row_group_writer_;
ColumnWriterContext column_write_context_;
@@ -1135,6 +1154,15 @@ const std::shared_ptr<FileMetaData> FileWriter::metadata() const {
FileWriter::~FileWriter() {}
+Status FileWriter::Make(::arrow::MemoryPool* pool,
+ std::unique_ptr<ParquetFileWriter> writer,
+ const std::shared_ptr<::arrow::Schema>& schema,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+ std::unique_ptr<FileWriter>* out) {
+ out->reset(new FileWriter(pool, std::move(writer), schema, arrow_properties));
+ return (*out)->impl_->Init();
+}
+
FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
const std::shared_ptr<::arrow::Schema>& schema,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
@@ -1163,9 +1191,7 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool
ParquetFileWriter::Open(sink, schema_node, properties, schema.metadata());
auto schema_ptr = std::make_shared<::arrow::Schema>(schema);
- writer->reset(
- new FileWriter(pool, std::move(base_writer), schema_ptr, arrow_properties));
- return Status::OK();
+ return Make(pool, std::move(base_writer), schema_ptr, arrow_properties, writer);
}
Status WriteFileMetaData(const FileMetaData& file_metadata,
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 2b9d892..8f7d499 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -128,10 +128,11 @@ std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_prope
*/
class PARQUET_EXPORT FileWriter {
public:
- FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<::arrow::Schema>& schema,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
- default_arrow_writer_properties());
+ static ::arrow::Status Make(
+ ::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+ const std::shared_ptr<::arrow::Schema>& schema,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+ std::unique_ptr<FileWriter>* out);
static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink,
@@ -164,6 +165,10 @@ class PARQUET_EXPORT FileWriter {
const std::shared_ptr<FileMetaData> metadata() const;
private:
+ FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+ const std::shared_ptr<::arrow::Schema>& schema,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
+
class PARQUET_NO_EXPORT Impl;
std::unique_ptr<Impl> impl_;
std::shared_ptr<::arrow::Schema> schema_;
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index a741cfa..22c75fa 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -87,7 +87,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
properties_(properties),
total_bytes_written_(0),
closed_(false),
- current_column_index_(0),
+ next_column_index_(0),
num_rows_(0),
buffered_row_group_(buffered_row_group) {
if (buffered_row_group) {
@@ -122,7 +122,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
total_bytes_written_ += column_writers_[0]->Close();
}
- ++current_column_index_;
+ ++next_column_index_;
const ColumnDescriptor* column_descr = col_meta->descr();
std::unique_ptr<PageWriter> pager =
@@ -192,7 +192,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
const WriterProperties* properties_;
int64_t total_bytes_written_;
bool closed_;
- int current_column_index_;
+ int next_column_index_;
mutable int64_t num_rows_;
bool buffered_row_group_;
@@ -203,7 +203,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
if (num_rows_ == 0) {
num_rows_ = current_col_rows;
} else if (num_rows_ != current_col_rows) {
- ThrowRowsMisMatchError(current_column_index_, current_col_rows, num_rows_);
+ ThrowRowsMisMatchError(next_column_index_, current_col_rows, num_rows_);
}
} else if (buffered_row_group_ &&
column_writers_.size() > 0) { // when buffered_row_group = true
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index bee818d..5410dc8 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -778,32 +778,32 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
public:
explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
const SchemaDescriptor* schema, void* contents)
- : properties_(props), schema_(schema), current_column_(0) {
+ : properties_(props), schema_(schema), next_column_(0) {
row_group_ = reinterpret_cast<format::RowGroup*>(contents);
InitializeColumns(schema->num_columns());
}
ColumnChunkMetaDataBuilder* NextColumnChunk() {
- if (!(current_column_ < num_columns())) {
+ if (!(next_column_ < num_columns())) {
std::stringstream ss;
ss << "The schema only has " << num_columns()
- << " columns, requested metadata for column: " << current_column_;
+ << " columns, requested metadata for column: " << next_column_;
throw ParquetException(ss.str());
}
- auto column = schema_->Column(current_column_);
+ auto column = schema_->Column(next_column_);
auto column_builder = ColumnChunkMetaDataBuilder::Make(
- properties_, column, &row_group_->columns[current_column_++]);
+ properties_, column, &row_group_->columns[next_column_++]);
auto column_builder_ptr = column_builder.get();
column_builders_.push_back(std::move(column_builder));
return column_builder_ptr;
}
- int current_column() { return current_column_; }
+ int current_column() { return next_column_ - 1; }
void Finish(int64_t total_bytes_written) {
- if (!(current_column_ == schema_->num_columns())) {
+ if (!(next_column_ == schema_->num_columns())) {
std::stringstream ss;
- ss << "Only " << current_column_ - 1 << " out of " << schema_->num_columns()
+ ss << "Only " << next_column_ - 1 << " out of " << schema_->num_columns()
<< " columns are initialized";
throw ParquetException(ss.str());
}
@@ -836,7 +836,7 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
const std::shared_ptr<WriterProperties> properties_;
const SchemaDescriptor* schema_;
std::vector<std::unique_ptr<ColumnChunkMetaDataBuilder>> column_builders_;
- int current_column_;
+ int next_column_;
};
std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
diff --git a/cpp/src/parquet/schema-internal.h b/cpp/src/parquet/schema-internal.h
index ad5ae44..9bcdc5d 100644
--- a/cpp/src/parquet/schema-internal.h
+++ b/cpp/src/parquet/schema-internal.h
@@ -39,57 +39,6 @@ class SchemaElement;
namespace schema {
-inline bool str_endswith_tuple(const std::string& str) {
- if (str.size() >= 6) {
- return str.substr(str.size() - 6, 6) == "_tuple";
- }
- return false;
-}
-
-// Special case mentioned in the format spec:
-// If the name is array or ends in _tuple, this should be a list of struct
-// even for single child elements.
-inline bool HasStructListName(const GroupNode& node) {
- return (node.name() == "array" || str_endswith_tuple(node.name()));
-}
-
-// TODO(itaiin): This aux. function is to be deleted once repeated structs are supported
-inline bool IsSimpleStruct(const Node* node) {
- if (!node->is_group()) return false;
- if (node->is_repeated()) return false;
- if (node->converted_type() == ConvertedType::LIST) return false;
- // Special case mentioned in the format spec:
- // If the name is array or ends in _tuple, this should be a list of struct
- // even for single child elements.
- auto group = static_cast<const GroupNode*>(node);
- if (group->field_count() == 1 && HasStructListName(*group)) return false;
-
- return true;
-}
-
-// Coalesce a list of schema fields indices which are the roots of the
-// columns referred by a list of column indices
-inline bool ColumnIndicesToFieldIndices(const SchemaDescriptor& descr,
- const std::vector<int>& column_indices,
- std::vector<int>* out) {
- const GroupNode* group = descr.group_node();
- std::unordered_set<int> already_added;
- out->clear();
- for (auto& column_idx : column_indices) {
- auto field_node = descr.GetColumnRoot(column_idx);
- auto field_idx = group->FieldIndex(*field_node);
- if (field_idx < 0) {
- return false;
- }
- auto insertion = already_added.insert(field_idx);
- if (insertion.second) {
- out->push_back(field_idx);
- }
- }
-
- return true;
-}
-
// ----------------------------------------------------------------------
// Conversion from Parquet Thrift metadata
diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc
index 71aa919..e961127 100644
--- a/cpp/src/parquet/schema.cc
+++ b/cpp/src/parquet/schema.cc
@@ -857,6 +857,9 @@ void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level,
BuildTree(group->field(i), max_def_level, max_rep_level, base);
}
} else {
+ node_to_leaf_index_[static_cast<const PrimitiveNode*>(node.get())] =
+ static_cast<int>(leaves_.size());
+
// Primitive node, append to leaves
leaves_.push_back(ColumnDescriptor(node, max_def_level, max_rep_level, this));
leaf_to_base_.emplace(static_cast<int>(leaves_.size()) - 1, base);
@@ -865,6 +868,14 @@ void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level,
}
}
+int SchemaDescriptor::GetColumnIndex(const PrimitiveNode& node) const {
+ auto it = node_to_leaf_index_.find(&node);
+ if (it == node_to_leaf_index_.end()) {
+ return -1;
+ }
+ return it->second;
+}
+
ColumnDescriptor::ColumnDescriptor(const schema::NodePtr& node,
int16_t max_definition_level,
int16_t max_repetition_level,
diff --git a/cpp/src/parquet/schema.h b/cpp/src/parquet/schema.h
index 566d5fb..8fb3a54 100644
--- a/cpp/src/parquet/schema.h
+++ b/cpp/src/parquet/schema.h
@@ -430,6 +430,10 @@ class PARQUET_EXPORT SchemaDescriptor {
void updateColumnOrders(const std::vector<ColumnOrder>& column_orders);
+ /// \brief Return column index corresponding to a particular
+ /// PrimitiveNode. Returns -1 if not found
+ int GetColumnIndex(const schema::PrimitiveNode& node) const;
+
private:
friend class ColumnDescriptor;
@@ -444,6 +448,8 @@ class PARQUET_EXPORT SchemaDescriptor {
// Result of leaf node / tree analysis
std::vector<ColumnDescriptor> leaves_;
+ std::unordered_map<const schema::PrimitiveNode*, int> node_to_leaf_index_;
+
// Mapping between leaf nodes and root group of leaf (first node
// below the schema's root group)
//
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 270bb61..3a9dc9b 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -390,14 +390,14 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
void set_use_threads(c_bool use_threads)
-
-cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
CStatus FromParquetSchema(
const SchemaDescriptor* parquet_schema,
const ArrowReaderProperties& properties,
const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
shared_ptr[CSchema]* out)
+cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
+
CStatus ToParquetSchema(
const CSchema* arrow_schema,
const ArrowReaderProperties& properties,
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 2031fea..7af16a7 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -752,12 +752,8 @@ write_parquet_file <- function(table, filename){
invisible(.Call(`_arrow_write_parquet_file` , table, filename))
}
-parquet___arrow___FileReader__GetSchema2 <- function(reader, indices){
- .Call(`_arrow_parquet___arrow___FileReader__GetSchema2` , reader, indices)
-}
-
-parquet___arrow___FileReader__GetSchema1 <- function(reader){
- .Call(`_arrow_parquet___arrow___FileReader__GetSchema1` , reader)
+parquet___arrow___FileReader__GetSchema <- function(reader){
+ .Call(`_arrow_parquet___arrow___FileReader__GetSchema` , reader)
}
RecordBatch__num_columns <- function(x){
diff --git a/r/R/parquet.R b/r/R/parquet.R
index 2647536..b75f93e 100644
--- a/r/R/parquet.R
+++ b/r/R/parquet.R
@@ -25,18 +25,13 @@
if(quo_is_null(col_select)) {
shared_ptr(`arrow::Table`, parquet___arrow___FileReader__ReadTable1(self))
} else {
- all_vars <- shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema1(self))$names
+ all_vars <- shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema(self))$names
indices <- match(vars_select(all_vars, !!col_select), all_vars) - 1L
shared_ptr(`arrow::Table`, parquet___arrow___FileReader__ReadTable2(self, indices))
}
},
- GetSchema = function(column_indices = NULL) {
- if (is.null(column_indices)) {
- shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema1(self))
- } else {
- shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema2(self, column_indices))
- }
-
+ GetSchema = function() {
+ shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema(self))
}
)
)
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index bcb0ac5..8cfaa77 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -2900,32 +2900,16 @@ RcppExport SEXP _arrow_write_parquet_file(SEXP table_sexp, SEXP filename_sexp){
// parquet.cpp
#if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema2(const std::unique_ptr<parquet::arrow::FileReader>& reader, const std::vector<int>& indices);
-RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema2(SEXP reader_sexp, SEXP indices_sexp){
+std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema(const std::unique_ptr<parquet::arrow::FileReader>& reader);
+RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema(SEXP reader_sexp){
BEGIN_RCPP
Rcpp::traits::input_parameter<const std::unique_ptr<parquet::arrow::FileReader>&>::type reader(reader_sexp);
- Rcpp::traits::input_parameter<const std::vector<int>&>::type indices(indices_sexp);
- return Rcpp::wrap(parquet___arrow___FileReader__GetSchema2(reader, indices));
+ return Rcpp::wrap(parquet___arrow___FileReader__GetSchema(reader));
END_RCPP
}
#else
-RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema2(SEXP reader_sexp, SEXP indices_sexp){
- Rf_error("Cannot call parquet___arrow___FileReader__GetSchema2(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// parquet.cpp
-#if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema1(const std::unique_ptr<parquet::arrow::FileReader>& reader);
-RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema1(SEXP reader_sexp){
-BEGIN_RCPP
- Rcpp::traits::input_parameter<const std::unique_ptr<parquet::arrow::FileReader>&>::type reader(reader_sexp);
- return Rcpp::wrap(parquet___arrow___FileReader__GetSchema1(reader));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema1(SEXP reader_sexp){
- Rf_error("Cannot call parquet___arrow___FileReader__GetSchema1(). Please use arrow::install_arrow() to install required runtime libraries. ");
+RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema(SEXP reader_sexp){
+ Rf_error("Cannot call parquet___arrow___FileReader__GetSchema(). Please use arrow::install_arrow() to install required runtime libraries. ");
}
#endif
@@ -3904,8 +3888,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_parquet___arrow___FileReader__ReadTable1", (DL_FUNC) &_arrow_parquet___arrow___FileReader__ReadTable1, 1},
{ "_arrow_parquet___arrow___FileReader__ReadTable2", (DL_FUNC) &_arrow_parquet___arrow___FileReader__ReadTable2, 2},
{ "_arrow_write_parquet_file", (DL_FUNC) &_arrow_write_parquet_file, 2},
- { "_arrow_parquet___arrow___FileReader__GetSchema2", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema2, 2},
- { "_arrow_parquet___arrow___FileReader__GetSchema1", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema1, 1},
+ { "_arrow_parquet___arrow___FileReader__GetSchema", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema, 1},
{ "_arrow_RecordBatch__num_columns", (DL_FUNC) &_arrow_RecordBatch__num_columns, 1},
{ "_arrow_RecordBatch__num_rows", (DL_FUNC) &_arrow_RecordBatch__num_rows, 1},
{ "_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1},
diff --git a/r/src/parquet.cpp b/r/src/parquet.cpp
index 9ad1438..5124e9e 100644
--- a/r/src/parquet.cpp
+++ b/r/src/parquet.cpp
@@ -94,26 +94,11 @@ void write_parquet_file(const std::shared_ptr<arrow::Table>& table,
}
// [[arrow::export]]
-std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema2(
- const std::unique_ptr<parquet::arrow::FileReader>& reader,
- const std::vector<int>& indices) {
+std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema(
+ const std::unique_ptr<parquet::arrow::FileReader>& reader) {
std::shared_ptr<arrow::Schema> schema;
- STOP_IF_NOT_OK(reader->GetSchema(indices, &schema));
+ STOP_IF_NOT_OK(reader->GetSchema(&schema));
return schema;
}
-// [[arrow::export]]
-std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema1(
- const std::unique_ptr<parquet::arrow::FileReader>& reader) {
- // FileReader does not have this exposed
- // std::shared_ptr<arrow::Schema> schema;
- // STOP_IF_NOT_OK(reader->GetSchema(&schema));
-
- // so going indirectly about it
- std::shared_ptr<arrow::RecordBatchReader> record_batch_reader;
- STOP_IF_NOT_OK(reader->GetRecordBatchReader({}, &record_batch_reader));
-
- return record_batch_reader->schema();
-}
-
#endif