You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/06/22 07:37:04 UTC

parquet-cpp git commit: PARQUET-636: Expose selection for different encodings

Repository: parquet-cpp
Updated Branches:
  refs/heads/master f334a8b97 -> 53475c71a


PARQUET-636: Expose selection for different encodings

Yet we still only support PLAIN encoding. Will implement the other encodings in separate PRs to not have huge changesets.

Author: Uwe L. Korn <uw...@xhochy.com>

Closes #122 from xhochy/parquet-636 and squashes the following commits:

b98a575 [Uwe L. Korn] Lint fixes
37f1b7b [Uwe L. Korn] Add comment to describe the column default/specific vars
7ef8b12 [Uwe L. Korn] PARQUET-636: Expose selection for different encodings


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/53475c71
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/53475c71
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/53475c71

Branch: refs/heads/master
Commit: 53475c71a17005f13c101ab341c8aee96da4636d
Parents: f334a8b
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Wed Jun 22 00:36:57 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Wed Jun 22 00:36:57 2016 -0700

----------------------------------------------------------------------
 src/parquet/column/column-io-benchmark.cc |  2 +-
 src/parquet/column/column-writer-test.cc  | 59 ++++++++++++++++++++------
 src/parquet/column/properties.h           | 57 +++++++++++++++++++------
 src/parquet/column/writer.cc              | 33 ++++++++------
 src/parquet/column/writer.h               |  6 ++-
 src/parquet/file/writer-internal.cc       | 19 ++++-----
 6 files changed, 126 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/column/column-io-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-io-benchmark.cc b/src/parquet/column/column-io-benchmark.cc
index 3bc2582..10272b2 100644
--- a/src/parquet/column/column-io-benchmark.cc
+++ b/src/parquet/column/column-io-benchmark.cc
@@ -35,7 +35,7 @@ std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
   std::unique_ptr<SerializedPageWriter> pager(
       new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
   return std::unique_ptr<Int64Writer>(
-      new Int64Writer(schema, std::move(pager), output_size));
+      new Int64Writer(schema, std::move(pager), output_size, Encoding::PLAIN));
 }
 
 std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index 5d89c1a..f78ab5a 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -84,12 +84,12 @@ class TestPrimitiveWriter : public ::testing::Test {
   }
 
   std::unique_ptr<TypedColumnWriter<TestType>> BuildWriter(
-      int64_t output_size = SMALL_SIZE) {
+      int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) {
     sink_.reset(new InMemoryOutputStream());
     std::unique_ptr<SerializedPageWriter> pager(
         new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_));
-    return std::unique_ptr<TypedColumnWriter<TestType>>(
-        new TypedColumnWriter<TestType>(schema_.get(), std::move(pager), output_size));
+    return std::unique_ptr<TypedColumnWriter<TestType>>(new TypedColumnWriter<TestType>(
+        schema_.get(), std::move(pager), output_size, encoding));
   }
 
   void SyncValuesOut();
@@ -100,6 +100,20 @@ class TestPrimitiveWriter : public ::testing::Test {
     SyncValuesOut();
   }
 
+  void TestRequiredWithEncoding(Encoding::type encoding) {
+    this->GenerateData(SMALL_SIZE);
+
+    // Test case 1: required and non-repeated, so no definition or repetition levels
+    std::unique_ptr<TypedColumnWriter<TestType>> writer =
+        this->BuildWriter(SMALL_SIZE, encoding);
+    writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+    writer->Close();
+
+    this->ReadColumn();
+    ASSERT_EQ(SMALL_SIZE, this->values_read_);
+    ASSERT_EQ(this->values_, this->values_out_);
+  }
+
  protected:
   int64_t values_read_;
   // Keep the reader alive as for ByteArray the lifetime of the ByteArray
@@ -172,18 +186,39 @@ typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
 
 TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
 
-TYPED_TEST(TestPrimitiveWriter, Required) {
-  this->GenerateData(SMALL_SIZE);
+TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
+  this->TestRequiredWithEncoding(Encoding::PLAIN);
+}
 
