You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/02/17 00:12:15 UTC

parquet-cpp git commit: PARQUET-515: Add "SetData" to LevelDecoder

Repository: parquet-cpp
Updated Branches:
  refs/heads/master b71e826f0 -> 6df383666


PARQUET-515: Add "SetData" to LevelDecoder

This PR implements a SetData interface for the LevelDecoder class similar to existing value decoders.
This PR also adds a test for PARQUET-523

Author: Deepak Majeti <de...@hp.com>

Closes #51 from majetideepak/PARQUET-515 and squashes the following commits:

dde3654 [Deepak Majeti] fixed headers order
c26db08 [Deepak Majeti] rebased with upstream
1420bbf [Deepak Majeti] PARQUET-515


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/6df38366
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/6df38366
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/6df38366

Branch: refs/heads/master
Commit: 6df383666533b8c3273d791a3870e462097842f9
Parents: b71e826
Author: Deepak Majeti <de...@hp.com>
Authored: Tue Feb 16 15:12:09 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Tue Feb 16 15:12:09 2016 -0800

----------------------------------------------------------------------
 src/parquet/column/column-reader-test.cc |  58 ++++++++++-
 src/parquet/column/levels-test.cc        | 139 +++++++++++++++++++-------
 src/parquet/column/levels.h              |  33 ++++--
 src/parquet/column/reader.cc             |  18 ++--
 src/parquet/util/bit-stream-utils.h      |   2 +
 5 files changed, 192 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6df38366/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 0abdf79..c2c0aa3 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -181,6 +181,62 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
   ASSERT_EQ(0, batch_actual);
   ASSERT_EQ(0, values_read);
 }
-} // namespace test
 
+TEST_F(TestPrimitiveReader, TestInt32FlatRepeatedMultiplePages) {
+  vector<int32_t> values[2] = {{1, 2, 3, 4, 5},
+    {6, 7, 8, 9, 10}};
+  vector<int16_t> def_levels[2] = {{2, 1, 1, 2, 2, 1, 1, 2, 2, 1},
+    {2, 2, 1, 2, 1, 1, 2, 1, 2, 1}};
+  vector<int16_t> rep_levels[2] = {{0, 1, 1, 0, 0, 1, 1, 0, 0, 1},
+    {0, 0, 1, 0, 1, 1, 0, 1, 0, 1}};
+
+  std::vector<uint8_t> buffer[4];
+  std::shared_ptr<DataPage> page;
+
+  for (int i = 0; i < 4; i++) {
+    page = MakeDataPage<Type::INT32>(values[i % 2],
+        def_levels[i % 2], 2, rep_levels[i % 2], 1, &buffer[i]);
+    pages_.push_back(page);
+  }
+
+  NodePtr type = schema::Int32("a", Repetition::REPEATED);
+  ColumnDescriptor descr(type, 2, 1);
+  InitReader(&descr);
+
+  Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+
+  size_t values_read = 0;
+  size_t batch_actual = 0;
+
+  vector<int32_t> vresult(3, -1);
+  vector<int16_t> dresult(5, -1);
+  vector<int16_t> rresult(5, -1);
+
+  for (int i = 0; i < 4; i++) {
+    batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
+        &vresult[0], &values_read);
+    ASSERT_EQ(5, batch_actual);
+    ASSERT_EQ(3, values_read);
+
+    ASSERT_TRUE(vector_equal(vresult, slice(values[i % 2], 0, 3)));
+    ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 0, 5)));
+    ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 0, 5)));
+
+    batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
+        &vresult[0], &values_read);
+    ASSERT_EQ(5, batch_actual);
+    ASSERT_EQ(2, values_read);
+
+    ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values[i % 2], 3, 5)));
+    ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 5, 10)));
+    ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 5, 10)));
+  }
+  // EOS, pass all nullptrs to check for improper writes. Do not segfault /
+  // core dump
+  batch_actual = reader->ReadBatch(5, nullptr, nullptr,
+      nullptr, &values_read);
+  ASSERT_EQ(0, batch_actual);
+  ASSERT_EQ(0, values_read);
+}
+} // namespace test
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6df38366/src/parquet/column/levels-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc
index 6061d23..62188db 100644
--- a/src/parquet/column/levels-test.cc
+++ b/src/parquet/column/levels-test.cc
@@ -16,8 +16,8 @@
 // under the License.
 
 #include <cstdint>
-#include <string>
 #include <vector>
+#include <string>
 
 #include <gtest/gtest.h>
 
