You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/12/14 16:57:37 UTC

[arrow] branch master updated: ARROW-17798: [C++][Parquet] Add DELTA_BINARY_PACKED encoder to Parquet writer (#14191)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b3d4afc79 ARROW-17798: [C++][Parquet] Add DELTA_BINARY_PACKED encoder to Parquet writer (#14191)
1b3d4afc79 is described below

commit 1b3d4afc796e785717f73c867157f4c7075007dd
Author: Rok Mihevc <ro...@mihevc.org>
AuthorDate: Wed Dec 14 17:57:28 2022 +0100

    ARROW-17798: [C++][Parquet] Add DELTA_BINARY_PACKED encoder to Parquet writer (#14191)
    
    This is to add DELTA_BINARY_PACKED encoder.
    
    Lead-authored-by: Rok Mihevc <ro...@mihevc.org>
    Co-authored-by: Will Jones <wi...@gmail.com>
    Co-authored-by: Antoine Pitrou <pi...@free.fr>
    Co-authored-by: Gang Wu <us...@gmail.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/util/bit_stream_utils.h      |  10 +-
 cpp/src/arrow/util/rle_encoding_test.cc    |  34 ++++
 cpp/src/parquet/column_writer_test.cc      |  15 +-
 cpp/src/parquet/encoding.cc                | 306 ++++++++++++++++++++++++++++-
 cpp/src/parquet/encoding_test.cc           | 132 +++++++++++++
 docs/source/cpp/parquet.rst                |   2 +-
 python/pyarrow/tests/parquet/test_basic.py |  17 +-
 7 files changed, 496 insertions(+), 20 deletions(-)

diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h
index 2f70c28650..dc9b41793c 100644
--- a/cpp/src/arrow/util/bit_stream_utils.h
+++ b/cpp/src/arrow/util/bit_stream_utils.h
@@ -203,9 +203,10 @@ class BitReader {
 };
 
 inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
-  // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases)
-  DCHECK_LE(num_bits, 32);
-  DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits;
+  DCHECK_LE(num_bits, 64);
+  if (num_bits < 64) {
+    DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits;
+  }
 
   if (ARROW_PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8))
     return false;
@@ -220,7 +221,8 @@ inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
     buffered_values_ = 0;
     byte_offset_ += 8;
     bit_offset_ -= 64;
-    buffered_values_ = v >> (num_bits - bit_offset_);
+    buffered_values_ =
+        (num_bits - bit_offset_ == 64) ? 0 : (v >> (num_bits - bit_offset_));
   }
   DCHECK_LT(bit_offset_, 64);
   return true;
diff --git a/cpp/src/arrow/util/rle_encoding_test.cc b/cpp/src/arrow/util/rle_encoding_test.cc
index 52f355daf2..01d1ffd767 100644
--- a/cpp/src/arrow/util/rle_encoding_test.cc
+++ b/cpp/src/arrow/util/rle_encoding_test.cc
@@ -173,6 +173,40 @@ TEST(BitArray, TestMixed) {
   }
 }
 
+// Write up to 'num_vals' values with width 'bit_width' and reads them back.
+static void TestPutValue(int bit_width, uint64_t num_vals) {
+  // The max value representable in `bit_width` bits.
+  const uint64_t max = std::numeric_limits<uint64_t>::max() >> (64 - bit_width);
+  num_vals = std::min(num_vals, max);
+  int len = static_cast<int>(bit_util::BytesForBits(bit_width * num_vals));
+  EXPECT_GT(len, 0);
+
+  std::vector<uint8_t> buffer(len);
+  bit_util::BitWriter writer(buffer.data(), len);
+  for (uint64_t i = max - num_vals; i < max; i++) {
+    bool result = writer.PutValue(i, bit_width);
+    EXPECT_TRUE(result);
+  }
+  writer.Flush();
+  EXPECT_EQ(writer.bytes_written(), len);
+
+  bit_util::BitReader reader(buffer.data(), len);
+  for (uint64_t i = max - num_vals; i < max; i++) {
+    int64_t val = 0;
+    bool result = reader.GetValue(bit_width, &val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(val, i);
+  }
+  EXPECT_EQ(reader.bytes_left(), 0);
+}
+
+TEST(BitUtil, RoundTripIntValues) {
+  for (int width = 1; width < 64; width++) {
+    TestPutValue(width, 1);
+    TestPutValue(width, 1024);
+  }
+}
+
 // Validates encoding of values by encoding and decoding them.  If
 // expected_encoding != NULL, also validates that the encoded buffer is
 // exactly 'expected_encoding'.
diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc
index 2cd21628b3..0da7826483 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -400,7 +400,8 @@ typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
 
 TYPED_TEST_SUITE(TestPrimitiveWriter, TestTypes);
 
-using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>;
+using TestValuesWriterInt32Type = TestPrimitiveWriter<Int32Type>;
+using TestValuesWriterInt64Type = TestPrimitiveWriter<Int64Type>;
 
 TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
   this->TestRequiredWithEncoding(Encoding::PLAIN);
@@ -418,11 +419,17 @@ TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
 TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
   this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
 }