-  // Test case 1: required and non-repeated, so no definition or repetition levels
-  std::unique_ptr<TypedColumnWriter<TypeParam>> writer = this->BuildWriter();
-  writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
-  writer->Close();
+/*
+TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) {
+  this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY);
+}
 
-  this->ReadColumn();
-  ASSERT_EQ(SMALL_SIZE, this->values_read_);
-  ASSERT_EQ(this->values_, this->values_out_);
+TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
+  this->TestRequiredWithEncoding(Encoding::RLE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
+  this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
+  this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
+  this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
+  this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
+  this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
 }
+*/
 
 TYPED_TEST(TestPrimitiveWriter, Optional) {
   // Optional and non-repeated, with definition levels

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
index ee74290..4296229 100644
--- a/src/parquet/column/properties.h
+++ b/src/parquet/column/properties.h
@@ -22,10 +22,10 @@
 #include <string>
 #include <unordered_map>
 
-#include "parquet/util/input.h"
-#include "parquet/util/mem-allocator.h"
 #include "parquet/types.h"
 #include "parquet/schema/types.h"
+#include "parquet/util/input.h"
+#include "parquet/util/mem-allocator.h"
 
 namespace parquet {
 
@@ -79,10 +79,10 @@ ReaderProperties default_reader_properties();
 static int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
 static int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
 static bool DEFAULT_IS_DICTIONARY_ENABLED = true;
+static Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
 static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
     ParquetVersion::PARQUET_1_0;
-static constexpr Compression::type DEFAULT_COMPRESSION_TYPE =
-    Compression::UNCOMPRESSED;
+static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
 
 using ColumnCodecs = std::unordered_map<std::string, Compression::type>;
 
@@ -94,6 +94,7 @@ class WriterProperties {
         : allocator_(default_allocator()),
           dictionary_enabled_(DEFAULT_IS_DICTIONARY_ENABLED),
           dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
+          default_encoding_(DEFAULT_ENCODING),
           pagesize_(DEFAULT_PAGE_SIZE),
           version_(DEFAULT_WRITER_VERSION),
           default_codec_(DEFAULT_COMPRESSION_TYPE) {}
@@ -124,6 +125,21 @@ class WriterProperties {
       return this;
     }
 
+    Builder* encoding(
+        const std::shared_ptr<schema::ColumnPath>& path, Encoding::type encoding_type) {
+      return encoding(path->ToDotString(), encoding_type);
+    }
+
+    Builder* encoding(const std::string& column_path, Encoding::type encoding_type) {
+      encodings_[column_path] = encoding_type;
+      return this;
+    }
+
+    Builder* encoding(Encoding::type encoding_type) {
+      default_encoding_ = encoding_type;
+      return this;
+    }
+
     Builder* version(ParquetVersion::type version) {
       version_ = version;
       return this;
@@ -139,14 +155,14 @@ class WriterProperties {
       return this;
     }
 
-    Builder* compression(const std::shared_ptr<schema::ColumnPath>& path,
-                         Compression::type codec) {
+    Builder* compression(
+        const std::shared_ptr<schema::ColumnPath>& path, Compression::type codec) {
       return this->compression(path->ToDotString(), codec);
     }
 
     std::shared_ptr<WriterProperties> build() {
-      return std::shared_ptr<WriterProperties>(new WriterProperties(
-          allocator_, dictionary_enabled_, dictionary_pagesize_,
+      return std::shared_ptr<WriterProperties>(new WriterProperties(allocator_,
+          dictionary_enabled_, dictionary_pagesize_, default_encoding_, encodings_,
           pagesize_, version_, default_codec_, codecs_));
     }
 
@@ -154,8 +170,14 @@ class WriterProperties {
     MemoryAllocator* allocator_;
     bool dictionary_enabled_;
     int64_t dictionary_pagesize_;
+    // Encoding used for each column if not a specialized one is defined as
+    // part of encodings_
+    Encoding::type default_encoding_;
+    std::unordered_map<std::string, Encoding::type> encodings_;
     int64_t pagesize_;
     ParquetVersion::type version_;
+    // Default compression codec. This will be used for all columns that do
+    // not have a specific codec set as part of codecs_
     Compression::type default_codec_;
     ColumnCodecs codecs_;
   };
@@ -170,20 +192,29 @@ class WriterProperties {
 
   ParquetVersion::type version() const { return parquet_version_; }
 
+  Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
+    auto it = encodings_.find(path->ToDotString());
+    if (it != encodings_.end()) { return it->second; }
+    return default_encoding_;
+  }
+
   Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
     auto it = codecs_.find(path->ToDotString());
-    if (it != codecs_.end())
-      return it->second;
+    if (it != codecs_.end()) return it->second;
     return default_codec_;
   }
 
  private:
   explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled,
-      int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version,
-      Compression::type default_codec, const ColumnCodecs& codecs)
+      int64_t dictionary_pagesize, Encoding::type default_encoding,
+      const std::unordered_map<std::string, Encoding::type>& encodings, int64_t pagesize,
+      ParquetVersion::type version, Compression::type default_codec,
+      const ColumnCodecs& codecs)
       : allocator_(allocator),
         dictionary_enabled_(dictionary_enabled),
         dictionary_pagesize_(dictionary_pagesize),
+        default_encoding_(default_encoding),
+        encodings_(encodings),
         pagesize_(pagesize),
         parquet_version_(version),
         default_codec_(default_codec),
@@ -195,6 +226,8 @@ class WriterProperties {
   MemoryAllocator* allocator_;
   bool dictionary_enabled_;
   int64_t dictionary_pagesize_;
+  Encoding::type default_encoding_;
+  std::unordered_map<std::string, Encoding::type> encodings_;
   int64_t pagesize_;
   ParquetVersion::type parquet_version_;
   Compression::type default_codec_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 8856f51..482265e 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -128,11 +128,17 @@ int64_t ColumnWriter::Close() {
 
 template <typename Type>
 TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* schema,
-    std::unique_ptr<PageWriter> pager, int64_t expected_rows, MemoryAllocator* allocator)
+    std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
+    MemoryAllocator* allocator)
     : ColumnWriter(schema, std::move(pager), expected_rows, allocator) {
-  // TODO(PARQUET-590) Get decoder type from WriterProperties
-  current_encoder_ =
-      std::unique_ptr<EncoderType>(new PlainEncoder<Type>(schema, allocator));
+  switch (encoding) {
+    case Encoding::PLAIN:
+      current_encoder_ =
+          std::unique_ptr<EncoderType>(new PlainEncoder<Type>(schema, allocator));
+      break;
+    default:
+      ParquetException::NYI("Selected encoding is not supported");
+  }
 }
 
 // ----------------------------------------------------------------------
@@ -140,32 +146,33 @@ TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* schema,
 
 std::shared_ptr<ColumnWriter> ColumnWriter::Make(const ColumnDescriptor* descr,
     std::unique_ptr<PageWriter> pager, int64_t expected_rows,
-    MemoryAllocator* allocator) {
+    const WriterProperties* properties) {
+  Encoding::type encoding = properties->encoding(descr->path());
   switch (descr->physical_type()) {
     case Type::BOOLEAN:
       return std::make_shared<BoolWriter>(
-          descr, std::move(pager), expected_rows, allocator);
+          descr, std::move(pager), expected_rows, encoding, properties->allocator());
     case Type::INT32:
       return std::make_shared<Int32Writer>(
-          descr, std::move(pager), expected_rows, allocator);
+          descr, std::move(pager), expected_rows, encoding, properties->allocator());
     case Type::INT64:
       return std::make_shared<Int64Writer>(
-          descr, std::move(pager), expected_rows, allocator);
+          descr, std::move(pager), expected_rows, encoding, properties->allocator());
     case Type::INT96:
       return std::make_shared<Int96Writer>(
-          descr, std::move(pager), expected_rows, allocator);
+          descr, std::move(pager), expected_rows, encoding, properties->allocator());
     case Type::FLOAT:
       return std::make_shared<FloatWriter>(
-          descr, std::move(pager), expected_rows, allocator);
+          descr, std::move(pager), expected_rows, encoding, properties->allocator());
     case Type::DOUBLE:
       return std::make_shared<DoubleWriter>(
-          descr, std::move(pager), expected_rows, allocator);
+          descr, std::move(pager), expected_rows, encoding, properties->allocator());
     case Type::BYTE_ARRAY:
       return std::make_shared<ByteArrayWriter>(
-          descr, std::move(pager), expected_rows, allocator);
+          descr, std::move(pager), expected_rows, encoding, properties->allocator());
     case Type::FIXED_LEN_BYTE_ARRAY:
       return std::make_shared<FixedLenByteArrayWriter>(
-          descr, std::move(pager), expected_rows, allocator);
+          descr, std::move(pager), expected_rows, encoding, properties->allocator());
     default:
       ParquetException::NYI("type reader not implemented");
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index 24f647a..93c66a4 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -20,6 +20,7 @@
 
 #include "parquet/column/levels.h"
 #include "parquet/column/page.h"
