You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/09/09 03:06:27 UTC

parquet-cpp git commit: PARQUET-711: Use metadata builders in parquet writer

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 441d85b43 -> 55604b297


PARQUET-711: Use metadata builders in parquet writer

I wrote a sample file and the metadata seems to be correct.
@xhochy I fixed some missing metadata like `dictionary_page_offset`. You might want to check if this fixes the Drill problem.

Author: Deepak Majeti <de...@hpe.com>

Closes #156 from majetideepak/PARQUET-711 and squashes the following commits:

25f5a7e [Deepak Majeti] fix schema and descr. Resolves PARQUET-705 and PARQUET-707
8b4784d [Deepak Majeti] Review comments to add methods back
fdbc761 [Deepak Majeti] fix clang error and comments
c6cb071 [Deepak Majeti] convert DCHECKS to Exceptions in metadata
ada3ac2 [Deepak Majeti] clang format
d9c9131 [Deepak Majeti] Use metadata builders in parquet writer


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/55604b29
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/55604b29
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/55604b29

Branch: refs/heads/master
Commit: 55604b297f444e95e132658b2ef384870ae1f701
Parents: 441d85b
Author: Deepak Majeti <de...@hpe.com>
Authored: Thu Sep 8 23:06:08 2016 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Sep 8 23:06:08 2016 -0400

----------------------------------------------------------------------
 src/parquet/column/column-io-benchmark.cc    |  22 +++--
 src/parquet/column/column-writer-test.cc     |  16 ++--
 src/parquet/file/file-metadata-test.cc       |   8 +-
 src/parquet/file/metadata.cc                 |  96 ++++++++++++-------
 src/parquet/file/metadata.h                  |  17 ++--
 src/parquet/file/reader.cc                   |   8 +-
 src/parquet/file/writer-internal.cc          | 107 +++++++++-------------
 src/parquet/file/writer-internal.h           |  42 ++++-----
 src/parquet/file/writer.cc                   |  16 ++--
 src/parquet/file/writer.h                    |  26 ++----
 src/parquet/schema/descriptor.cc             |   6 +-
 src/parquet/schema/descriptor.h              |   8 +-
 src/parquet/schema/schema-descriptor-test.cc |   2 +-
 tools/parquet-dump-schema.cc                 |   2 +-
 14 files changed, 191 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/column/column-io-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-io-benchmark.cc b/src/parquet/column/column-io-benchmark.cc
