You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/08/02 23:38:32 UTC

[2/2] parquet-cpp git commit: PARQUET-671: performance improvements for rle/bit-packed decoding

PARQUET-671: performance improvements for rle/bit-packed decoding

Testing on my own data shows an order-of-magnitude improvement.

I separated the commits for clarity, each one gives an imcremental improvement.

The motivation for the last commit (allowing NULL for def_levels/rep_level) is a workaround for Spark  which doesn't seem to be able to generate columns without def_level, even when a column is specified as "not nullable".

Author: Eric Daniel <ed...@comscore.com>

Closes #140 from edani/decode-perf and squashes the following commits:

eec0855 [Eric Daniel] Ran "make format"
0568de6 [Eric Daniel] Only check num. of repetition levels when def_levels is set
5f54e1c [Eric Daniel] Added benchmarks for dictionary decoding
087945b [Eric Daniel] Style fixes from code review
906be73 [Eric Daniel] Allow the reader to skip rep/def decoding
04b7391 [Eric Daniel] Fast bit unpacking
bda5d84 [Eric Daniel] The bit reader can decode in batches
3f10378 [Eric Daniel] Improve decoding of repeated values in the dict encoding


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/38f0ffd5
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/38f0ffd5
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/38f0ffd5

Branch: refs/heads/master
Commit: 38f0ffd5adc37db948991a2eb6409e2727721463
Parents: 616305c
Author: Eric Daniel <ed...@comscore.com>
Authored: Tue Aug 2 16:38:24 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Tue Aug 2 16:38:24 2016 -0700

----------------------------------------------------------------------
 src/parquet/column/levels.cc                |    5 +-
 src/parquet/column/reader.h                 |   10 +-
 src/parquet/encodings/dictionary-encoding.h |   20 +-
 src/parquet/encodings/encoding-benchmark.cc |   72 +-
 src/parquet/encodings/plain-encoding.h      |    6 +-
 src/parquet/util/bit-stream-utils.h         |    4 +
 src/parquet/util/bit-stream-utils.inline.h  |  102 +-
 src/parquet/util/bpacking.h                 | 3323 ++++++++++++++++++++++
 src/parquet/util/buffer.h                   |    2 +-
 src/parquet/util/rle-encoding.h             |   72 +-
 10 files changed, 3549 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/column/levels.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels.cc b/src/parquet/column/levels.cc
index 9b2d901..3e7b9df 100644
--- a/src/parquet/column/levels.cc
+++ b/src/parquet/column/levels.cc
@@ -133,10 +133,7 @@ int LevelDecoder::Decode(int batch_size, int16_t* levels) {
   if (encoding_ == Encoding::RLE) {
     num_decoded = rle_decoder_->GetBatch(levels, num_values);
   } else {
-    for (int i = 0; i < num_values; ++i) {
-      if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) { break; }
-      ++num_decoded;
-    }
+    num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
   }
   num_values_remaining_ -= num_decoded;
   return num_decoded;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 5153698..25df2b4 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -115,6 +115,10 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
   // may be less than the number of repetition and definition levels. With
   // nested data this is almost certainly true.
   //
+  // Set def_levels or rep_levels to nullptr if you want to skip reading them.
+  // This is only safe if you know through some other source that there are no
+  // undefined values.
+  //
   // To fully exhaust a row group, you must read batches until the number of
   // values read reaches the number of stored values according to the metadata.
   //
@@ -171,7 +175,7 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_
   int64_t values_to_read = 0;
 
   // If the field is required and non-repeated, there are no definition levels