+#include "parquet/column/properties.h"
 #include "parquet/encodings/encoder.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/types.h"
@@ -35,7 +36,7 @@ class ColumnWriter {
 
   static std::shared_ptr<ColumnWriter> Make(const ColumnDescriptor*,
       std::unique_ptr<PageWriter>, int64_t expected_rows,
-      MemoryAllocator* allocator = default_allocator());
+      const WriterProperties* properties);
 
   Type::type type() const { return descr_->physical_type(); }
 
@@ -103,7 +104,8 @@ class TypedColumnWriter : public ColumnWriter {
   typedef typename DType::c_type T;
 
   TypedColumnWriter(const ColumnDescriptor* schema, std::unique_ptr<PageWriter> pager,
-      int64_t expected_rows, MemoryAllocator* allocator = default_allocator());
+      int64_t expected_rows, Encoding::type encoding,
+      MemoryAllocator* allocator = default_allocator());
 
   // Write a batch of repetition levels, definition levels, and values to the
   // column.

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index a90f28b..e0b1f66 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -78,10 +78,10 @@ int64_t SerializedPageWriter::WriteDataPage(int32_t num_rows, int32_t num_values
   std::shared_ptr<OwnedMutableBuffer> compressed_data = uncompressed_data;
   if (compressor_) {
     const uint8_t* uncompressed_ptr = uncompressed_data->data();
-    int64_t max_compressed_size = compressor_->MaxCompressedLen(
-        uncompressed_size, uncompressed_ptr);
-    compressed_data = std::make_shared<OwnedMutableBuffer>(max_compressed_size,
-        allocator_);
+    int64_t max_compressed_size =
+        compressor_->MaxCompressedLen(uncompressed_size, uncompressed_ptr);
+    compressed_data =
+        std::make_shared<OwnedMutableBuffer>(max_compressed_size, allocator_);
     compressed_size = compressor_->Compress(uncompressed_size, uncompressed_ptr,
         max_compressed_size, compressed_data->mutable_data());
   }
@@ -142,10 +142,10 @@ ColumnWriter* RowGroupSerializer::NextColumn() {
   col_meta->__isset.meta_data = true;
   col_meta->meta_data.__set_type(ToThrift(column_descr->physical_type()));
   col_meta->meta_data.__set_path_in_schema(column_descr->path()->ToDotVector());
-  std::unique_ptr<PageWriter> pager(new SerializedPageWriter(sink_,
-      properties_->compression(column_descr->path()), col_meta, allocator_));
+  std::unique_ptr<PageWriter> pager(new SerializedPageWriter(
+      sink_, properties_->compression(column_descr->path()), col_meta, allocator_));
   current_column_writer_ =
-      ColumnWriter::Make(column_descr, std::move(pager), num_rows_, allocator_);
+      ColumnWriter::Make(column_descr, std::move(pager), num_rows_, properties_);
   return current_column_writer_.get();
 }
 
@@ -214,9 +214,8 @@ RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
   auto rgm_size = row_group_metadata_.size();
   row_group_metadata_.resize(rgm_size + 1);
   format::RowGroup* rg_metadata = &row_group_metadata_.data()[rgm_size];
-  std::unique_ptr<RowGroupWriter::Contents> contents(
-      new RowGroupSerializer(num_rows, &schema_, sink_.get(),
-                             rg_metadata, properties().get()));
+  std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
+      num_rows, &schema_, sink_.get(), rg_metadata, properties_.get()));
   row_group_writer_.reset(new RowGroupWriter(std::move(contents), allocator_));
   return row_group_writer_.get();
 }