You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/21 17:32:54 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #14191: ARROW-17798: [C++][Parquet] Add DELTA_BINARY_PACKED encoder to Parquet writer

pitrou commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r1028282051


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),

Review Comment:
   Uh... cast to `double` instead? I know that, given these quantities are small, `float` ideally shouldn't incur any precision loss here, but still, let's avoid pointless micro-optimizations.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;

Review Comment:
   Why not put these into `DeltaBitPackEncoder`?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);

Review Comment:
   `mini_blocks_per_block_` is typically 4, so the overhead of `arrow::stl::allocator` is probably overkill here.
   And/or, you can cache this container on the encoder instead of doing a temporary heap allocation every 128 values.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }

Review Comment:
   Can we also ensure that `values_per_block % mini_blocks_per_block == 0`?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +

Review Comment:
   Why 32 above?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);

Review Comment:
   You're using `SafeSignedSubtract` here... but what would happen if the subtraction overflow?
   
   Say `T` is `INT32`, you have current_value_ = -2147483647 (0x80000001) and value = 2147483647 (0x7fffffff)... then the computed delta is -2 (!).
   
   I see the Java implementation [claims that](https://github.com/apache/parquet-mr/blob/d057b39d93014fe40f5067ee4a33621e65c91552/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java#L86-L88):
   > The algorithm is correct because Java long is working as a modalar ring with base 2^64 and because of the plus and minus properties of a ring.
   
   The Rust Parquet reader also seems to take specific steps to handle such input properly:
   https://github.com/apache/arrow-rs/blob/6f8187fb34effc79b20a03cb1d0d164448fb5ba8/parquet/src/encodings/decoding.rs#L712-L717
   
   Worth adding a comment about this?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(

Review Comment:
   Is the call to `std::min` required or even ok?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);

Review Comment:
   Should add a comment here explaining that you are skipping over the "slots" for the miniblock bitwidths.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }

Review Comment:
   Perhaps this might be simplified:
   ```c++
   const int num_bits = bit_util::NumRequiredBits(static_cast<std::make_unsigned_t<T>>(max_delta_diff));
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the mini block
+    // with zeroes so that its length is the number of values in a full mini block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);

Review Comment:
   I would say the call to `ToLittleEndian` is pedantic, given that bit widths are 8-bit quantities. :-)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);

Review Comment:
   Hmm... the encoded deltas are supposed to be bit-packed, but here you are aligning them over byte boundaries?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the mini block
+    // with zeroes so that its length is the number of values in a full mini block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);

Review Comment:
   (same question)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the mini block
+    // with zeroes so that its length is the number of values in a full mini block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }

Review Comment:
   This seems to assume that the `bits_buffer_` is large enough, once reset, to write the header into... which I assume it is, but perhaps add a check just to be sure?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the mini block
+    // with zeroes so that its length is the number of values in a full mini block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), bit_buffer->size()));
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  return buffer;

Review Comment:
   I think this can spelled more concisely
   ```suggestion
     PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/ true));
     return buffer;
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the mini block
+    // with zeroes so that its length is the number of values in a full mini block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), bit_buffer->size()));

Review Comment:
   It's a bit of a pity to copy again the generated blocks here... Unfortunately the header's byte length cannot be computed until `FlushValues` AFAIU, since `total_value_count_` is varlen-encoded. However, you could perhaps at least presize `sink_` here to avoid a pointless reallocation?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,66 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) {

Review Comment:
   `override` here as well?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last mini block, we pad the mini block
+    // with zeroes so that its length is the number of values in a full mini block
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();

Review Comment:
   Perhaps
   ```suggestion
     PARQUET_ASSIGN_OR_THROW(auto bit_buffer, sink_.Finish(/*shrink_to_fit=*/ true));
   ```
   
   Also, note that `BufferBuilder::Finish` calls `Reset` implicitly.



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,66 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {

Review Comment:
   Why is this `virtual`? Did you mean to mark this `override`?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,66 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<Int32Type, Int64Type> TestDeltaBitPackEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1));
+}

Review Comment:
   Also, currently `GenerateData` uses `std::numeric_limits<T>` for the random distribution of values, probably ensuring that the max bitwidth is always emitted?
   
   I think we want to improve this.



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,66 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<Int32Type, Int64Type> TestDeltaBitPackEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0.1));
+}

Review Comment:
   I think we would like to check with all data sizes between, say, 0 and 256 values, to exercise the blocking logic a bit more.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,265 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kMaxPageHeaderWriterSize = 32;
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+/// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format
+/// as per the parquet spec. See:
+/// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
+///
+/// Consists of a header followed by blocks of delta encoded values binary packed.
+///
+///  Format
+///    [header] [block 1] [block 2] ... [block N]
+///
+///  Header
+///    [block size] [number of mini blocks per block] [total value count] [first value]
+///
+///  Block
+///    [min delta] [list of bitwidths of the mini blocks] [miniblocks]
+///
+/// Sets aside bytes at the start of the internal buffer where the header will be written,
+/// and only writes the header when FlushValues is called before returning it.
+///
+/// To encode a block, we will:
+///
+/// 1. Compute the differences between consecutive elements. For the first element in the
+/// block, use the last element in the previous block or, in the case of the first block,
+/// use the first value of the whole sequence, stored in the header.
+///
+/// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract
+/// this min delta from all deltas in the block. This guarantees that all values are
+/// non-negative.
+///
+/// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the
+/// bit widths of the mini blocks and the delta values (minus the min delta) bit packed
+/// per mini block.
+///
+/// Supports only INT32 and INT64.
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(
+            pool, kMaxPageHeaderWriterSize + values_per_block * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a block must be multiple of 128, but it's " +
+          std::to_string(values_per_block_));
+    }
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  // If, in the last block, less than <number of miniblocks in a block> miniblocks are
+  // needed to store the values, the bytes storing the bit widths of the unneeded
+  // miniblocks are still present, their value should be zero, but readers must accept
+  // arbitrary values as well.
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    // The minimum number of bytes required to write any of value in deltas_ vector.
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);

Review Comment:
   If this is really an error, I would like the tests to be able to catch this!



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,66 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<Int32Type, Int64Type> TestDeltaBitPackEncodingTypes;

Review Comment:
   Style nit
   ```suggestion
   using TestDeltaBitPackEncodingTypes = ::testing::Types<Int32Type, Int64Type>;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org