You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2016/09/16 07:01:36 UTC
parquet-cpp git commit: PARQUET-689: C++: Compress DataPages eagerly
Repository: parquet-cpp
Updated Branches:
refs/heads/master 942f2aedb -> ffeb828ac
PARQUET-689: C++: Compress DataPages eagerly
Author: Deepak Majeti <de...@hpe.com>
Closes #162 from majetideepak/PARQUET-689 and squashes the following commits:
46f04fb [Deepak Majeti] Clang format
73dfcf9 [Deepak Majeti] Compress Data Pages early
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/ffeb828a
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/ffeb828a
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/ffeb828a
Branch: refs/heads/master
Commit: ffeb828ac5bf19abe5990a2be9245a8fdd292c7a
Parents: 942f2ae
Author: Deepak Majeti <de...@hpe.com>
Authored: Fri Sep 16 09:01:00 2016 +0200
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Fri Sep 16 09:01:00 2016 +0200
----------------------------------------------------------------------
src/parquet/column/page.h | 19 ++++++++++++++++++-
src/parquet/column/writer.cc | 8 +++++---
src/parquet/column/writer.h | 4 ++--
src/parquet/file/writer-internal.cc | 6 +++---
src/parquet/file/writer-internal.h | 18 +++++++++---------
5 files changed, 37 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index c06d3de..1de6013 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -95,6 +95,21 @@ class DataPage : public Page {
std::string min_;
};
+class CompressedDataPage : public DataPage {
+ public:
+ CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
+ Encoding::type encoding, Encoding::type definition_level_encoding,
+ Encoding::type repetition_level_encoding, int64_t uncompressed_size)
+ : DataPage(buffer, num_values, encoding, definition_level_encoding,
+ repetition_level_encoding),
+ uncompressed_size_(uncompressed_size) {}
+
+ int64_t uncompressed_size() const { return uncompressed_size_; }
+
+ private:
+ int64_t uncompressed_size_;
+};
+
class DataPageV2 : public Page {
public:
DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls,
@@ -176,9 +191,11 @@ class PageWriter {
// page limit
virtual void Close(bool has_dictionary, bool fallback) = 0;
- virtual int64_t WriteDataPage(const DataPage& page) = 0;
+ virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
+
+ virtual std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) = 0;
};
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 1fbea62..bfbd0c5 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -116,8 +116,10 @@ void ColumnWriter::AddDataPage() {
memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size());
uncompressed_ptr += definition_levels->size();
memcpy(uncompressed_ptr, values->data(), values->size());
- DataPage page(
- uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE);
+
+ std::shared_ptr<Buffer> compressed_data = pager_->Compress(uncompressed_data);
+ CompressedDataPage page(compressed_data, num_buffered_values_, encoding_, Encoding::RLE,
+ Encoding::RLE, uncompressed_size);
// Write the page to OutputStream eagerly if there is no dictionary or
// if dictionary encoding has fallen back to PLAIN
@@ -133,7 +135,7 @@ void ColumnWriter::AddDataPage() {
num_buffered_encoded_values_ = 0;
}
-void ColumnWriter::WriteDataPage(const DataPage& page) {
+void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
int64_t bytes_written = pager_->WriteDataPage(page);
total_bytes_written_ += bytes_written;
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index 4b2a021..3a54cbb 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -72,7 +72,7 @@ class PARQUET_EXPORT ColumnWriter {
void AddDataPage();
// Serializes Data Pages
- void WriteDataPage(const DataPage& page);
+ void WriteDataPage(const CompressedDataPage& page);
// Write multiple definition levels
void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
@@ -128,7 +128,7 @@ class PARQUET_EXPORT ColumnWriter {
std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
- std::vector<DataPage> data_pages_;
+ std::vector<CompressedDataPage> data_pages_;
private:
void InitSinks();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index 2d396b7..05aefb9 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -66,9 +66,9 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress(
return compression_buffer_;
}
-int64_t SerializedPageWriter::WriteDataPage(const DataPage& page) {
- int64_t uncompressed_size = page.size();
- std::shared_ptr<Buffer> compressed_data = Compress(page.buffer());
+int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) {
+ int64_t uncompressed_size = page.uncompressed_size();
+ std::shared_ptr<Buffer> compressed_data = page.buffer();
format::DataPageHeader data_page_header;
data_page_header.__set_num_values(page.num_values());
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index e6364e9..2095154 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -40,10 +40,18 @@ class SerializedPageWriter : public PageWriter {
virtual ~SerializedPageWriter() {}
- int64_t WriteDataPage(const DataPage& page) override;
+ int64_t WriteDataPage(const CompressedDataPage& page) override;
int64_t WriteDictionaryPage(const DictionaryPage& page) override;
+ /**
+ * Compress a buffer.
+ *
+ * This method may return compression_buffer_ and thus the resulting memory
+ * is only valid until the next call to Compress().
+ */
+ std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) override;
+
void Close(bool has_dictionary, bool fallback) override;
private:
@@ -58,14 +66,6 @@ class SerializedPageWriter : public PageWriter {
// Compression codec to use.
std::unique_ptr<Codec> compressor_;
std::shared_ptr<OwnedMutableBuffer> compression_buffer_;
-
- /**
- * Compress a buffer.
- *
- * This method may return compression_buffer_ and thus the resulting memory
- * is only valid until the next call to Compress().
- */
- std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer);
};
// RowGroupWriter::Contents implementation for the Parquet file specification