You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ro...@apache.org on 2022/12/22 15:15:33 UTC
[arrow] branch master updated: ARROW-18437: [C++][Parquet] Fix encoder for DELTA_BINARY_PACKED when flushing more than once (#14959)
This is an automated email from the ASF dual-hosted git repository.
rok pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ff1002056e ARROW-18437: [C++][Parquet] Fix encoder for DELTA_BINARY_PACKED when flushing more than once (#14959)
ff1002056e is described below
commit ff1002056ec394811734fe00565b29e4ba15ec22
Author: mwish <15...@qq.com>
AuthorDate: Thu Dec 22 23:15:25 2022 +0800
ARROW-18437: [C++][Parquet] Fix encoder for DELTA_BINARY_PACKED when flushing more than once (#14959)
When flush more than one block, `DELTA_BINARY_PACKED` will be corrupt, because it didn't reset the context after flush.
I'll add some tests this weekend.
Authored-by: mwish <ma...@gmail.com>
Signed-off-by: Rok Mihevc <ro...@mihevc.org>
---
cpp/src/parquet/encoding.cc | 7 ++++++-
cpp/src/parquet/encoding_test.cc | 37 +++++++++++++++++++++----------------
2 files changed, 27 insertions(+), 17 deletions(-)
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 4923870e9e..9761dfd301 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -2186,7 +2186,7 @@ void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
while (idx < num_values) {
UT value = static_cast<UT>(src[idx]);
// Calculate deltas. The possible overflow is handled by use of unsigned integers
- // making subtraction operations well defined and correct even in case of overflow.
+ // making subtraction operations well-defined and correct even in case of overflow.
// Encoded integers will wrap back around on decoding.
// See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
deltas_[values_current_block_] = value - current_value_;
@@ -2282,6 +2282,11 @@ std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_,
header_writer.bytes_written());
+ // Reset counter of cached values
+ total_value_count_ = 0;
+ // Reserve enough space at the beginning of the buffer for largest possible header.
+ PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+
// Excess bytes at the beginning are sliced off and ignored.
return SliceBuffer(buffer, offset_bytes);
}
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index f0a5f32c41..3b4cafab82 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -1290,6 +1290,7 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
public:
using c_type = typename Type::c_type;
static constexpr int TYPE = Type::type_num;
+ static constexpr size_t ROUND_TRIP_TIMES = 3;
void InitBoundData(int nvalues, int repeats, c_type half_range) {
num_values_ = nvalues * repeats;
@@ -1328,14 +1329,16 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
- encoder->Put(draws_, num_values_);
- encode_buffer_ = encoder->FlushValues();
+ for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) {
+ encoder->Put(draws_, num_values_);
+ encode_buffer_ = encoder->FlushValues();
- decoder->SetData(num_values_, encode_buffer_->data(),
- static_cast<int>(encode_buffer_->size()));
- int values_decoded = decoder->Decode(decode_buf_, num_values_);
- ASSERT_EQ(num_values_, values_decoded);
- ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+ decoder->SetData(num_values_, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ int values_decoded = decoder->Decode(decode_buf_, num_values_);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+ }
}
void CheckRoundtripSpaced(const uint8_t* valid_bits,
@@ -1350,15 +1353,17 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
}
}
- encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
- encode_buffer_ = encoder->FlushValues();
- decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
- static_cast<int>(encode_buffer_->size()));
- auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
- valid_bits, valid_bits_offset);
- ASSERT_EQ(num_values_, values_decoded);
- ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
- valid_bits, valid_bits_offset));
+ for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) {
+ encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+ encode_buffer_ = encoder->FlushValues();
+ decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+ valid_bits, valid_bits_offset);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(
+ decode_buf_, draws_, num_values_, valid_bits, valid_bits_offset));
+ }
}
protected: