You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/01/04 15:04:42 UTC

[arrow] branch master updated: GH-15052: [C++][Parquet] Fix DELTA_BINARY_PACKED decoder when reading only one value (#15124)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 53d73f8f97 GH-15052: [C++][Parquet] Fix DELTA_BINARY_PACKED decoder when reading only one value (#15124)
53d73f8f97 is described below

commit 53d73f8f97516443cdcf98f71c0cbc527dae7dc4
Author: mwish <an...@qq.com>
AuthorDate: Wed Jan 4 23:04:35 2023 +0800

    GH-15052: [C++][Parquet] Fix DELTA_BINARY_PACKED decoder when reading only one value (#15124)
    
    This patch trying to fix https://github.com/apache/arrow/issues/15052 . The problem is mentioned here: https://github.com/apache/arrow/issues/15052#issuecomment-1367486164
    
    When read 1 value, DeltaBitPackDecoder will not call `InitBlock`, causing it always read `last_value_`.
    
    Seems the problem is introduced in https://github.com/apache/arrow/pull/10627 and https://github.com/amol-/arrow/commit/d982bedcf5e03d44c01949b192da54a8c1e525d8
    
    I will add some test tonight
    * Closes: #15052
    
    Lead-authored-by: mwish <ma...@gmail.com>
    Co-authored-by: Antoine Pitrou <an...@python.org>
    Co-authored-by: mwish <15...@qq.com>
    Co-authored-by: Rok Mihevc <ro...@mihevc.org>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/parquet/encoding.cc      | 19 +++++++++++++++++--
 cpp/src/parquet/encoding_test.cc | 30 ++++++++++++++++++++----------
 2 files changed, 37 insertions(+), 12 deletions(-)

diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index b761de69d3..b9472d72ae 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -2459,7 +2459,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
         ParquetException::EofException();
       }
       if (bit_width_data[i] > kMaxDeltaBitWidth) {
-        throw ParquetException("delta bit width larger than integer bit width");
+        throw ParquetException("delta bit width " + std::to_string(bit_width_data[i]) +
+                               " larger than integer bit width " +
+                               std::to_string(kMaxDeltaBitWidth));
       }
     }
     mini_block_idx_ = 0;
@@ -2479,7 +2481,20 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
       if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
         if (ARROW_PREDICT_FALSE(!block_initialized_)) {
           buffer[i++] = last_value_;
-          if (ARROW_PREDICT_FALSE(i == max_values)) break;
+          DCHECK_EQ(i, 1);  // we're at the beginning of the page
+          if (ARROW_PREDICT_FALSE(i == max_values)) {
+            // When block is uninitialized and i reaches max_values we have two
+            // different possibilities:
+            // 1. total_value_count_ == 1, which means that the page may have only
+            // one value (encoded in the header), and we should not initialize
+            // any block.
+            // 2. total_value_count_ != 1, which means we should initialize the
+            // incoming block for subsequent reads.
+            if (total_value_count_ != 1) {
+              InitBlock();
+            }
+            break;
+          }
           InitBlock();
         } else {
           ++mini_block_idx_;
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 422d48e396..b8363d29cd 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -1290,7 +1290,8 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
  public:
   using c_type = typename Type::c_type;
   static constexpr int TYPE = Type::type_num;
-  static constexpr size_t ROUND_TRIP_TIMES = 3;
+  static constexpr size_t kNumRoundTrips = 3;
+  const std::vector<int> kReadBatchSizes = {1, 11};
 
   void InitBoundData(int nvalues, int repeats, c_type half_range) {
     num_values_ = nvalues * repeats;
@@ -1328,16 +1329,25 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
     auto encoder =
         MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
     auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
-
-    for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) {
+    auto read_batch_sizes = kReadBatchSizes;
+    read_batch_sizes.push_back(num_values_);
+    // Encode a number of times to exercise the flush logic
+    for (size_t i = 0; i < kNumRoundTrips; ++i) {
       encoder->Put(draws_, num_values_);
       encode_buffer_ = encoder->FlushValues();
-
-      decoder->SetData(num_values_, encode_buffer_->data(),
-                       static_cast<int>(encode_buffer_->size()));
-      int values_decoded = decoder->Decode(decode_buf_, num_values_);
-      ASSERT_EQ(num_values_, values_decoded);
-      ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+      // Exercise different batch sizes
+      for (const int read_batch_size : read_batch_sizes) {
+        decoder->SetData(num_values_, encode_buffer_->data(),
+                         static_cast<int>(encode_buffer_->size()));
+
+        int values_decoded = 0;
+        while (values_decoded < num_values_) {
+          values_decoded +=
+              decoder->Decode(decode_buf_ + values_decoded, read_batch_size);
+        }
+        ASSERT_EQ(num_values_, values_decoded);
+        ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+      }
     }
   }
 
@@ -1353,7 +1363,7 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
       }
     }
 
-    for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) {
+    for (size_t i = 0; i < kNumRoundTrips; ++i) {
       encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
       encode_buffer_ = encoder->FlushValues();
       decoder->SetData(num_values_ - null_count, encode_buffer_->data(),