You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/06/20 12:39:56 UTC

parquet-cpp git commit: PARQUET-592: Support compressed writes

Repository: parquet-cpp
Updated Branches:
  refs/heads/master be04dcec1 -> aac1f2aa4


PARQUET-592: Support compressed writes

Hello,

I'm working on compressed writes and would like some advice:

* What would be the preferred way to pass down per-column codec settings? `Builder::set_column_codec(column index (?), Compression::type)`?
* It appears that there's some duplication in `ParquetFileWriter::Open`, namely, there is an allocator included into `WriterProperties` \u2014 I presume this should be cleaned up via a separate PR?

Author: Artem Tarasov <ar...@embl.de>

Closes #121 from lomereiter/parquet-592 and squashes the following commits:

03d4ed8 [Artem Tarasov] address comments from reviewers
a1fe78e [Artem Tarasov] remove redundant 'allocator' constructor parameter
d2c0473 [Artem Tarasov] allow to reuse builder object
dfbee1f [Artem Tarasov] props -> properties
e687e81 [Artem Tarasov] serialization test cleanup
470e176 [Artem Tarasov] more convenient Builder methods
5d236df [Artem Tarasov] per-column compression support
a80c2f2 [Artem Tarasov] don't use shared_ptr where raw is enough;
7c745d9 [Artem Tarasov] run three separate tests
25a23e5 [Artem Tarasov] loop over codec types in file serialization test
4a0a044 [Artem Tarasov] PARQUET-592: Support compressed writes


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/aac1f2aa
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/aac1f2aa
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/aac1f2aa

Branch: refs/heads/master
Commit: aac1f2aa4c09703aa5e382dd394d51dd684ada7e
Parents: be04dce
Author: Artem Tarasov <ar...@embl.de>
Authored: Mon Jun 20 05:39:48 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Mon Jun 20 05:39:48 2016 -0700

----------------------------------------------------------------------
 src/parquet/column/properties.h         | 50 +++++++++++++++---
 src/parquet/file/file-serialize-test.cc | 78 ++++++++++++++++------------
 src/parquet/file/writer-internal.cc     | 31 ++++++-----
 src/parquet/file/writer-internal.h      |  9 ++--
 src/parquet/file/writer.cc              |  4 +-
 src/parquet/file/writer.h               |  3 +-
 6 files changed, 115 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
index d2a231b..ee74290 100644
--- a/src/parquet/column/properties.h
+++ b/src/parquet/column/properties.h
@@ -20,9 +20,12 @@
 
 #include <memory>
 #include <string>
+#include <unordered_map>
 
 #include "parquet/util/input.h"
 #include "parquet/util/mem-allocator.h"
+#include "parquet/types.h"
+#include "parquet/schema/types.h"
 
 namespace parquet {
 
@@ -78,6 +81,10 @@ static int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
 static bool DEFAULT_IS_DICTIONARY_ENABLED = true;
 static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
     ParquetVersion::PARQUET_1_0;
+static constexpr Compression::type DEFAULT_COMPRESSION_TYPE =
+    Compression::UNCOMPRESSED;
+
+using ColumnCodecs = std::unordered_map<std::string, Compression::type>;
 
 class WriterProperties {
  public:
@@ -88,7 +95,8 @@ class WriterProperties {
           dictionary_enabled_(DEFAULT_IS_DICTIONARY_ENABLED),
           dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
           pagesize_(DEFAULT_PAGE_SIZE),
-          version_(DEFAULT_WRITER_VERSION) {}
+          version_(DEFAULT_WRITER_VERSION),
+          default_codec_(DEFAULT_COMPRESSION_TYPE) {}
     virtual ~Builder() {}
 
     Builder* allocator(MemoryAllocator* allocator) {
@@ -121,9 +129,25 @@ class WriterProperties {
       return this;
     }
 
+    Builder* compression(Compression::type codec) {
+      default_codec_ = codec;
+      return this;
+    }
+
+    Builder* compression(const std::string& path, Compression::type codec) {
+      codecs_[path] = codec;
+      return this;
+    }
+
+    Builder* compression(const std::shared_ptr<schema::ColumnPath>& path,
+                         Compression::type codec) {
+      return this->compression(path->ToDotString(), codec);
+    }
+
     std::shared_ptr<WriterProperties> build() {
       return std::shared_ptr<WriterProperties>(new WriterProperties(
-          allocator_, dictionary_enabled_, dictionary_pagesize_, pagesize_, version_));
+          allocator_, dictionary_enabled_, dictionary_pagesize_,
+          pagesize_, version_, default_codec_, codecs_));
     }
 
    private:
@@ -132,9 +156,11 @@ class WriterProperties {
     int64_t dictionary_pagesize_;
     int64_t pagesize_;
     ParquetVersion::type version_;
+    Compression::type default_codec_;
+    ColumnCodecs codecs_;
   };
 
-  MemoryAllocator* allocator() { return allocator_; }
+  MemoryAllocator* allocator() const { return allocator_; }
 
   bool dictionary_enabled() const { return dictionary_enabled_; }
 
@@ -142,16 +168,26 @@ class WriterProperties {
 
   int64_t data_pagesize() const { return pagesize_; }
 
-  ParquetVersion::type version() { return parquet_version_; }
+  ParquetVersion::type version() const { return parquet_version_; }
+
+  Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
+    auto it = codecs_.find(path->ToDotString());
+    if (it != codecs_.end())
+      return it->second;
+    return default_codec_;
+  }
 
  private:
   explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled,
-      int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version)
+      int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version,
+      Compression::type default_codec, const ColumnCodecs& codecs)
       : allocator_(allocator),
         dictionary_enabled_(dictionary_enabled),
         dictionary_pagesize_(dictionary_pagesize),
         pagesize_(pagesize),