+*/
+
+TEST_F(TestValuesWriterInt32Type, RequiredDeltaBinaryPacked) {
+  this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
+}
 
-TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
+TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) {
   this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
 }
 
+/*
 TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
   this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
 }
@@ -430,11 +437,11 @@ TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
 TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
   this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
 }
+*/
 
 TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
   this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
 }
-*/
 
 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
   this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true,
@@ -647,7 +654,7 @@ TEST(TestWriter, NullValuesBuffer) {
 
 // PARQUET-719
 // Test case for NULL values
-TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
+TEST_F(TestValuesWriterInt32Type, OptionalNullValueChunk) {
   this->SetUpSchema(Repetition::OPTIONAL);
 
   this->GenerateData(LARGE_SIZE);
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 44f762d711..4923870e9e 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -2060,6 +2060,285 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+  // Maximum possible header size
+  static constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+  static constexpr uint32_t kValuesPerBlock = 128;
+  static constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+ public:
+  using T = typename DType::c_type;
+  using UT = std::make_unsigned_t<T>;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(
+            AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+    if (values_per_block % mini_blocks_per_block != 0) {
+      throw ParquetException(
+          "the number of values per block % number of miniblocks per block must be 0, "
+          "but it's " +
+          std::to_string(values_per_block % mini_blocks_per_block));
+    }
+    // Reserve enough space at the beginning of the buffer for largest possible header.
+    PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize));
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  UT first_value_{0};
+  UT current_value_{0};
+  ArrowPoolVector<UT> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    UT value = static_cast<UT>(src[idx]);
+    // Calculate deltas. The possible overflow is handled by use of unsigned integers
+    // making subtraction operations well defined and correct even in case of overflow.
+    // Encoded integers will wrap back around on decoding.
+    // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    deltas_[values_current_block_] = value - current_value_;
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const UT min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(static_cast<T>(min_delta));
+
+  // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write
+  // bit widths of miniblocks as they become known during the encoding.
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  const uint32_t num_miniblocks =
+      static_cast<uint32_t>(std::ceil(static_cast<double>(values_current_block_) /
+                                      static_cast<double>(values_per_mini_block_)));
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const UT max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    // The minimum number of bits required to write any of values in deltas_ vector.
+    // See overflow comment above.
+    const auto bit_width = bit_width_data[i] =
+        bit_util::NumRequiredBits(max_delta - min_delta);
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      // See overflow comment above.
+      const UT value = deltas_[j] - min_delta;
+      bit_writer_.PutValue(value, bit_width);
+    }
+    // If there are not enough values to fill the last mini block, we pad the mini block
+    // with zeroes so that its length is the number of values in a full mini block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutValue(0, bit_width);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  for (uint32_t i = num_miniblocks; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = 0;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true));
+
+  uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {};
+  bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_));
+  if (!header_writer.PutVlqInt(values_per_block_) ||
+      !header_writer.PutVlqInt(mini_blocks_per_block_) ||
+      !header_writer.PutVlqInt(total_value_count_) ||
+      !header_writer.PutZigZagVlqInt(static_cast<T>(first_value_))) {
+    throw ParquetException("header writing error");
+  }
+  header_writer.Flush();
+
+  // We reserved enough space at the beginning of the buffer for largest possible header
+  // and data was written immediately after. We now write the header data immediately
+  // before the end of reserved space.
+  const size_t offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written();
+  std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_,
+              header_writer.bytes_written());
+
+  // Excess bytes at the beginning are sliced off and ignored.
+  return SliceBuffer(buffer, offset_bytes);
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const ::arrow::ArrayData& data = *values.data();
+  if (values.type_id() != ::arrow::Type::INT32) {
+    throw ParquetException("Expected Int32TArray, got ", values.type()->ToString());
+  }
+  if (data.length > std::numeric_limits<int32_t>::max()) {
+    throw ParquetException("Array cannot be longer than ",
+                           std::numeric_limits<int32_t>::max());
+  }
+
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int32_t>(1), static_cast<int>(data.length));
+  } else {
+    PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
+              data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+}
+
+template <>
+void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
+  const ::arrow::ArrayData& data = *values.data();
+  if (values.type_id() != ::arrow::Type::INT64) {
+    throw ParquetException("Expected Int64TArray, got ", values.type()->ToString());
+  }
+  if (data.length > std::numeric_limits<int32_t>::max()) {
+    throw ParquetException("Array cannot be longer than ",
+                           std::numeric_limits<int32_t>::max());
+  }
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int64_t>(1), static_cast<int>(data.length));
+  } else {
+    PutSpaced(data.GetValues<int64_t>(1), static_cast<int>(data.length),
+              data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::PutSpaced(const T* src, int num_values,
+                                           const uint8_t* valid_bits,
+                                           int64_t valid_bits_offset) {
+  if (valid_bits != NULLPTR) {
+    PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+                                                                 this->memory_pool()));
+    T* data = reinterpret_cast<T*>(buffer->mutable_data());
+    int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+        src, num_values, valid_bits, valid_bits_offset, data);
+    Put(data, num_valid_values);
+  } else {
+    Put(src, num_values);
+  }
+}
+
 // ----------------------------------------------------------------------
 // DeltaBitPackDecoder
 
