You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/06/20 12:39:56 UTC
parquet-cpp git commit: PARQUET-592: Support compressed writes
Repository: parquet-cpp
Updated Branches:
refs/heads/master be04dcec1 -> aac1f2aa4
PARQUET-592: Support compressed writes
Hello,
I'm working on compressed writes and would like some advice:
* What would be the preferred way to pass down per-column codec settings? `Builder::set_column_codec(column index (?), Compression::type)`?
* It appears that there's some duplication in `ParquetFileWriter::Open`, namely, there is an allocator included into `WriterProperties` \u2014 I presume this should be cleaned up via a separate PR?
Author: Artem Tarasov <ar...@embl.de>
Closes #121 from lomereiter/parquet-592 and squashes the following commits:
03d4ed8 [Artem Tarasov] address comments from reviewers
a1fe78e [Artem Tarasov] remove redundant 'allocator' constructor parameter
d2c0473 [Artem Tarasov] allow to reuse builder object
dfbee1f [Artem Tarasov] props -> properties
e687e81 [Artem Tarasov] serialization test cleanup
470e176 [Artem Tarasov] more convenient Builder methods
5d236df [Artem Tarasov] per-column compression support
a80c2f2 [Artem Tarasov] don't use shared_ptr where raw is enough;
7c745d9 [Artem Tarasov] run three separate tests
25a23e5 [Artem Tarasov] loop over codec types in file serialization test
4a0a044 [Artem Tarasov] PARQUET-592: Support compressed writes
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/aac1f2aa
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/aac1f2aa
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/aac1f2aa
Branch: refs/heads/master
Commit: aac1f2aa4c09703aa5e382dd394d51dd684ada7e
Parents: be04dce
Author: Artem Tarasov <ar...@embl.de>
Authored: Mon Jun 20 05:39:48 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Mon Jun 20 05:39:48 2016 -0700
----------------------------------------------------------------------
src/parquet/column/properties.h | 50 +++++++++++++++---
src/parquet/file/file-serialize-test.cc | 78 ++++++++++++++++------------
src/parquet/file/writer-internal.cc | 31 ++++++-----
src/parquet/file/writer-internal.h | 9 ++--
src/parquet/file/writer.cc | 4 +-
src/parquet/file/writer.h | 3 +-
6 files changed, 115 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
index d2a231b..ee74290 100644
--- a/src/parquet/column/properties.h
+++ b/src/parquet/column/properties.h
@@ -20,9 +20,12 @@
#include <memory>
#include <string>
+#include <unordered_map>
#include "parquet/util/input.h"
#include "parquet/util/mem-allocator.h"
+#include "parquet/types.h"
+#include "parquet/schema/types.h"
namespace parquet {
@@ -78,6 +81,10 @@ static int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
static bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
ParquetVersion::PARQUET_1_0;
+static constexpr Compression::type DEFAULT_COMPRESSION_TYPE =
+ Compression::UNCOMPRESSED;
+
+using ColumnCodecs = std::unordered_map<std::string, Compression::type>;
class WriterProperties {
public:
@@ -88,7 +95,8 @@ class WriterProperties {
dictionary_enabled_(DEFAULT_IS_DICTIONARY_ENABLED),
dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
pagesize_(DEFAULT_PAGE_SIZE),
- version_(DEFAULT_WRITER_VERSION) {}
+ version_(DEFAULT_WRITER_VERSION),
+ default_codec_(DEFAULT_COMPRESSION_TYPE) {}
virtual ~Builder() {}
Builder* allocator(MemoryAllocator* allocator) {
@@ -121,9 +129,25 @@ class WriterProperties {
return this;
}
+ Builder* compression(Compression::type codec) {
+ default_codec_ = codec;
+ return this;
+ }
+
+ Builder* compression(const std::string& path, Compression::type codec) {
+ codecs_[path] = codec;
+ return this;
+ }
+
+ Builder* compression(const std::shared_ptr<schema::ColumnPath>& path,
+ Compression::type codec) {
+ return this->compression(path->ToDotString(), codec);
+ }
+
std::shared_ptr<WriterProperties> build() {
return std::shared_ptr<WriterProperties>(new WriterProperties(
- allocator_, dictionary_enabled_, dictionary_pagesize_, pagesize_, version_));
+ allocator_, dictionary_enabled_, dictionary_pagesize_,
+ pagesize_, version_, default_codec_, codecs_));
}
private:
@@ -132,9 +156,11 @@ class WriterProperties {
int64_t dictionary_pagesize_;
int64_t pagesize_;
ParquetVersion::type version_;
+ Compression::type default_codec_;
+ ColumnCodecs codecs_;
};
- MemoryAllocator* allocator() { return allocator_; }
+ MemoryAllocator* allocator() const { return allocator_; }
bool dictionary_enabled() const { return dictionary_enabled_; }
@@ -142,16 +168,26 @@ class WriterProperties {
int64_t data_pagesize() const { return pagesize_; }
- ParquetVersion::type version() { return parquet_version_; }
+ ParquetVersion::type version() const { return parquet_version_; }
+
+ Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
+ auto it = codecs_.find(path->ToDotString());
+ if (it != codecs_.end())
+ return it->second;
+ return default_codec_;
+ }
private:
explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled,
- int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version)
+ int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version,
+ Compression::type default_codec, const ColumnCodecs& codecs)
: allocator_(allocator),
dictionary_enabled_(dictionary_enabled),
dictionary_pagesize_(dictionary_pagesize),
pagesize_(pagesize),
- parquet_version_(version) {
+ parquet_version_(version),
+ default_codec_(default_codec),
+ codecs_(codecs) {
pagesize_ = DEFAULT_PAGE_SIZE;
dictionary_enabled_ = DEFAULT_IS_DICTIONARY_ENABLED;
}
@@ -161,6 +197,8 @@ class WriterProperties {
int64_t dictionary_pagesize_;
int64_t pagesize_;
ParquetVersion::type parquet_version_;
+ Compression::type default_codec_;
+ ColumnCodecs codecs_;
};
std::shared_ptr<WriterProperties> default_writer_properties();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 194e496..ca7bb45 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -61,40 +61,54 @@ class TestSerialize : public ::testing::Test {
protected:
NodePtr node_;
SchemaDescriptor schema_;
+
+ void FileSerializeTest(Compression::type codec_type) {
+ std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+ auto gnode = std::static_pointer_cast<GroupNode>(node_);
+ std::shared_ptr<WriterProperties> writer_properties =
+ WriterProperties::Builder().compression("schema.int64", codec_type)->build();
+ auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
+ auto row_group_writer = file_writer->AppendRowGroup(100);
+ auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
+ std::vector<int64_t> values(100, 128);
+ column_writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
+ column_writer->Close();
+ row_group_writer->Close();
+ file_writer->Close();
+
+ auto buffer = sink->GetBuffer();
+ std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
+ auto file_reader = ParquetFileReader::Open(std::move(source));
+ ASSERT_EQ(1, file_reader->num_columns());
+ ASSERT_EQ(1, file_reader->num_row_groups());
+ ASSERT_EQ(100, file_reader->num_rows());
+
+ auto rg_reader = file_reader->RowGroup(0);
+ ASSERT_EQ(1, rg_reader->num_columns());
+ ASSERT_EQ(100, rg_reader->num_rows());
+
+ auto col_reader = std::static_pointer_cast<Int64Reader>(rg_reader->Column(0));
+ std::vector<int64_t> values_out(100);
+ std::vector<int16_t> def_levels_out(100);
+ std::vector<int16_t> rep_levels_out(100);
+ int64_t values_read;
+ col_reader->ReadBatch(values_out.size(), def_levels_out.data(), rep_levels_out.data(),
+ values_out.data(), &values_read);
+ ASSERT_EQ(100, values_read);
+ ASSERT_EQ(values, values_out);
+ }
};
-TEST_F(TestSerialize, SmallFile) {
- std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
- auto gnode = std::static_pointer_cast<GroupNode>(node_);
- auto file_writer = ParquetFileWriter::Open(sink, gnode);
- auto row_group_writer = file_writer->AppendRowGroup(100);
- auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
- std::vector<int64_t> values(100, 128);
- column_writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
- column_writer->Close();
- row_group_writer->Close();
- file_writer->Close();
-
- auto buffer = sink->GetBuffer();
- std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
- auto file_reader = ParquetFileReader::Open(std::move(source));
- ASSERT_EQ(1, file_reader->num_columns());
- ASSERT_EQ(1, file_reader->num_row_groups());
- ASSERT_EQ(100, file_reader->num_rows());
-
- auto rg_reader = file_reader->RowGroup(0);
- ASSERT_EQ(1, rg_reader->num_columns());
- ASSERT_EQ(100, rg_reader->num_rows());
-
- auto col_reader = std::static_pointer_cast<Int64Reader>(rg_reader->Column(0));
- std::vector<int64_t> values_out(100);
- std::vector<int16_t> def_levels_out(100);
- std::vector<int16_t> rep_levels_out(100);
- int64_t values_read;
- col_reader->ReadBatch(values_out.size(), def_levels_out.data(), rep_levels_out.data(),
- values_out.data(), &values_read);
- ASSERT_EQ(100, values_read);
- ASSERT_EQ(values, values_out);
+TEST_F(TestSerialize, SmallFileUncompressed) {
+ FileSerializeTest(Compression::UNCOMPRESSED);
+}
+
+TEST_F(TestSerialize, SmallFileSnappy) {
+ FileSerializeTest(Compression::SNAPPY);
+}
+
+TEST_F(TestSerialize, SmallFileGzip) {
+ FileSerializeTest(Compression::GZIP);
}
} // namespace test
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index f9ff486..a90f28b 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -77,13 +77,15 @@ int64_t SerializedPageWriter::WriteDataPage(int32_t num_rows, int32_t num_values
int64_t compressed_size = uncompressed_size;
std::shared_ptr<OwnedMutableBuffer> compressed_data = uncompressed_data;
if (compressor_) {
- // TODO(PARQUET-592): Add support for compression
- // int64_t max_compressed_size = compressor_->MaxCompressedLen(
- // uncompressed_data.size(), uncompressed_data.data());
- // OwnedMutableBuffer compressed_data(compressor_->MaxCompressedLen(
- // uncompressed_data.size(), uncompressed_data.data()));
+ const uint8_t* uncompressed_ptr = uncompressed_data->data();
+ int64_t max_compressed_size = compressor_->MaxCompressedLen(
+ uncompressed_size, uncompressed_ptr);
+ compressed_data = std::make_shared<OwnedMutableBuffer>(max_compressed_size,
+ allocator_);
+ compressed_size = compressor_->Compress(uncompressed_size, uncompressed_ptr,
+ max_compressed_size, compressed_data->mutable_data());
}
- // Compressed data is not needed anymore, so immediately get rid of it.
+ // Uncompressed data is not needed anymore, so immediately get rid of it.
uncompressed_data.reset();
format::DataPageHeader data_page_header;
@@ -103,7 +105,7 @@ int64_t SerializedPageWriter::WriteDataPage(int32_t num_rows, int32_t num_values
int64_t start_pos = sink_->Tell();
SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
int64_t header_size = sink_->Tell() - start_pos;
- sink_->Write(compressed_data->data(), compressed_data->size());
+ sink_->Write(compressed_data->data(), compressed_size);
metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size;
metadata_->meta_data.total_compressed_size += compressed_size + header_size;
@@ -140,8 +142,8 @@ ColumnWriter* RowGroupSerializer::NextColumn() {
col_meta->__isset.meta_data = true;
col_meta->meta_data.__set_type(ToThrift(column_descr->physical_type()));
col_meta->meta_data.__set_path_in_schema(column_descr->path()->ToDotVector());
- std::unique_ptr<PageWriter> pager(
- new SerializedPageWriter(sink_, Compression::UNCOMPRESSED, col_meta, allocator_));
+ std::unique_ptr<PageWriter> pager(new SerializedPageWriter(sink_,
+ properties_->compression(column_descr->path()), col_meta, allocator_));
current_column_writer_ =
ColumnWriter::Make(column_descr, std::move(pager), num_rows_, allocator_);
return current_column_writer_.get();
@@ -168,9 +170,9 @@ void RowGroupSerializer::Close() {
std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open(
std::shared_ptr<OutputStream> sink, std::shared_ptr<GroupNode>& schema,
- MemoryAllocator* allocator, const std::shared_ptr<WriterProperties>& properties) {
+ const std::shared_ptr<WriterProperties>& properties) {
std::unique_ptr<ParquetFileWriter::Contents> result(
- new FileSerializer(sink, schema, allocator, properties));
+ new FileSerializer(sink, schema, properties));
return result;
}
@@ -213,7 +215,8 @@ RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
row_group_metadata_.resize(rgm_size + 1);
format::RowGroup* rg_metadata = &row_group_metadata_.data()[rgm_size];
std::unique_ptr<RowGroupWriter::Contents> contents(
- new RowGroupSerializer(num_rows, &schema_, sink_.get(), rg_metadata, allocator_));
+ new RowGroupSerializer(num_rows, &schema_, sink_.get(),
+ rg_metadata, properties().get()));
row_group_writer_.reset(new RowGroupWriter(std::move(contents), allocator_));
return row_group_writer_.get();
}
@@ -247,10 +250,10 @@ void FileSerializer::WriteMetaData() {
}
FileSerializer::FileSerializer(std::shared_ptr<OutputStream> sink,
- std::shared_ptr<GroupNode>& schema, MemoryAllocator* allocator,
+ std::shared_ptr<GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties)
: sink_(sink),
- allocator_(allocator),
+ allocator_(properties->allocator()),
num_row_groups_(0),
num_rows_(0),
is_open_(true),
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 8ab025c..c49a076 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -67,12 +67,13 @@ class SerializedPageWriter : public PageWriter {
class RowGroupSerializer : public RowGroupWriter::Contents {
public:
RowGroupSerializer(int64_t num_rows, const SchemaDescriptor* schema, OutputStream* sink,
- format::RowGroup* metadata, MemoryAllocator* allocator)
+ format::RowGroup* metadata, const WriterProperties* properties)
: num_rows_(num_rows),
schema_(schema),
sink_(sink),
metadata_(metadata),
- allocator_(allocator),
+ allocator_(properties->allocator()),
+ properties_(properties),
total_bytes_written_(0),
closed_(false),
current_column_index_(-1) {
@@ -96,6 +97,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
OutputStream* sink_;
format::RowGroup* metadata_;
MemoryAllocator* allocator_;
+ const WriterProperties* properties_;
int64_t total_bytes_written_;
bool closed_;
@@ -110,7 +112,6 @@ class FileSerializer : public ParquetFileWriter::Contents {
public:
static std::unique_ptr<ParquetFileWriter::Contents> Open(
std::shared_ptr<OutputStream> sink, std::shared_ptr<schema::GroupNode>& schema,
- MemoryAllocator* allocator = default_allocator(),
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
void Close() override;
@@ -127,7 +128,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
private:
explicit FileSerializer(std::shared_ptr<OutputStream> sink,
- std::shared_ptr<schema::GroupNode>& schema, MemoryAllocator* allocator,
+ std::shared_ptr<schema::GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties);
std::shared_ptr<OutputStream> sink_;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/file/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
index 8f643c2..84bbe8d 100644
--- a/src/parquet/file/writer.cc
+++ b/src/parquet/file/writer.cc
@@ -55,8 +55,8 @@ ParquetFileWriter::~ParquetFileWriter() {
std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
std::shared_ptr<OutputStream> sink, std::shared_ptr<GroupNode>& schema,
- MemoryAllocator* allocator, const std::shared_ptr<WriterProperties>& properties) {
- auto contents = FileSerializer::Open(sink, schema, allocator);
+ const std::shared_ptr<WriterProperties>& properties) {
+ auto contents = FileSerializer::Open(sink, schema, properties);
std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
result->Open(std::move(contents));
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aac1f2aa/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index 15b590d..e213fbe 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -106,7 +106,6 @@ class ParquetFileWriter {
static std::unique_ptr<ParquetFileWriter> Open(std::shared_ptr<OutputStream> sink,
std::shared_ptr<schema::GroupNode>& schema,
- MemoryAllocator* allocator = default_allocator(),
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
void Open(std::unique_ptr<Contents> contents);
@@ -143,7 +142,7 @@ class ParquetFileWriter {
int num_row_groups() const;
/**
- * Configuartion passed to the writer, e.g. the used Parquet format version.
+ * Configuration passed to the writer, e.g. the used Parquet format version.
*/
const std::shared_ptr<WriterProperties>& properties() const;