-  if (descr_->max_definition_level() > 0) {
+  if (descr_->max_definition_level() > 0 && def_levels) {
     num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
     // TODO(wesm): this tallying of values-to-decode can be performed with better
     // cache-efficiency if fused with the level decoding.
@@ -184,9 +188,9 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_
   }
 
   // Not present for non-repeated fields
-  if (descr_->max_repetition_level() > 0) {
+  if (descr_->max_repetition_level() > 0 && rep_levels) {
     num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
-    if (num_def_levels != num_rep_levels) {
+    if (def_levels && num_def_levels != num_rep_levels) {
       throw ParquetException("Number of decoded rep / def levels did not match");
     }
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 7d6785e..8e121ee 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -64,22 +64,15 @@ class DictionaryDecoder : public Decoder<Type> {
 
   virtual int Decode(T* buffer, int max_values) {
     max_values = std::min(max_values, num_values_);
-    for (int i = 0; i < max_values; ++i) {
-      buffer[i] = dictionary_[index()];
-    }
+    int decoded_values = idx_decoder_.GetBatchWithDict(dictionary_, buffer, max_values);
+    if (decoded_values != max_values) { ParquetException::EofException(); }
+    num_values_ -= max_values;
     return max_values;
   }
 
  private:
   using Decoder<Type>::num_values_;
 
-  int index() {
-    int idx = 0;
-    if (!idx_decoder_.Get(&idx)) ParquetException::EofException();
-    --num_values_;
-    return idx;
-  }
-
   // Only one is set.
   Vector<T> dictionary_;
 
@@ -177,7 +170,12 @@ class DictEncoderBase {
   /// Returns a conservative estimate of the number of bytes needed to encode the buffered
   /// indices. Used to size the buffer passed to WriteIndices().
   int EstimatedDataEncodedSize() {
-    return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size());
+    // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
+    // reserve
+    // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
+    // but not reserving them would cause the encoder to fail.
+    return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size()) +
+           RleEncoder::MinBufferSize(bit_width());
   }
 
   /// The minimum bit width required to encode the currently buffered indices.

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/encodings/encoding-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-benchmark.cc b/src/parquet/encodings/encoding-benchmark.cc
index f9265bd..43348d8 100644
--- a/src/parquet/encodings/encoding-benchmark.cc
+++ b/src/parquet/encodings/encoding-benchmark.cc
@@ -17,12 +17,23 @@
 
 #include "benchmark/benchmark.h"
 
-#include "parquet/encodings/plain-encoding.h"
+#include "parquet/encodings/dictionary-encoding.h"
+#include "parquet/file/reader-internal.h"
+#include "parquet/util/mem-pool.h"
 
 namespace parquet {
 
+using format::ColumnChunk;
+using schema::PrimitiveNode;
+
 namespace benchmark {
 
+std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
+  auto node = PrimitiveNode::Make("int64", repetition, Type::INT64);
+  return std::make_shared<ColumnDescriptor>(
+      node, repetition != Repetition::REQUIRED, repetition == Repetition::REPEATED);
+}
+
 static void BM_PlainEncodingBoolean(::benchmark::State& state) {
   std::vector<bool> values(state.range_x(), 64);
   PlainEncoder<BooleanType> encoder(nullptr);
@@ -86,6 +97,65 @@ static void BM_PlainDecodingInt64(::benchmark::State& state) {
 
 BENCHMARK(BM_PlainDecodingInt64)->Range(1024, 65536);
 
+template <typename Type>
+static void DecodeDict(
+    std::vector<typename Type::c_type>& values, ::benchmark::State& state) {
+  typedef typename Type::c_type T;
+  int num_values = values.size();
+
+  MemPool pool;
+  MemoryAllocator* allocator = default_allocator();
+  std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED);
+  std::shared_ptr<OwnedMutableBuffer> dict_buffer =
+      std::make_shared<OwnedMutableBuffer>();
+  auto indices = std::make_shared<OwnedMutableBuffer>();
+
+  DictEncoder<T> encoder(&pool, allocator, descr->type_length());
+  for (int i = 0; i < num_values; ++i) {
+    encoder.Put(values[i]);
+  }
+
+  dict_buffer->Resize(encoder.dict_encoded_size());
+  encoder.WriteDict(dict_buffer->mutable_data());
+  indices->Resize(encoder.EstimatedDataEncodedSize());
+  int actual_bytes = encoder.WriteIndices(indices->mutable_data(), indices->size());
+  indices->Resize(actual_bytes);
+
+  while (state.KeepRunning()) {
+    PlainDecoder<Type> dict_decoder(descr.get());
+    dict_decoder.SetData(encoder.num_entries(), dict_buffer->data(), dict_buffer->size());
+    DictionaryDecoder<Type> decoder(descr.get());
+    decoder.SetDict(&dict_decoder);
+    decoder.SetData(num_values, indices->data(), indices->size());
+    decoder.Decode(values.data(), num_values);
+  }
+
+  state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(T));
+}
+
+static void BM_DictDecodingInt64_repeats(::benchmark::State& state) {
+  typedef Int64Type Type;
+  typedef typename Type::c_type T;
+
+  std::vector<T> values(state.range_x(), 64);
+  DecodeDict<Type>(values, state);
+}
+
+BENCHMARK(BM_DictDecodingInt64_repeats)->Range(1024, 65536);
+
+static void BM_DictDecodingInt64_literals(::benchmark::State& state) {
+  typedef Int64Type Type;
+  typedef typename Type::c_type T;
+
+  std::vector<T> values(state.range_x());
+  for (size_t i = 0; i < values.size(); ++i) {
+    values[i] = i;
+  }
+  DecodeDict<Type>(values, state);
+}
+
+BENCHMARK(BM_DictDecodingInt64_literals)->Range(1024, 65536);
+
 }  // namespace benchmark
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index 71ae740..c169688 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -142,10 +142,8 @@ class PlainDecoder<BooleanType> : public Decoder<BooleanType> {
 
   virtual int Decode(bool* buffer, int max_values) {
     max_values = std::min(max_values, num_values_);
-    bool val;
-    for (int i = 0; i < max_values; ++i) {
-      if (!bit_reader_.GetValue(1, &val)) { ParquetException::EofException(); }
-      buffer[i] = val;
+    if (bit_reader_.GetBatch(1, buffer, max_values) != max_values) {
+      ParquetException::EofException();
     }
     num_values_ -= max_values;
     return max_values;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-stream-utils.h b/src/parquet/util/bit-stream-utils.h
index dd0c9e2..f7d09a9 100644
--- a/src/parquet/util/bit-stream-utils.h
+++ b/src/parquet/util/bit-stream-utils.h
@@ -122,6 +122,10 @@ class BitReader {
   template <typename T>
   bool GetValue(int num_bits, T* v);
 
+  /// Get a number of values from the buffer. Return the number of values actually read.
+  template <typename T>
+  int GetBatch(int num_bits, T* v, int batch_size);
+
   /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T
   /// needs to be a little-endian native type and big enough to store
   /// 'num_bytes'. The value is assumed to be byte-aligned so the stream will

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-stream-utils.inline.h b/src/parquet/util/bit-stream-utils.inline.h
index 02c0e25..5cdf76b 100644
--- a/src/parquet/util/bit-stream-utils.inline.h
+++ b/src/parquet/util/bit-stream-utils.inline.h
@@ -20,7 +20,10 @@
 #ifndef PARQUET_UTIL_BIT_STREAM_UTILS_INLINE_H
 #define PARQUET_UTIL_BIT_STREAM_UTILS_INLINE_H
 
+#include <algorithm>
+
 #include "parquet/util/bit-stream-utils.h"
+#include "parquet/util/bpacking.h"
 
 namespace parquet {
 
@@ -86,34 +89,97 @@ inline bool BitWriter::PutVlqInt(uint32_t v) {
 }
 
 template <typename T>
+inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer,
+    int* bit_offset, int* byte_offset, uint64_t* buffered_values) {
+  *v = BitUtil::TrailingBits(*buffered_values, *bit_offset + num_bits) >> *bit_offset;
+
+  *bit_offset += num_bits;
+  if (*bit_offset >= 64) {
+    *byte_offset += 8;
+    *bit_offset -= 64;
+
+    int bytes_remaining = max_bytes - *byte_offset;
+    if (LIKELY(bytes_remaining >= 8)) {
+      memcpy(buffered_values, buffer + *byte_offset, 8);
+    } else {
+      memcpy(buffered_values, buffer + *byte_offset, bytes_remaining);
+    }
+
+    // Read bits of v that crossed into new buffered_values_
+    *v |= BitUtil::TrailingBits(*buffered_values, *bit_offset)
+          << (num_bits - *bit_offset);
+    DCHECK_LE(*bit_offset, 64);
+  }
+}
+
+template <typename T>
 inline bool BitReader::GetValue(int num_bits, T* v) {
+  return GetBatch(num_bits, v, 1) == 1;
+}
+
+template <typename T>
+inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) {
   DCHECK(buffer_ != NULL);
   // TODO: revisit this limit if necessary
   DCHECK_LE(num_bits, 32);
   DCHECK_LE(num_bits, static_cast<int>(sizeof(T) * 8));
 
-  if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false;
-
-  *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_;
-
-  bit_offset_ += num_bits;
-  if (bit_offset_ >= 64) {
-    byte_offset_ += 8;
-    bit_offset_ -= 64;
+  int bit_offset = bit_offset_;
+  int byte_offset = byte_offset_;
+  uint64_t buffered_values = buffered_values_;
+  int max_bytes = max_bytes_;
+  const uint8_t* buffer = buffer_;
+
+  uint64_t needed_bits = num_bits * batch_size;
+  uint64_t remaining_bits = (max_bytes - byte_offset) * 8 - bit_offset;
+  if (remaining_bits < needed_bits) { batch_size = remaining_bits / num_bits; }
+
+  int i = 0;
+  if (UNLIKELY(bit_offset != 0)) {
+    for (; i < batch_size && bit_offset != 0; ++i) {
+      GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset,
+          &buffered_values);
+    }
+  }
 
-    int bytes_remaining = max_bytes_ - byte_offset_;
-    if (LIKELY(bytes_remaining >= 8)) {
-      memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
-    } else {
-      memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+  if (sizeof(T) == 4) {
+    int num_unpacked = unpack32(reinterpret_cast<const uint32_t*>(buffer + byte_offset),
+        reinterpret_cast<uint32_t*>(v + i), batch_size - i, num_bits);
+    i += num_unpacked;
+    byte_offset += num_unpacked * num_bits / 8;
+  } else {
+    const int buffer_size = 1024;
+    static uint32_t unpack_buffer[buffer_size];
+    while (i < batch_size) {
+      int unpack_size = std::min(buffer_size, batch_size - i);
+      int num_unpacked = unpack32(reinterpret_cast<const uint32_t*>(buffer + byte_offset),
+          unpack_buffer, unpack_size, num_bits);
+      if (num_unpacked == 0) { break; }
+      for (int k = 0; k < num_unpacked; ++k) {
+        v[i + k] = unpack_buffer[k];
+      }
+      i += num_unpacked;
+      byte_offset += num_unpacked * num_bits / 8;
     }
+  }
 
-    // Read bits of v that crossed into new buffered_values_
-    *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_)
-          << (num_bits - bit_offset_);
+  int bytes_remaining = max_bytes - byte_offset;
+  if (bytes_remaining >= 8) {
+    memcpy(&buffered_values, buffer + byte_offset, 8);
+  } else {
+    memcpy(&buffered_values, buffer + byte_offset, bytes_remaining);
   }
-  DCHECK_LE(bit_offset_, 64);
-  return true;
+
+  for (; i < batch_size; ++i) {
+    GetValue_(
+        num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset, &buffered_values);
+  }
+
+  bit_offset_ = bit_offset;
+  byte_offset_ = byte_offset;
+  buffered_values_ = buffered_values;
+
+  return batch_size;
 }
 
 template <typename T>