index 74d7349..319c8f5 100644
--- a/src/parquet/column/column-io-benchmark.cc
+++ b/src/parquet/column/column-io-benchmark.cc
@@ -25,13 +25,13 @@
 
 namespace parquet {
 
-using format::ColumnChunk;
 using schema::PrimitiveNode;
 
 namespace benchmark {
 
 std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
-    ColumnChunk* metadata, ColumnDescriptor* schema, const WriterProperties* properties) {
+    ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema,
+    const WriterProperties* properties) {
   std::unique_ptr<SerializedPageWriter> pager(
       new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
   return std::unique_ptr<Int64Writer>(new Int64Writer(
@@ -57,17 +57,19 @@ void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) {
 
 template <Repetition::type repetition>
 static void BM_WriteInt64Column(::benchmark::State& state) {
-  format::ColumnChunk metadata;
+  format::ColumnChunk thrift_metadata;
   std::vector<int64_t> values(state.range_x(), 128);
   std::vector<int16_t> definition_levels(state.range_x(), 1);
   std::vector<int16_t> repetition_levels(state.range_x(), 0);
   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
-  std::shared_ptr<parquet::WriterProperties> properties = default_writer_properties();
+  std::shared_ptr<WriterProperties> properties = default_writer_properties();
+  auto metadata = ColumnChunkMetaDataBuilder::Make(
+      properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
 
   while (state.KeepRunning()) {
     InMemoryOutputStream dst;
-    std::unique_ptr<Int64Writer> writer =
-        BuildWriter(state.range_x(), &dst, &metadata, schema.get(), properties.get());
+    std::unique_ptr<Int64Writer> writer = BuildWriter(
+        state.range_x(), &dst, metadata.get(), schema.get(), properties.get());
     writer->WriteBatch(
         values.size(), definition_levels.data(), repetition_levels.data(), values.data());
     writer->Close();
@@ -91,16 +93,18 @@ std::unique_ptr<Int64Reader> BuildReader(
 
 template <Repetition::type repetition>
 static void BM_ReadInt64Column(::benchmark::State& state) {
-  format::ColumnChunk metadata;
+  format::ColumnChunk thrift_metadata;
   std::vector<int64_t> values(state.range_x(), 128);
   std::vector<int16_t> definition_levels(state.range_x(), 1);
   std::vector<int16_t> repetition_levels(state.range_x(), 0);
   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
+  std::shared_ptr<WriterProperties> properties = default_writer_properties();
+  auto metadata = ColumnChunkMetaDataBuilder::Make(
+      properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
 
   InMemoryOutputStream dst;
-  std::shared_ptr<parquet::WriterProperties> properties = default_writer_properties();
   std::unique_ptr<Int64Writer> writer =
-      BuildWriter(state.range_x(), &dst, &metadata, schema.get(), properties.get());
+      BuildWriter(state.range_x(), &dst, metadata.get(), schema.get(), properties.get());
   writer->WriteBatch(
       values.size(), definition_levels.data(), repetition_levels.data(), values.data());
   writer->Close();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index bdbb7a0..b3ca080 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -74,6 +74,8 @@ class TestPrimitiveWriter : public ::testing::Test {
     repetition_levels_out_.resize(SMALL_SIZE);
 
     SetUpSchemaRequired();
+    metadata_accessor_ =
+        ColumnChunkMetaData::Make(reinterpret_cast<uint8_t*>(&thrift_metadata_));
   }
 
   void BuildReader() {
@@ -87,8 +89,10 @@ class TestPrimitiveWriter : public ::testing::Test {
   std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
       int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) {
     sink_.reset(new InMemoryOutputStream());
-    std::unique_ptr<SerializedPageWriter> pager(
-        new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_));
+    metadata_ = ColumnChunkMetaDataBuilder::Make(
+        writer_properties_, schema_.get(), reinterpret_cast<uint8_t*>(&thrift_metadata_));
+    std::unique_ptr<SerializedPageWriter> pager(new SerializedPageWriter(
+        sink_.get(), Compression::UNCOMPRESSED, metadata_.get()));
     WriterProperties::Builder wp_builder;
     if (encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY) {
       wp_builder.enable_dictionary();
@@ -126,9 +130,7 @@ class TestPrimitiveWriter : public ::testing::Test {
     ASSERT_EQ(this->values_, this->values_out_);
   }
 
-  int64_t metadata_num_values() const {
-    return metadata_.meta_data.num_values;
-  }
+  int64_t metadata_num_values() const { return metadata_accessor_->num_values(); }
 
  protected:
   int64_t values_read_;
@@ -152,7 +154,9 @@ class TestPrimitiveWriter : public ::testing::Test {
 
  private:
   NodePtr node_;
-  format::ColumnChunk metadata_;
+  format::ColumnChunk thrift_metadata_;
+  std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
+  std::unique_ptr<ColumnChunkMetaData> metadata_accessor_;
   std::shared_ptr<ColumnDescriptor> schema_;
   std::unique_ptr<InMemoryOutputStream> sink_;
   std::shared_ptr<WriterProperties> writer_properties_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/file-metadata-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index 5fbd613..852072c 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -54,8 +54,8 @@ TEST(Metadata, TestBuildAccess) {
   stats_float.max = &float_max;
 
   auto f_builder = FileMetaDataBuilder::Make(&schema, props);
-  auto rg1_builder = f_builder->AppendRowGroup();
-  auto rg2_builder = f_builder->AppendRowGroup();
+  auto rg1_builder = f_builder->AppendRowGroup(nrows / 2);
+  auto rg2_builder = f_builder->AppendRowGroup(nrows / 2);
 
   // Write the metadata
   // rowgroup1 metadata
@@ -66,7 +66,7 @@ TEST(Metadata, TestBuildAccess) {
   col2_builder->SetStatistics(stats_float);
   col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, false);
   col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, false);
-  rg1_builder->Finish(nrows / 2);
+  rg1_builder->Finish(1024);
 
   // rowgroup2 metadata
   col1_builder = rg2_builder->NextColumnChunk();
@@ -76,7 +76,7 @@ TEST(Metadata, TestBuildAccess) {
   col2_builder->SetStatistics(stats_float);
   col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, false);
   col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, false);
-  rg2_builder->Finish(nrows / 2);
+  rg2_builder->Finish(1024);
 
   // Read the metadata
   auto f_accessor = f_builder->Finish();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index 4e298a8..bc0f7b9 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -180,8 +180,12 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
   inline const SchemaDescriptor* schema() const { return schema_; }
 
   std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) {
-    DCHECK(i < num_columns()) << "The file only has " << num_columns()
-                              << " columns, requested metadata for column: " << i;
+    if (!(i < num_columns())) {
+      std::stringstream ss;
+      ss << "The file only has " << num_columns()
+         << " columns, requested metadata for column: " << i;
+      throw ParquetException(ss.str());
+    }
     return ColumnChunkMetaData::Make(
         reinterpret_cast<const uint8_t*>(&row_group_->columns[i]));
   }
@@ -244,14 +248,17 @@ class FileMetaData::FileMetaDataImpl {
   void WriteTo(OutputStream* dst) { SerializeThriftMsg(metadata_.get(), 1024, dst); }
 
   std::unique_ptr<RowGroupMetaData> RowGroup(int i) {
-    DCHECK(i < num_row_groups())
-        << "The file only has " << num_row_groups()
-        << " row groups, requested metadata for row group: " << i;
+    if (!(i < num_row_groups())) {
+      std::stringstream ss;
+      ss << "The file only has " << num_row_groups()
+         << " row groups, requested metadata for row group: " << i;
+      throw ParquetException(ss.str());
+    }
     return RowGroupMetaData::Make(
         reinterpret_cast<const uint8_t*>(&metadata_->row_groups[i]), &schema_);
   }
 
-  const SchemaDescriptor* schema_descriptor() const { return &schema_; }
+  const SchemaDescriptor* schema() const { return &schema_; }
 
  private:
   friend FileMetaDataBuilder;
@@ -306,8 +313,8 @@ int FileMetaData::num_schema_elements() const {
   return impl_->num_schema_elements();
 }
 
-const SchemaDescriptor* FileMetaData::schema_descriptor() const {
-  return impl_->schema_descriptor();
+const SchemaDescriptor* FileMetaData::schema() const {
+  return impl_->schema();
 }
 
 void FileMetaData::WriteTo(OutputStream* dst) {
@@ -374,6 +381,8 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
     column_chunk_->meta_data.__set_encodings(thrift_encodings);
   }
 
+  const ColumnDescriptor* descr() const { return column_; }
+
  private:
   format::ColumnChunk* column_chunk_;
   const std::shared_ptr<WriterProperties> properties_;
@@ -406,24 +415,33 @@ void ColumnChunkMetaDataBuilder::Finish(int64_t num_values,
       compressed_size, uncompressed_size, dictionary_fallback);
 }
 
+const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {
+  return impl_->descr();
+}
+
 void ColumnChunkMetaDataBuilder::SetStatistics(const ColumnStatistics& result) {
   impl_->SetStatistics(result);
 }
 
 class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
  public:
-  explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
-      const SchemaDescriptor* schema, uint8_t* contents)
+  explicit RowGroupMetaDataBuilderImpl(int64_t num_rows,
+      const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema,
+      uint8_t* contents)
       : properties_(props), schema_(schema), current_column_(0) {
     row_group_ = reinterpret_cast<format::RowGroup*>(contents);
     InitializeColumns(schema->num_columns());
+    row_group_->__set_num_rows(num_rows);
   }
   ~RowGroupMetaDataBuilderImpl() {}
 
   ColumnChunkMetaDataBuilder* NextColumnChunk() {
-    DCHECK(current_column_ < num_columns())
-        << "The schema only has " << num_columns()
-        << " columns, requested metadata for column: " << current_column_;
+    if (!(current_column_ < num_columns())) {
+      std::stringstream ss;
+      ss << "The schema only has " << num_columns()
+         << " columns, requested metadata for column: " << current_column_;
+      throw ParquetException(ss.str());
+    }
     auto column = schema_->Column(current_column_);
     auto column_builder = ColumnChunkMetaDataBuilder::Make(properties_, column,
         reinterpret_cast<uint8_t*>(&row_group_->columns[current_column_++]));
@@ -432,25 +450,32 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
     return column_builder_ptr;
   }
 
-  void Finish(int64_t num_rows) {
-    DCHECK(current_column_ == schema_->num_columns())
-        << "Only " << current_column_ - 1 << " out of " << schema_->num_columns()
-        << " columns are initialized";
-    size_t total_byte_size = 0;
+  void Finish(int64_t total_bytes_written) {
+    if (!(current_column_ == schema_->num_columns())) {
+      std::stringstream ss;
+      ss << "Only " << current_column_ - 1 << " out of " << schema_->num_columns()
+         << " columns are initialized";
+      throw ParquetException(ss.str());
+    }
+    int64_t total_byte_size = 0;
 
     for (int i = 0; i < schema_->num_columns(); i++) {
-      DCHECK(row_group_->columns[i].file_offset > 0) << "Column " << i
-                                                     << " is not complete.";
+      if (!(row_group_->columns[i].file_offset > 0)) {
+        std::stringstream ss;
+        ss << "Column " << i << " is not complete.";
+        throw ParquetException(ss.str());
+      }
       total_byte_size += row_group_->columns[i].meta_data.total_compressed_size;
     }
+    DCHECK(total_bytes_written == total_byte_size)
+        << "Total bytes in this RowGroup does not match with compressed sizes of columns";
 
     row_group_->__set_total_byte_size(total_byte_size);
-    row_group_->__set_num_rows(num_rows);
   }
 
- private:
   int num_columns() { return row_group_->columns.size(); }
 
+ private:
   void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); }
 
   format::RowGroup* row_group_;
@@ -460,18 +485,18 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
   int current_column_;
 };
 
-std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
+std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(int64_t num_rows,
     const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
     uint8_t* contents) {
   return std::unique_ptr<RowGroupMetaDataBuilder>(
-      new RowGroupMetaDataBuilder(props, schema_, contents));
+      new RowGroupMetaDataBuilder(num_rows, props, schema_, contents));
 }
 
-RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(
+RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(int64_t num_rows,
     const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
     uint8_t* contents)
     : impl_{std::unique_ptr<RowGroupMetaDataBuilderImpl>(
-          new RowGroupMetaDataBuilderImpl(props, schema_, contents))} {}
+          new RowGroupMetaDataBuilderImpl(num_rows, props, schema_, contents))} {}
 
 RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() {}
 
@@ -479,11 +504,16 @@ ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() {
   return impl_->NextColumnChunk();
 }
 
-void RowGroupMetaDataBuilder::Finish(int64_t num_rows) {
-  impl_->Finish(num_rows);
+int RowGroupMetaDataBuilder::num_columns() {
+  return impl_->num_columns();
+}
+
+void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written) {
+  impl_->Finish(total_bytes_written);
 }
 
 // file metadata
+// TODO(PARQUET-595) Support key_value_metadata
 class FileMetaDataBuilder::FileMetaDataBuilderImpl {
  public:
   explicit FileMetaDataBuilderImpl(
@@ -493,10 +523,10 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
   }
   ~FileMetaDataBuilderImpl() {}
 
-  RowGroupMetaDataBuilder* AppendRowGroup() {
+  RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows) {
     auto row_group = std::unique_ptr<format::RowGroup>(new format::RowGroup());
     auto row_group_builder = RowGroupMetaDataBuilder::Make(
-        properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
+        num_rows, properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
     RowGroupMetaDataBuilder* row_group_ptr = row_group_builder.get();
     row_group_builders_.push_back(std::move(row_group_builder));
     row_groups_.push_back(std::move(row_group));
@@ -517,7 +547,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
     metadata_->__set_version(properties_->version());
     metadata_->__set_created_by(properties_->created_by());
     parquet::schema::SchemaFlattener flattener(
-        static_cast<parquet::schema::GroupNode*>(schema_->schema().get()),
+        static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()),
         &metadata_->schema);
     flattener.Flatten();
     auto file_meta_data = std::unique_ptr<FileMetaData>(new FileMetaData());
@@ -548,8 +578,8 @@ FileMetaDataBuilder::FileMetaDataBuilder(
 
 FileMetaDataBuilder::~FileMetaDataBuilder() {}
 
-RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() {
-  return impl_->AppendRowGroup();
+RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup(int64_t num_rows) {
+  return impl_->AppendRowGroup(num_rows);
 }
 
 std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 78ea53b..1d96621 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -117,7 +117,7 @@ class PARQUET_EXPORT FileMetaData {
   void WriteTo(OutputStream* dst);
 
   // Return const-pointer to make it clear that this object is not to be copied
-  const SchemaDescriptor* schema_descriptor() const;
+  const SchemaDescriptor* schema() const;
 
  private:
   friend FileMetaDataBuilder;
@@ -144,7 +144,8 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
   // column metadata
   // ownership of min/max is with ColumnChunkMetadata
   void SetStatistics(const ColumnStatistics& stats);
-
+  // get the column descriptor
+  const ColumnDescriptor* descr() const;
   // commit the metadata
   void Finish(int64_t num_values, int64_t dictonary_page_offset,
       int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size,
@@ -161,20 +162,22 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
 class PARQUET_EXPORT RowGroupMetaDataBuilder {
  public:
   // API convenience to get a MetaData reader
-  static std::unique_ptr<RowGroupMetaDataBuilder> Make(
+  static std::unique_ptr<RowGroupMetaDataBuilder> Make(int64_t num_rows,
       const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
       uint8_t* contents);
 
   ~RowGroupMetaDataBuilder();
 
   ColumnChunkMetaDataBuilder* NextColumnChunk();
+  int num_columns();
 
   // commit the metadata
-  void Finish(int64_t num_rows);
+  void Finish(int64_t total_bytes_written);
 
  private:
-  explicit RowGroupMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
-      const SchemaDescriptor* schema_, uint8_t* contents);
+  explicit RowGroupMetaDataBuilder(int64_t num_rows,
+      const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
+      uint8_t* contents);
   // PIMPL Idiom
   class RowGroupMetaDataBuilderImpl;
   std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
@@ -188,7 +191,7 @@ class PARQUET_EXPORT FileMetaDataBuilder {
 
   ~FileMetaDataBuilder();
 
-  RowGroupMetaDataBuilder* AppendRowGroup();
+  RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows);
 
   // commit the metadata
   std::unique_ptr<FileMetaData> Finish();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index b593ea0..7cf3f1a 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -126,7 +126,7 @@ void ParquetFileReader::DebugPrint(
   stream << "Total rows: " << file_metadata->num_rows() << "\n";
   stream << "Number of RowGroups: " << file_metadata->num_row_groups() << "\n";
   stream << "Number of Real Columns: "
-         << file_metadata->schema_descriptor()->group()->field_count() << "\n";
+         << file_metadata->schema()->group_node()->field_count() << "\n";
 
   if (selected_columns.size() == 0) {
     for (int i = 0; i < file_metadata->num_columns(); i++) {
@@ -143,7 +143,7 @@ void ParquetFileReader::DebugPrint(
   stream << "Number of Columns: " << file_metadata->num_columns() << "\n";
   stream << "Number of Selected Columns: " << selected_columns.size() << "\n";
   for (auto i : selected_columns) {
-    const ColumnDescriptor* descr = file_metadata->schema_descriptor()->Column(i);
+    const ColumnDescriptor* descr = file_metadata->schema()->Column(i);
     stream << "Column " << i << ": " << descr->name() << " ("
            << TypeToString(descr->physical_type()) << ")" << std::endl;
   }
@@ -162,7 +162,7 @@ void ParquetFileReader::DebugPrint(
       auto column_chunk = group_metadata->ColumnChunk(i);
       const ColumnStatistics stats = column_chunk->statistics();
 
-      const ColumnDescriptor* descr = file_metadata->schema_descriptor()->Column(i);
+      const ColumnDescriptor* descr = file_metadata->schema()->Column(i);
       stream << "Column " << i << std::endl << ", values: " << column_chunk->num_values();
       if (column_chunk->is_stats_set()) {
         stream << ", null values: " << stats.null_count
@@ -201,7 +201,7 @@ void ParquetFileReader::DebugPrint(
       std::string fmt = ss.str();
 
       snprintf(buffer, bufsize, fmt.c_str(),
-          file_metadata->schema_descriptor()->Column(i)->name().c_str());
+          file_metadata->schema()->Column(i)->name().c_str());
       stream << buffer;
 
       // This is OK in this method as long as the RowGroupReader does not get

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index f07b44b..fb05f13 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -34,25 +34,23 @@ static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
 // SerializedPageWriter
 
 SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type codec,
-    format::ColumnChunk* metadata, MemoryAllocator* allocator)
+    ColumnChunkMetaDataBuilder* metadata, MemoryAllocator* allocator)
     : sink_(sink),
       metadata_(metadata),
-      // allocator_(allocator),
+      num_values_(0),
+      dictionary_page_offset_(0),
+      data_page_offset_(0),
+      total_uncompressed_size_(0),
+      total_compressed_size_(0),
       compression_buffer_(std::make_shared<OwnedMutableBuffer>(0, allocator)) {
   compressor_ = Codec::Create(codec);
-  // Currently we directly start with the data page
-  metadata_->meta_data.__set_data_page_offset(sink_->Tell());
-  metadata_->meta_data.__set_codec(ToThrift(codec));
 }
 
-void SerializedPageWriter::Close() {}
-
-void SerializedPageWriter::AddEncoding(Encoding::type encoding) {
-  auto it = std::find(metadata_->meta_data.encodings.begin(),
-      metadata_->meta_data.encodings.end(), ToThrift(encoding));
-  if (it != metadata_->meta_data.encodings.end()) {
-    metadata_->meta_data.encodings.push_back(ToThrift(encoding));
-  }
+void SerializedPageWriter::Close() {
+  // index_page_offset = 0 since they are not supported
+  // TODO: Remove default fallback = 'false' when implemented
+  metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
+      total_compressed_size_, total_uncompressed_size_, false);
 }
 
 std::shared_ptr<Buffer> SerializedPageWriter::Compress(
@@ -91,13 +89,14 @@ int64_t SerializedPageWriter::WriteDataPage(const DataPage& page) {
   // TODO(PARQUET-594) crc checksum
 
   int64_t start_pos = sink_->Tell();
+  if (data_page_offset_ == 0) { data_page_offset_ = start_pos; }
   SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
   int64_t header_size = sink_->Tell() - start_pos;
   sink_->Write(compressed_data->data(), compressed_data->size());
 
-  metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size;
-  metadata_->meta_data.total_compressed_size += compressed_data->size() + header_size;
-  metadata_->meta_data.num_values += page.num_values();
+  total_uncompressed_size_ += uncompressed_size + header_size;
+  total_compressed_size_ += compressed_data->size() + header_size;
+  num_values_ += page.num_values();
 
   return sink_->Tell() - start_pos;
 }
@@ -119,12 +118,13 @@ int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) {
   // TODO(PARQUET-594) crc checksum
 
   int64_t start_pos = sink_->Tell();
+  if (dictionary_page_offset_ == 0) { dictionary_page_offset_ = start_pos; }
   SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
   int64_t header_size = sink_->Tell() - start_pos;
   sink_->Write(compressed_data->data(), compressed_data->size());
 
-  metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size;
-  metadata_->meta_data.total_compressed_size += compressed_data->size() + header_size;
+  total_uncompressed_size_ += uncompressed_size + header_size;
+  total_compressed_size_ += compressed_data->size() + header_size;
 
   return sink_->Tell() - start_pos;
 }
@@ -133,50 +133,38 @@ int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) {
 // RowGroupSerializer
 
 int RowGroupSerializer::num_columns() const {
-  return schema_->num_columns();
+  return metadata_->num_columns();
 }
 
 int64_t RowGroupSerializer::num_rows() const {
   return num_rows_;
 }
 
-const SchemaDescriptor* RowGroupSerializer::schema() const {
-  return schema_;
-}
-
 ColumnWriter* RowGroupSerializer::NextColumn() {
-  if (current_column_index_ == schema_->num_columns() - 1) {
-    throw ParquetException("All columns have already been written.");
-  }
-  current_column_index_++;
+  // Throws an error if more columns are being written
+  auto col_meta = metadata_->NextColumnChunk();
 
-  if (current_column_writer_) { total_bytes_written_ += current_column_writer_->Close(); }
+  if (current_column_writer_) { total_bytes_written_ = current_column_writer_->Close(); }
 
-  const ColumnDescriptor* column_descr = schema_->Column(current_column_index_);
-  format::ColumnChunk* col_meta = &metadata_->columns[current_column_index_];
-  col_meta->__isset.meta_data = true;
-  col_meta->meta_data.__set_type(ToThrift(column_descr->physical_type()));
-  col_meta->meta_data.__set_path_in_schema(column_descr->path()->ToDotVector());
-  std::unique_ptr<PageWriter> pager(new SerializedPageWriter(
-      sink_, properties_->compression(column_descr->path()), col_meta, allocator_));
+  const ColumnDescriptor* column_descr = col_meta->descr();
+  std::unique_ptr<PageWriter> pager(
+      new SerializedPageWriter(sink_, properties_->compression(column_descr->path()),
+          col_meta, properties_->allocator()));
   current_column_writer_ =
-      ColumnWriter::Make(column_descr, std::move(pager), num_rows_, properties_);
+      ColumnWriter::Make(col_meta->descr(), std::move(pager), num_rows_, properties_);
   return current_column_writer_.get();
 }
 
 void RowGroupSerializer::Close() {
   if (!closed_) {
     closed_ = true;
-    if (current_column_index_ != schema_->num_columns() - 1) {
-      throw ParquetException("Not all column were written in the current rowgroup.");
-    }
 
     if (current_column_writer_) {
-      total_bytes_written_ += current_column_writer_->Close();
+      total_bytes_written_ = current_column_writer_->Close();
       current_column_writer_.reset();
     }
-
-    metadata_->__set_total_byte_size(total_bytes_written_);
+    // Ensures all columns have been written
+    metadata_->Finish(total_bytes_written_);
   }
 }
 
@@ -225,13 +213,10 @@ RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
   if (row_group_writer_) { row_group_writer_->Close(); }
   num_rows_ += num_rows;
   num_row_groups_++;
-
-  auto rgm_size = row_group_metadata_.size();
-  row_group_metadata_.resize(rgm_size + 1);
-  format::RowGroup* rg_metadata = &row_group_metadata_.data()[rgm_size];
-  std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
-      num_rows, &schema_, sink_.get(), rg_metadata, properties_.get()));
-  row_group_writer_.reset(new RowGroupWriter(std::move(contents), allocator_));
+  auto rg_metadata = metadata_->AppendRowGroup(num_rows);
+  std::unique_ptr<RowGroupWriter::Contents> contents(
+      new RowGroupSerializer(num_rows, sink_.get(), rg_metadata, properties_.get()));
+  row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
   return row_group_writer_.get();
 }
 
@@ -243,19 +228,9 @@ void FileSerializer::WriteMetaData() {
   // Write MetaData
   uint32_t metadata_len = sink_->Tell();
 
-  SchemaFlattener flattener(
-      static_cast<GroupNode*>(schema_.schema().get()), &metadata_.schema);
-  flattener.Flatten();
-
-  // TODO: Currently we only write version 1 files
-  metadata_.__set_version(1);
-  metadata_.__set_num_rows(num_rows_);
-  metadata_.__set_row_groups(row_group_metadata_);
-  // TODO(PARQUET-595) Support key_value_metadata
-  // TODO(PARQUET-590) Get from WriterProperties
-  metadata_.__set_created_by("parquet-cpp");
-
-  SerializeThriftMsg(&metadata_, 1024, sink_.get());
+  // Get a FileMetaData
+  auto metadata = metadata_->Finish();
+  metadata->WriteTo(sink_.get());
   metadata_len = sink_->Tell() - metadata_len;
 
   // Write Footer
@@ -267,12 +242,12 @@ FileSerializer::FileSerializer(std::shared_ptr<OutputStream> sink,
     const std::shared_ptr<GroupNode>& schema,
     const std::shared_ptr<WriterProperties>& properties)
     : sink_(sink),
-      allocator_(properties->allocator()),
-      num_row_groups_(0),
-      num_rows_(0),
       is_open_(true),
-      properties_(properties) {
+      properties_(properties),
+      num_row_groups_(0),
+      num_rows_(0) {
   schema_.Init(schema);
+  metadata_ = FileMetaDataBuilder::Make(&schema_, properties);
   StartFile();
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 7c46408..645d4bf 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -23,6 +23,7 @@
 
 #include "parquet/column/page.h"
 #include "parquet/compression/codec.h"
+#include "parquet/file/metadata.h"
 #include "parquet/file/writer.h"
 #include "parquet/thrift/parquet_types.h"
 
@@ -31,11 +32,11 @@ namespace parquet {
 // This subclass delimits pages appearing in a serialized stream, each preceded
 // by a serialized Thrift format::PageHeader indicating the type of each page
 // and the page metadata.
-//
 class SerializedPageWriter : public PageWriter {
  public:
   SerializedPageWriter(OutputStream* sink, Compression::type codec,
-      format::ColumnChunk* metadata, MemoryAllocator* allocator = default_allocator());
+      ColumnChunkMetaDataBuilder* metadata,
+      MemoryAllocator* allocator = default_allocator());
 
   virtual ~SerializedPageWriter() {}
 
@@ -47,14 +48,17 @@ class SerializedPageWriter : public PageWriter {
 
  private:
   OutputStream* sink_;
-  format::ColumnChunk* metadata_;
-  // MemoryAllocator* allocator_;
+  ColumnChunkMetaDataBuilder* metadata_;
+  int64_t num_values_;
+  int64_t dictionary_page_offset_;
+  int64_t data_page_offset_;
+  int64_t total_uncompressed_size_;
+  int64_t total_compressed_size_;
 
   // Compression codec to use.
   std::unique_ptr<Codec> compressor_;
   std::shared_ptr<OwnedMutableBuffer> compression_buffer_;
 
-  void AddEncoding(Encoding::type encoding);
   /**
    * Compress a buffer.
    *
@@ -67,24 +71,17 @@ class SerializedPageWriter : public PageWriter {
 // RowGroupWriter::Contents implementation for the Parquet file specification
 class RowGroupSerializer : public RowGroupWriter::Contents {
  public:
-  RowGroupSerializer(int64_t num_rows, const SchemaDescriptor* schema, OutputStream* sink,
-      format::RowGroup* metadata, const WriterProperties* properties)
+  RowGroupSerializer(int64_t num_rows, OutputStream* sink,
+      RowGroupMetaDataBuilder* metadata, const WriterProperties* properties)
       : num_rows_(num_rows),
-        schema_(schema),
         sink_(sink),
         metadata_(metadata),
-        allocator_(properties->allocator()),
         properties_(properties),
         total_bytes_written_(0),
-        closed_(false),
-        current_column_index_(-1) {
-    metadata_->__set_num_rows(num_rows_);
-    metadata_->columns.resize(schema->num_columns());
-  }
+        closed_(false) {}
 
   int num_columns() const override;
   int64_t num_rows() const override;
-  const SchemaDescriptor* schema() const override;
 
   // TODO: PARQUET-579
   // void WriteRowGroupStatitics() override;
@@ -94,15 +91,12 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
 
  private:
   int64_t num_rows_;
-  const SchemaDescriptor* schema_;
   OutputStream* sink_;
-  format::RowGroup* metadata_;
-  MemoryAllocator* allocator_;
+  RowGroupMetaDataBuilder* metadata_;
   const WriterProperties* properties_;
   int64_t total_bytes_written_;
   bool closed_;
 
-  int64_t current_column_index_;
   std::shared_ptr<ColumnWriter> current_column_writer_;
 };
 
@@ -134,14 +128,12 @@ class FileSerializer : public ParquetFileWriter::Contents {
       const std::shared_ptr<WriterProperties>& properties);
 
   std::shared_ptr<OutputStream> sink_;
-  format::FileMetaData metadata_;
-  std::vector<format::RowGroup> row_group_metadata_;
-  MemoryAllocator* allocator_;
-  int num_row_groups_;
-  int num_rows_;
   bool is_open_;
+  const std::shared_ptr<WriterProperties> properties_;
+  int num_row_groups_;
+  int64_t num_rows_;
+  std::unique_ptr<FileMetaDataBuilder> metadata_;
   std::unique_ptr<RowGroupWriter> row_group_writer_;
-  std::shared_ptr<WriterProperties> properties_;
 
   void StartFile();
   void WriteMetaData();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
index 269b1cb..8c9f52f 100644
--- a/src/parquet/file/writer.cc
+++ b/src/parquet/file/writer.cc
@@ -27,11 +27,8 @@ namespace parquet {
 // ----------------------------------------------------------------------
 // RowGroupWriter public API
 
-RowGroupWriter::RowGroupWriter(
-    std::unique_ptr<Contents> contents, MemoryAllocator* allocator)
-    : contents_(std::move(contents)), allocator_(allocator) {
-  schema_ = contents_->schema();
-}
+RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
+    : contents_(std::move(contents)) {}
 
 void RowGroupWriter::Close() {
   if (contents_) {
@@ -64,9 +61,16 @@ std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
   return result;
 }
 
+const SchemaDescriptor* ParquetFileWriter::schema() const {
+  return contents_->schema();
+}
+
+const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
+  return contents_->schema()->Column(i);
+}
+
 void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
   contents_ = std::move(contents);
-  schema_ = contents_->schema();
 }
 
 void ParquetFileWriter::Close() {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index 0e64961..422f008 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -46,12 +46,9 @@ class PARQUET_EXPORT RowGroupWriter {
     // virtual void WriteRowGroupStatitics();
     virtual ColumnWriter* NextColumn() = 0;
     virtual void Close() = 0;
-
-    // Return const-pointer to make it clear that this object is not to be copied
-    virtual const SchemaDescriptor* schema() const = 0;
   };
 
-  RowGroupWriter(std::unique_ptr<Contents> contents, MemoryAllocator* allocator);
+  explicit RowGroupWriter(std::unique_ptr<Contents> contents);
 
   /**
    * Construct a ColumnWriter for the indicated row group-relative column.
@@ -75,13 +72,8 @@ class PARQUET_EXPORT RowGroupWriter {
   // virtual void WriteRowGroupStatitics();
 
  private:
-  // Owned by the parent ParquetFileWriter
-  const SchemaDescriptor* schema_;
-
   // Holds a pointer to an instance of Contents implementation
   std::unique_ptr<Contents> contents_;
-
-  MemoryAllocator* allocator_;
 };
 
 class PARQUET_EXPORT ParquetFileWriter {
@@ -102,7 +94,7 @@ class PARQUET_EXPORT ParquetFileWriter {
 
     virtual const std::shared_ptr<WriterProperties>& properties() const = 0;
 
-    // Return const-poitner to make it clear that this object is not to be copied
+    // Return const-pointer to make it clear that this object is not to be copied
     const SchemaDescriptor* schema() const { return &schema_; }
     SchemaDescriptor schema_;
   };
@@ -153,18 +145,18 @@ class PARQUET_EXPORT ParquetFileWriter {
   const std::shared_ptr<WriterProperties>& properties() const;
 
   /**
-   * Returns the file schema descriptor
-   */
-  const SchemaDescriptor* descr() { return schema_; }
+    * Returns the file schema descriptor
+    */
+  const SchemaDescriptor* schema() const;
 
-  const ColumnDescriptor* column_schema(int i) const { return schema_->Column(i); }
+  /**
+   * Returns a column descriptor in schema
+   */
+  const ColumnDescriptor* descr(int i) const;
 
  private:
   // Holds a pointer to an instance of Contents implementation
   std::unique_ptr<Contents> contents_;
-
-  // The SchemaDescriptor is provided by the Contents impl
-  const SchemaDescriptor* schema_;
 };
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/schema/descriptor.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/descriptor.cc b/src/parquet/schema/descriptor.cc
index de63e5e..4d46204 100644
--- a/src/parquet/schema/descriptor.cc
+++ b/src/parquet/schema/descriptor.cc
@@ -39,11 +39,11 @@ void SchemaDescriptor::Init(const NodePtr& schema) {
     throw ParquetException("Must initialize with a schema group");
   }
 
-  group_ = static_cast<const GroupNode*>(schema_.get());
+  group_node_ = static_cast<const GroupNode*>(schema_.get());
   leaves_.clear();
 
-  for (int i = 0; i < group_->field_count(); ++i) {
-    BuildTree(group_->field(i), 0, 0, group_->field(i));
+  for (int i = 0; i < group_node_->field_count(); ++i) {
+    BuildTree(group_node_->field(i), 0, 0, group_node_->field(i));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/schema/descriptor.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema/descriptor.h b/src/parquet/schema/descriptor.h
index 51f3503..c591954 100644
--- a/src/parquet/schema/descriptor.h
+++ b/src/parquet/schema/descriptor.h
@@ -100,18 +100,20 @@ class PARQUET_EXPORT SchemaDescriptor {
   // The number of physical columns appearing in the file
   int num_columns() const { return leaves_.size(); }
 
-  const schema::NodePtr& schema() const { return schema_; }
+  const schema::NodePtr& schema_root() const { return schema_; }
 
-  const schema::GroupNode* group() const { return group_; }
+  const schema::GroupNode* group_node() const { return group_node_; }
 
   // Returns the root (child of the schema root) node of the leaf(column) node
   const schema::NodePtr& GetColumnRoot(int i) const;
 
+  const std::string& name() const { return group_node_->name(); }
+
  private:
   friend class ColumnDescriptor;
 
   schema::NodePtr schema_;
-  const schema::GroupNode* group_;
+  const schema::GroupNode* group_node_;
 
   void BuildTree(const schema::NodePtr& node, int16_t max_def_level,
       int16_t max_rep_level, const schema::NodePtr& base);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/src/parquet/schema/schema-descriptor-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc
index d88cd0d..5be8db7 100644
--- a/src/parquet/schema/schema-descriptor-test.cc
+++ b/src/parquet/schema/schema-descriptor-test.cc
@@ -128,7 +128,7 @@ TEST_F(TestSchemaDescriptor, BuildTree) {
   ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get());
   ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5).get());
 
-  ASSERT_EQ(schema.get(), descr_.group());
+  ASSERT_EQ(schema.get(), descr_.group_node());
 
   // Init clears the leaves
   descr_.Init(schema);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/55604b29/tools/parquet-dump-schema.cc
----------------------------------------------------------------------
diff --git a/tools/parquet-dump-schema.cc b/tools/parquet-dump-schema.cc
index deef2fd..1e0239b 100644
--- a/tools/parquet-dump-schema.cc
+++ b/tools/parquet-dump-schema.cc
@@ -26,7 +26,7 @@ int main(int argc, char** argv) {
   try {
     std::unique_ptr<parquet::ParquetFileReader> reader =
         parquet::ParquetFileReader::OpenFile(filename);
-    PrintSchema(reader->metadata()->schema_descriptor()->schema().get(), std::cout);
+    PrintSchema(reader->metadata()->schema()->schema_root().get(), std::cout);
   } catch (const std::exception& e) {
     std::cerr << "Parquet error: " << e.what() << std::endl;
     return -1;