@@ -28,97 +28,164 @@ using std::string;
 
 namespace parquet_cpp {
 
-int GenerateLevels(int min_repeat_factor, int max_repeat_factor,
+void GenerateLevels(int min_repeat_factor, int max_repeat_factor,
     int max_level, std::vector<int16_t>& input_levels) {
   int total_count = 0;
   // for each repetition count upto max_repeat_factor
   for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
-    // repeat count increase by a factor of 2 for every iteration
+    // repeat count increases by a factor of 2 for every iteration
     int repeat_count = (1 << repeat);
     // generate levels for repetition count upto the maximum level
     int value = 0;
     int bwidth = 0;
     while (value <= max_level) {
       for (int i = 0; i < repeat_count; i++) {
-        input_levels[total_count++] = value;
+        input_levels.push_back(value);
       }
       value = (2 << bwidth) - 1;
       bwidth++;
     }
   }
-  return total_count;
 }
 
-void VerifyLevelsEncoding(Encoding::type encoding, int max_level,
-    std::vector<int16_t>& input_levels) {
+void EncodeLevels(Encoding::type encoding, int max_level, int num_levels,
+    const int16_t* input_levels, std::vector<uint8_t>& bytes) {
   LevelEncoder encoder;
-  LevelDecoder decoder;
   int levels_count = 0;
-  std::vector<int16_t> output_levels;
-  std::vector<uint8_t> bytes;
-  int num_levels = input_levels.size();
-  output_levels.resize(num_levels);
   bytes.resize(2 * num_levels);
-  ASSERT_EQ(num_levels, output_levels.size());
   ASSERT_EQ(2 * num_levels, bytes.size());
-  // start encoding and decoding
+  // encode levels
   if (encoding == Encoding::RLE) {
     // leave space to write the rle length value
     encoder.Init(encoding, max_level, num_levels,
         bytes.data() + sizeof(uint32_t), bytes.size());
 
-    levels_count = encoder.Encode(num_levels, input_levels.data());
+    levels_count = encoder.Encode(num_levels, input_levels);
     (reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len();
-
   } else {
     encoder.Init(encoding, max_level, num_levels,
         bytes.data(), bytes.size());
-    levels_count = encoder.Encode(num_levels, input_levels.data());
+    levels_count = encoder.Encode(num_levels, input_levels);
   }
-
   ASSERT_EQ(num_levels, levels_count);
+}
 
-  decoder.Init(encoding, max_level, num_levels, bytes.data());
-  levels_count = decoder.Decode(num_levels, output_levels.data());
+void VerifyDecodingLevels(Encoding::type encoding, int max_level,
+    std::vector<int16_t>& input_levels, std::vector<uint8_t>& bytes) {
+  LevelDecoder decoder;
+  int levels_count = 0;
+  std::vector<int16_t> output_levels;
+  int num_levels = input_levels.size();
 
-  ASSERT_EQ(num_levels, levels_count);
+  output_levels.resize(num_levels);
+  ASSERT_EQ(num_levels, output_levels.size());
 
-  for (int i = 0; i < num_levels; i++) {
-    EXPECT_EQ(input_levels[i], output_levels[i]);
+  // Decode levels and test with multiple decode calls
+  decoder.SetData(encoding, max_level, num_levels, bytes.data());
+  int decode_count = 4;
+  int num_inner_levels = num_levels / decode_count;
+  // Try multiple decoding on a single SetData call
+  for (int ct = 0; ct < decode_count; ct++) {
+    int offset = ct * num_inner_levels;
+    levels_count = decoder.Decode(num_inner_levels, output_levels.data());
+    ASSERT_EQ(num_inner_levels, levels_count);
+    for (int i = 0; i < num_inner_levels; i++) {
+      EXPECT_EQ(input_levels[i + offset], output_levels[i]);
+    }
   }
+  // check the remaining levels
+  int num_levels_completed = decode_count * (num_levels / decode_count);
+  int num_remaining_levels =  num_levels - num_levels_completed;
+  if (num_remaining_levels > 0) {
+    levels_count = decoder.Decode(num_remaining_levels, output_levels.data());
+    ASSERT_EQ(num_remaining_levels, levels_count);
+    for (int i = 0; i < num_remaining_levels; i++) {
+      EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]);
+    }
+  }
+  //Test zero Decode values
+  ASSERT_EQ(0, decoder.Decode(1, output_levels.data()));
 }
 
-TEST(TestLevels, TestEncodeDecodeLevels) {
-  // test levels with maximum bit-width from 1 to 8
-  // increase the repetition count for each iteration by a factor of 2
+void VerifyDecodingMultipleSetData(Encoding::type encoding, int max_level,
+    std::vector<int16_t>& input_levels, std::vector<std::vector<uint8_t>>& bytes) {
+  LevelDecoder decoder;
+  int levels_count = 0;
+  std::vector<int16_t> output_levels;
 
+  // Decode levels and test with multiple SetData calls
+  int setdata_count = bytes.size();
+  int num_levels = input_levels.size() / setdata_count;
+  output_levels.resize(num_levels);
+  // Try multiple SetData
+  for (int ct = 0; ct < setdata_count; ct++) {
+    int offset = ct * num_levels;
+    ASSERT_EQ(num_levels, output_levels.size());
+    decoder.SetData(encoding, max_level, num_levels, bytes[ct].data());
+    levels_count = decoder.Decode(num_levels, output_levels.data());
+    ASSERT_EQ(num_levels, levels_count);
+    for (int i = 0; i < num_levels; i++) {
+      EXPECT_EQ(input_levels[i + offset], output_levels[i]);
+    }
+  }
+}
+
+// Test levels with maximum bit-width from 1 to 8
+// increase the repetition count for each iteration by a factor of 2
+TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) {
   int min_repeat_factor = 0;
   int max_repeat_factor = 7; // 128
   int max_bit_width = 8;
   std::vector<int16_t> input_levels;
-  Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
+  std::vector<uint8_t> bytes;
+  Encoding::type encodings[2] = {Encoding::RLE,
+      Encoding::BIT_PACKED};
 
   // for each encoding
   for (int encode = 0; encode < 2; encode++) {
     Encoding::type encoding = encodings[encode];
     // BIT_PACKED requires a sequence of atleast 8
     if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3;
-
     // for each maximum bit-width
     for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
-      int num_levels_per_width = ((2 << max_repeat_factor) - (1 << min_repeat_factor));
-      int num_levels = (bit_width + 1) * num_levels_per_width;
-      input_levels.resize(num_levels);
-      ASSERT_EQ(num_levels, input_levels.size());
-
       // find the maximum level for the current bit_width
       int max_level = (1 << bit_width) - 1;
       // Generate levels
-      int total_count = GenerateLevels(min_repeat_factor, max_repeat_factor,
+      GenerateLevels(min_repeat_factor, max_repeat_factor,
           max_level, input_levels);
-      ASSERT_EQ(num_levels, total_count);
-      VerifyLevelsEncoding(encoding, max_level, input_levels);
+      EncodeLevels(encoding, max_level, input_levels.size(), input_levels.data(), bytes);
+      VerifyDecodingLevels(encoding, max_level, input_levels, bytes);
+      input_levels.clear();
+    }
+  }
+}
+
+// Test multiple decoder SetData calls
+TEST(TestLevels, TestLevelsDecodeMultipleSetData) {
+  int min_repeat_factor = 3;
+  int max_repeat_factor = 7; // 128
+  int bit_width = 8;
+  int max_level = (1 << bit_width) - 1;
+  std::vector<int16_t> input_levels;
+  std::vector<std::vector<uint8_t>> bytes;
+  Encoding::type encodings[2] = {Encoding::RLE,
+      Encoding::BIT_PACKED};
+  GenerateLevels(min_repeat_factor, max_repeat_factor,
+      max_level, input_levels);
+  int num_levels = input_levels.size();
+  int setdata_factor = 8;
+  int split_level_size = num_levels / setdata_factor;
+  bytes.resize(setdata_factor);
+
+  // for each encoding
+  for (int encode = 0; encode < 2; encode++) {
+    Encoding::type encoding = encodings[encode];
+    for (int rf = 0; rf < setdata_factor; rf++) {
+      int offset = rf * split_level_size;
+      EncodeLevels(encoding, max_level, split_level_size,
+          reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf]);
     }
+    VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6df38366/src/parquet/column/levels.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels.h b/src/parquet/column/levels.h
index 18fd0bb..a026604 100644
--- a/src/parquet/column/levels.h
+++ b/src/parquet/column/levels.h
@@ -19,6 +19,7 @@
 #define PARQUET_COLUMN_LEVELS_H
 
 #include <memory>
+#include <algorithm>
 
 #include "parquet/exception.h"
 #include "parquet/types.h"
@@ -96,25 +97,35 @@ class LevelEncoder {
 
 class LevelDecoder {
  public:
-  LevelDecoder() {}
+  LevelDecoder() : num_values_remaining_(0) {}
 
-  // Initialize the LevelDecoder and return the number of bytes consumed
-  size_t Init(Encoding::type encoding, int16_t max_level,
+  // Initialize the LevelDecoder state with new data
+  // and return the number of bytes consumed
+  size_t SetData(Encoding::type encoding, int16_t max_level,
       int num_buffered_values, const uint8_t* data) {
     uint32_t num_bytes = 0;
     uint32_t total_bytes = 0;
-    bit_width_ = BitUtil::Log2(max_level + 1);
     encoding_ = encoding;
+    num_values_remaining_ = num_buffered_values;
+    bit_width_ = BitUtil::Log2(max_level + 1);
     switch (encoding) {
       case Encoding::RLE: {
         num_bytes = *reinterpret_cast<const uint32_t*>(data);
         const uint8_t* decoder_data = data + sizeof(uint32_t);
-        rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
+        if (!rle_decoder_) {
+          rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
+        } else {
+          rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
+        }
         return sizeof(uint32_t) + num_bytes;
       }
       case Encoding::BIT_PACKED: {
         num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);
-        bit_packed_decoder_.reset(new BitReader(data, num_bytes));
+        if (!bit_packed_decoder_) {
+          bit_packed_decoder_.reset(new BitReader(data, num_bytes));
+        } else {
+          bit_packed_decoder_->Reset(data, num_bytes);
+        }
         return num_bytes;
       }
       default:
@@ -126,30 +137,30 @@ class LevelDecoder {
   // Decodes a batch of levels into an array and returns the number of levels decoded
   size_t Decode(size_t batch_size, int16_t* levels) {
     size_t num_decoded = 0;
-    if (!rle_decoder_ && !bit_packed_decoder_) {
-      throw ParquetException("Level decoders are not initialized.");
-    }
 
+    size_t num_values = std::min(num_values_remaining_, batch_size);
     if (encoding_ == Encoding::RLE) {
-      for (size_t i = 0; i < batch_size; ++i) {
+      for (size_t i = 0; i < num_values; ++i) {
         if (!rle_decoder_->Get(levels + i)) {
           break;
         }
         ++num_decoded;
       }
     } else {
-      for (size_t i = 0; i < batch_size; ++i) {
+      for (size_t i = 0; i < num_values; ++i) {
         if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) {
           break;
         }
         ++num_decoded;
       }
     }
+    num_values_remaining_ -= num_decoded;
     return num_decoded;
   }
 
  private:
   int bit_width_;
+  size_t num_values_remaining_;
   Encoding::type encoding_;
   std::unique_ptr<RleDecoder> rle_decoder_;
   std::unique_ptr<BitReader> bit_packed_decoder_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6df38366/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index 4ba0616..4011347 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -97,15 +97,13 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
       // the page size to determine the number of bytes in the encoded data.
       size_t data_size = page->size();
 
-      int16_t max_definition_level = descr_->max_definition_level();
-      int16_t max_repetition_level = descr_->max_repetition_level();
       //Data page Layout: Repetition Levels - Definition Levels - encoded values.
       //Levels are encoded as rle or bit-packed.
       //Init repetition levels
-      if (max_repetition_level > 0) {
-        size_t rep_levels_bytes = repetition_level_decoder_.Init(
-            page->repetition_level_encoding(),
-            max_repetition_level, num_buffered_values_, buffer);
+      if (descr_->max_repetition_level() > 0) {
+        size_t rep_levels_bytes = repetition_level_decoder_.SetData(
+            page->repetition_level_encoding(), descr_->max_repetition_level(),
+            num_buffered_values_, buffer);
         buffer += rep_levels_bytes;
         data_size -= rep_levels_bytes;
       }
@@ -113,10 +111,10 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
       //if the initial value is invalid
 
       //Init definition levels
-      if (max_definition_level > 0) {
-        size_t def_levels_bytes = definition_level_decoder_.Init(
-            page->definition_level_encoding(),
-            max_definition_level, num_buffered_values_, buffer);
+      if (descr_->max_definition_level() > 0) {
+        size_t def_levels_bytes = definition_level_decoder_.SetData(
+            page->definition_level_encoding(), descr_->max_definition_level(),
+            num_buffered_values_, buffer);
         buffer += def_levels_bytes;
         data_size -= def_levels_bytes;
       }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6df38366/src/parquet/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-stream-utils.h b/src/parquet/util/bit-stream-utils.h
index 3e8f95c..b93b90e 100644
--- a/src/parquet/util/bit-stream-utils.h
+++ b/src/parquet/util/bit-stream-utils.h
@@ -118,6 +118,8 @@ class BitReader {
     max_bytes_ = buffer_len;
     byte_offset_ = 0;
     bit_offset_ = 0;
+    int num_bytes = std::min(8, max_bytes_ - byte_offset_);
+    memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes);
   }
 
   /// Gets the next value from the buffer.  Returns true if 'v' could be read or false if