You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/01/04 15:04:42 UTC
[arrow] branch master updated: GH-15052: [C++][Parquet] Fix DELTA_BINARY_PACKED decoder when reading only one value (#15124)
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 53d73f8f97 GH-15052: [C++][Parquet] Fix DELTA_BINARY_PACKED decoder when reading only one value (#15124)
53d73f8f97 is described below
commit 53d73f8f97516443cdcf98f71c0cbc527dae7dc4
Author: mwish <an...@qq.com>
AuthorDate: Wed Jan 4 23:04:35 2023 +0800
GH-15052: [C++][Parquet] Fix DELTA_BINARY_PACKED decoder when reading only one value (#15124)
This patch trying to fix https://github.com/apache/arrow/issues/15052 . The problem is mentioned here: https://github.com/apache/arrow/issues/15052#issuecomment-1367486164
When read 1 value, DeltaBitPackDecoder will not call `InitBlock`, causing it always read `last_value_`.
Seems the problem is introduced in https://github.com/apache/arrow/pull/10627 and https://github.com/amol-/arrow/commit/d982bedcf5e03d44c01949b192da54a8c1e525d8
I will add some test tonight
* Closes: #15052
Lead-authored-by: mwish <ma...@gmail.com>
Co-authored-by: Antoine Pitrou <an...@python.org>
Co-authored-by: mwish <15...@qq.com>
Co-authored-by: Rok Mihevc <ro...@mihevc.org>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/parquet/encoding.cc | 19 +++++++++++++++++--
cpp/src/parquet/encoding_test.cc | 30 ++++++++++++++++++++----------
2 files changed, 37 insertions(+), 12 deletions(-)
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index b761de69d3..b9472d72ae 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -2459,7 +2459,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
ParquetException::EofException();
}
if (bit_width_data[i] > kMaxDeltaBitWidth) {
- throw ParquetException("delta bit width larger than integer bit width");
+ throw ParquetException("delta bit width " + std::to_string(bit_width_data[i]) +
+ " larger than integer bit width " +
+ std::to_string(kMaxDeltaBitWidth));
}
}
mini_block_idx_ = 0;
@@ -2479,7 +2481,20 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
if (ARROW_PREDICT_FALSE(!block_initialized_)) {
buffer[i++] = last_value_;
- if (ARROW_PREDICT_FALSE(i == max_values)) break;
+ DCHECK_EQ(i, 1); // we're at the beginning of the page
+ if (ARROW_PREDICT_FALSE(i == max_values)) {
+ // When block is uninitialized and i reaches max_values we have two
+ // different possibilities:
+ // 1. total_value_count_ == 1, which means that the page may have only
+ // one value (encoded in the header), and we should not initialize
+ // any block.
+ // 2. total_value_count_ != 1, which means we should initialize the
+ // incoming block for subsequent reads.
+ if (total_value_count_ != 1) {
+ InitBlock();
+ }
+ break;
+ }
InitBlock();
} else {
++mini_block_idx_;
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 422d48e396..b8363d29cd 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -1290,7 +1290,8 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
public:
using c_type = typename Type::c_type;
static constexpr int TYPE = Type::type_num;
- static constexpr size_t ROUND_TRIP_TIMES = 3;
+ static constexpr size_t kNumRoundTrips = 3;
+ const std::vector<int> kReadBatchSizes = {1, 11};
void InitBoundData(int nvalues, int repeats, c_type half_range) {
num_values_ = nvalues * repeats;
@@ -1328,16 +1329,25 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
auto encoder =
MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
-
- for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) {
+ auto read_batch_sizes = kReadBatchSizes;
+ read_batch_sizes.push_back(num_values_);
+ // Encode a number of times to exercise the flush logic
+ for (size_t i = 0; i < kNumRoundTrips; ++i) {
encoder->Put(draws_, num_values_);
encode_buffer_ = encoder->FlushValues();
-
- decoder->SetData(num_values_, encode_buffer_->data(),
- static_cast<int>(encode_buffer_->size()));
- int values_decoded = decoder->Decode(decode_buf_, num_values_);
- ASSERT_EQ(num_values_, values_decoded);
- ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+ // Exercise different batch sizes
+ for (const int read_batch_size : read_batch_sizes) {
+ decoder->SetData(num_values_, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+
+ int values_decoded = 0;
+ while (values_decoded < num_values_) {
+ values_decoded +=
+ decoder->Decode(decode_buf_ + values_decoded, read_batch_size);
+ }
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+ }
}
}
@@ -1353,7 +1363,7 @@ class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
}
}
- for (size_t i = 0; i < ROUND_TRIP_TIMES; ++i) {
+ for (size_t i = 0; i < kNumRoundTrips; ++i) {
encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
encode_buffer_ = encoder->FlushValues();
decoder->SetData(num_values_ - null_count, encode_buffer_->data(),