You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/06/22 07:37:04 UTC
parquet-cpp git commit: PARQUET-636: Expose selection for different
encodings
Repository: parquet-cpp
Updated Branches:
refs/heads/master f334a8b97 -> 53475c71a
PARQUET-636: Expose selection for different encodings
Yet we still only support PLAIN encoding. Will implement the other encodings in separate PRs to not have huge changesets.
Author: Uwe L. Korn <uw...@xhochy.com>
Closes #122 from xhochy/parquet-636 and squashes the following commits:
b98a575 [Uwe L. Korn] Lint fixes
37f1b7b [Uwe L. Korn] Add comment to describe the column default/specific vars
7ef8b12 [Uwe L. Korn] PARQUET-636: Expose selection for different encodings
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/53475c71
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/53475c71
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/53475c71
Branch: refs/heads/master
Commit: 53475c71a17005f13c101ab341c8aee96da4636d
Parents: f334a8b
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Wed Jun 22 00:36:57 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Wed Jun 22 00:36:57 2016 -0700
----------------------------------------------------------------------
src/parquet/column/column-io-benchmark.cc | 2 +-
src/parquet/column/column-writer-test.cc | 59 ++++++++++++++++++++------
src/parquet/column/properties.h | 57 +++++++++++++++++++------
src/parquet/column/writer.cc | 33 ++++++++------
src/parquet/column/writer.h | 6 ++-
src/parquet/file/writer-internal.cc | 19 ++++-----
6 files changed, 126 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/column/column-io-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-io-benchmark.cc b/src/parquet/column/column-io-benchmark.cc
index 3bc2582..10272b2 100644
--- a/src/parquet/column/column-io-benchmark.cc
+++ b/src/parquet/column/column-io-benchmark.cc
@@ -35,7 +35,7 @@ std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
return std::unique_ptr<Int64Writer>(
- new Int64Writer(schema, std::move(pager), output_size));
+ new Int64Writer(schema, std::move(pager), output_size, Encoding::PLAIN));
}
std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index 5d89c1a..f78ab5a 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -84,12 +84,12 @@ class TestPrimitiveWriter : public ::testing::Test {
}
std::unique_ptr<TypedColumnWriter<TestType>> BuildWriter(
- int64_t output_size = SMALL_SIZE) {
+ int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) {
sink_.reset(new InMemoryOutputStream());
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_));
- return std::unique_ptr<TypedColumnWriter<TestType>>(
- new TypedColumnWriter<TestType>(schema_.get(), std::move(pager), output_size));
+ return std::unique_ptr<TypedColumnWriter<TestType>>(new TypedColumnWriter<TestType>(
+ schema_.get(), std::move(pager), output_size, encoding));
}
void SyncValuesOut();
@@ -100,6 +100,20 @@ class TestPrimitiveWriter : public ::testing::Test {
SyncValuesOut();
}
+ void TestRequiredWithEncoding(Encoding::type encoding) {
+ this->GenerateData(SMALL_SIZE);
+
+ // Test case 1: required and non-repeated, so no definition or repetition levels
+ std::unique_ptr<TypedColumnWriter<TestType>> writer =
+ this->BuildWriter(SMALL_SIZE, encoding);
+ writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+ writer->Close();
+
+ this->ReadColumn();
+ ASSERT_EQ(SMALL_SIZE, this->values_read_);
+ ASSERT_EQ(this->values_, this->values_out_);
+ }
+
protected:
int64_t values_read_;
// Keep the reader alive as for ByteArray the lifetime of the ByteArray
@@ -172,18 +186,39 @@ typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
-TYPED_TEST(TestPrimitiveWriter, Required) {
- this->GenerateData(SMALL_SIZE);
+TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
+ this->TestRequiredWithEncoding(Encoding::PLAIN);
+}
- // Test case 1: required and non-repeated, so no definition or repetition levels
- std::unique_ptr<TypedColumnWriter<TypeParam>> writer = this->BuildWriter();
- writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
- writer->Close();
+/*
+TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) {
+ this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY);
+}
- this->ReadColumn();
- ASSERT_EQ(SMALL_SIZE, this->values_read_);
- ASSERT_EQ(this->values_, this->values_out_);
+TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
+ this->TestRequiredWithEncoding(Encoding::RLE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
+ this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
+ this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
+ this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
+ this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
+ this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
}
+*/
TYPED_TEST(TestPrimitiveWriter, Optional) {
// Optional and non-repeated, with definition levels
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
index ee74290..4296229 100644
--- a/src/parquet/column/properties.h
+++ b/src/parquet/column/properties.h
@@ -22,10 +22,10 @@
#include <string>
#include <unordered_map>
-#include "parquet/util/input.h"
-#include "parquet/util/mem-allocator.h"
#include "parquet/types.h"
#include "parquet/schema/types.h"
+#include "parquet/util/input.h"
+#include "parquet/util/mem-allocator.h"
namespace parquet {
@@ -79,10 +79,10 @@ ReaderProperties default_reader_properties();
static int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
static int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
static bool DEFAULT_IS_DICTIONARY_ENABLED = true;
+static Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
ParquetVersion::PARQUET_1_0;
-static constexpr Compression::type DEFAULT_COMPRESSION_TYPE =
- Compression::UNCOMPRESSED;
+static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
using ColumnCodecs = std::unordered_map<std::string, Compression::type>;
@@ -94,6 +94,7 @@ class WriterProperties {
: allocator_(default_allocator()),
dictionary_enabled_(DEFAULT_IS_DICTIONARY_ENABLED),
dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
+ default_encoding_(DEFAULT_ENCODING),
pagesize_(DEFAULT_PAGE_SIZE),
version_(DEFAULT_WRITER_VERSION),
default_codec_(DEFAULT_COMPRESSION_TYPE) {}
@@ -124,6 +125,21 @@ class WriterProperties {
return this;
}
+ Builder* encoding(
+ const std::shared_ptr<schema::ColumnPath>& path, Encoding::type encoding_type) {
+ return encoding(path->ToDotString(), encoding_type);
+ }
+
+ Builder* encoding(const std::string& column_path, Encoding::type encoding_type) {
+ encodings_[column_path] = encoding_type;
+ return this;
+ }
+
+ Builder* encoding(Encoding::type encoding_type) {
+ default_encoding_ = encoding_type;
+ return this;
+ }
+
Builder* version(ParquetVersion::type version) {
version_ = version;
return this;
@@ -139,14 +155,14 @@ class WriterProperties {
return this;
}
- Builder* compression(const std::shared_ptr<schema::ColumnPath>& path,
- Compression::type codec) {
+ Builder* compression(
+ const std::shared_ptr<schema::ColumnPath>& path, Compression::type codec) {
return this->compression(path->ToDotString(), codec);
}
std::shared_ptr<WriterProperties> build() {
- return std::shared_ptr<WriterProperties>(new WriterProperties(
- allocator_, dictionary_enabled_, dictionary_pagesize_,
+ return std::shared_ptr<WriterProperties>(new WriterProperties(allocator_,
+ dictionary_enabled_, dictionary_pagesize_, default_encoding_, encodings_,
pagesize_, version_, default_codec_, codecs_));
}
@@ -154,8 +170,14 @@ class WriterProperties {
MemoryAllocator* allocator_;
bool dictionary_enabled_;
int64_t dictionary_pagesize_;
+ // Encoding used for each column if not a specialized one is defined as
+ // part of encodings_
+ Encoding::type default_encoding_;
+ std::unordered_map<std::string, Encoding::type> encodings_;
int64_t pagesize_;
ParquetVersion::type version_;
+ // Default compression codec. This will be used for all columns that do
+ // not have a specific codec set as part of codecs_
Compression::type default_codec_;
ColumnCodecs codecs_;
};
@@ -170,20 +192,29 @@ class WriterProperties {
ParquetVersion::type version() const { return parquet_version_; }
+ Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
+ auto it = encodings_.find(path->ToDotString());
+ if (it != encodings_.end()) { return it->second; }
+ return default_encoding_;
+ }
+
Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
auto it = codecs_.find(path->ToDotString());
- if (it != codecs_.end())
- return it->second;
+ if (it != codecs_.end()) return it->second;
return default_codec_;
}
private:
explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled,
- int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version,
- Compression::type default_codec, const ColumnCodecs& codecs)
+ int64_t dictionary_pagesize, Encoding::type default_encoding,
+ const std::unordered_map<std::string, Encoding::type>& encodings, int64_t pagesize,
+ ParquetVersion::type version, Compression::type default_codec,
+ const ColumnCodecs& codecs)
: allocator_(allocator),
dictionary_enabled_(dictionary_enabled),
dictionary_pagesize_(dictionary_pagesize),
+ default_encoding_(default_encoding),
+ encodings_(encodings),
pagesize_(pagesize),
parquet_version_(version),
default_codec_(default_codec),
@@ -195,6 +226,8 @@ class WriterProperties {
MemoryAllocator* allocator_;
bool dictionary_enabled_;
int64_t dictionary_pagesize_;
+ Encoding::type default_encoding_;
+ std::unordered_map<std::string, Encoding::type> encodings_;
int64_t pagesize_;
ParquetVersion::type parquet_version_;
Compression::type default_codec_;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 8856f51..482265e 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -128,11 +128,17 @@ int64_t ColumnWriter::Close() {
template <typename Type>
TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* schema,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows, MemoryAllocator* allocator)
+ std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
+ MemoryAllocator* allocator)
: ColumnWriter(schema, std::move(pager), expected_rows, allocator) {
- // TODO(PARQUET-590) Get decoder type from WriterProperties
- current_encoder_ =
- std::unique_ptr<EncoderType>(new PlainEncoder<Type>(schema, allocator));
+ switch (encoding) {
+ case Encoding::PLAIN:
+ current_encoder_ =
+ std::unique_ptr<EncoderType>(new PlainEncoder<Type>(schema, allocator));
+ break;
+ default:
+ ParquetException::NYI("Selected encoding is not supported");
+ }
}
// ----------------------------------------------------------------------
@@ -140,32 +146,33 @@ TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* schema,
std::shared_ptr<ColumnWriter> ColumnWriter::Make(const ColumnDescriptor* descr,
std::unique_ptr<PageWriter> pager, int64_t expected_rows,
- MemoryAllocator* allocator) {
+ const WriterProperties* properties) {
+ Encoding::type encoding = properties->encoding(descr->path());
switch (descr->physical_type()) {
case Type::BOOLEAN:
return std::make_shared<BoolWriter>(
- descr, std::move(pager), expected_rows, allocator);
+ descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::INT32:
return std::make_shared<Int32Writer>(
- descr, std::move(pager), expected_rows, allocator);
+ descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::INT64:
return std::make_shared<Int64Writer>(
- descr, std::move(pager), expected_rows, allocator);
+ descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::INT96:
return std::make_shared<Int96Writer>(
- descr, std::move(pager), expected_rows, allocator);
+ descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::FLOAT:
return std::make_shared<FloatWriter>(
- descr, std::move(pager), expected_rows, allocator);
+ descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::DOUBLE:
return std::make_shared<DoubleWriter>(
- descr, std::move(pager), expected_rows, allocator);
+ descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::BYTE_ARRAY:
return std::make_shared<ByteArrayWriter>(
- descr, std::move(pager), expected_rows, allocator);
+ descr, std::move(pager), expected_rows, encoding, properties->allocator());
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<FixedLenByteArrayWriter>(
- descr, std::move(pager), expected_rows, allocator);
+ descr, std::move(pager), expected_rows, encoding, properties->allocator());
default:
ParquetException::NYI("type reader not implemented");
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index 24f647a..93c66a4 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -20,6 +20,7 @@
#include "parquet/column/levels.h"
#include "parquet/column/page.h"
+#include "parquet/column/properties.h"
#include "parquet/encodings/encoder.h"
#include "parquet/schema/descriptor.h"
#include "parquet/types.h"
@@ -35,7 +36,7 @@ class ColumnWriter {
static std::shared_ptr<ColumnWriter> Make(const ColumnDescriptor*,
std::unique_ptr<PageWriter>, int64_t expected_rows,
- MemoryAllocator* allocator = default_allocator());
+ const WriterProperties* properties);
Type::type type() const { return descr_->physical_type(); }
@@ -103,7 +104,8 @@ class TypedColumnWriter : public ColumnWriter {
typedef typename DType::c_type T;
TypedColumnWriter(const ColumnDescriptor* schema, std::unique_ptr<PageWriter> pager,
- int64_t expected_rows, MemoryAllocator* allocator = default_allocator());
+ int64_t expected_rows, Encoding::type encoding,
+ MemoryAllocator* allocator = default_allocator());
// Write a batch of repetition levels, definition levels, and values to the
// column.
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/53475c71/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index a90f28b..e0b1f66 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -78,10 +78,10 @@ int64_t SerializedPageWriter::WriteDataPage(int32_t num_rows, int32_t num_values
std::shared_ptr<OwnedMutableBuffer> compressed_data = uncompressed_data;
if (compressor_) {
const uint8_t* uncompressed_ptr = uncompressed_data->data();
- int64_t max_compressed_size = compressor_->MaxCompressedLen(
- uncompressed_size, uncompressed_ptr);
- compressed_data = std::make_shared<OwnedMutableBuffer>(max_compressed_size,
- allocator_);
+ int64_t max_compressed_size =
+ compressor_->MaxCompressedLen(uncompressed_size, uncompressed_ptr);
+ compressed_data =
+ std::make_shared<OwnedMutableBuffer>(max_compressed_size, allocator_);
compressed_size = compressor_->Compress(uncompressed_size, uncompressed_ptr,
max_compressed_size, compressed_data->mutable_data());
}
@@ -142,10 +142,10 @@ ColumnWriter* RowGroupSerializer::NextColumn() {
col_meta->__isset.meta_data = true;
col_meta->meta_data.__set_type(ToThrift(column_descr->physical_type()));
col_meta->meta_data.__set_path_in_schema(column_descr->path()->ToDotVector());
- std::unique_ptr<PageWriter> pager(new SerializedPageWriter(sink_,
- properties_->compression(column_descr->path()), col_meta, allocator_));
+ std::unique_ptr<PageWriter> pager(new SerializedPageWriter(
+ sink_, properties_->compression(column_descr->path()), col_meta, allocator_));
current_column_writer_ =
- ColumnWriter::Make(column_descr, std::move(pager), num_rows_, allocator_);
+ ColumnWriter::Make(column_descr, std::move(pager), num_rows_, properties_);
return current_column_writer_.get();
}
@@ -214,9 +214,8 @@ RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
auto rgm_size = row_group_metadata_.size();
row_group_metadata_.resize(rgm_size + 1);
format::RowGroup* rg_metadata = &row_group_metadata_.data()[rgm_size];
- std::unique_ptr<RowGroupWriter::Contents> contents(
- new RowGroupSerializer(num_rows, &schema_, sink_.get(),
- rg_metadata, properties().get()));
+ std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
+ num_rows, &schema_, sink_.get(), rg_metadata, properties_.get()));
row_group_writer_.reset(new RowGroupWriter(std::move(contents), allocator_));
return row_group_writer_.get();
}