You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ro...@apache.org on 2022/12/22 15:15:33 UTC

[arrow] branch master updated: ARROW-18437: [C++][Parquet] Fix encoder for DELTA_BINARY_PACKED when flushing more than once (#14959)

This is an automated email from the ASF dual-hosted git repository.

rok pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ff1002056e ARROW-18437: [C++][Parquet] Fix encoder for DELTA_BINARY_PACKED when flushing more than once (#14959)
ff1002056e is described below

commit ff1002056ec394811734fe00565b29e4ba15ec22
Author: mwish <15...@qq.com>
AuthorDate: Thu Dec 22 23:15:25 2022 +0800

    ARROW-18437: [C++][Parquet] Fix encoder for DELTA_BINARY_PACKED when flushing more than once (#14959)
    
    When flush more than one block, `DELTA_BINARY_PACKED` will be corrupt, because it didn't reset the context after flush.
    
    I'll add some tests this weekend.
    
    Authored-by: mwish <ma...@gmail.com>
    Signed-off-by: Rok Mihevc <ro...@mihevc.org>
---
 cpp/src/parquet/encoding.cc      |  7 ++++++-
 cpp/src/parquet/encoding_test.cc | 37 +++++++++++++++++++++----------------
 2 files changed, 27 insertions(+), 17 deletions(-)

diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 4923870e9e..9761dfd301 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -2186,7 +2186,7 @@ void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
   while (idx < num_values) {
     UT value = static_cast<UT>(src[idx]);
     // Calculate deltas. The possible overflow is handled by use of unsigned integers
-    // making subtraction operations well defined and correct even in case of overflow.
+    // making subtraction operations well-defined and correct even in case of overflow.
     // Encoded integers will wrap back around on decoding.
     // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
     deltas_[values_current_block_] = value - current_value_;
@@ -2282,6 +2282,11 @@ std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
   std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_,
               header_writer.bytes_written());
 
+  // Reset counter of cached values
+  total_value_count_ = 0;
+  // Reserve enough space at the beginning of the buffer for largest possible header.
+  PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+
   // Excess bytes at the beginning are sliced off and ignored.
   return SliceBuffer(buffer, offset_bytes);
 }
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index f0a5f32c41..3b4cafab82 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -1290,6 +1290,7 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
  public:
   using c_type = typename Type::c_type;
   static constexpr int TYPE = Type::type_num;
+  static constexpr size_t ROUND_TRIP_TIMES = 3;
 
   void InitBoundData(int nvalues, int repeats, c_type half_range) {
     num_values_ = nvalues * repeats;
@@ -1328,14 +1329,16 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
         MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
     auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
 
-    encoder->Put(draws_, num_values_);
-    encode_buffer_ = encoder->FlushValues();
+    for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) {
+      encoder->Put(draws_, num_values_);
+      encode_buffer_ = encoder->FlushValues();
 
-    decoder->SetData(num_values_, encode_buffer_->data(),
-                     static_cast<int>(encode_buffer_->size()));
-    int values_decoded = decoder->Decode(decode_buf_, num_values_);
-    ASSERT_EQ(num_values_, values_decoded);
-    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+      decoder->SetData(num_values_, encode_buffer_->data(),
+                       static_cast<int>(encode_buffer_->size()));
+      int values_decoded = decoder->Decode(decode_buf_, num_values_);
+      ASSERT_EQ(num_values_, values_decoded);
+      ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+    }
   }
 
   void CheckRoundtripSpaced(const uint8_t* valid_bits,
@@ -1350,15 +1353,17 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
       }
     }
 
-    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
-    encode_buffer_ = encoder->FlushValues();
-    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
-                     static_cast<int>(encode_buffer_->size()));
-    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
-                                                valid_bits, valid_bits_offset);
-    ASSERT_EQ(num_values_, values_decoded);
-    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
-                                                        valid_bits, valid_bits_offset));
+    for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) {
+      encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+      encode_buffer_ = encoder->FlushValues();
+      decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                       static_cast<int>(encode_buffer_->size()));
+      auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                  valid_bits, valid_bits_offset);
+      ASSERT_EQ(num_values_, values_decoded);
+      ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(
+          decode_buf_, draws_, num_values_, valid_bits, valid_bits_offset));
+    }
   }
 
  protected: