You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/12/14 16:57:37 UTC
[arrow] branch master updated: ARROW-17798: [C++][Parquet] Add DELTA_BINARY_PACKED encoder to Parquet writer (#14191)
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 1b3d4afc79 ARROW-17798: [C++][Parquet] Add DELTA_BINARY_PACKED encoder to Parquet writer (#14191)
1b3d4afc79 is described below
commit 1b3d4afc796e785717f73c867157f4c7075007dd
Author: Rok Mihevc <ro...@mihevc.org>
AuthorDate: Wed Dec 14 17:57:28 2022 +0100
ARROW-17798: [C++][Parquet] Add DELTA_BINARY_PACKED encoder to Parquet writer (#14191)
This is to add DELTA_BINARY_PACKED encoder.
Lead-authored-by: Rok Mihevc <ro...@mihevc.org>
Co-authored-by: Will Jones <wi...@gmail.com>
Co-authored-by: Antoine Pitrou <pi...@free.fr>
Co-authored-by: Gang Wu <us...@gmail.com>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/arrow/util/bit_stream_utils.h | 10 +-
cpp/src/arrow/util/rle_encoding_test.cc | 34 ++++
cpp/src/parquet/column_writer_test.cc | 15 +-
cpp/src/parquet/encoding.cc | 306 ++++++++++++++++++++++++++++-
cpp/src/parquet/encoding_test.cc | 132 +++++++++++++
docs/source/cpp/parquet.rst | 2 +-
python/pyarrow/tests/parquet/test_basic.py | 17 +-
7 files changed, 496 insertions(+), 20 deletions(-)
diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h
index 2f70c28650..dc9b41793c 100644
--- a/cpp/src/arrow/util/bit_stream_utils.h
+++ b/cpp/src/arrow/util/bit_stream_utils.h
@@ -203,9 +203,10 @@ class BitReader {
};
inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
- // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases)
- DCHECK_LE(num_bits, 32);
- DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits;
+ DCHECK_LE(num_bits, 64);
+ if (num_bits < 64) {
+ DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits;
+ }
if (ARROW_PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8))
return false;
@@ -220,7 +221,8 @@ inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
buffered_values_ = 0;
byte_offset_ += 8;
bit_offset_ -= 64;
- buffered_values_ = v >> (num_bits - bit_offset_);
+ buffered_values_ =
+ (num_bits - bit_offset_ == 64) ? 0 : (v >> (num_bits - bit_offset_));
}
DCHECK_LT(bit_offset_, 64);
return true;
diff --git a/cpp/src/arrow/util/rle_encoding_test.cc b/cpp/src/arrow/util/rle_encoding_test.cc
index 52f355daf2..01d1ffd767 100644
--- a/cpp/src/arrow/util/rle_encoding_test.cc
+++ b/cpp/src/arrow/util/rle_encoding_test.cc
@@ -173,6 +173,40 @@ TEST(BitArray, TestMixed) {
}
}
+// Write up to 'num_vals' values with width 'bit_width' and reads them back.
+static void TestPutValue(int bit_width, uint64_t num_vals) {
+ // The max value representable in `bit_width` bits.
+ const uint64_t max = std::numeric_limits<uint64_t>::max() >> (64 - bit_width);
+ num_vals = std::min(num_vals, max);
+ int len = static_cast<int>(bit_util::BytesForBits(bit_width * num_vals));
+ EXPECT_GT(len, 0);
+
+ std::vector<uint8_t> buffer(len);
+ bit_util::BitWriter writer(buffer.data(), len);
+ for (uint64_t i = max - num_vals; i < max; i++) {
+ bool result = writer.PutValue(i, bit_width);
+ EXPECT_TRUE(result);
+ }
+ writer.Flush();
+ EXPECT_EQ(writer.bytes_written(), len);
+
+ bit_util::BitReader reader(buffer.data(), len);
+ for (uint64_t i = max - num_vals; i < max; i++) {
+ int64_t val = 0;
+ bool result = reader.GetValue(bit_width, &val);
+ EXPECT_TRUE(result);
+ EXPECT_EQ(val, i);
+ }
+ EXPECT_EQ(reader.bytes_left(), 0);
+}
+
+TEST(BitUtil, RoundTripIntValues) {
+ for (int width = 1; width < 64; width++) {
+ TestPutValue(width, 1);
+ TestPutValue(width, 1024);
+ }
+}
+
// Validates encoding of values by encoding and decoding them. If
// expected_encoding != NULL, also validates that the encoded buffer is
// exactly 'expected_encoding'.
diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc
index 2cd21628b3..0da7826483 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -400,7 +400,8 @@ typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
TYPED_TEST_SUITE(TestPrimitiveWriter, TestTypes);
-using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>;
+using TestValuesWriterInt32Type = TestPrimitiveWriter<Int32Type>;
+using TestValuesWriterInt64Type = TestPrimitiveWriter<Int64Type>;
TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
this->TestRequiredWithEncoding(Encoding::PLAIN);
@@ -418,11 +419,17 @@ TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
}
+*/
+
+TEST_F(TestValuesWriterInt32Type, RequiredDeltaBinaryPacked) {
+ this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
+}
-TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
+TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) {
this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
}
+/*
TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
}
@@ -430,11 +437,11 @@ TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
}
+*/
TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
}
-*/
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true,
@@ -647,7 +654,7 @@ TEST(TestWriter, NullValuesBuffer) {
// PARQUET-719
// Test case for NULL values
-TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
+TEST_F(TestValuesWriterInt32Type, OptionalNullValueChunk) {
this->SetUpSchema(Repetition::OPTIONAL);
this->GenerateData(LARGE_SIZE);
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 44f762d711..4923870e9e 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -2060,6 +2060,285 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
}
};
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+/// Format
+/// [header] [block 1] [block 2] ... [block N]
+///
+/// Header
+/// [block size] [number of mini blocks per block] [total value count] [first value]
+///
+/// Block
+/// [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ // Maximum possible header size
+ static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+ static constexpr uint32_t kValuesPerBlock = 128;
+ static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+ using T = typename DType::c_type;
+ using UT = std::make_unsigned_t<T>;
+ using TypedEncoder<DType>::Put;
+
+ explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+ const uint32_t values_per_block = kValuesPerBlock,
+ const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+ : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+ values_per_block_(values_per_block),
+ mini_blocks_per_block_(mini_blocks_per_block),
+ values_per_mini_block_(values_per_block / mini_blocks_per_block),
+ deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+ bits_buffer_(
+ AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * sizeof(T))),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {
+ if (values_per_block_ % 128 != 0) {
+ throw ParquetException(
+ "the number of values in a block must be multiple of 128, but it's " +
+ std::to_string(values_per_block_));
+ }
+ if (values_per_mini_block_ % 32 != 0) {
+ throw ParquetException(
+ "the number of values in a miniblock must be multiple of 32, but it's " +
+ std::to_string(values_per_mini_block_));
+ }
+ if (values_per_block % mini_blocks_per_block != 0) {
+ throw ParquetException(
+ "the number of values per block % number of miniblocks per block must be 0, "
+ "but it's " +
+ std::to_string(values_per_block % mini_blocks_per_block));
+ }
+ // Reserve enough space at the beginning of the buffer for largest possible header.
+ PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ void Put(const ::arrow::Array& values) override;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ void FlushBlock();
+
+ private:
+ const uint32_t values_per_block_;
+ const uint32_t mini_blocks_per_block_;
+ const uint32_t values_per_mini_block_;
+ uint32_t values_current_block_{0};
+ uint32_t total_value_count_{0};
+ UT first_value_{0};
+ UT current_value_{0};
+ ArrowPoolVector<UT> deltas_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+ if (num_values == 0) {
+ return;
+ }
+
+ int idx = 0;
+ if (total_value_count_ == 0) {
+ current_value_ = src[0];
+ first_value_ = current_value_;
+ idx = 1;
+ }
+ total_value_count_ += num_values;
+
+ while (idx < num_values) {
+ UT value = static_cast<UT>(src[idx]);
+ // Calculate deltas. The possible overflow is handled by use of unsigned integers
+ // making subtraction operations well defined and correct even in case of overflow.
+ // Encoded integers will wrap back around on decoding.
+ // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+ deltas_[values_current_block_] = value - current_value_;
+ current_value_ = value;
+ idx++;
+ values_current_block_++;
+ if (values_current_block_ == values_per_block_) {
+ FlushBlock();
+ }
+ }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+ if (values_current_block_ == 0) {
+ return;
+ }
+
+ const UT min_delta =
+ *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+ bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+ // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write
+ // bit widths of miniblocks as they become known during the encoding.
+ uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+ DCHECK(bit_width_data != nullptr);
+
+ const uint32_t num_miniblocks =
+ static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+ static_cast<double>(values_per_mini_block_)));
+ for (uint32_t i = 0; i < num_miniblocks; i++) {
+ const uint32_t values_current_mini_block =
+ std::min(values_per_mini_block_, values_current_block_);
+
+ const uint32_t start = i * values_per_mini_block_;
+ const UT max_delta = *std::max_element(
+ deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+ // The minimum number of bits required to write any of values in deltas_ vector.
+ // See overflow comment above.
+ const auto bit_width = bit_width_data[i] =
+ bit_util::NumRequiredBits(max_delta - min_delta);
+
+ for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+ // See overflow comment above.
+ const UT value = deltas_[j] - min_delta;
+ bit_writer_.PutValue(value, bit_width);
+ }
+ // If there are not enough values to fill the last mini block, we pad the mini block
+ // with zeroes so that its length is the number of values in a full mini block
+ // multiplied by the bit width.
+ for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+ bit_writer_.PutValue(0, bit_width);
+ }
+ values_current_block_ -= values_current_mini_block;
+ }
+
+ // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+ // needed to store the values, the bytes storing the bit widths of the unneeded
+ // miniblocks are still present, their value should be zero, but readers must accept
+ // arbitrary values as well.
+ for (uint32_t i = num_miniblocks; i < mini_blocks_per_block_; i++) {
+ bit_width_data[i] = 0;
+ }
+ DCHECK_EQ(values_current_block_, 0);
+
+ bit_writer_.Flush();
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+ bit_writer_.Clear();
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+ if (values_current_block_ > 0) {
+ FlushBlock();
+ }
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true));
+
+ uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {};
+ bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_));
+ if (!header_writer.PutVlqInt(values_per_block_) ||
+ !header_writer.PutVlqInt(mini_blocks_per_block_) ||
+ !header_writer.PutVlqInt(total_value_count_) ||
+ !header_writer.PutZigZagVlqInt(static_cast<T>(first_value_))) {
+ throw ParquetException("header writing error");
+ }
+ header_writer.Flush();
+
+ // We reserved enough space at the beginning of the buffer for largest possible header
+ // and data was written immediately after. We now write the header data immediately
+ // before the end of reserved space.
+ const size_t offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written();
+ std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_,
+ header_writer.bytes_written());
+
+ // Excess bytes at the beginning are sliced off and ignored.
+ return SliceBuffer(buffer, offset_bytes);
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+ const ::arrow::ArrayData& data = *values.data();
+ if (values.type_id() != ::arrow::Type::INT32) {
+ throw ParquetException("Expected Int32TArray, got ", values.type()->ToString());
+ }
+ if (data.length > std::numeric_limits<int32_t>::max()) {
+ throw ParquetException("Array cannot be longer than ",
+ std::numeric_limits<int32_t>::max());
+ }
+
+ if (values.null_count() == 0) {
+ Put(data.GetValues<int32_t>(1), static_cast<int>(data.length));
+ } else {
+ PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
+ data.GetValues<uint8_t>(0, 0), data.offset);
+ }
+}
+
+template <>
+void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
+ const ::arrow::ArrayData& data = *values.data();
+ if (values.type_id() != ::arrow::Type::INT64) {
+ throw ParquetException("Expected Int64TArray, got ", values.type()->ToString());
+ }
+ if (data.length > std::numeric_limits<int32_t>::max()) {
+ throw ParquetException("Array cannot be longer than ",
+ std::numeric_limits<int32_t>::max());
+ }
+ if (values.null_count() == 0) {
+ Put(data.GetValues<int64_t>(1), static_cast<int>(data.length));
+ } else {
+ PutSpaced(data.GetValues<int64_t>(1), static_cast<int>(data.length),
+ data.GetValues<uint8_t>(0, 0), data.offset);
+ }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::PutSpaced(const T* src, int num_values,
+ const uint8_t* valid_bits,
+ int64_t valid_bits_offset) {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+ this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+}
+
// ----------------------------------------------------------------------
// DeltaBitPackDecoder
@@ -2067,6 +2346,7 @@ template <typename DType>
class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
public:
typedef typename DType::c_type T;
+ using UT = std::make_unsigned_t<T>;
explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
MemoryPool* pool = ::arrow::default_memory_pool())
@@ -2141,6 +2421,11 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
if (values_per_block_ == 0) {
throw ParquetException("cannot have zero value per block");
}
+ if (values_per_block_ % 128 != 0) {
+ throw ParquetException(
+ "the number of values in a block must be multiple of 128, but it's " +
+ std::to_string(values_per_block_));
+ }
if (mini_blocks_per_block_ == 0) {
throw ParquetException("cannot have zero miniblock per block");
}
@@ -2210,10 +2495,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
}
for (int j = 0; j < values_decode; ++j) {
// Addition between min_delta, packed int and last_value should be treated as
- // unsigned addtion. Overflow is as expected.
- uint64_t delta =
- static_cast<uint64_t>(min_delta_) + static_cast<uint64_t>(buffer[i + j]);
- buffer[i + j] = static_cast<T>(delta + static_cast<uint64_t>(last_value_));
+ // unsigned addition. Overflow is as expected.
+ buffer[i + j] = static_cast<UT>(min_delta_) + static_cast<UT>(buffer[i + j]) +
+ static_cast<UT>(last_value_);
last_value_ = buffer[i + j];
}
values_current_mini_block_ -= values_decode;
@@ -2760,6 +3044,17 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encodin
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
break;
}
+ } else if (encoding == Encoding::DELTA_BINARY_PACKED) {
+ switch (type_num) {
+ case Type::INT32:
+ return std::unique_ptr<Encoder>(new DeltaBitPackEncoder<Int32Type>(descr, pool));
+ case Type::INT64:
+ return std::unique_ptr<Encoder>(new DeltaBitPackEncoder<Int64Type>(descr, pool));
+ default:
+ throw ParquetException(
+ "DELTA_BINARY_PACKED encoder only supports INT32 and INT64");
+ break;
+ }
} else {
ParquetException::NYI("Selected encoding is not supported");
}
@@ -2807,7 +3102,8 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encodin
case Type::INT64:
return std::make_unique<DeltaBitPackDecoder<Int64Type>>(descr);
default:
- throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64");
+ throw ParquetException(
+ "DELTA_BINARY_PACKED decoder only supports INT32 and INT64");
break;
}
} else if (encoding == Encoding::DELTA_BYTE_ARRAY) {
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 7d42e3e8ce..f0a5f32c41 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -124,6 +124,12 @@ void GenerateData(int num_values, T* out, std::vector<uint8_t>* heap) {
std::numeric_limits<T>::max(), out);
}
+template <typename T>
+void GenerateBoundData(int num_values, T* out, T min, T max, std::vector<uint8_t>* heap) {
+ // seed the prng so failure is deterministic
+ random_numbers(num_values, 0, min, max, out);
+}
+
template <>
void GenerateData<bool>(int num_values, bool* out, std::vector<uint8_t>* heap) {
// seed the prng so failure is deterministic
@@ -1276,5 +1282,131 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), ParquetException);
}
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+ using c_type = typename Type::c_type;
+ static constexpr int TYPE = Type::type_num;
+
+ void InitBoundData(int nvalues, int repeats, c_type half_range) {
+ num_values_ = nvalues * repeats;
+ input_bytes_.resize(num_values_ * sizeof(c_type));
+ output_bytes_.resize(num_values_ * sizeof(c_type));
+ draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+ decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+ GenerateBoundData<c_type>(nvalues, draws_, -half_range, half_range, &data_buffer_);
+
+ // add some repeated values
+ for (int j = 1; j < repeats; ++j) {
+ for (int i = 0; i < nvalues; ++i) {
+ draws_[nvalues * j + i] = draws_[i];
+ }
+ }
+ }
+
+ void ExecuteBound(int nvalues, int repeats, c_type half_range) {
+ InitBoundData(nvalues, repeats, half_range);
+ CheckRoundtrip();
+ }
+
+ void ExecuteSpacedBound(int nvalues, int repeats, int64_t valid_bits_offset,
+ double null_probability, c_type half_range) {
+ InitBoundData(nvalues, repeats, half_range);
+
+ int64_t size = num_values_ + valid_bits_offset;
+ auto rand = ::arrow::random::RandomArrayGenerator(1923);
+ const auto array = rand.UInt8(size, 0, 100, null_probability);
+ const auto valid_bits = array->null_bitmap_data();
+ CheckRoundtripSpaced(valid_bits, valid_bits_offset);
+ }
+
+ void CheckRoundtrip() override {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+ auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+
+ encoder->Put(draws_, num_values_);
+ encode_buffer_ = encoder->FlushValues();
+
+ decoder->SetData(num_values_, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ int values_decoded = decoder->Decode(decode_buf_, num_values_);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+ }
+
+ void CheckRoundtripSpaced(const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ auto encoder =
+ MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+ auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+ int null_count = 0;
+ for (auto i = 0; i < num_values_; i++) {
+ if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+ null_count++;
+ }
+ }
+
+ encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+ encode_buffer_ = encoder->FlushValues();
+ decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+ static_cast<int>(encode_buffer_->size()));
+ auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+ valid_bits, valid_bits_offset);
+ ASSERT_EQ(num_values_, values_decoded);
+ ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+ valid_bits, valid_bits_offset));
+ }
+
+ protected:
+ USING_BASE_MEMBERS();
+ std::vector<uint8_t> input_bytes_;
+ std::vector<uint8_t> output_bytes_;
+};
+
+using TestDeltaBitPackEncodingTypes = ::testing::Types<Int32Type, Int64Type>;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+ using T = typename TypeParam::c_type;
+ int values_per_block = 128;
+ int values_per_mini_block = 32;
+
+ // Size a multiple of miniblock size
+ ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_mini_block * 10, 10));
+ // Size a multiple of block size
+ ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_block * 10, 10));
+ // Size multiple of neither miniblock nor block size
+ ASSERT_NO_FATAL_FAILURE(
+ this->Execute((values_per_mini_block * values_per_block) + 1, 10));
+ ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+ /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
+ /*null_probability*/ 0.1));
+
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(2000, 2000, 0));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound(
+ /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
+ /*null_probability*/ 0.1,
+ /*half_range*/ 0));
+
+ const int max_bitwidth = sizeof(T) * 8;
+ std::vector<int> bitwidths = {
+ 1, 2, 3, 5, 8, 11, 16, max_bitwidth - 8, max_bitwidth - 1, max_bitwidth};
+ for (int bitwidth : bitwidths) {
+ T half_range =
+ std::numeric_limits<T>::max() >> static_cast<uint32_t>(max_bitwidth - bitwidth);
+
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200, half_range));
+ ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound(
+ /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
+ /*null_probability*/ 0.1,
+ /*half_range*/ half_range));
+ }
+}
+
} // namespace test
} // namespace parquet
diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst
index 23a9657fd4..edc42d54cf 100644
--- a/docs/source/cpp/parquet.rst
+++ b/docs/source/cpp/parquet.rst
@@ -398,7 +398,7 @@ Encodings
+--------------------------+----------+----------+---------+
| BYTE_STREAM_SPLIT | ✓ | ✓ | |
+--------------------------+----------+----------+---------+
-| DELTA_BINARY_PACKED | ✓ | | |
+| DELTA_BINARY_PACKED | ✓ | ✓ | |
+--------------------------+----------+----------+---------+
| DELTA_BYTE_ARRAY | ✓ | | |
+--------------------------+----------+----------+---------+
diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py
index 004bbd8d77..b2809e0f91 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -405,6 +405,13 @@ def test_column_encoding(use_legacy_dataset):
column_encoding="PLAIN",
use_legacy_dataset=use_legacy_dataset)
+ # Check "DELTA_BINARY_PACKED" for integer columns.
+ _check_roundtrip(mixed_table, expected=mixed_table,
+ use_dictionary=False,
+ column_encoding={'a': "PLAIN",
+ 'b': "DELTA_BINARY_PACKED"},
+ use_legacy_dataset=use_legacy_dataset)
+
# Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'.
# This should throw an error as it is only supports FLOAT and DOUBLE.
with pytest.raises(IOError,
@@ -415,14 +422,12 @@ def test_column_encoding(use_legacy_dataset):
column_encoding={'b': "BYTE_STREAM_SPLIT"},
use_legacy_dataset=use_legacy_dataset)
- # Try to pass "DELTA_BINARY_PACKED".
- # This should throw an error as it is only supported for reading.
- with pytest.raises(IOError,
- match="Not yet implemented: Selected encoding is"
- " not supported."):
+ # Try to pass use "DELTA_BINARY_PACKED" encoding on float column.
+ # This should throw an error as only integers are supported.
+ with pytest.raises(OSError):
_check_roundtrip(mixed_table, expected=mixed_table,
use_dictionary=False,
- column_encoding={'b': "DELTA_BINARY_PACKED"},
+ column_encoding={'a': "DELTA_BINARY_PACKED"},
use_legacy_dataset=use_legacy_dataset)
# Try to pass "RLE_DICTIONARY".