@@ -2067,6 +2346,7 @@ template <typename DType>
 class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
  public:
   typedef typename DType::c_type T;
+  using UT = std::make_unsigned_t<T>;
 
   explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
                                MemoryPool* pool = ::arrow::default_memory_pool())
@@ -2141,6 +2421,11 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
     if (values_per_block_ == 0) {
       throw ParquetException("cannot have zero value per block");
     }
+    if (values_per_block_ % 128 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
     if (mini_blocks_per_block_ == 0) {
       throw ParquetException("cannot have zero miniblock per block");
     }
@@ -2210,10 +2495,9 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
       }
       for (int j = 0; j < values_decode; ++j) {
         // Addition between min_delta, packed int and last_value should be treated as
-        // unsigned addtion. Overflow is as expected.
-        uint64_t delta =
-            static_cast<uint64_t>(min_delta_) + static_cast<uint64_t>(buffer[i + j]);
-        buffer[i + j] = static_cast<T>(delta + static_cast<uint64_t>(last_value_));
+        // unsigned addition. Overflow is as expected.
+        buffer[i + j] = static_cast<UT>(min_delta_) + static_cast<UT>(buffer[i + j]) +
+                        static_cast<UT>(last_value_);
         last_value_ = buffer[i + j];
       }
       values_current_mini_block_ -= values_decode;
@@ -2760,6 +3044,17 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encodin
         throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
         break;
     }
+  } else if (encoding == Encoding::DELTA_BINARY_PACKED) {
+    switch (type_num) {
+      case Type::INT32:
+        return std::unique_ptr<Encoder>(new DeltaBitPackEncoder<Int32Type>(descr, pool));
+      case Type::INT64:
+        return std::unique_ptr<Encoder>(new DeltaBitPackEncoder<Int64Type>(descr, pool));
+      default:
+        throw ParquetException(
+            "DELTA_BINARY_PACKED encoder only supports INT32 and INT64");
+        break;
+    }
   } else {
     ParquetException::NYI("Selected encoding is not supported");
   }
@@ -2807,7 +3102,8 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encodin
       case Type::INT64:
         return std::make_unique<DeltaBitPackDecoder<Int64Type>>(descr);
       default:
-        throw ParquetException("DELTA_BINARY_PACKED only supports INT32 and INT64");
+        throw ParquetException(
+            "DELTA_BINARY_PACKED decoder only supports INT32 and INT64");
         break;
     }
   } else if (encoding == Encoding::DELTA_BYTE_ARRAY) {
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 7d42e3e8ce..f0a5f32c41 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -124,6 +124,12 @@ void GenerateData(int num_values, T* out, std::vector<uint8_t>* heap) {
                  std::numeric_limits<T>::max(), out);
 }
 
