You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2019/02/05 14:16:52 UTC
[arrow] branch master updated: PARQUET-1521: [C++] Use pure virtual
interfaces for parquet::TypedColumnWriter,
remove use of 'extern template class'
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7ce2655 PARQUET-1521: [C++] Use pure virtual interfaces for parquet::TypedColumnWriter, remove use of 'extern template class'
7ce2655 is described below
commit 7ce26553b8ce78085751e4a4ae603d4043abf337
Author: Wes McKinney <we...@apache.org>
AuthorDate: Tue Feb 5 15:15:43 2019 +0100
PARQUET-1521: [C++] Use pure virtual interfaces for parquet::TypedColumnWriter, remove use of 'extern template class'
This follows corresponding work in TypedColumnReader. The public API is unchanged as can be verified by lack of changes to the unit tests
Author: Wes McKinney <we...@apache.org>
Closes #3551 from wesm/PARQUET-1521 and squashes the following commits:
aa6687a9 <Wes McKinney> Fix clang warnings
33555044 <Wes McKinney> Print build warning level
b657ac93 <Wes McKinney> Fix parquet-column-io-benchmark
61204dec <Wes McKinney> Refactor TypedColumnWriter implementation to be based on pure virtual interface, remove use of extern template class
---
cpp/cmake_modules/SetupCxxFlags.cmake | 2 +
cpp/src/parquet/column-io-benchmark.cc | 26 +-
cpp/src/parquet/column_writer.cc | 450 ++++++++++++++++++++++-----------
cpp/src/parquet/column_writer.h | 208 +++------------
4 files changed, 350 insertions(+), 336 deletions(-)
diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index 44ca22f..43dab02 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -111,6 +111,8 @@ if (NOT BUILD_WARNING_LEVEL)
endif(NOT BUILD_WARNING_LEVEL)
string(TOUPPER ${BUILD_WARNING_LEVEL} BUILD_WARNING_LEVEL)
+message(STATUS "Arrow build warning level: ${BUILD_WARNING_LEVEL}")
+
if ("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
# Pre-checkin builds
if ("${COMPILER_FAMILY}" STREQUAL "msvc")
diff --git a/cpp/src/parquet/column-io-benchmark.cc b/cpp/src/parquet/column-io-benchmark.cc
index c648d56..762bcb7 100644
--- a/cpp/src/parquet/column-io-benchmark.cc
+++ b/cpp/src/parquet/column-io-benchmark.cc
@@ -30,14 +30,15 @@ using schema::PrimitiveNode;
namespace benchmark {
-std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
+std::shared_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
ColumnChunkMetaDataBuilder* metadata,
ColumnDescriptor* schema,
const WriterProperties* properties) {
std::unique_ptr<PageWriter> pager =
PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata);
- return std::unique_ptr<Int64Writer>(new Int64Writer(
- metadata, std::move(pager), false /*use_dictionary*/, Encoding::PLAIN, properties));
+ std::shared_ptr<ColumnWriter> writer =
+ ColumnWriter::Make(metadata, std::move(pager), properties);
+ return std::static_pointer_cast<Int64Writer>(writer);
}
std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
@@ -65,14 +66,17 @@ static void BM_WriteInt64Column(::benchmark::State& state) {
std::vector<int16_t> definition_levels(state.range(0), 1);
std::vector<int16_t> repetition_levels(state.range(0), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
- WriterProperties::Builder builder;
- std::shared_ptr<WriterProperties> properties = builder.compression(codec)->build();
+ std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
+ .compression(codec)
+ ->encoding(Encoding::PLAIN)
+ ->disable_dictionary()
+ ->build();
auto metadata = ColumnChunkMetaDataBuilder::Make(
properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
while (state.KeepRunning()) {
InMemoryOutputStream stream;
- std::unique_ptr<Int64Writer> writer = BuildWriter(
+ std::shared_ptr<Int64Writer> writer = BuildWriter(
state.range(0), &stream, metadata.get(), schema.get(), properties.get());
writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(),
values.data());
@@ -125,13 +129,17 @@ static void BM_ReadInt64Column(::benchmark::State& state) {
std::vector<int16_t> definition_levels(state.range(0), 1);
std::vector<int16_t> repetition_levels(state.range(0), 0);
std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
- WriterProperties::Builder builder;
- std::shared_ptr<WriterProperties> properties = builder.compression(codec)->build();
+ std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
+ .compression(codec)
+ ->encoding(Encoding::PLAIN)
+ ->disable_dictionary()
+ ->build();
+
auto metadata = ColumnChunkMetaDataBuilder::Make(
properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
InMemoryOutputStream stream;
- std::unique_ptr<Int64Writer> writer = BuildWriter(
+ std::shared_ptr<Int64Writer> writer = BuildWriter(
state.range(0), &stream, metadata.get(), schema.get(), properties.get());
writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(),
values.data());
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 0919a3f..47a1256 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -353,57 +353,148 @@ std::shared_ptr<WriterProperties> default_writer_properties() {
return default_writer_properties;
}
-ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, bool has_dictionary,
- Encoding::type encoding, const WriterProperties* properties)
- : metadata_(metadata),
- descr_(metadata->descr()),
- pager_(std::move(pager)),
- has_dictionary_(has_dictionary),
- encoding_(encoding),
- properties_(properties),
- allocator_(properties->memory_pool()),
- num_buffered_values_(0),
- num_buffered_encoded_values_(0),
- rows_written_(0),
- total_bytes_written_(0),
- total_compressed_bytes_(0),
- closed_(false),
- fallback_(false) {
- definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
- repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
- definition_levels_rle_ =
- std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
- repetition_levels_rle_ =
- std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
- uncompressed_data_ =
- std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
- if (pager_->has_compressor()) {
- compressed_data_ =
+class ColumnWriterImpl {
+ public:
+ ColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
+ std::unique_ptr<PageWriter> pager, const bool use_dictionary,
+ Encoding::type encoding, const WriterProperties* properties)
+ : metadata_(metadata),
+ descr_(metadata->descr()),
+ pager_(std::move(pager)),
+ has_dictionary_(use_dictionary),
+ encoding_(encoding),
+ properties_(properties),
+ allocator_(properties->memory_pool()),
+ num_buffered_values_(0),
+ num_buffered_encoded_values_(0),
+ rows_written_(0),
+ total_bytes_written_(0),
+ total_compressed_bytes_(0),
+ closed_(false),
+ fallback_(false) {
+ definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+ repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+ definition_levels_rle_ =
+ std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+ repetition_levels_rle_ =
std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+ uncompressed_data_ =
+ std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+ if (pager_->has_compressor()) {
+ compressed_data_ =
+ std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+ }
}
-}
-void ColumnWriter::InitSinks() {
- definition_levels_sink_->Clear();
- repetition_levels_sink_->Clear();
-}
+ virtual ~ColumnWriterImpl() = default;
-void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
- DCHECK(!closed_);
- definition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
- sizeof(int16_t) * num_levels);
-}
+ int64_t Close();
-void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
- DCHECK(!closed_);
- repetition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
- sizeof(int16_t) * num_levels);
-}
+ protected:
+ virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
+
+ // Serializes Dictionary Page if enabled
+ virtual void WriteDictionaryPage() = 0;
+
+ // Plain-encoded statistics of the current page
+ virtual EncodedStatistics GetPageStatistics() = 0;
+
+ // Plain-encoded statistics of the whole chunk
+ virtual EncodedStatistics GetChunkStatistics() = 0;
+
+ // Merges page statistics into chunk statistics, then resets the values
+ virtual void ResetPageStatistics() = 0;
+
+ // Adds Data Pages to an in memory buffer in dictionary encoding mode
+ // Serializes the Data Pages in other encoding modes
+ void AddDataPage();
+
+ // Serializes Data Pages
+ void WriteDataPage(const CompressedDataPage& page);
+
+ // Write multiple definition levels
+ void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
+ DCHECK(!closed_);
+ definition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
+ sizeof(int16_t) * num_levels);
+ }
+
+ // Write multiple repetition levels
+ void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
+ DCHECK(!closed_);
+ repetition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
+ sizeof(int16_t) * num_levels);
+ }
+
+ // RLE encode the src_buffer into dest_buffer and return the encoded size
+ int64_t RleEncodeLevels(const Buffer& src_buffer, ResizableBuffer* dest_buffer,
+ int16_t max_level);
+
+ // Serialize the buffered Data Pages
+ void FlushBufferedDataPages();
+
+ ColumnChunkMetaDataBuilder* metadata_;
+ const ColumnDescriptor* descr_;
+
+ std::unique_ptr<PageWriter> pager_;
+
+ bool has_dictionary_;
+ Encoding::type encoding_;
+ const WriterProperties* properties_;
+
+ LevelEncoder level_encoder_;
+
+ ::arrow::MemoryPool* allocator_;
+
+ // The total number of values stored in the data page. This is the maximum of
+ // the number of encoded definition levels or encoded values. For
+ // non-repeated, required columns, this is equal to the number of encoded
+ // values. For repeated or optional values, there may be fewer data values
+ // than levels, and this tells you how many encoded levels there are in that
+ // case.
+ int64_t num_buffered_values_;
+
+ // The total number of stored values. For repeated or optional values, this
+ // number may be lower than num_buffered_values_.
+ int64_t num_buffered_encoded_values_;
+
+ // Total number of rows written with this ColumnWriter
+ int rows_written_;
+
+ // Records the total number of bytes written by the serializer
+ int64_t total_bytes_written_;
+
+ // Records the current number of compressed bytes in a column
+ int64_t total_compressed_bytes_;
+
+ // Flag to check if the Writer has been closed
+ bool closed_;
+
+ // Flag to infer if dictionary encoding has fallen back to PLAIN
+ bool fallback_;
+
+ std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
+ std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
+
+ std::shared_ptr<ResizableBuffer> definition_levels_rle_;
+ std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
+
+ std::shared_ptr<ResizableBuffer> uncompressed_data_;
+ std::shared_ptr<ResizableBuffer> compressed_data_;
+
+ std::vector<CompressedDataPage> data_pages_;
+
+ private:
+ void InitSinks() {
+ definition_levels_sink_->Clear();
+ repetition_levels_sink_->Clear();
+ }
+};
// return the size of the encoded buffer
-int64_t ColumnWriter::RleEncodeLevels(const Buffer& src_buffer,
- ResizableBuffer* dest_buffer, int16_t max_level) {
+int64_t ColumnWriterImpl::RleEncodeLevels(const Buffer& src_buffer,
+ ResizableBuffer* dest_buffer,
+ int16_t max_level) {
// TODO: This only works with due to some RLE specifics
int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
static_cast<int>(num_buffered_values_)) +
@@ -425,7 +516,7 @@ int64_t ColumnWriter::RleEncodeLevels(const Buffer& src_buffer,
return encoded_size;
}
-void ColumnWriter::AddDataPage() {
+void ColumnWriterImpl::AddDataPage() {
int64_t definition_levels_rle_size = 0;
int64_t repetition_levels_rle_size = 0;
@@ -493,11 +584,11 @@ void ColumnWriter::AddDataPage() {
num_buffered_encoded_values_ = 0;
}
-void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
+void ColumnWriterImpl::WriteDataPage(const CompressedDataPage& page) {
total_bytes_written_ += pager_->WriteDataPage(page);
}
-int64_t ColumnWriter::Close() {
+int64_t ColumnWriterImpl::Close() {
if (!closed_) {
closed_ = true;
if (has_dictionary_ && !fallback_) {
@@ -525,7 +616,7 @@ int64_t ColumnWriter::Close() {
return total_bytes_written_;
}
-void ColumnWriter::FlushBufferedDataPages() {
+void ColumnWriterImpl::FlushBufferedDataPages() {
// Write all outstanding data to a new page
if (num_buffered_values_ > 0) {
AddDataPage();
@@ -540,47 +631,123 @@ void ColumnWriter::FlushBufferedDataPages() {
// ----------------------------------------------------------------------
// TypedColumnWriter
-template <typename Type>
-TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager,
- const bool use_dictionary,
- Encoding::type encoding,
- const WriterProperties* properties)
- : ColumnWriter(metadata, std::move(pager), use_dictionary, encoding, properties) {
- current_encoder_ = MakeEncoder(Type::type_num, encoding, use_dictionary, descr_,
- properties->memory_pool());
-
- if (properties->statistics_enabled(descr_->path()) &&
- (SortOrder::UNKNOWN != descr_->sort_order())) {
- page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
- chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
+template <typename DType>
+class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<DType> {
+ public:
+ using T = typename DType::c_type;
+
+ TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
+ std::unique_ptr<PageWriter> pager, const bool use_dictionary,
+ Encoding::type encoding, const WriterProperties* properties)
+ : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding,
+ properties) {
+ current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_,
+ properties->memory_pool());
+
+ if (properties->statistics_enabled(descr_->path()) &&
+ (SortOrder::UNKNOWN != descr_->sort_order())) {
+ page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
+ chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
+ }
+ }
+
+ int64_t Close() override { return ColumnWriterImpl::Close(); }
+
+ void WriteBatch(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const T* values) override;
+
+ void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const T* values) override;
+
+ int64_t EstimatedBufferedValueBytes() const override {
+ return current_encoder_->EstimatedDataEncodedSize();
}
-}
+
+ protected:
+ std::shared_ptr<Buffer> GetValuesBuffer() override {
+ return current_encoder_->FlushValues();
+ }
+ void WriteDictionaryPage() override;
+
+ // Checks if the Dictionary Page size limit is reached
+ // If the limit is reached, the Dictionary and Data Pages are serialized
+ // The encoding is switched to PLAIN
+ void CheckDictionarySizeLimit();
+
+ EncodedStatistics GetPageStatistics() override {
+ EncodedStatistics result;
+ if (page_statistics_) result = page_statistics_->Encode();
+ return result;
+ }
+
+ EncodedStatistics GetChunkStatistics() override {
+ EncodedStatistics result;
+ if (chunk_statistics_) result = chunk_statistics_->Encode();
+ return result;
+ }
+
+ void ResetPageStatistics() override;
+
+ Type::type type() const override { return descr_->physical_type(); }
+
+ const ColumnDescriptor* descr() const override { return descr_; }
+
+ int64_t rows_written() const override { return rows_written_; }
+
+ int64_t total_compressed_bytes() const override { return total_compressed_bytes_; }
+
+ int64_t total_bytes_written() const override { return total_bytes_written_; }
+
+ const WriterProperties* properties() override { return properties_; }
+
+ private:
+ inline int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const T* values);
+
+ inline int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels,
+ const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const T* values,
+ int64_t* num_spaced_written);
+
+ // Write values to a temporary buffer before they are encoded into pages
+ void WriteValues(int64_t num_values, const T* values);
+ void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const T* values);
+
+ using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
+ std::unique_ptr<Encoder> current_encoder_;
+
+ typedef TypedRowGroupStatistics<DType> TypedStats;
+ std::unique_ptr<TypedStats> page_statistics_;
+ std::unique_ptr<TypedStats> chunk_statistics_;
+};
// Only one Dictionary Page is written.
// Fallback to PLAIN if dictionary page limit is reached.
-template <typename Type>
-void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
+template <typename DType>
+void TypedColumnWriterImpl<DType>::CheckDictionarySizeLimit() {
// We have to dynamic cast here because TypedEncoder<Type> as some compilers
// don't want to cast through virtual inheritance
- auto dict_encoder = dynamic_cast<DictEncoder<Type>*>(current_encoder_.get());
+ auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
WriteDictionaryPage();
// Serialize the buffered Dictionary Indicies
FlushBufferedDataPages();
fallback_ = true;
// Only PLAIN encoding is supported for fallback in V1
- current_encoder_ = MakeEncoder(Type::type_num, Encoding::PLAIN, false, descr_,
+ current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_,
properties_->memory_pool());
encoding_ = Encoding::PLAIN;
}
}
-template <typename Type>
-void TypedColumnWriter<Type>::WriteDictionaryPage() {
+template <typename DType>
+void TypedColumnWriterImpl<DType>::WriteDictionaryPage() {
// We have to dynamic cast here because TypedEncoder<Type> as some compilers
// don't want to cast through virtual inheritance
- auto dict_encoder = dynamic_cast<DictEncoder<Type>*>(current_encoder_.get());
+ auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
DCHECK(dict_encoder);
std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
@@ -591,22 +758,8 @@ void TypedColumnWriter<Type>::WriteDictionaryPage() {
total_bytes_written_ += pager_->WriteDictionaryPage(page);
}
-template <typename Type>
-EncodedStatistics TypedColumnWriter<Type>::GetPageStatistics() {
- EncodedStatistics result;
- if (page_statistics_) result = page_statistics_->Encode();
- return result;
-}
-
-template <typename Type>
-EncodedStatistics TypedColumnWriter<Type>::GetChunkStatistics() {
- EncodedStatistics result;
- if (chunk_statistics_) result = chunk_statistics_->Encode();
- return result;
-}
-
-template <typename Type>
-void TypedColumnWriter<Type>::ResetPageStatistics() {
+template <typename DType>
+void TypedColumnWriterImpl<DType>::ResetPageStatistics() {
if (chunk_statistics_ != nullptr) {
chunk_statistics_->Merge(*page_statistics_);
page_statistics_->Reset();
@@ -614,58 +767,13 @@ void TypedColumnWriter<Type>::ResetPageStatistics() {
}
// ----------------------------------------------------------------------
-// Dynamic column writer constructor
-
-std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager,
- const WriterProperties* properties) {
- const ColumnDescriptor* descr = metadata->descr();
- const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
- descr->physical_type() != Type::BOOLEAN;
- Encoding::type encoding = properties->encoding(descr->path());
- if (use_dictionary) {
- encoding = properties->dictionary_index_encoding();
- }
- switch (descr->physical_type()) {
- case Type::BOOLEAN:
- return std::make_shared<BoolWriter>(metadata, std::move(pager), use_dictionary,
- encoding, properties);
- case Type::INT32:
- return std::make_shared<Int32Writer>(metadata, std::move(pager), use_dictionary,
- encoding, properties);
- case Type::INT64:
- return std::make_shared<Int64Writer>(metadata, std::move(pager), use_dictionary,
- encoding, properties);
- case Type::INT96:
- return std::make_shared<Int96Writer>(metadata, std::move(pager), use_dictionary,
- encoding, properties);
- case Type::FLOAT:
- return std::make_shared<FloatWriter>(metadata, std::move(pager), use_dictionary,
- encoding, properties);
- case Type::DOUBLE:
- return std::make_shared<DoubleWriter>(metadata, std::move(pager), use_dictionary,
- encoding, properties);
- case Type::BYTE_ARRAY:
- return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), use_dictionary,
- encoding, properties);
- case Type::FIXED_LEN_BYTE_ARRAY:
- return std::make_shared<FixedLenByteArrayWriter>(
- metadata, std::move(pager), use_dictionary, encoding, properties);
- default:
- ParquetException::NYI("type reader not implemented");
- }
- // Unreachable code, but supress compiler warning
- return std::shared_ptr<ColumnWriter>(nullptr);
-}
-
-// ----------------------------------------------------------------------
// Instantiate templated classes
template <typename DType>
-inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
- const int16_t* def_levels,
- const int16_t* rep_levels,
- const T* values) {
+int64_t TypedColumnWriterImpl<DType>::WriteMiniBatch(int64_t num_values,
+ const int16_t* def_levels,
+ const int16_t* rep_levels,
+ const T* values) {
int64_t values_to_write = 0;
// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
@@ -722,7 +830,7 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
}
template <typename DType>
-inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(
+int64_t TypedColumnWriterImpl<DType>::WriteMiniBatchSpaced(
int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values,
int64_t* num_spaced_written) {
@@ -793,8 +901,10 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(
}
template <typename DType>
-void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const T* values) {
+void TypedColumnWriterImpl<DType>::WriteBatch(int64_t num_values,
+ const int16_t* def_levels,
+ const int16_t* rep_levels,
+ const T* values) {
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
@@ -817,7 +927,7 @@ void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def
}
template <typename DType>
-void TypedColumnWriter<DType>::WriteBatchSpaced(
+void TypedColumnWriterImpl<DType>::WriteBatchSpaced(
int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels,
const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
// We check for DataPage limits only after we have inserted the values. If a user
@@ -845,27 +955,63 @@ void TypedColumnWriter<DType>::WriteBatchSpaced(
}
template <typename DType>
-void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) {
+void TypedColumnWriterImpl<DType>::WriteValues(int64_t num_values, const T* values) {
dynamic_cast<ValueEncoderType*>(current_encoder_.get())
->Put(values, static_cast<int>(num_values));
}
template <typename DType>
-void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values,
- const uint8_t* valid_bits,
- int64_t valid_bits_offset,
- const T* values) {
+void TypedColumnWriterImpl<DType>::WriteValuesSpaced(int64_t num_values,
+ const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ const T* values) {
dynamic_cast<ValueEncoderType*>(current_encoder_.get())
->PutSpaced(values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
}
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<BooleanType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int32Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int64Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int96Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FloatType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<DoubleType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<ByteArrayType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FLBAType>;
+// ----------------------------------------------------------------------
+// Dynamic column writer constructor
+
+std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
+ std::unique_ptr<PageWriter> pager,
+ const WriterProperties* properties) {
+ const ColumnDescriptor* descr = metadata->descr();
+ const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
+ descr->physical_type() != Type::BOOLEAN;
+ Encoding::type encoding = properties->encoding(descr->path());
+ if (use_dictionary) {
+ encoding = properties->dictionary_index_encoding();
+ }
+ switch (descr->physical_type()) {
+ case Type::BOOLEAN:
+ return std::make_shared<TypedColumnWriterImpl<BooleanType>>(
+ metadata, std::move(pager), use_dictionary, encoding, properties);
+ case Type::INT32:
+ return std::make_shared<TypedColumnWriterImpl<Int32Type>>(
+ metadata, std::move(pager), use_dictionary, encoding, properties);
+ case Type::INT64:
+ return std::make_shared<TypedColumnWriterImpl<Int64Type>>(
+ metadata, std::move(pager), use_dictionary, encoding, properties);
+ case Type::INT96:
+ return std::make_shared<TypedColumnWriterImpl<Int96Type>>(
+ metadata, std::move(pager), use_dictionary, encoding, properties);
+ case Type::FLOAT:
+ return std::make_shared<TypedColumnWriterImpl<FloatType>>(
+ metadata, std::move(pager), use_dictionary, encoding, properties);
+ case Type::DOUBLE:
+ return std::make_shared<TypedColumnWriterImpl<DoubleType>>(
+ metadata, std::move(pager), use_dictionary, encoding, properties);
+ case Type::BYTE_ARRAY:
+ return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>(
+ metadata, std::move(pager), use_dictionary, encoding, properties);
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_shared<TypedColumnWriterImpl<FLBAType>>(
+ metadata, std::move(pager), use_dictionary, encoding, properties);
+ default:
+ ParquetException::NYI("type reader not implemented");
+ }
+ // Unreachable code, but supress compiler warning
+ return std::shared_ptr<ColumnWriter>(nullptr);
+}
} // namespace parquet
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index 254bf0d..5b9efb4 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -105,147 +105,47 @@ class PARQUET_EXPORT PageWriter {
static constexpr int WRITE_BATCH_SIZE = 1000;
class PARQUET_EXPORT ColumnWriter {
public:
- ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>,
- bool has_dictionary, Encoding::type encoding,
- const WriterProperties* properties);
-
virtual ~ColumnWriter() = default;
static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
std::unique_ptr<PageWriter>,
const WriterProperties* properties);
- Type::type type() const { return descr_->physical_type(); }
-
- const ColumnDescriptor* descr() const { return descr_; }
-
- /**
- * Closes the ColumnWriter, commits any buffered values to pages.
- *
- * @return Total size of the column in bytes
- */
- int64_t Close();
-
- int64_t rows_written() const { return rows_written_; }
-
- // Only considers the size of the compressed pages + page header
- // Some values might be still buffered an not written to a page yet
- int64_t total_compressed_bytes() const { return total_compressed_bytes_; }
-
- int64_t total_bytes_written() const { return total_bytes_written_; }
-
- const WriterProperties* properties() { return properties_; }
-
- protected:
- virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
-
- // Serializes Dictionary Page if enabled
- virtual void WriteDictionaryPage() = 0;
-
- // Checks if the Dictionary Page size limit is reached
- // If the limit is reached, the Dictionary and Data Pages are serialized
- // The encoding is switched to PLAIN
-
- virtual void CheckDictionarySizeLimit() = 0;
+ /// \brief Closes the ColumnWriter, commits any buffered values to pages.
+ /// \return Total size of the column in bytes
+ virtual int64_t Close() = 0;
- // Plain-encoded statistics of the current page
- virtual EncodedStatistics GetPageStatistics() = 0;
+ /// \brief The physical Parquet type of the column
+ virtual Type::type type() const = 0;
- // Plain-encoded statistics of the whole chunk
- virtual EncodedStatistics GetChunkStatistics() = 0;
-
- // Merges page statistics into chunk statistics, then resets the values
- virtual void ResetPageStatistics() = 0;
-
- // Adds Data Pages to an in memory buffer in dictionary encoding mode
- // Serializes the Data Pages in other encoding modes
- void AddDataPage();
-
- // Serializes Data Pages
- void WriteDataPage(const CompressedDataPage& page);
-
- // Write multiple definition levels
- void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
-
- // Write multiple repetition levels
- void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels);
-
- // RLE encode the src_buffer into dest_buffer and return the encoded size
- int64_t RleEncodeLevels(const Buffer& src_buffer, ResizableBuffer* dest_buffer,
- int16_t max_level);
-
- // Serialize the buffered Data Pages
- void FlushBufferedDataPages();
-
- ColumnChunkMetaDataBuilder* metadata_;
- const ColumnDescriptor* descr_;
-
- std::unique_ptr<PageWriter> pager_;
-
- bool has_dictionary_;
- Encoding::type encoding_;
- const WriterProperties* properties_;
+ /// \brief The schema for the column
+ virtual const ColumnDescriptor* descr() const = 0;
- LevelEncoder level_encoder_;
+ /// \brief The number of rows written so far
+ virtual int64_t rows_written() const = 0;
- ::arrow::MemoryPool* allocator_;
+ /// \brief The total size of the compressed pages + page headers. Some values
+ /// might be still buffered an not written to a page yet
+ virtual int64_t total_compressed_bytes() const = 0;
- // The total number of values stored in the data page. This is the maximum of
- // the number of encoded definition levels or encoded values. For
- // non-repeated, required columns, this is equal to the number of encoded
- // values. For repeated or optional values, there may be fewer data values
- // than levels, and this tells you how many encoded levels there are in that
- // case.
- int64_t num_buffered_values_;
+ /// \brief The total number of bytes written as serialized data and
+ /// dictionary pages to the ColumnChunk so far
+ virtual int64_t total_bytes_written() const = 0;
- // The total number of stored values. For repeated or optional values, this
- // number may be lower than num_buffered_values_.
- int64_t num_buffered_encoded_values_;
-
- // Total number of rows written with this ColumnWriter
- int rows_written_;
-
- // Records the total number of bytes written by the serializer
- int64_t total_bytes_written_;
-
- // Records the current number of compressed bytes in a column
- int64_t total_compressed_bytes_;
-
- // Flag to check if the Writer has been closed
- bool closed_;
-
- // Flag to infer if dictionary encoding has fallen back to PLAIN
- bool fallback_;
-
- std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
- std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
-
- std::shared_ptr<ResizableBuffer> definition_levels_rle_;
- std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
-
- std::shared_ptr<ResizableBuffer> uncompressed_data_;
- std::shared_ptr<ResizableBuffer> compressed_data_;
-
- std::vector<CompressedDataPage> data_pages_;
-
- private:
- void InitSinks();
+ /// \brief The file-level writer properties
+ virtual const WriterProperties* properties() = 0;
};
// API to write values to a single column. This is the main client facing API.
template <typename DType>
-class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter : public ColumnWriter {
+class TypedColumnWriter : public ColumnWriter {
public:
- typedef typename DType::c_type T;
-
- TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, const bool use_dictionary,
- Encoding::type encoding, const WriterProperties* properties);
+ using T = typename DType::c_type;
// Write a batch of repetition levels, definition levels, and values to the
// column.
- void WriteBatch(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const T* values);
+ virtual void WriteBatch(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const T* values) = 0;
/// Write a batch of repetition levels, definition levels, and values to the
/// column.
@@ -273,63 +173,21 @@ class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter : public ColumnWriter {
/// @param values The values in the lowest nested level including
/// spacing for nulls on the lowest levels; input has the length
/// of the number of rows on the lowest nesting level.
- void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const T* values);
+ virtual void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const T* values) = 0;
// Estimated size of the values that are not written to a page yet
- int64_t EstimatedBufferedValueBytes() const {
- return current_encoder_->EstimatedDataEncodedSize();
- }
-
- protected:
- std::shared_ptr<Buffer> GetValuesBuffer() override {
- return current_encoder_->FlushValues();
- }
- void WriteDictionaryPage() override;
- void CheckDictionarySizeLimit() override;
- EncodedStatistics GetPageStatistics() override;
- EncodedStatistics GetChunkStatistics() override;
- void ResetPageStatistics() override;
-
- private:
- int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const T* values);
-
- int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const T* values,
- int64_t* num_spaced_written);
-
- // Write values to a temporary buffer before they are encoded into pages
- void WriteValues(int64_t num_values, const T* values);
- void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const T* values);
-
- using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
- std::unique_ptr<Encoder> current_encoder_;
-
- typedef TypedRowGroupStatistics<DType> TypedStats;
- std::unique_ptr<TypedStats> page_statistics_;
- std::unique_ptr<TypedStats> chunk_statistics_;
+ virtual int64_t EstimatedBufferedValueBytes() const = 0;
};
-typedef TypedColumnWriter<BooleanType> BoolWriter;
-typedef TypedColumnWriter<Int32Type> Int32Writer;
-typedef TypedColumnWriter<Int64Type> Int64Writer;
-typedef TypedColumnWriter<Int96Type> Int96Writer;
-typedef TypedColumnWriter<FloatType> FloatWriter;
-typedef TypedColumnWriter<DoubleType> DoubleWriter;
-typedef TypedColumnWriter<ByteArrayType> ByteArrayWriter;
-typedef TypedColumnWriter<FLBAType> FixedLenByteArrayWriter;
-
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<BooleanType>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int32Type>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int64Type>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int96Type>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<FloatType>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<DoubleType>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<ByteArrayType>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<FLBAType>;
+using BoolWriter = TypedColumnWriter<BooleanType>;
+using Int32Writer = TypedColumnWriter<Int32Type>;
+using Int64Writer = TypedColumnWriter<Int64Type>;
+using Int96Writer = TypedColumnWriter<Int96Type>;
+using FloatWriter = TypedColumnWriter<FloatType>;
+using DoubleWriter = TypedColumnWriter<DoubleType>;
+using ByteArrayWriter = TypedColumnWriter<ByteArrayType>;
+using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>;
} // namespace parquet