You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/08/02 23:38:32 UTC
[2/2] parquet-cpp git commit: PARQUET-671: performance improvements
for rle/bit-packed decoding
PARQUET-671: performance improvements for rle/bit-packed decoding
Testing on my own data shows an order-of-magnitude improvement.
I separated the commits for clarity, each one gives an imcremental improvement.
The motivation for the last commit (allowing NULL for def_levels/rep_level) is a workaround for Spark which doesn't seem to be able to generate columns without def_level, even when a column is specified as "not nullable".
Author: Eric Daniel <ed...@comscore.com>
Closes #140 from edani/decode-perf and squashes the following commits:
eec0855 [Eric Daniel] Ran "make format"
0568de6 [Eric Daniel] Only check num. of repetition levels when def_levels is set
5f54e1c [Eric Daniel] Added benchmarks for dictionary decoding
087945b [Eric Daniel] Style fixes from code review
906be73 [Eric Daniel] Allow the reader to skip rep/def decoding
04b7391 [Eric Daniel] Fast bit unpacking
bda5d84 [Eric Daniel] The bit reader can decode in batches
3f10378 [Eric Daniel] Improve decoding of repeated values in the dict encoding
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/38f0ffd5
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/38f0ffd5
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/38f0ffd5
Branch: refs/heads/master
Commit: 38f0ffd5adc37db948991a2eb6409e2727721463
Parents: 616305c
Author: Eric Daniel <ed...@comscore.com>
Authored: Tue Aug 2 16:38:24 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Tue Aug 2 16:38:24 2016 -0700
----------------------------------------------------------------------
src/parquet/column/levels.cc | 5 +-
src/parquet/column/reader.h | 10 +-
src/parquet/encodings/dictionary-encoding.h | 20 +-
src/parquet/encodings/encoding-benchmark.cc | 72 +-
src/parquet/encodings/plain-encoding.h | 6 +-
src/parquet/util/bit-stream-utils.h | 4 +
src/parquet/util/bit-stream-utils.inline.h | 102 +-
src/parquet/util/bpacking.h | 3323 ++++++++++++++++++++++
src/parquet/util/buffer.h | 2 +-
src/parquet/util/rle-encoding.h | 72 +-
10 files changed, 3549 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/column/levels.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels.cc b/src/parquet/column/levels.cc
index 9b2d901..3e7b9df 100644
--- a/src/parquet/column/levels.cc
+++ b/src/parquet/column/levels.cc
@@ -133,10 +133,7 @@ int LevelDecoder::Decode(int batch_size, int16_t* levels) {
if (encoding_ == Encoding::RLE) {
num_decoded = rle_decoder_->GetBatch(levels, num_values);
} else {
- for (int i = 0; i < num_values; ++i) {
- if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) { break; }
- ++num_decoded;
- }
+ num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
}
num_values_remaining_ -= num_decoded;
return num_decoded;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 5153698..25df2b4 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -115,6 +115,10 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
// may be less than the number of repetition and definition levels. With
// nested data this is almost certainly true.
//
+ // Set def_levels or rep_levels to nullptr if you want to skip reading them.
+ // This is only safe if you know through some other source that there are no
+ // undefined values.
+ //
// To fully exhaust a row group, you must read batches until the number of
// values read reaches the number of stored values according to the metadata.
//
@@ -171,7 +175,7 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_
int64_t values_to_read = 0;
// If the field is required and non-repeated, there are no definition levels
- if (descr_->max_definition_level() > 0) {
+ if (descr_->max_definition_level() > 0 && def_levels) {
num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
@@ -184,9 +188,9 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_
}
// Not present for non-repeated fields
- if (descr_->max_repetition_level() > 0) {
+ if (descr_->max_repetition_level() > 0 && rep_levels) {
num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
- if (num_def_levels != num_rep_levels) {
+ if (def_levels && num_def_levels != num_rep_levels) {
throw ParquetException("Number of decoded rep / def levels did not match");
}
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 7d6785e..8e121ee 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -64,22 +64,15 @@ class DictionaryDecoder : public Decoder<Type> {
virtual int Decode(T* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
- for (int i = 0; i < max_values; ++i) {
- buffer[i] = dictionary_[index()];
- }
+ int decoded_values = idx_decoder_.GetBatchWithDict(dictionary_, buffer, max_values);
+ if (decoded_values != max_values) { ParquetException::EofException(); }
+ num_values_ -= max_values;
return max_values;
}
private:
using Decoder<Type>::num_values_;
- int index() {
- int idx = 0;
- if (!idx_decoder_.Get(&idx)) ParquetException::EofException();
- --num_values_;
- return idx;
- }
-
// Only one is set.
Vector<T> dictionary_;
@@ -177,7 +170,12 @@ class DictEncoderBase {
/// Returns a conservative estimate of the number of bytes needed to encode the buffered
/// indices. Used to size the buffer passed to WriteIndices().
int EstimatedDataEncodedSize() {
- return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size());
+ // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
+ // reserve
+ // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
+ // but not reserving them would cause the encoder to fail.
+ return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size()) +
+ RleEncoder::MinBufferSize(bit_width());
}
/// The minimum bit width required to encode the currently buffered indices.
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/encodings/encoding-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-benchmark.cc b/src/parquet/encodings/encoding-benchmark.cc
index f9265bd..43348d8 100644
--- a/src/parquet/encodings/encoding-benchmark.cc
+++ b/src/parquet/encodings/encoding-benchmark.cc
@@ -17,12 +17,23 @@
#include "benchmark/benchmark.h"
-#include "parquet/encodings/plain-encoding.h"
+#include "parquet/encodings/dictionary-encoding.h"
+#include "parquet/file/reader-internal.h"
+#include "parquet/util/mem-pool.h"
namespace parquet {
+using format::ColumnChunk;
+using schema::PrimitiveNode;
+
namespace benchmark {
+std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
+ auto node = PrimitiveNode::Make("int64", repetition, Type::INT64);
+ return std::make_shared<ColumnDescriptor>(
+ node, repetition != Repetition::REQUIRED, repetition == Repetition::REPEATED);
+}
+
static void BM_PlainEncodingBoolean(::benchmark::State& state) {
std::vector<bool> values(state.range_x(), 64);
PlainEncoder<BooleanType> encoder(nullptr);
@@ -86,6 +97,65 @@ static void BM_PlainDecodingInt64(::benchmark::State& state) {
BENCHMARK(BM_PlainDecodingInt64)->Range(1024, 65536);
+template <typename Type>
+static void DecodeDict(
+ std::vector<typename Type::c_type>& values, ::benchmark::State& state) {
+ typedef typename Type::c_type T;
+ int num_values = values.size();
+
+ MemPool pool;
+ MemoryAllocator* allocator = default_allocator();
+ std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED);
+ std::shared_ptr<OwnedMutableBuffer> dict_buffer =
+ std::make_shared<OwnedMutableBuffer>();
+ auto indices = std::make_shared<OwnedMutableBuffer>();
+
+ DictEncoder<T> encoder(&pool, allocator, descr->type_length());
+ for (int i = 0; i < num_values; ++i) {
+ encoder.Put(values[i]);
+ }
+
+ dict_buffer->Resize(encoder.dict_encoded_size());
+ encoder.WriteDict(dict_buffer->mutable_data());
+ indices->Resize(encoder.EstimatedDataEncodedSize());
+ int actual_bytes = encoder.WriteIndices(indices->mutable_data(), indices->size());
+ indices->Resize(actual_bytes);
+
+ while (state.KeepRunning()) {
+ PlainDecoder<Type> dict_decoder(descr.get());
+ dict_decoder.SetData(encoder.num_entries(), dict_buffer->data(), dict_buffer->size());
+ DictionaryDecoder<Type> decoder(descr.get());
+ decoder.SetDict(&dict_decoder);
+ decoder.SetData(num_values, indices->data(), indices->size());
+ decoder.Decode(values.data(), num_values);
+ }
+
+ state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(T));
+}
+
+static void BM_DictDecodingInt64_repeats(::benchmark::State& state) {
+ typedef Int64Type Type;
+ typedef typename Type::c_type T;
+
+ std::vector<T> values(state.range_x(), 64);
+ DecodeDict<Type>(values, state);
+}
+
+BENCHMARK(BM_DictDecodingInt64_repeats)->Range(1024, 65536);
+
+static void BM_DictDecodingInt64_literals(::benchmark::State& state) {
+ typedef Int64Type Type;
+ typedef typename Type::c_type T;
+
+ std::vector<T> values(state.range_x());
+ for (size_t i = 0; i < values.size(); ++i) {
+ values[i] = i;
+ }
+ DecodeDict<Type>(values, state);
+}
+
+BENCHMARK(BM_DictDecodingInt64_literals)->Range(1024, 65536);
+
} // namespace benchmark
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index 71ae740..c169688 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -142,10 +142,8 @@ class PlainDecoder<BooleanType> : public Decoder<BooleanType> {
virtual int Decode(bool* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
- bool val;
- for (int i = 0; i < max_values; ++i) {
- if (!bit_reader_.GetValue(1, &val)) { ParquetException::EofException(); }
- buffer[i] = val;
+ if (bit_reader_.GetBatch(1, buffer, max_values) != max_values) {
+ ParquetException::EofException();
}
num_values_ -= max_values;
return max_values;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-stream-utils.h b/src/parquet/util/bit-stream-utils.h
index dd0c9e2..f7d09a9 100644
--- a/src/parquet/util/bit-stream-utils.h
+++ b/src/parquet/util/bit-stream-utils.h
@@ -122,6 +122,10 @@ class BitReader {
template <typename T>
bool GetValue(int num_bits, T* v);
+ /// Get a number of values from the buffer. Return the number of values actually read.
+ template <typename T>
+ int GetBatch(int num_bits, T* v, int batch_size);
+
/// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T
/// needs to be a little-endian native type and big enough to store
/// 'num_bytes'. The value is assumed to be byte-aligned so the stream will
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-stream-utils.inline.h b/src/parquet/util/bit-stream-utils.inline.h
index 02c0e25..5cdf76b 100644
--- a/src/parquet/util/bit-stream-utils.inline.h
+++ b/src/parquet/util/bit-stream-utils.inline.h
@@ -20,7 +20,10 @@
#ifndef PARQUET_UTIL_BIT_STREAM_UTILS_INLINE_H
#define PARQUET_UTIL_BIT_STREAM_UTILS_INLINE_H
+#include <algorithm>
+
#include "parquet/util/bit-stream-utils.h"
+#include "parquet/util/bpacking.h"
namespace parquet {
@@ -86,34 +89,97 @@ inline bool BitWriter::PutVlqInt(uint32_t v) {
}
template <typename T>
+inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer,
+ int* bit_offset, int* byte_offset, uint64_t* buffered_values) {
+ *v = BitUtil::TrailingBits(*buffered_values, *bit_offset + num_bits) >> *bit_offset;
+
+ *bit_offset += num_bits;
+ if (*bit_offset >= 64) {
+ *byte_offset += 8;
+ *bit_offset -= 64;
+
+ int bytes_remaining = max_bytes - *byte_offset;
+ if (LIKELY(bytes_remaining >= 8)) {
+ memcpy(buffered_values, buffer + *byte_offset, 8);
+ } else {
+ memcpy(buffered_values, buffer + *byte_offset, bytes_remaining);
+ }
+
+ // Read bits of v that crossed into new buffered_values_
+ *v |= BitUtil::TrailingBits(*buffered_values, *bit_offset)
+ << (num_bits - *bit_offset);
+ DCHECK_LE(*bit_offset, 64);
+ }
+}
+
+template <typename T>
inline bool BitReader::GetValue(int num_bits, T* v) {
+ return GetBatch(num_bits, v, 1) == 1;
+}
+
+template <typename T>
+inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) {
DCHECK(buffer_ != NULL);
// TODO: revisit this limit if necessary
DCHECK_LE(num_bits, 32);
DCHECK_LE(num_bits, static_cast<int>(sizeof(T) * 8));
- if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false;
-
- *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_;
-
- bit_offset_ += num_bits;
- if (bit_offset_ >= 64) {
- byte_offset_ += 8;
- bit_offset_ -= 64;
+ int bit_offset = bit_offset_;
+ int byte_offset = byte_offset_;
+ uint64_t buffered_values = buffered_values_;
+ int max_bytes = max_bytes_;
+ const uint8_t* buffer = buffer_;
+
+ uint64_t needed_bits = num_bits * batch_size;
+ uint64_t remaining_bits = (max_bytes - byte_offset) * 8 - bit_offset;
+ if (remaining_bits < needed_bits) { batch_size = remaining_bits / num_bits; }
+
+ int i = 0;
+ if (UNLIKELY(bit_offset != 0)) {
+ for (; i < batch_size && bit_offset != 0; ++i) {
+ GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset,
+ &buffered_values);
+ }
+ }
- int bytes_remaining = max_bytes_ - byte_offset_;
- if (LIKELY(bytes_remaining >= 8)) {
- memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
- } else {
- memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+ if (sizeof(T) == 4) {
+ int num_unpacked = unpack32(reinterpret_cast<const uint32_t*>(buffer + byte_offset),
+ reinterpret_cast<uint32_t*>(v + i), batch_size - i, num_bits);
+ i += num_unpacked;
+ byte_offset += num_unpacked * num_bits / 8;
+ } else {
+ const int buffer_size = 1024;
+ static uint32_t unpack_buffer[buffer_size];
+ while (i < batch_size) {
+ int unpack_size = std::min(buffer_size, batch_size - i);
+ int num_unpacked = unpack32(reinterpret_cast<const uint32_t*>(buffer + byte_offset),
+ unpack_buffer, unpack_size, num_bits);
+ if (num_unpacked == 0) { break; }
+ for (int k = 0; k < num_unpacked; ++k) {
+ v[i + k] = unpack_buffer[k];
+ }
+ i += num_unpacked;
+ byte_offset += num_unpacked * num_bits / 8;
}
+ }
- // Read bits of v that crossed into new buffered_values_
- *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_)
- << (num_bits - bit_offset_);
+ int bytes_remaining = max_bytes - byte_offset;
+ if (bytes_remaining >= 8) {
+ memcpy(&buffered_values, buffer + byte_offset, 8);
+ } else {
+ memcpy(&buffered_values, buffer + byte_offset, bytes_remaining);
}
- DCHECK_LE(bit_offset_, 64);
- return true;
+
+ for (; i < batch_size; ++i) {
+ GetValue_(
+ num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset, &buffered_values);
+ }
+
+ bit_offset_ = bit_offset;
+ byte_offset_ = byte_offset;
+ buffered_values_ = buffered_values;
+
+ return batch_size;
}
template <typename T>