+template <typename T>
+void GenerateBoundData(int num_values, T* out, T min, T max, std::vector<uint8_t>* heap) {
+  // seed the prng so failure is deterministic
+  random_numbers(num_values, 0, min, max, out);
+}
+
 template <>
 void GenerateData<bool>(int num_values, bool* out, std::vector<uint8_t>* heap) {
   // seed the prng so failure is deterministic
@@ -1276,5 +1282,131 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void InitBoundData(int nvalues, int repeats, c_type half_range) {
+    num_values_ = nvalues * repeats;
+    input_bytes_.resize(num_values_ * sizeof(c_type));
+    output_bytes_.resize(num_values_ * sizeof(c_type));
+    draws_ = reinterpret_cast<c_type*>(input_bytes_.data());
+    decode_buf_ = reinterpret_cast<c_type*>(output_bytes_.data());
+    GenerateBoundData<c_type>(nvalues, draws_, -half_range, half_range, &data_buffer_);
+
+    // add some repeated values
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  void ExecuteBound(int nvalues, int repeats, c_type half_range) {
+    InitBoundData(nvalues, repeats, half_range);
+    CheckRoundtrip();
+  }
+
+  void ExecuteSpacedBound(int nvalues, int repeats, int64_t valid_bits_offset,
+                          double null_probability, c_type half_range) {
+    InitBoundData(nvalues, repeats, half_range);
+
+    int64_t size = num_values_ + valid_bits_offset;
+    auto rand = ::arrow::random::RandomArrayGenerator(1923);
+    const auto array = rand.UInt8(size, 0, 100, null_probability);
+    const auto valid_bits = array->null_bitmap_data();
+    CheckRoundtripSpaced(valid_bits, valid_bits_offset);
+  }
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+  std::vector<uint8_t> input_bytes_;
+  std::vector<uint8_t> output_bytes_;
+};
+
+using TestDeltaBitPackEncodingTypes = ::testing::Types<Int32Type, Int64Type>;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+  using T = typename TypeParam::c_type;
+  int values_per_block = 128;
+  int values_per_mini_block = 32;
+
+  // Size a multiple of miniblock size
+  ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_mini_block * 10, 10));
+  // Size a multiple of block size
+  ASSERT_NO_FATAL_FAILURE(this->Execute(values_per_block * 10, 10));
+  // Size multiple of neither miniblock nor block size
+  ASSERT_NO_FATAL_FAILURE(
+      this->Execute((values_per_mini_block * values_per_block) + 1, 10));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
+      /*null_probability*/ 0.1));
+
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(2000, 2000, 0));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
+      /*null_probability*/ 0.1,
+      /*half_range*/ 0));
+
+  const int max_bitwidth = sizeof(T) * 8;
+  std::vector<int> bitwidths = {
+      1, 2, 3, 5, 8, 11, 16, max_bitwidth - 8, max_bitwidth - 1, max_bitwidth};
+  for (int bitwidth : bitwidths) {
+    T half_range =
+        std::numeric_limits<T>::max() >> static_cast<uint32_t>(max_bitwidth - bitwidth);
+
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteBound(25000, 200, half_range));
+    ASSERT_NO_FATAL_FAILURE(this->ExecuteSpacedBound(
+        /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64,
+        /*null_probability*/ 0.1,
+        /*half_range*/ half_range));
+  }
+}
+
 }  // namespace test
 }  // namespace parquet
diff --git a/docs/source/cpp/parquet.rst b/docs/source/cpp/parquet.rst
index 23a9657fd4..edc42d54cf 100644
--- a/docs/source/cpp/parquet.rst
+++ b/docs/source/cpp/parquet.rst
@@ -398,7 +398,7 @@ Encodings
 +--------------------------+----------+----------+---------+
 | BYTE_STREAM_SPLIT        | ✓        | ✓        |         |
 +--------------------------+----------+----------+---------+
-| DELTA_BINARY_PACKED      | ✓        |          |         |
+| DELTA_BINARY_PACKED      | ✓        | ✓        |         |
 +--------------------------+----------+----------+---------+
 | DELTA_BYTE_ARRAY         | ✓        |          |         |
 +--------------------------+----------+----------+---------+
diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py
index 004bbd8d77..b2809e0f91 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -405,6 +405,13 @@ def test_column_encoding(use_legacy_dataset):
                      column_encoding="PLAIN",
                      use_legacy_dataset=use_legacy_dataset)
 
+    # Check "DELTA_BINARY_PACKED" for integer columns.
+    _check_roundtrip(mixed_table, expected=mixed_table,
+                     use_dictionary=False,
+                     column_encoding={'a': "PLAIN",
+                                      'b': "DELTA_BINARY_PACKED"},
+                     use_legacy_dataset=use_legacy_dataset)
+
     # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'.
     # This should throw an error as it is only supports FLOAT and DOUBLE.
     with pytest.raises(IOError,
@@ -415,14 +422,12 @@ def test_column_encoding(use_legacy_dataset):
                          column_encoding={'b': "BYTE_STREAM_SPLIT"},
                          use_legacy_dataset=use_legacy_dataset)
 
-    # Try to pass "DELTA_BINARY_PACKED".
-    # This should throw an error as it is only supported for reading.
-    with pytest.raises(IOError,
-                       match="Not yet implemented: Selected encoding is"
-                             " not supported."):
+    # Try to pass use "DELTA_BINARY_PACKED" encoding on float column.
+    # This should throw an error as only integers are supported.
+    with pytest.raises(OSError):
         _check_roundtrip(mixed_table, expected=mixed_table,
                          use_dictionary=False,
-                         column_encoding={'b': "DELTA_BINARY_PACKED"},
+                         column_encoding={'a': "DELTA_BINARY_PACKED"},
                          use_legacy_dataset=use_legacy_dataset)
 
     # Try to pass "RLE_DICTIONARY".