You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/09/15 04:04:43 UTC
parquet-cpp git commit: PARQUET-687: C++: Switch to PLAIN encoding if
dictionary grows too large
Repository: parquet-cpp
Updated Branches:
refs/heads/master 55604b297 -> c6f5ebe52
PARQUET-687: C++: Switch to PLAIN encoding if dictionary grows too large
Implemented dictionary fallback encoding
Added tests
Added a fast path to serialize data pages
Author: Deepak Majeti <de...@hpe.com>
Closes #157 from majetideepak/PARQUET-717 and squashes the following commits:
6f51df6 [Deepak Majeti] minor comment fix
c498aeb [Deepak Majeti] modify comment style
eac9114 [Deepak Majeti] clang format
da46033 [Deepak Majeti] added comments and fixed review suggestions
312bad8 [Deepak Majeti] minor changes
dd0cc7e [Deepak Majeti] Add all types to the test
54af38a [Deepak Majeti] clang format
84f360d [Deepak Majeti] added dictionary fallback support with tests
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c6f5ebe5
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c6f5ebe5
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c6f5ebe5
Branch: refs/heads/master
Commit: c6f5ebe5207c541b691f75cad0eb32e517522201
Parents: 55604b2
Author: Deepak Majeti <de...@hpe.com>
Authored: Thu Sep 15 00:04:35 2016 -0400
Committer: Wes McKinney <we...@apache.org>
Committed: Thu Sep 15 00:04:35 2016 -0400
----------------------------------------------------------------------
src/parquet/column/column-writer-test.cc | 51 +++++++++++++++++++--
src/parquet/column/page.h | 5 ++-
src/parquet/column/properties-test.cc | 2 +-
src/parquet/column/properties.h | 41 +++++++++++------
src/parquet/column/writer.cc | 60 ++++++++++++++++++-------
src/parquet/column/writer.h | 64 ++++++++++++++++++++++-----
src/parquet/file/file-metadata-test.cc | 8 ++--
src/parquet/file/metadata.cc | 15 ++++---
src/parquet/file/metadata.h | 2 +-
src/parquet/file/writer-internal.cc | 6 +--
src/parquet/file/writer-internal.h | 2 +-
11 files changed, 192 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index b3ca080..a87dc48 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -39,6 +39,8 @@ namespace test {
const int SMALL_SIZE = 100;
// Larger size to test some corner cases, only used in some specific cases.
const int LARGE_SIZE = 10000;
+// Very large size to test dictionary fallback.
+const int VERY_LARGE_SIZE = 400000;
template <typename TestType>
class TestPrimitiveWriter : public ::testing::Test {
@@ -74,10 +76,10 @@ class TestPrimitiveWriter : public ::testing::Test {
repetition_levels_out_.resize(SMALL_SIZE);
SetUpSchemaRequired();
- metadata_accessor_ =
- ColumnChunkMetaData::Make(reinterpret_cast<uint8_t*>(&thrift_metadata_));
}
+ Type::type type_num() { return TestType::type_num; }
+
void BuildReader() {
auto buffer = sink_->GetBuffer();
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
@@ -130,7 +132,23 @@ class TestPrimitiveWriter : public ::testing::Test {
ASSERT_EQ(this->values_, this->values_out_);
}
- int64_t metadata_num_values() const { return metadata_accessor_->num_values(); }
+ int64_t metadata_num_values() {
+ // Metadata accessor must be created lazily.
+ // This is because the ColumnChunkMetaData semantics dictate the metadata object is
+ // complete (no changes to the metadata buffer can be made after instantiation)
+ auto metadata_accessor =
+ ColumnChunkMetaData::Make(reinterpret_cast<const uint8_t*>(&thrift_metadata_));
+ return metadata_accessor->num_values();
+ }
+
+ std::vector<Encoding::type> metadata_encodings() {
+ // Metadata accessor must be created lazily.
+ // This is because the ColumnChunkMetaData semantics dictate the metadata object is
+ // complete (no changes to the metadata buffer can be made after instantiation)
+ auto metadata_accessor =
+ ColumnChunkMetaData::Make(reinterpret_cast<const uint8_t*>(&thrift_metadata_));
+ return metadata_accessor->encodings();
+ }
protected:
int64_t values_read_;
@@ -156,7 +174,6 @@ class TestPrimitiveWriter : public ::testing::Test {
NodePtr node_;
format::ColumnChunk thrift_metadata_;
std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
- std::unique_ptr<ColumnChunkMetaData> metadata_accessor_;
std::shared_ptr<ColumnDescriptor> schema_;
std::unique_ptr<InMemoryOutputStream> sink_;
std::shared_ptr<WriterProperties> writer_properties_;
@@ -334,5 +351,31 @@ TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
ASSERT_EQ(this->values_, this->values_out_);
}
+// Test case for dictionary fallback encoding
+TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
+ this->GenerateData(VERY_LARGE_SIZE);
+
+ auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY);
+ writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+ writer->Close();
+
+ // Just read the first SMALL_SIZE rows to ensure we could read it back in
+ this->ReadColumn();
+ ASSERT_EQ(SMALL_SIZE, this->values_read_);
+ this->values_.resize(SMALL_SIZE);
+ ASSERT_EQ(this->values_, this->values_out_);
+ std::vector<Encoding::type> encodings = this->metadata_encodings();
+ // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case
+ // Dictionary encoding is not allowed for boolean type
+ // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case
+ ASSERT_EQ(Encoding::RLE, encodings[0]);
+ if (this->type_num() != Type::BOOLEAN) {
+ ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[1]);
+ ASSERT_EQ(Encoding::PLAIN, encodings[2]);
+ } else {
+ ASSERT_EQ(Encoding::PLAIN, encodings[1]);
+ }
+}
+
} // namespace test
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index 13dec2c..c06d3de 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -171,7 +171,10 @@ class PageWriter {
public:
virtual ~PageWriter() {}
- virtual void Close() = 0;
+ // The Column Writer decides if dictionary encoding is used if set and
+ // if the dictionary encoding has fallen back to default encoding on reaching dictionary
+ // page limit
+ virtual void Close(bool has_dictionary, bool fallback) = 0;
virtual int64_t WriteDataPage(const DataPage& page) = 0;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/properties-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties-test.cc b/src/parquet/column/properties-test.cc
index f1eeaf3..0d7314b 100644
--- a/src/parquet/column/properties-test.cc
+++ b/src/parquet/column/properties-test.cc
@@ -37,7 +37,7 @@ TEST(TestWriterProperties, Basics) {
std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize());
- ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props->dictionary_pagesize());
+ ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit());
ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
index c8f103b..91a4672 100644
--- a/src/parquet/column/properties.h
+++ b/src/parquet/column/properties.h
@@ -80,7 +80,8 @@ ReaderProperties PARQUET_EXPORT default_reader_properties();
static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
-static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
+static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
+static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
ParquetVersion::PARQUET_1_0;
@@ -96,7 +97,8 @@ class PARQUET_EXPORT WriterProperties {
Builder()
: allocator_(default_allocator()),
dictionary_enabled_default_(DEFAULT_IS_DICTIONARY_ENABLED),
- dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
+ dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
+ write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
pagesize_(DEFAULT_PAGE_SIZE),
version_(DEFAULT_WRITER_VERSION),
created_by_(DEFAULT_CREATED_BY),
@@ -137,8 +139,13 @@ class PARQUET_EXPORT WriterProperties {
return this->enable_dictionary(path->ToDotString());
}
- Builder* dictionary_pagesize(int64_t dictionary_psize) {
- dictionary_pagesize_ = dictionary_psize;
+ Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) {
+ dictionary_pagesize_limit_ = dictionary_psize_limit;
+ return this;
+ }
+
+ Builder* write_batch_size(int64_t write_batch_size) {
+ write_batch_size_ = write_batch_size;
return this;
}
@@ -214,17 +221,18 @@ class PARQUET_EXPORT WriterProperties {
}
std::shared_ptr<WriterProperties> build() {
- return std::shared_ptr<WriterProperties>(
- new WriterProperties(allocator_, dictionary_enabled_default_,
- dictionary_enabled_, dictionary_pagesize_, pagesize_, version_, created_by_,
- default_encoding_, encodings_, default_codec_, codecs_));
+ return std::shared_ptr<WriterProperties>(new WriterProperties(allocator_,
+ dictionary_enabled_default_, dictionary_enabled_, dictionary_pagesize_limit_,
+ write_batch_size_, pagesize_, version_, created_by_, default_encoding_,
+ encodings_, default_codec_, codecs_));
}
private:
MemoryAllocator* allocator_;
bool dictionary_enabled_default_;
std::unordered_map<std::string, bool> dictionary_enabled_;
- int64_t dictionary_pagesize_;
+ int64_t dictionary_pagesize_limit_;
+ int64_t write_batch_size_;
int64_t pagesize_;
ParquetVersion::type version_;
std::string created_by_;
@@ -246,7 +254,9 @@ class PARQUET_EXPORT WriterProperties {
return dictionary_enabled_default_;
}
- inline int64_t dictionary_pagesize() const { return dictionary_pagesize_; }
+ inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
+
+ inline int64_t write_batch_size() const { return write_batch_size_; }
inline int64_t data_pagesize() const { return pagesize_; }
@@ -286,14 +296,16 @@ class PARQUET_EXPORT WriterProperties {
private:
explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled_default,
std::unordered_map<std::string, bool> dictionary_enabled,
- int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version,
- const std::string& created_by, Encoding::type default_encoding,
+ int64_t dictionary_pagesize, int64_t write_batch_size, int64_t pagesize,
+ ParquetVersion::type version, const std::string& created_by,
+ Encoding::type default_encoding,
std::unordered_map<std::string, Encoding::type> encodings,
Compression::type default_codec, const ColumnCodecs& codecs)
: allocator_(allocator),
dictionary_enabled_default_(dictionary_enabled_default),
dictionary_enabled_(dictionary_enabled),
- dictionary_pagesize_(dictionary_pagesize),
+ dictionary_pagesize_limit_(dictionary_pagesize),
+ write_batch_size_(write_batch_size),
pagesize_(pagesize),
parquet_version_(version),
parquet_created_by_(created_by),
@@ -304,7 +316,8 @@ class PARQUET_EXPORT WriterProperties {
MemoryAllocator* allocator_;
bool dictionary_enabled_default_;
std::unordered_map<std::string, bool> dictionary_enabled_;
- int64_t dictionary_pagesize_;
+ int64_t dictionary_pagesize_limit_;
+ int64_t write_batch_size_;
int64_t pagesize_;
ParquetVersion::type parquet_version_;
std::string parquet_created_by_;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index da4b17c..1fbea62 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -47,7 +47,8 @@ ColumnWriter::ColumnWriter(const ColumnDescriptor* descr,
num_buffered_encoded_values_(0),
num_rows_(0),
total_bytes_written_(0),
- closed_(false) {
+ closed_(false),
+ fallback_(false) {
InitSinks();
}
@@ -118,7 +119,13 @@ void ColumnWriter::AddDataPage() {
DataPage page(
uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE);
- data_pages_.push_back(std::move(page));
+ // Write the page to OutputStream eagerly if there is no dictionary or
+ // if dictionary encoding has fallen back to PLAIN
+ if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
+ data_pages_.push_back(std::move(page));
+ } else { // Eagerly write pages
+ WriteDataPage(page);
+ }
// Re-initialize the sinks as GetBuffer made them invalid.
InitSinks();
@@ -134,52 +141,71 @@ void ColumnWriter::WriteDataPage(const DataPage& page) {
int64_t ColumnWriter::Close() {
if (!closed_) {
closed_ = true;
- if (has_dictionary_) { WriteDictionaryPage(); }
- // Write all outstanding data to a new page
- if (num_buffered_values_ > 0) { AddDataPage(); }
+ if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); }
+
+ FlushBufferedDataPages();
- for (size_t i = 0; i < data_pages_.size(); i++) {
- WriteDataPage(data_pages_[i]);
- }
+ pager_->Close(has_dictionary_, fallback_);
}
if (num_rows_ != expected_rows_) {
throw ParquetException(
- "Less then the number of expected rows written in"
+ "Less than the number of expected rows written in"
" the current column chunk");
}
- pager_->Close();
-
return total_bytes_written_;
}
+void ColumnWriter::FlushBufferedDataPages() {
+ // Write all outstanding data to a new page
+ if (num_buffered_values_ > 0) { AddDataPage(); }
+ for (size_t i = 0; i < data_pages_.size(); i++) {
+ WriteDataPage(data_pages_[i]);
+ }
+ data_pages_.clear();
+}
+
// ----------------------------------------------------------------------
// TypedColumnWriter
template <typename Type>
-TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* schema,
+TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* descr,
std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
const WriterProperties* properties)
- : ColumnWriter(schema, std::move(pager), expected_rows,
+ : ColumnWriter(descr, std::move(pager), expected_rows,
(encoding == Encoding::PLAIN_DICTIONARY ||
encoding == Encoding::RLE_DICTIONARY),
encoding, properties) {
switch (encoding) {
case Encoding::PLAIN:
- current_encoder_ = std::unique_ptr<EncoderType>(
- new PlainEncoder<Type>(schema, properties->allocator()));
+ current_encoder_.reset(new PlainEncoder<Type>(descr, properties->allocator()));
break;
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
- current_encoder_ = std::unique_ptr<EncoderType>(
- new DictEncoder<Type>(schema, &pool_, properties->allocator()));
+ current_encoder_.reset(
+ new DictEncoder<Type>(descr, &pool_, properties->allocator()));
break;
default:
ParquetException::NYI("Selected encoding is not supported");
}
}
+// Only one Dictionary Page is written.
+// Fallback to PLAIN if dictionary page limit is reached.
+template <typename Type>
+void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
+ auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
+ if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
+ WriteDictionaryPage();
+ // Serialize the buffered Dictionary Indicies
+ FlushBufferedDataPages();
+ fallback_ = true;
+ // Only PLAIN encoding is supported for fallback in V1
+ current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->allocator()));
+ }
+}
+
template <typename Type>
void TypedColumnWriter<Type>::WriteDictionaryPage() {
auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index c88ead1..6a6ee5f 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -33,6 +33,7 @@
namespace parquet {
+static constexpr int WRITE_BATCH_SIZE = 1000;
class PARQUET_EXPORT ColumnWriter {
public:
ColumnWriter(const ColumnDescriptor*, std::unique_ptr<PageWriter>,
@@ -56,9 +57,21 @@ class PARQUET_EXPORT ColumnWriter {
protected:
virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
+
+ // Serializes Dictionary Page if enabled
virtual void WriteDictionaryPage() = 0;
+ // Checks if the Dictionary Page size limit is reached
+ // If the limit is reached, the Dictionary and Data Pages are serialized
+ // The encoding is switched to PLAIN
+
+ virtual void CheckDictionarySizeLimit() = 0;
+
+ // Adds Data Pages to an in memory buffer in dictionary encoding mode
+ // Serializes the Data Pages in other encoding modes
void AddDataPage();
+
+ // Serializes Data Pages
void WriteDataPage(const DataPage& page);
// Write multiple definition levels
@@ -70,6 +83,9 @@ class PARQUET_EXPORT ColumnWriter {
std::shared_ptr<Buffer> RleEncodeLevels(
const std::shared_ptr<Buffer>& buffer, int16_t max_level);
+ // Serialize the buffered Data Pages
+ void FlushBufferedDataPages();
+
const ColumnDescriptor* descr_;
std::unique_ptr<PageWriter> pager_;
@@ -100,16 +116,22 @@ class PARQUET_EXPORT ColumnWriter {
// Total number of rows written with this ColumnWriter
int num_rows_;
+ // Records the total number of bytes written by the serializer
int total_bytes_written_;
+
+ // Flag to check if the Writer has been closed
bool closed_;
+ // Flag to infer if dictionary encoding has fallen back to PLAIN
+ bool fallback_;
+
std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
+ std::vector<DataPage> data_pages_;
+
private:
void InitSinks();
-
- std::vector<DataPage> data_pages_;
};
// API to write values to a single column. This is the main client facing API.
@@ -131,26 +153,23 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
return current_encoder_->FlushValues();
}
void WriteDictionaryPage() override;
+ void CheckDictionarySizeLimit() override;
private:
+ void WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const T* values);
+
typedef Encoder<DType> EncoderType;
// Write values to a temporary buffer before they are encoded into pages
void WriteValues(int64_t num_values, const T* values);
-
- // Map of encoding type to the respective encoder object. For example, a
- // column chunk's data pages may include both dictionary-encoded and
- // plain-encoded data.
- std::unordered_map<int, std::shared_ptr<EncoderType>> encoders_;
-
std::unique_ptr<EncoderType> current_encoder_;
};
template <typename DType>
-inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
+inline void TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
int64_t values_to_write = 0;
-
// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
for (int64_t i = 0; i < num_values; ++i) {
@@ -178,7 +197,7 @@ inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
}
if (num_rows_ > expected_rows_) {
- throw ParquetException("More rows were written in the column chunk then expected");
+ throw ParquetException("More rows were written in the column chunk than expected");
}
WriteValues(values_to_write, values);
@@ -189,6 +208,29 @@ inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
AddDataPage();
}
+ if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
+}
+
+template <typename DType>
+inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
+ const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
+ // We check for DataPage limits only after we have inserted the values. If a user
+ // writes a large number of values, the DataPage size can be much above the limit.
+ // The purpose of this chunking is to bound this. Even if a user writes large number
+ // of values, the chunking will ensure the AddDataPage() is called at a reasonable
+ // pagesize limit
+ int64_t write_batch_size = properties_->write_batch_size();
+ int num_batches = num_values / write_batch_size;
+ int64_t num_remaining = num_values % write_batch_size;
+ for (int round = 0; round < num_batches; round++) {
+ int64_t offset = round * write_batch_size;
+ WriteMiniBatch(
+ write_batch_size, &def_levels[offset], &rep_levels[offset], &values[offset]);
+ }
+ // Write the remaining values
+ int64_t offset = num_batches * write_batch_size;
+ WriteMiniBatch(
+ num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]);
}
template <typename DType>
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/file-metadata-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index 852072c..a7f83c5 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -64,8 +64,8 @@ TEST(Metadata, TestBuildAccess) {
// column metadata
col1_builder->SetStatistics(stats_int);
col2_builder->SetStatistics(stats_float);
- col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, false);
- col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, false);
+ col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false);
+ col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false);
rg1_builder->Finish(1024);
// rowgroup2 metadata
@@ -74,8 +74,8 @@ TEST(Metadata, TestBuildAccess) {
// column metadata
col1_builder->SetStatistics(stats_int);
col2_builder->SetStatistics(stats_float);
- col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, false);
- col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, false);
+ col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false);
+ col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false);
rg2_builder->Finish(1024);
// Read the metadata
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index bc0f7b9..00ce990 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -353,7 +353,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
void Finish(int64_t num_values, int64_t dictionary_page_offset,
int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size,
- int64_t uncompressed_size, bool dictionary_fallback = false) {
+ int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback) {
if (dictionary_page_offset > 0) {
column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size);
} else {
@@ -368,16 +368,18 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
column_chunk_->meta_data.__set_total_compressed_size(compressed_size);
std::vector<format::Encoding::type> thrift_encodings;
thrift_encodings.push_back(ToThrift(Encoding::RLE));
- if (properties_->dictionary_enabled(column_->path())) {
+ if (has_dictionary) {
thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding()));
// add the encoding only if it is unique
if (properties_->version() == ParquetVersion::PARQUET_2_0) {
thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding()));
}
- }
- if (!properties_->dictionary_enabled(column_->path()) || dictionary_fallback) {
+ } else { // Dictionary not enabled
thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path())));
}
+ // Only PLAIN encoding is supported for fallback in V1
+ // TODO(majetideepak): Use user specified encoding for V2
+ if (dictionary_fallback) { thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); }
column_chunk_->meta_data.__set_encodings(thrift_encodings);
}
@@ -410,9 +412,10 @@ void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) {
void ColumnChunkMetaDataBuilder::Finish(int64_t num_values,
int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset,
- int64_t compressed_size, int64_t uncompressed_size, bool dictionary_fallback) {
+ int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary,
+ bool dictionary_fallback) {
impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset,
- compressed_size, uncompressed_size, dictionary_fallback);
+ compressed_size, uncompressed_size, has_dictionary, dictionary_fallback);
}
const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 1d96621..0ef6fa0 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -149,7 +149,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
// commit the metadata
void Finish(int64_t num_values, int64_t dictonary_page_offset,
int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size,
- int64_t uncompressed_size, bool dictionary_fallback);
+ int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback);
private:
explicit ColumnChunkMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index fb05f13..2d396b7 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -46,11 +46,9 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type
compressor_ = Codec::Create(codec);
}
-void SerializedPageWriter::Close() {
- // index_page_offset = 0 since they are not supported
- // TODO: Remove default fallback = 'false' when implemented
+void SerializedPageWriter::Close(bool has_dictionary, bool fallback) {
metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
- total_compressed_size_, total_uncompressed_size_, false);
+ total_compressed_size_, total_uncompressed_size_, has_dictionary, fallback);
}
std::shared_ptr<Buffer> SerializedPageWriter::Compress(
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6f5ebe5/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 645d4bf..e6364e9 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -44,7 +44,7 @@ class SerializedPageWriter : public PageWriter {
int64_t WriteDictionaryPage(const DictionaryPage& page) override;
- void Close() override;
+ void Close(bool has_dictionary, bool fallback) override;
private:
OutputStream* sink_;