-        parquet_version_(version) {
+        parquet_version_(version),
+        default_codec_(default_codec),
+        codecs_(codecs) {
     pagesize_ = DEFAULT_PAGE_SIZE;
     dictionary_enabled_ = DEFAULT_IS_DICTIONARY_ENABLED;
   }
@@ -161,6 +197,8 @@ class WriterProperties {
   int64_t dictionary_pagesize_;
   int64_t pagesize_;
   ParquetVersion::type parquet_version_;
+  Compression::type default_codec_;
+  ColumnCodecs codecs_;
 };
 
 std::shared_ptr<WriterProperties> default_writer_properties();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 194e496..ca7bb45 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -61,40 +61,54 @@ class TestSerialize : public ::testing::Test {
  protected:
   NodePtr node_;
   SchemaDescriptor schema_;
+
+  void FileSerializeTest(Compression::type codec_type) {
+    std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+    auto gnode = std::static_pointer_cast<GroupNode>(node_);
+    std::shared_ptr<WriterProperties> writer_properties =
+        WriterProperties::Builder().compression("schema.int64", codec_type)->build();
+    auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
+    auto row_group_writer = file_writer->AppendRowGroup(100);
+    auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
+    std::vector<int64_t> values(100, 128);
+    column_writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
+    column_writer->Close();
+    row_group_writer->Close();
+    file_writer->Close();
+
+    auto buffer = sink->GetBuffer();
+    std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
+    auto file_reader = ParquetFileReader::Open(std::move(source));
+    ASSERT_EQ(1, file_reader->num_columns());
+    ASSERT_EQ(1, file_reader->num_row_groups());
+    ASSERT_EQ(100, file_reader->num_rows());
+
+    auto rg_reader = file_reader->RowGroup(0);
+    ASSERT_EQ(1, rg_reader->num_columns());
+    ASSERT_EQ(100, rg_reader->num_rows());
+
+    auto col_reader = std::static_pointer_cast<Int64Reader>(rg_reader->Column(0));
+    std::vector<int64_t> values_out(100);
+    std::vector<int16_t> def_levels_out(100);
+    std::vector<int16_t> rep_levels_out(100);
+    int64_t values_read;
+    col_reader->ReadBatch(values_out.size(), def_levels_out.data(), rep_levels_out.data(),
+        values_out.data(), &values_read);
+    ASSERT_EQ(100, values_read);
+    ASSERT_EQ(values, values_out);
+  }
 };
 
-TEST_F(TestSerialize, SmallFile) {
-  std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
-  auto gnode = std::static_pointer_cast<GroupNode>(node_);
-  auto file_writer = ParquetFileWriter::Open(sink, gnode);
-  auto row_group_writer = file_writer->AppendRowGroup(100);
-  auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
-  std::vector<int64_t> values(100, 128);
-  column_writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
-  column_writer->Close();
-  row_group_writer->Close();
-  file_writer->Close();
-
-  auto buffer = sink->GetBuffer();
-  std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
-  auto file_reader = ParquetFileReader::Open(std::move(source));
-  ASSERT_EQ(1, file_reader->num_columns());
-  ASSERT_EQ(1, file_reader->num_row_groups());
-  ASSERT_EQ(100, file_reader->num_rows());
-
-  auto rg_reader = file_reader->RowGroup(0);
-  ASSERT_EQ(1, rg_reader->num_columns());
-  ASSERT_EQ(100, rg_reader->num_rows());
-
-  auto col_reader = std::static_pointer_cast<Int64Reader>(rg_reader->Column(0));
-  std::vector<int64_t> values_out(100);
-  std::vector<int16_t> def_levels_out(100);
-  std::vector<int16_t> rep_levels_out(100);
-  int64_t values_read;
-  col_reader->ReadBatch(values_out.size(), def_levels_out.data(), rep_levels_out.data(),
-      values_out.data(), &values_read);
-  ASSERT_EQ(100, values_read);
-  ASSERT_EQ(values, values_out);
+TEST_F(TestSerialize, SmallFileUncompressed) {
+  FileSerializeTest(Compression::UNCOMPRESSED);
+}
+
+TEST_F(TestSerialize, SmallFileSnappy) {
+  FileSerializeTest(Compression::SNAPPY);
+}
+
+TEST_F(TestSerialize, SmallFileGzip) {
+  FileSerializeTest(Compression::GZIP);
 }
 
 }  // namespace test

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index f9ff486..a90f28b 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -77,13 +77,15 @@ int64_t SerializedPageWriter::WriteDataPage(int32_t num_rows, int32_t num_values
   int64_t compressed_size = uncompressed_size;
   std::shared_ptr<OwnedMutableBuffer> compressed_data = uncompressed_data;
   if (compressor_) {
-    // TODO(PARQUET-592): Add support for compression
-    // int64_t max_compressed_size = compressor_->MaxCompressedLen(
-    // uncompressed_data.size(), uncompressed_data.data());
-    // OwnedMutableBuffer compressed_data(compressor_->MaxCompressedLen(
-    // uncompressed_data.size(), uncompressed_data.data()));
+    const uint8_t* uncompressed_ptr = uncompressed_data->data();
+    int64_t max_compressed_size = compressor_->MaxCompressedLen(
+        uncompressed_size, uncompressed_ptr);
+    compressed_data = std::make_shared<OwnedMutableBuffer>(max_compressed_size,
+        allocator_);
+    compressed_size = compressor_->Compress(uncompressed_size, uncompressed_ptr,
+        max_compressed_size, compressed_data->mutable_data());
   }
-  // Compressed data is not needed anymore, so immediately get rid of it.
+  // Uncompressed data is not needed anymore, so immediately get rid of it.
   uncompressed_data.reset();
 
   format::DataPageHeader data_page_header;
@@ -103,7 +105,7 @@ int64_t SerializedPageWriter::WriteDataPage(int32_t num_rows, int32_t num_values
   int64_t start_pos = sink_->Tell();
   SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
   int64_t header_size = sink_->Tell() - start_pos;
-  sink_->Write(compressed_data->data(), compressed_data->size());
+  sink_->Write(compressed_data->data(), compressed_size);
 
   metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size;
   metadata_->meta_data.total_compressed_size += compressed_size + header_size;
@@ -140,8 +142,8 @@ ColumnWriter* RowGroupSerializer::NextColumn() {
   col_meta->__isset.meta_data = true;
   col_meta->meta_data.__set_type(ToThrift(column_descr->physical_type()));
   col_meta->meta_data.__set_path_in_schema(column_descr->path()->ToDotVector());
-  std::unique_ptr<PageWriter> pager(
-      new SerializedPageWriter(sink_, Compression::UNCOMPRESSED, col_meta, allocator_));
+  std::unique_ptr<PageWriter> pager(new SerializedPageWriter(sink_,
+      properties_->compression(column_descr->path()), col_meta, allocator_));
   current_column_writer_ =
       ColumnWriter::Make(column_descr, std::move(pager), num_rows_, allocator_);
   return current_column_writer_.get();
@@ -168,9 +170,9 @@ void RowGroupSerializer::Close() {
 
 std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open(
     std::shared_ptr<OutputStream> sink, std::shared_ptr<GroupNode>& schema,
-    MemoryAllocator* allocator, const std::shared_ptr<WriterProperties>& properties) {
+    const std::shared_ptr<WriterProperties>& properties) {
   std::unique_ptr<ParquetFileWriter::Contents> result(
-      new FileSerializer(sink, schema, allocator, properties));
+      new FileSerializer(sink, schema, properties));
 
   return result;
 }
@@ -213,7 +215,8 @@ RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
   row_group_metadata_.resize(rgm_size + 1);
   format::RowGroup* rg_metadata = &row_group_metadata_.data()[rgm_size];
   std::unique_ptr<RowGroupWriter::Contents> contents(
-      new RowGroupSerializer(num_rows, &schema_, sink_.get(), rg_metadata, allocator_));
+      new RowGroupSerializer(num_rows, &schema_, sink_.get(),
+                             rg_metadata, properties().get()));
   row_group_writer_.reset(new RowGroupWriter(std::move(contents), allocator_));
   return row_group_writer_.get();
 }
@@ -247,10 +250,10 @@ void FileSerializer::WriteMetaData() {
 }
 
 FileSerializer::FileSerializer(std::shared_ptr<OutputStream> sink,
-    std::shared_ptr<GroupNode>& schema, MemoryAllocator* allocator,
+    std::shared_ptr<GroupNode>& schema,
     const std::shared_ptr<WriterProperties>& properties)
     : sink_(sink),
-      allocator_(allocator),
+      allocator_(properties->allocator()),
       num_row_groups_(0),
       num_rows_(0),
       is_open_(true),

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 8ab025c..c49a076 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -67,12 +67,13 @@ class SerializedPageWriter : public PageWriter {
 class RowGroupSerializer : public RowGroupWriter::Contents {
  public:
   RowGroupSerializer(int64_t num_rows, const SchemaDescriptor* schema, OutputStream* sink,
-      format::RowGroup* metadata, MemoryAllocator* allocator)
+      format::RowGroup* metadata, const WriterProperties* properties)
       : num_rows_(num_rows),
         schema_(schema),
         sink_(sink),
         metadata_(metadata),
-        allocator_(allocator),
+        allocator_(properties->allocator()),
+        properties_(properties),
         total_bytes_written_(0),
         closed_(false),
         current_column_index_(-1) {
@@ -96,6 +97,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
   OutputStream* sink_;
   format::RowGroup* metadata_;
   MemoryAllocator* allocator_;
+  const WriterProperties* properties_;
   int64_t total_bytes_written_;
   bool closed_;
 
@@ -110,7 +112,6 @@ class FileSerializer : public ParquetFileWriter::Contents {
  public:
   static std::unique_ptr<ParquetFileWriter::Contents> Open(
       std::shared_ptr<OutputStream> sink, std::shared_ptr<schema::GroupNode>& schema,
-      MemoryAllocator* allocator = default_allocator(),
       const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
 
   void Close() override;
@@ -127,7 +128,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
 
  private:
   explicit FileSerializer(std::shared_ptr<OutputStream> sink,
-      std::shared_ptr<schema::GroupNode>& schema, MemoryAllocator* allocator,
+      std::shared_ptr<schema::GroupNode>& schema,
       const std::shared_ptr<WriterProperties>& properties);
 
   std::shared_ptr<OutputStream> sink_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/file/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
index 8f643c2..84bbe8d 100644
--- a/src/parquet/file/writer.cc
+++ b/src/parquet/file/writer.cc
@@ -55,8 +55,8 @@ ParquetFileWriter::~ParquetFileWriter() {
 
 std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
     std::shared_ptr<OutputStream> sink, std::shared_ptr<GroupNode>& schema,
-    MemoryAllocator* allocator, const std::shared_ptr<WriterProperties>& properties) {
-  auto contents = FileSerializer::Open(sink, schema, allocator);
+    const std::shared_ptr<WriterProperties>& properties) {
+  auto contents = FileSerializer::Open(sink, schema, properties);
 
   std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
   result->Open(std::move(contents));

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index 15b590d..e213fbe 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -106,7 +106,6 @@ class ParquetFileWriter {
 
   static std::unique_ptr<ParquetFileWriter> Open(std::shared_ptr<OutputStream> sink,
       std::shared_ptr<schema::GroupNode>& schema,
-      MemoryAllocator* allocator = default_allocator(),
       const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
 
   void Open(std::unique_ptr<Contents> contents);
@@ -143,7 +142,7 @@ class ParquetFileWriter {
   int num_row_groups() const;
 
   /**
-   * Configuartion passed to the writer, e.g. the used Parquet format version.
+   * Configuration passed to the writer, e.g. the used Parquet format version.
    */
   const std::shared_ptr<WriterProperties>& properties() const;