You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/09/09 03:06:27 UTC
parquet-cpp git commit: PARQUET-711: Use metadata builders in parquet
writer
Repository: parquet-cpp
Updated Branches:
refs/heads/master 441d85b43 -> 55604b297
PARQUET-711: Use metadata builders in parquet writer
I wrote a sample file and the metadata seems to be correct.
@xhochy I fixed some missing metadata like `dictionary_page_offset`. You might want to check if this fixes the Drill problem.
Author: Deepak Majeti <de...@hpe.com>
Closes #156 from majetideepak/PARQUET-711 and squashes the following commits:
25f5a7e [Deepak Majeti] fix schema and descr. Resolves PARQUET-705 and PARQUET-707
8b4784d [Deepak Majeti] Review comments to add methods back
fdbc761 [Deepak Majeti] fix clang error and comments
c6cb071 [Deepak Majeti] convert DCHECKS to Exceptions in metadata
ada3ac2 [Deepak Majeti] clang format
d9c9131 [Deepak Majeti] Use metadata builders in parquet writer
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/55604b29
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/55604b29
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/55604b29
Branch: refs/heads/master
Commit: 55604b297f444e95e132658b2ef384870ae1f701
Parents: 441d85b
Author: Deepak Majeti <de...@hpe.com>
Authored: Thu Sep 8 23:06:08 2016 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Sep 8 23:06:08 2016 -0400
----------------------------------------------------------------------
src/parquet/column/column-io-benchmark.cc | 22 +++--
src/parquet/column/column-writer-test.cc | 16 ++--
src/parquet/file/file-metadata-test.cc | 8 +-
src/parquet/file/metadata.cc | 96 ++++++++++++-------
src/parquet/file/metadata.h | 17 ++--
src/parquet/file/reader.cc | 8 +-
src/parquet/file/writer-internal.cc | 107 +++++++++-------------
src/parquet/file/writer-internal.h | 42 ++++-----
src/parquet/file/writer.cc | 16 ++--
src/parquet/file/writer.h | 26 ++----
src/parquet/schema/descriptor.cc | 6 +-
src/parquet/schema/descriptor.h | 8 +-
src/parquet/schema/schema-descriptor-test.cc | 2 +-
tools/parquet-dump-schema.cc | 2 +-
14 files changed, 191 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/column/column-io-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-io-benchmark.cc b/src/parquet/column/column-io-benchmark.cc
index 74d7349..319c8f5 100644
--- a/src/parquet/column/column-io-benchmark.cc
+++ b/src/parquet/column/column-io-benchmark.cc
@@ -25,13 +25,13 @@
namespace parquet {
-using format::ColumnChunk;
using schema::PrimitiveNode;
namespace benchmark {
std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
- ColumnChunk* metadata, ColumnDescriptor* schema, const WriterProperties* properties) {
+ ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema,
+ const WriterProperties* properties) {
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
return std::unique_ptr<Int64Writer>(new Int64Writer(
@@ -57,17 +57,19 @@ void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) {
template <Repetition::type repetition>
static void BM_WriteInt64Column(::benchmark::State& state) {
- format::ColumnChunk metadata;
+ format::ColumnChunk thrift_metadata;
std::vector<int64_t> values(state.range_x(), 128);
std::vector<int16_t> definition_levels(state.range_x(), 1);
std::vector<int16_t> repetition_levels(state.range_x(), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
- std::shared_ptr<parquet::WriterProperties> properties = default_writer_properties();
+ std::shared_ptr<WriterProperties> properties = default_writer_properties();
+ auto metadata = ColumnChunkMetaDataBuilder::Make(
+ properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
while (state.KeepRunning()) {
InMemoryOutputStream dst;
- std::unique_ptr<Int64Writer> writer =
- BuildWriter(state.range_x(), &dst, &metadata, schema.get(), properties.get());
+ std::unique_ptr<Int64Writer> writer = BuildWriter(
+ state.range_x(), &dst, metadata.get(), schema.get(), properties.get());
writer->WriteBatch(
values.size(), definition_levels.data(), repetition_levels.data(), values.data());
writer->Close();
@@ -91,16 +93,18 @@ std::unique_ptr<Int64Reader> BuildReader(
template <Repetition::type repetition>
static void BM_ReadInt64Column(::benchmark::State& state) {
- format::ColumnChunk metadata;
+ format::ColumnChunk thrift_metadata;
std::vector<int64_t> values(state.range_x(), 128);
std::vector<int16_t> definition_levels(state.range_x(), 1);
std::vector<int16_t> repetition_levels(state.range_x(), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
+ std::shared_ptr<WriterProperties> properties = default_writer_properties();
+ auto metadata = ColumnChunkMetaDataBuilder::Make(
+ properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
InMemoryOutputStream dst;
- std::shared_ptr<parquet::WriterProperties> properties = default_writer_properties();
std::unique_ptr<Int64Writer> writer =
- BuildWriter(state.range_x(), &dst, &metadata, schema.get(), properties.get());
+ BuildWriter(state.range_x(), &dst, metadata.get(), schema.get(), properties.get());
writer->WriteBatch(
values.size(), definition_levels.data(), repetition_levels.data(), values.data());
writer->Close();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index bdbb7a0..b3ca080 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -74,6 +74,8 @@ class TestPrimitiveWriter : public ::testing::Test {
repetition_levels_out_.resize(SMALL_SIZE);
SetUpSchemaRequired();
+ metadata_accessor_ =
+ ColumnChunkMetaData::Make(reinterpret_cast<uint8_t*>(&thrift_metadata_));
}
void BuildReader() {
@@ -87,8 +89,10 @@ class TestPrimitiveWriter : public ::testing::Test {
std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) {
sink_.reset(new InMemoryOutputStream());
- std::unique_ptr<SerializedPageWriter> pager(
- new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_));
+ metadata_ = ColumnChunkMetaDataBuilder::Make(
+ writer_properties_, schema_.get(), reinterpret_cast<uint8_t*>(&thrift_metadata_));
+ std::unique_ptr<SerializedPageWriter> pager(new SerializedPageWriter(
+ sink_.get(), Compression::UNCOMPRESSED, metadata_.get()));
WriterProperties::Builder wp_builder;
if (encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY) {
wp_builder.enable_dictionary();
@@ -126,9 +130,7 @@ class TestPrimitiveWriter : public ::testing::Test {
ASSERT_EQ(this->values_, this->values_out_);
}
- int64_t metadata_num_values() const {
- return metadata_.meta_data.num_values;
- }
+ int64_t metadata_num_values() const { return metadata_accessor_->num_values(); }
protected:
int64_t values_read_;
@@ -152,7 +154,9 @@ class TestPrimitiveWriter : public ::testing::Test {
private:
NodePtr node_;
- format::ColumnChunk metadata_;
+ format::ColumnChunk thrift_metadata_;
+ std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
+ std::unique_ptr<ColumnChunkMetaData> metadata_accessor_;
std::shared_ptr<ColumnDescriptor> schema_;
std::unique_ptr<InMemoryOutputStream> sink_;
std::shared_ptr<WriterProperties> writer_properties_;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/file-metadata-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index 5fbd613..852072c 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -54,8 +54,8 @@ TEST(Metadata, TestBuildAccess) {
stats_float.max = &float_max;
auto f_builder = FileMetaDataBuilder::Make(&schema, props);
- auto rg1_builder = f_builder->AppendRowGroup();
- auto rg2_builder = f_builder->AppendRowGroup();
+ auto rg1_builder = f_builder->AppendRowGroup(nrows / 2);
+ auto rg2_builder = f_builder->AppendRowGroup(nrows / 2);
// Write the metadata
// rowgroup1 metadata
@@ -66,7 +66,7 @@ TEST(Metadata, TestBuildAccess) {
col2_builder->SetStatistics(stats_float);
col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, false);
col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, false);
- rg1_builder->Finish(nrows / 2);
+ rg1_builder->Finish(1024);
// rowgroup2 metadata
col1_builder = rg2_builder->NextColumnChunk();
@@ -76,7 +76,7 @@ TEST(Metadata, TestBuildAccess) {
col2_builder->SetStatistics(stats_float);
col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, false);
col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, false);
- rg2_builder->Finish(nrows / 2);
+ rg2_builder->Finish(1024);
// Read the metadata
auto f_accessor = f_builder->Finish();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index 4e298a8..bc0f7b9 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -180,8 +180,12 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
inline const SchemaDescriptor* schema() const { return schema_; }
std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) {
- DCHECK(i < num_columns()) << "The file only has " << num_columns()
- << " columns, requested metadata for column: " << i;
+ if (!(i < num_columns())) {
+ std::stringstream ss;
+ ss << "The file only has " << num_columns()
+ << " columns, requested metadata for column: " << i;
+ throw ParquetException(ss.str());
+ }
return ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&row_group_->columns[i]));
}
@@ -244,14 +248,17 @@ class FileMetaData::FileMetaDataImpl {
void WriteTo(OutputStream* dst) { SerializeThriftMsg(metadata_.get(), 1024, dst); }
std::unique_ptr<RowGroupMetaData> RowGroup(int i) {
- DCHECK(i < num_row_groups())
- << "The file only has " << num_row_groups()
- << " row groups, requested metadata for row group: " << i;
+ if (!(i < num_row_groups())) {
+ std::stringstream ss;
+ ss << "The file only has " << num_row_groups()
+ << " row groups, requested metadata for row group: " << i;
+ throw ParquetException(ss.str());
+ }
return RowGroupMetaData::Make(
reinterpret_cast<const uint8_t*>(&metadata_->row_groups[i]), &schema_);
}
- const SchemaDescriptor* schema_descriptor() const { return &schema_; }
+ const SchemaDescriptor* schema() const { return &schema_; }
private:
friend FileMetaDataBuilder;
@@ -306,8 +313,8 @@ int FileMetaData::num_schema_elements() const {
return impl_->num_schema_elements();
}
-const SchemaDescriptor* FileMetaData::schema_descriptor() const {
- return impl_->schema_descriptor();
+const SchemaDescriptor* FileMetaData::schema() const {
+ return impl_->schema();
}
void FileMetaData::WriteTo(OutputStream* dst) {
@@ -374,6 +381,8 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
column_chunk_->meta_data.__set_encodings(thrift_encodings);
}
+ const ColumnDescriptor* descr() const { return column_; }
+
private:
format::ColumnChunk* column_chunk_;
const std::shared_ptr<WriterProperties> properties_;
@@ -406,24 +415,33 @@ void ColumnChunkMetaDataBuilder::Finish(int64_t num_values,
compressed_size, uncompressed_size, dictionary_fallback);
}
+const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {
+ return impl_->descr();
+}
+
void ColumnChunkMetaDataBuilder::SetStatistics(const ColumnStatistics& result) {
impl_->SetStatistics(result);
}
class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
public:
- explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
- const SchemaDescriptor* schema, uint8_t* contents)
+ explicit RowGroupMetaDataBuilderImpl(int64_t num_rows,
+ const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema,
+ uint8_t* contents)
: properties_(props), schema_(schema), current_column_(0) {
row_group_ = reinterpret_cast<format::RowGroup*>(contents);
InitializeColumns(schema->num_columns());
+ row_group_->__set_num_rows(num_rows);
}
~RowGroupMetaDataBuilderImpl() {}
ColumnChunkMetaDataBuilder* NextColumnChunk() {
- DCHECK(current_column_ < num_columns())
- << "The schema only has " << num_columns()
- << " columns, requested metadata for column: " << current_column_;
+ if (!(current_column_ < num_columns())) {
+ std::stringstream ss;
+ ss << "The schema only has " << num_columns()
+ << " columns, requested metadata for column: " << current_column_;
+ throw ParquetException(ss.str());
+ }
auto column = schema_->Column(current_column_);
auto column_builder = ColumnChunkMetaDataBuilder::Make(properties_, column,
reinterpret_cast<uint8_t*>(&row_group_->columns[current_column_++]));
@@ -432,25 +450,32 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
return column_builder_ptr;
}
- void Finish(int64_t num_rows) {
- DCHECK(current_column_ == schema_->num_columns())
- << "Only " << current_column_ - 1 << " out of " << schema_->num_columns()
- << " columns are initialized";
- size_t total_byte_size = 0;
+ void Finish(int64_t total_bytes_written) {
+ if (!(current_column_ == schema_->num_columns())) {
+ std::stringstream ss;
+ ss << "Only " << current_column_ - 1 << " out of " << schema_->num_columns()
+ << " columns are initialized";
+ throw ParquetException(ss.str());
+ }
+ int64_t total_byte_size = 0;
for (int i = 0; i < schema_->num_columns(); i++) {
- DCHECK(row_group_->columns[i].file_offset > 0) << "Column " << i
- << " is not complete.";
+ if (!(row_group_->columns[i].file_offset > 0)) {
+ std::stringstream ss;
+ ss << "Column " << i << " is not complete.";
+ throw ParquetException(ss.str());
+ }
total_byte_size += row_group_->columns[i].meta_data.total_compressed_size;
}
+ DCHECK(total_bytes_written == total_byte_size)
+ << "Total bytes in this RowGroup does not match with compressed sizes of columns";
row_group_->__set_total_byte_size(total_byte_size);
- row_group_->__set_num_rows(num_rows);
}
- private:
int num_columns() { return row_group_->columns.size(); }
+ private:
void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); }
format::RowGroup* row_group_;
@@ -460,18 +485,18 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
int current_column_;
};
-std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
+std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(int64_t num_rows,
const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
uint8_t* contents) {
return std::unique_ptr<RowGroupMetaDataBuilder>(
- new RowGroupMetaDataBuilder(props, schema_, contents));
+ new RowGroupMetaDataBuilder(num_rows, props, schema_, contents));
}
-RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(
+RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(int64_t num_rows,
const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
uint8_t* contents)
: impl_{std::unique_ptr<RowGroupMetaDataBuilderImpl>(
- new RowGroupMetaDataBuilderImpl(props, schema_, contents))} {}
+ new RowGroupMetaDataBuilderImpl(num_rows, props, schema_, contents))} {}
RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() {}
@@ -479,11 +504,16 @@ ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() {
return impl_->NextColumnChunk();
}
-void RowGroupMetaDataBuilder::Finish(int64_t num_rows) {
- impl_->Finish(num_rows);
+int RowGroupMetaDataBuilder::num_columns() {
+ return impl_->num_columns();
+}
+
+void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written) {
+ impl_->Finish(total_bytes_written);
}
// file metadata
+// TODO(PARQUET-595) Support key_value_metadata
class FileMetaDataBuilder::FileMetaDataBuilderImpl {
public:
explicit FileMetaDataBuilderImpl(
@@ -493,10 +523,10 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
}
~FileMetaDataBuilderImpl() {}
- RowGroupMetaDataBuilder* AppendRowGroup() {
+ RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows) {
auto row_group = std::unique_ptr<format::RowGroup>(new format::RowGroup());
auto row_group_builder = RowGroupMetaDataBuilder::Make(
- properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
+ num_rows, properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
RowGroupMetaDataBuilder* row_group_ptr = row_group_builder.get();
row_group_builders_.push_back(std::move(row_group_builder));
row_groups_.push_back(std::move(row_group));
@@ -517,7 +547,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
metadata_->__set_version(properties_->version());
metadata_->__set_created_by(properties_->created_by());
parquet::schema::SchemaFlattener flattener(
- static_cast<parquet::schema::GroupNode*>(schema_->schema().get()),
+ static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()),
&metadata_->schema);
flattener.Flatten();
auto file_meta_data = std::unique_ptr<FileMetaData>(new FileMetaData());
@@ -548,8 +578,8 @@ FileMetaDataBuilder::FileMetaDataBuilder(
FileMetaDataBuilder::~FileMetaDataBuilder() {}
-RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() {
- return impl_->AppendRowGroup();
+RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup(int64_t num_rows) {
+ return impl_->AppendRowGroup(num_rows);
}
std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 78ea53b..1d96621 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -117,7 +117,7 @@ class PARQUET_EXPORT FileMetaData {
void WriteTo(OutputStream* dst);
// Return const-pointer to make it clear that this object is not to be copied
- const SchemaDescriptor* schema_descriptor() const;
+ const SchemaDescriptor* schema() const;
private:
friend FileMetaDataBuilder;
@@ -144,7 +144,8 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
// column metadata
// ownership of min/max is with ColumnChunkMetadata
void SetStatistics(const ColumnStatistics& stats);
-
+ // get the column descriptor
+ const ColumnDescriptor* descr() const;
// commit the metadata
void Finish(int64_t num_values, int64_t dictonary_page_offset,
int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size,
@@ -161,20 +162,22 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
class PARQUET_EXPORT RowGroupMetaDataBuilder {
public:
// API convenience to get a MetaData reader
- static std::unique_ptr<RowGroupMetaDataBuilder> Make(
+ static std::unique_ptr<RowGroupMetaDataBuilder> Make(int64_t num_rows,
const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
uint8_t* contents);
~RowGroupMetaDataBuilder();
ColumnChunkMetaDataBuilder* NextColumnChunk();
+ int num_columns();
// commit the metadata
- void Finish(int64_t num_rows);
+ void Finish(int64_t total_bytes_written);
private:
- explicit RowGroupMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
- const SchemaDescriptor* schema_, uint8_t* contents);
+ explicit RowGroupMetaDataBuilder(int64_t num_rows,
+ const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
+ uint8_t* contents);
// PIMPL Idiom
class RowGroupMetaDataBuilderImpl;
std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
@@ -188,7 +191,7 @@ class PARQUET_EXPORT FileMetaDataBuilder {
~FileMetaDataBuilder();
- RowGroupMetaDataBuilder* AppendRowGroup();
+ RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows);
// commit the metadata
std::unique_ptr<FileMetaData> Finish();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index b593ea0..7cf3f1a 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -126,7 +126,7 @@ void ParquetFileReader::DebugPrint(
stream << "Total rows: " << file_metadata->num_rows() << "\n";
stream << "Number of RowGroups: " << file_metadata->num_row_groups() << "\n";
stream << "Number of Real Columns: "
- << file_metadata->schema_descriptor()->group()->field_count() << "\n";
+ << file_metadata->schema()->group_node()->field_count() << "\n";
if (selected_columns.size() == 0) {
for (int i = 0; i < file_metadata->num_columns(); i++) {
@@ -143,7 +143,7 @@ void ParquetFileReader::DebugPrint(
stream << "Number of Columns: " << file_metadata->num_columns() << "\n";
stream << "Number of Selected Columns: " << selected_columns.size() << "\n";
for (auto i : selected_columns) {
- const ColumnDescriptor* descr = file_metadata->schema_descriptor()->Column(i);
+ const ColumnDescriptor* descr = file_metadata->schema()->Column(i);
stream << "Column " << i << ": " << descr->name() << " ("
<< TypeToString(descr->physical_type()) << ")" << std::endl;
}
@@ -162,7 +162,7 @@ void ParquetFileReader::DebugPrint(
auto column_chunk = group_metadata->ColumnChunk(i);
const ColumnStatistics stats = column_chunk->statistics();
- const ColumnDescriptor* descr = file_metadata->schema_descriptor()->Column(i);
+ const ColumnDescriptor* descr = file_metadata->schema()->Column(i);
stream << "Column " << i << std::endl << ", values: " << column_chunk->num_values();
if (column_chunk->is_stats_set()) {
stream << ", null values: " << stats.null_count
@@ -201,7 +201,7 @@ void ParquetFileReader::DebugPrint(
std::string fmt = ss.str();
snprintf(buffer, bufsize, fmt.c_str(),
- file_metadata->schema_descriptor()->Column(i)->name().c_str());
+ file_metadata->schema()->Column(i)->name().c_str());
stream << buffer;
// This is OK in this method as long as the RowGroupReader does not get
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index f07b44b..fb05f13 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -34,25 +34,23 @@ static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
// SerializedPageWriter
SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type codec,
- format::ColumnChunk* metadata, MemoryAllocator* allocator)
+ ColumnChunkMetaDataBuilder* metadata, MemoryAllocator* allocator)
: sink_(sink),
metadata_(metadata),
- // allocator_(allocator),
+ num_values_(0),
+ dictionary_page_offset_(0),
+ data_page_offset_(0),
+ total_uncompressed_size_(0),
+ total_compressed_size_(0),
compression_buffer_(std::make_shared<OwnedMutableBuffer>(0, allocator)) {
compressor_ = Codec::Create(codec);
- // Currently we directly start with the data page
- metadata_->meta_data.__set_data_page_offset(sink_->Tell());
- metadata_->meta_data.__set_codec(ToThrift(codec));
}
-void SerializedPageWriter::Close() {}
-
-void SerializedPageWriter::AddEncoding(Encoding::type encoding) {
- auto it = std::find(metadata_->meta_data.encodings.begin(),
- metadata_->meta_data.encodings.end(), ToThrift(encoding));
- if (it != metadata_->meta_data.encodings.end()) {
- metadata_->meta_data.encodings.push_back(ToThrift(encoding));
- }
+void SerializedPageWriter::Close() {
+ // index_page_offset = 0 since they are not supported
+ // TODO: Remove default fallback = 'false' when implemented
+ metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
+ total_compressed_size_, total_uncompressed_size_, false);
}
std::shared_ptr<Buffer> SerializedPageWriter::Compress(
@@ -91,13 +89,14 @@ int64_t SerializedPageWriter::WriteDataPage(const DataPage& page) {
// TODO(PARQUET-594) crc checksum
int64_t start_pos = sink_->Tell();
+ if (data_page_offset_ == 0) { data_page_offset_ = start_pos; }
SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
int64_t header_size = sink_->Tell() - start_pos;
sink_->Write(compressed_data->data(), compressed_data->size());
- metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size;
- metadata_->meta_data.total_compressed_size += compressed_data->size() + header_size;
- metadata_->meta_data.num_values += page.num_values();
+ total_uncompressed_size_ += uncompressed_size + header_size;
+ total_compressed_size_ += compressed_data->size() + header_size;
+ num_values_ += page.num_values();
return sink_->Tell() - start_pos;
}
@@ -119,12 +118,13 @@ int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) {
// TODO(PARQUET-594) crc checksum
int64_t start_pos = sink_->Tell();
+ if (dictionary_page_offset_ == 0) { dictionary_page_offset_ = start_pos; }
SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
int64_t header_size = sink_->Tell() - start_pos;
sink_->Write(compressed_data->data(), compressed_data->size());
- metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size;
- metadata_->meta_data.total_compressed_size += compressed_data->size() + header_size;
+ total_uncompressed_size_ += uncompressed_size + header_size;
+ total_compressed_size_ += compressed_data->size() + header_size;
return sink_->Tell() - start_pos;
}
@@ -133,50 +133,38 @@ int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) {
// RowGroupSerializer
int RowGroupSerializer::num_columns() const {
- return schema_->num_columns();
+ return metadata_->num_columns();
}
int64_t RowGroupSerializer::num_rows() const {
return num_rows_;
}
-const SchemaDescriptor* RowGroupSerializer::schema() const {
- return schema_;
-}
-
ColumnWriter* RowGroupSerializer::NextColumn() {
- if (current_column_index_ == schema_->num_columns() - 1) {
- throw ParquetException("All columns have already been written.");
- }
- current_column_index_++;
+ // Throws an error if more columns are being written
+ auto col_meta = metadata_->NextColumnChunk();
- if (current_column_writer_) { total_bytes_written_ += current_column_writer_->Close(); }
+ if (current_column_writer_) { total_bytes_written_ = current_column_writer_->Close(); }
- const ColumnDescriptor* column_descr = schema_->Column(current_column_index_);
- format::ColumnChunk* col_meta = &metadata_->columns[current_column_index_];
- col_meta->__isset.meta_data = true;
- col_meta->meta_data.__set_type(ToThrift(column_descr->physical_type()));
- col_meta->meta_data.__set_path_in_schema(column_descr->path()->ToDotVector());
- std::unique_ptr<PageWriter> pager(new SerializedPageWriter(
- sink_, properties_->compression(column_descr->path()), col_meta, allocator_));
+ const ColumnDescriptor* column_descr = col_meta->descr();
+ std::unique_ptr<PageWriter> pager(
+ new SerializedPageWriter(sink_, properties_->compression(column_descr->path()),
+ col_meta, properties_->allocator()));
current_column_writer_ =
- ColumnWriter::Make(column_descr, std::move(pager), num_rows_, properties_);
+ ColumnWriter::Make(col_meta->descr(), std::move(pager), num_rows_, properties_);
return current_column_writer_.get();
}
void RowGroupSerializer::Close() {
if (!closed_) {
closed_ = true;
- if (current_column_index_ != schema_->num_columns() - 1) {
- throw ParquetException("Not all column were written in the current rowgroup.");
- }
if (current_column_writer_) {
- total_bytes_written_ += current_column_writer_->Close();
+ total_bytes_written_ = current_column_writer_->Close();
current_column_writer_.reset();
}
-
- metadata_->__set_total_byte_size(total_bytes_written_);
+ // Ensures all columns have been written
+ metadata_->Finish(total_bytes_written_);
}
}
@@ -225,13 +213,10 @@ RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
if (row_group_writer_) { row_group_writer_->Close(); }
num_rows_ += num_rows;
num_row_groups_++;
-
- auto rgm_size = row_group_metadata_.size();
- row_group_metadata_.resize(rgm_size + 1);
- format::RowGroup* rg_metadata = &row_group_metadata_.data()[rgm_size];
- std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
- num_rows, &schema_, sink_.get(), rg_metadata, properties_.get()));
- row_group_writer_.reset(new RowGroupWriter(std::move(contents), allocator_));
+ auto rg_metadata = metadata_->AppendRowGroup(num_rows);
+ std::unique_ptr<RowGroupWriter::Contents> contents(
+ new RowGroupSerializer(num_rows, sink_.get(), rg_metadata, properties_.get()));
+ row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
return row_group_writer_.get();
}
@@ -243,19 +228,9 @@ void FileSerializer::WriteMetaData() {
// Write MetaData
uint32_t metadata_len = sink_->Tell();
- SchemaFlattener flattener(
- static_cast<GroupNode*>(schema_.schema().get()), &metadata_.schema);
- flattener.Flatten();
-
- // TODO: Currently we only write version 1 files
- metadata_.__set_version(1);
- metadata_.__set_num_rows(num_rows_);
- metadata_.__set_row_groups(row_group_metadata_);
- // TODO(PARQUET-595) Support key_value_metadata
- // TODO(PARQUET-590) Get from WriterProperties
- metadata_.__set_created_by("parquet-cpp");
-
- SerializeThriftMsg(&metadata_, 1024, sink_.get());
+ // Get a FileMetaData
+ auto metadata = metadata_->Finish();
+ metadata->WriteTo(sink_.get());
metadata_len = sink_->Tell() - metadata_len;
// Write Footer
@@ -267,12 +242,12 @@ FileSerializer::FileSerializer(std::shared_ptr<OutputStream> sink,
const std::shared_ptr<GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties)
: sink_(sink),
- allocator_(properties->allocator()),
- num_row_groups_(0),
- num_rows_(0),
is_open_(true),
- properties_(properties) {
+ properties_(properties),
+ num_row_groups_(0),
+ num_rows_(0) {
schema_.Init(schema);
+ metadata_ = FileMetaDataBuilder::Make(&schema_, properties);
StartFile();
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 7c46408..645d4bf 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -23,6 +23,7 @@
#include "parquet/column/page.h"
#include "parquet/compression/codec.h"
+#include "parquet/file/metadata.h"
#include "parquet/file/writer.h"
#include "parquet/thrift/parquet_types.h"
@@ -31,11 +32,11 @@ namespace parquet {
// This subclass delimits pages appearing in a serialized stream, each preceded
// by a serialized Thrift format::PageHeader indicating the type of each page
// and the page metadata.
-//
class SerializedPageWriter : public PageWriter {
public:
SerializedPageWriter(OutputStream* sink, Compression::type codec,
- format::ColumnChunk* metadata, MemoryAllocator* allocator = default_allocator());
+ ColumnChunkMetaDataBuilder* metadata,
+ MemoryAllocator* allocator = default_allocator());
virtual ~SerializedPageWriter() {}
@@ -47,14 +48,17 @@ class SerializedPageWriter : public PageWriter {
private:
OutputStream* sink_;
- format::ColumnChunk* metadata_;
- // MemoryAllocator* allocator_;
+ ColumnChunkMetaDataBuilder* metadata_;
+ int64_t num_values_;
+ int64_t dictionary_page_offset_;
+ int64_t data_page_offset_;
+ int64_t total_uncompressed_size_;
+ int64_t total_compressed_size_;
// Compression codec to use.
std::unique_ptr<Codec> compressor_;
std::shared_ptr<OwnedMutableBuffer> compression_buffer_;
- void AddEncoding(Encoding::type encoding);
/**
* Compress a buffer.
*
@@ -67,24 +71,17 @@ class SerializedPageWriter : public PageWriter {
// RowGroupWriter::Contents implementation for the Parquet file specification
class RowGroupSerializer : public RowGroupWriter::Contents {
public:
- RowGroupSerializer(int64_t num_rows, const SchemaDescriptor* schema, OutputStream* sink,
- format::RowGroup* metadata, const WriterProperties* properties)
+ RowGroupSerializer(int64_t num_rows, OutputStream* sink,
+ RowGroupMetaDataBuilder* metadata, const WriterProperties* properties)
: num_rows_(num_rows),
- schema_(schema),
sink_(sink),
metadata_(metadata),
- allocator_(properties->allocator()),
properties_(properties),
total_bytes_written_(0),
- closed_(false),
- current_column_index_(-1) {
- metadata_->__set_num_rows(num_rows_);
- metadata_->columns.resize(schema->num_columns());
- }
+ closed_(false) {}
int num_columns() const override;
int64_t num_rows() const override;
- const SchemaDescriptor* schema() const override;
// TODO: PARQUET-579
// void WriteRowGroupStatitics() override;
@@ -94,15 +91,12 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
private:
int64_t num_rows_;
- const SchemaDescriptor* schema_;
OutputStream* sink_;
- format::RowGroup* metadata_;
- MemoryAllocator* allocator_;
+ RowGroupMetaDataBuilder* metadata_;
const WriterProperties* properties_;
int64_t total_bytes_written_;
bool closed_;
- int64_t current_column_index_;
std::shared_ptr<ColumnWriter> current_column_writer_;
};
@@ -134,14 +128,12 @@ class FileSerializer : public ParquetFileWriter::Contents {
const std::shared_ptr<WriterProperties>& properties);
std::shared_ptr<OutputStream> sink_;
- format::FileMetaData metadata_;
- std::vector<format::RowGroup> row_group_metadata_;
- MemoryAllocator* allocator_;
- int num_row_groups_;
- int num_rows_;
bool is_open_;
+ const std::shared_ptr<WriterProperties> properties_;
+ int num_row_groups_;
+ int64_t num_rows_;
+ std::unique_ptr<FileMetaDataBuilder> metadata_;
std::unique_ptr<RowGroupWriter> row_group_writer_;
- std::shared_ptr<WriterProperties> properties_;
void StartFile();
void WriteMetaData();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
index 269b1cb..8c9f52f 100644
--- a/src/parquet/file/writer.cc
+++ b/src/parquet/file/writer.cc
@@ -27,11 +27,8 @@ namespace parquet {
// ----------------------------------------------------------------------
// RowGroupWriter public API
-RowGroupWriter::RowGroupWriter(
- std::unique_ptr<Contents> contents, MemoryAllocator* allocator)
- : contents_(std::move(contents)), allocator_(allocator) {
- schema_ = contents_->schema();
-}
+RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
+ : contents_(std::move(contents)) {}
void RowGroupWriter::Close() {
if (contents_) {
@@ -64,9 +61,16 @@ std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
return result;
}
+const SchemaDescriptor* ParquetFileWriter::schema() const {
+ return contents_->schema();
+}
+
+const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
+ return contents_->schema()->Column(i);
+}
+
void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
contents_ = std::move(contents);
- schema_ = contents_->schema();
}
void ParquetFileWriter::Close() {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index 0e64961..422f008 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -46,12 +46,9 @@ class PARQUET_EXPORT RowGroupWriter {
// virtual void WriteRowGroupStatitics();
virtual ColumnWriter* NextColumn() = 0;
virtual void Close() = 0;
-
- // Return const-pointer to make it clear that this object is not to be copied
- virtual const SchemaDescriptor* schema() const = 0;
};
- RowGroupWriter(std::unique_ptr<Contents> contents, MemoryAllocator* allocator);
+ explicit RowGroupWriter(std::unique_ptr<Contents> contents);
/**
* Construct a ColumnWriter for the indicated row group-relative column.
@@ -75,13 +72,8 @@ class PARQUET_EXPORT RowGroupWriter {
// virtual void WriteRowGroupStatitics();
private:
- // Owned by the parent ParquetFileWriter
- const SchemaDescriptor* schema_;
-
// Holds a pointer to an instance of Contents implementation
std::unique_ptr<Contents> contents_;
-
- MemoryAllocator* allocator_;
};
class PARQUET_EXPORT ParquetFileWriter {
@@ -102,7 +94,7 @@ class PARQUET_EXPORT ParquetFileWriter {
virtual const std::shared_ptr<WriterProperties>& properties() const = 0;
- // Return const-poitner to make it clear that this object is not to be copied
+ // Return const-pointer to make it clear that this object is not to be copied
const SchemaDescriptor* schema() const { return &schema_; }
SchemaDescriptor schema_;
};
@@ -153,18 +145,18 @@ class PARQUET_EXPORT ParquetFileWriter {
const std::shared_ptr<WriterProperties>& properties() const;
/**
- * Returns the file schema descriptor
- */
- const SchemaDescriptor* descr() { return schema_; }
+ * Returns the file schema descriptor
+ */
+ const SchemaDescriptor* schema() const;
- const ColumnDescriptor* column_schema(int i) const { return schema_->Column(i); }
+ /**
+ * Returns a column descriptor in schema
+ */
+ const ColumnDescriptor* descr(int i) const;
private:
// Holds a pointer to an instance of Contents implementation
std::unique_ptr<Contents> contents_;
-
- // The SchemaDescriptor is provided by the Contents impl
- const SchemaDescriptor* schema_;
};
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/schema/descriptor.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/descriptor.cc b/src/parquet/schema/descriptor.cc
index de63e5e..4d46204 100644
--- a/src/parquet/schema/descriptor.cc
+++ b/src/parquet/schema/descriptor.cc
@@ -39,11 +39,11 @@ void SchemaDescriptor::Init(const NodePtr& schema) {
throw ParquetException("Must initialize with a schema group");
}
- group_ = static_cast<const GroupNode*>(schema_.get());
+ group_node_ = static_cast<const GroupNode*>(schema_.get());
leaves_.clear();
- for (int i = 0; i < group_->field_count(); ++i) {
- BuildTree(group_->field(i), 0, 0, group_->field(i));
+ for (int i = 0; i < group_node_->field_count(); ++i) {
+ BuildTree(group_node_->field(i), 0, 0, group_node_->field(i));
}
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/schema/descriptor.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema/descriptor.h b/src/parquet/schema/descriptor.h
index 51f3503..c591954 100644
--- a/src/parquet/schema/descriptor.h
+++ b/src/parquet/schema/descriptor.h
@@ -100,18 +100,20 @@ class PARQUET_EXPORT SchemaDescriptor {
// The number of physical columns appearing in the file
int num_columns() const { return leaves_.size(); }
- const schema::NodePtr& schema() const { return schema_; }
+ const schema::NodePtr& schema_root() const { return schema_; }
- const schema::GroupNode* group() const { return group_; }
+ const schema::GroupNode* group_node() const { return group_node_; }
// Returns the root (child of the schema root) node of the leaf(column) node
const schema::NodePtr& GetColumnRoot(int i) const;
+ const std::string& name() const { return group_node_->name(); }
+
private:
friend class ColumnDescriptor;
schema::NodePtr schema_;
- const schema::GroupNode* group_;
+ const schema::GroupNode* group_node_;
void BuildTree(const schema::NodePtr& node, int16_t max_def_level,
int16_t max_rep_level, const schema::NodePtr& base);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/schema/schema-descriptor-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc
index d88cd0d..5be8db7 100644
--- a/src/parquet/schema/schema-descriptor-test.cc
+++ b/src/parquet/schema/schema-descriptor-test.cc
@@ -128,7 +128,7 @@ TEST_F(TestSchemaDescriptor, BuildTree) {
ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get());
ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5).get());
- ASSERT_EQ(schema.get(), descr_.group());
+ ASSERT_EQ(schema.get(), descr_.group_node());
// Init clears the leaves
descr_.Init(schema);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/tools/parquet-dump-schema.cc
----------------------------------------------------------------------
diff --git a/tools/parquet-dump-schema.cc b/tools/parquet-dump-schema.cc
index deef2fd..1e0239b 100644
--- a/tools/parquet-dump-schema.cc
+++ b/tools/parquet-dump-schema.cc
@@ -26,7 +26,7 @@ int main(int argc, char** argv) {
try {
std::unique_ptr<parquet::ParquetFileReader> reader =
parquet::ParquetFileReader::OpenFile(filename);
- PrintSchema(reader->metadata()->schema_descriptor()->schema().get(), std::cout);
+ PrintSchema(reader->metadata()->schema()->schema_root().get(), std::cout);
} catch (const std::exception& e) {
std::cerr << "Parquet error: " << e.what() << std::endl;
return -1;