You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2016/09/16 07:01:36 UTC

parquet-cpp git commit: PARQUET-689: C++: Compress DataPages eagerly

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 942f2aedb -> ffeb828ac


PARQUET-689: C++: Compress DataPages eagerly

Author: Deepak Majeti <de...@hpe.com>

Closes #162 from majetideepak/PARQUET-689 and squashes the following commits:

46f04fb [Deepak Majeti] Clang format
73dfcf9 [Deepak Majeti] Compress Data Pages early


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/ffeb828a
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/ffeb828a
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/ffeb828a

Branch: refs/heads/master
Commit: ffeb828ac5bf19abe5990a2be9245a8fdd292c7a
Parents: 942f2ae
Author: Deepak Majeti <de...@hpe.com>
Authored: Fri Sep 16 09:01:00 2016 +0200
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Fri Sep 16 09:01:00 2016 +0200

----------------------------------------------------------------------
 src/parquet/column/page.h           | 19 ++++++++++++++++++-
 src/parquet/column/writer.cc        |  8 +++++---
 src/parquet/column/writer.h         |  4 ++--
 src/parquet/file/writer-internal.cc |  6 +++---
 src/parquet/file/writer-internal.h  | 18 +++++++++---------
 5 files changed, 37 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index c06d3de..1de6013 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -95,6 +95,21 @@ class DataPage : public Page {
   std::string min_;
 };
 
+class CompressedDataPage : public DataPage {
+ public:
+  CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
+      Encoding::type encoding, Encoding::type definition_level_encoding,
+      Encoding::type repetition_level_encoding, int64_t uncompressed_size)
+      : DataPage(buffer, num_values, encoding, definition_level_encoding,
+            repetition_level_encoding),
+        uncompressed_size_(uncompressed_size) {}
+
+  int64_t uncompressed_size() const { return uncompressed_size_; }
+
+ private:
+  int64_t uncompressed_size_;
+};
+
 class DataPageV2 : public Page {
  public:
   DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls,
@@ -176,9 +191,11 @@ class PageWriter {
   // page limit
   virtual void Close(bool has_dictionary, bool fallback) = 0;
 
-  virtual int64_t WriteDataPage(const DataPage& page) = 0;
+  virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
 
   virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
+
+  virtual std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) = 0;
 };
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 1fbea62..bfbd0c5 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -116,8 +116,10 @@ void ColumnWriter::AddDataPage() {
   memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size());
   uncompressed_ptr += definition_levels->size();
   memcpy(uncompressed_ptr, values->data(), values->size());
-  DataPage page(
-      uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE);
+
+  std::shared_ptr<Buffer> compressed_data = pager_->Compress(uncompressed_data);
+  CompressedDataPage page(compressed_data, num_buffered_values_, encoding_, Encoding::RLE,
+      Encoding::RLE, uncompressed_size);
 
   // Write the page to OutputStream eagerly if there is no dictionary or
   // if dictionary encoding has fallen back to PLAIN
@@ -133,7 +135,7 @@ void ColumnWriter::AddDataPage() {
   num_buffered_encoded_values_ = 0;
 }
 
-void ColumnWriter::WriteDataPage(const DataPage& page) {
+void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
   int64_t bytes_written = pager_->WriteDataPage(page);
   total_bytes_written_ += bytes_written;
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index 4b2a021..3a54cbb 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -72,7 +72,7 @@ class PARQUET_EXPORT ColumnWriter {
   void AddDataPage();
 
   // Serializes Data Pages
-  void WriteDataPage(const DataPage& page);
+  void WriteDataPage(const CompressedDataPage& page);
 
   // Write multiple definition levels
   void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
@@ -128,7 +128,7 @@ class PARQUET_EXPORT ColumnWriter {
   std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
   std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
 
-  std::vector<DataPage> data_pages_;
+  std::vector<CompressedDataPage> data_pages_;
 
  private:
   void InitSinks();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index 2d396b7..05aefb9 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -66,9 +66,9 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress(
   return compression_buffer_;
 }
 
-int64_t SerializedPageWriter::WriteDataPage(const DataPage& page) {
-  int64_t uncompressed_size = page.size();
-  std::shared_ptr<Buffer> compressed_data = Compress(page.buffer());
+int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) {
+  int64_t uncompressed_size = page.uncompressed_size();
+  std::shared_ptr<Buffer> compressed_data = page.buffer();
 
   format::DataPageHeader data_page_header;
   data_page_header.__set_num_values(page.num_values());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index e6364e9..2095154 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -40,10 +40,18 @@ class SerializedPageWriter : public PageWriter {
 
   virtual ~SerializedPageWriter() {}
 
-  int64_t WriteDataPage(const DataPage& page) override;
+  int64_t WriteDataPage(const CompressedDataPage& page) override;
 
   int64_t WriteDictionaryPage(const DictionaryPage& page) override;
 
+  /**
+   * Compress a buffer.
+   *
+   * This method may return compression_buffer_ and thus the resulting memory
+   * is only valid until the next call to Compress().
+   */
+  std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) override;
+
   void Close(bool has_dictionary, bool fallback) override;
 
  private:
@@ -58,14 +66,6 @@ class SerializedPageWriter : public PageWriter {
   // Compression codec to use.
   std::unique_ptr<Codec> compressor_;
   std::shared_ptr<OwnedMutableBuffer> compression_buffer_;
-
-  /**
-   * Compress a buffer.
-   *
-   * This method may return compression_buffer_ and thus the resulting memory
-   * is only valid until the next call to Compress().
-   */
-  std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer);
 };
 
 // RowGroupWriter::Contents implementation for the Parquet file specification