You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/16 20:51:39 UTC

[GitHub] [arrow] wjones127 commented on a diff in pull request #14191: ARROW-17798: [C++][Parquet] Add DELTA_BINARY_PACKED encoder to Parquet writer

wjones127 commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r1024423662


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};

Review Comment:
   Question: what does the `{0}` mean? a default value?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =

Review Comment:
   nit: call this `length` or `mini_block_length` instead?
   ```suggestion
       const uint32_t length =
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);

Review Comment:
   Should we compute `num_miniblocks` first, and then use `num_miniblocks` as the length instead? Or is there a reason we might want extra slots? (If the latter, worth a comment IMO)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder

Review Comment:
   For future readers, I'd appreciate some more comments on the encoding. It would be nice to know before getting into the code:
   
    * What is a block?
    * What is a miniblock?
    * Is there a website where the full specification of this encoding is provided?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));

Review Comment:
   Perhaps you could add a comment describing what `num_bytes` represents in the context of the encoding? IIUC each value is represented by a diff that takes up exactly `num_bytes` in a bitmap. Is that right?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last miniblock, we pad the miniblock
+    // with zeroes so that its length is the number of values in a full miniblock
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    T val = bit_util::ToLittleEndian(bit_widths[i]);
+    memcpy(bit_width_data + i, &val, 1);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), bit_buffer->size()));
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();

Review Comment:
   ```suggestion
     const ArrayData& data = *values.data();
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last miniblock, we pad the miniblock
+    // with zeroes so that its length is the number of values in a full miniblock
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    T val = bit_util::ToLittleEndian(bit_widths[i]);
+    memcpy(bit_width_data + i, &val, 1);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), bit_buffer->size()));
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int32_t>(1), static_cast<int>(values.length()));
+  } else {
+    PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
+              data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+}
+
+template <>
+void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();

Review Comment:
   ```suggestion
     const ArrayData& data = *values.data();
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last miniblock, we pad the miniblock
+    // with zeroes so that its length is the number of values in a full miniblock
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    T val = bit_util::ToLittleEndian(bit_widths[i]);
+    memcpy(bit_width_data + i, &val, 1);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), bit_buffer->size()));
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int32_t>(1), static_cast<int>(values.length()));

Review Comment:
   To be more consistent with line below (and possibly more optimal?)
   ```suggestion
       Put(data.GetValues<int32_t>(1), static_cast<int>(data.length));
   ```



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1276,5 +1276,64 @@ TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) {
   ASSERT_THROW(MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT), ParquetException);
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BINARY_PACKED encode/decode tests.
+
+template <typename Type>
+class TestDeltaBitPackEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BINARY_PACKED, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BINARY_PACKED, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<Int32Type, Int64Type> TestDeltaBitPackEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaBitPackEncoding, TestDeltaBitPackEncodingTypes);
+
+TYPED_TEST(TestDeltaBitPackEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));

Review Comment:
   Would it make sense to test the edge cases of an empty array or one that's all repeats?
   
   ```suggestion
     ASSERT_NO_FATAL_FAILURE(this->Execute(25000, 200));
     ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
     ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 2000));
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};

Review Comment:
   Also nit: could we use the term `length` instead of `values`? When I see the name `values` I think it should be a pointer into an array of values, not the number of values.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),

Review Comment:
   `(values_per_block + 3) * sizeof(T)` seems like some odd magic that could use a comment justifying it.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last miniblock, we pad the miniblock
+    // with zeroes so that its length is the number of values in a full miniblock
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    T val = bit_util::ToLittleEndian(bit_widths[i]);
+    memcpy(bit_width_data + i, &val, 1);
+  }
+
+  bit_writer_.Flush();
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  bit_writer_.Clear();
+  bit_width_data = NULL;
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> DeltaBitPackEncoder<DType>::FlushValues() {
+  if (values_current_block_ > 0) {
+    FlushBlock();
+  }
+
+  std::shared_ptr<Buffer> bit_buffer;
+  PARQUET_ASSIGN_OR_THROW(bit_buffer, sink_.Finish());
+  sink_.Reset();
+
+  if (!bit_writer_.PutVlqInt(values_per_block_) ||
+      !bit_writer_.PutVlqInt(mini_blocks_per_block_) ||
+      !bit_writer_.PutVlqInt(total_value_count_) ||
+      !bit_writer_.PutZigZagVlqInt(first_value_)) {
+    throw ParquetException("header writing error");
+  }
+  bit_writer_.Flush();
+
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+  PARQUET_THROW_NOT_OK(sink_.Append(bit_buffer->mutable_data(), bit_buffer->size()));
+  std::shared_ptr<Buffer> buffer;
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true));
+  return buffer;
+}
+
+template <>
+void DeltaBitPackEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int32_t>(1), static_cast<int>(values.length()));
+  } else {
+    PutSpaced(data.GetValues<int32_t>(1), static_cast<int>(data.length),
+              data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+}
+
+template <>
+void DeltaBitPackEncoder<Int64Type>::Put(const ::arrow::Array& values) {
+  const auto& data = *values.data();
+  if (values.null_count() == 0) {
+    Put(data.GetValues<int64_t>(1), static_cast<int>(values.length()));

Review Comment:
   ```suggestion
       Put(data.GetValues<int64_t>(1), static_cast<int>(data.length));
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2060,6 +2062,220 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+constexpr uint32_t kValuesPerBlock = 128;
+constexpr uint32_t kMiniBlocksPerBlock = 4;
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+  using TypedEncoder<DType>::Put;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool,
+                               const uint32_t values_per_block = kValuesPerBlock,
+                               const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(values_per_block),
+        mini_blocks_per_block_(mini_blocks_per_block),
+        values_per_mini_block_(values_per_block / mini_blocks_per_block),
+        deltas_(values_per_block, ::arrow::stl::allocator<T>(pool)),
+        bits_buffer_(AllocateBuffer(pool, (values_per_block + 3) * sizeof(T))),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {
+    if (values_per_mini_block_ % 32 != 0) {
+      throw ParquetException(
+          "the number of values in a miniblock must be multiple of 32, but it's " +
+          std::to_string(values_per_mini_block_));
+    }
+  }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+  void FlushBlock();
+
+ private:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_{0};
+  uint32_t total_value_count_{0};
+  T first_value_{0};
+  T current_value_{0};
+  ArrowPoolVector<T> deltas_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::BufferBuilder sink_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int idx = 0;
+  if (total_value_count_ == 0) {
+    current_value_ = src[0];
+    first_value_ = current_value_;
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+}
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::FlushBlock() {
+  if (values_current_block_ == 0) {
+    return;
+  }
+
+  const T min_delta =
+      *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_);
+  bit_writer_.PutZigZagVlqInt(min_delta);
+
+  std::vector<uint8_t, ::arrow::stl::allocator<uint8_t>> bit_widths(
+      mini_blocks_per_block_, 0);
+  uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_);
+  DCHECK(bit_width_data != nullptr);
+
+  uint32_t num_miniblocks = std::min(
+      static_cast<uint32_t>(std::ceil(static_cast<float>(values_current_block_) /
+                                      static_cast<float>(values_per_mini_block_))),
+      mini_blocks_per_block_);
+  for (uint32_t i = 0; i < num_miniblocks; i++) {
+    const uint32_t values_current_mini_block =
+        std::min(values_per_mini_block_, values_current_block_);
+
+    const uint32_t start = i * values_per_mini_block_;
+    const T max_delta = *std::max_element(
+        deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block);
+
+    const T max_delta_diff = SafeSignedSubtract(max_delta, min_delta);
+    int num_bits;
+    if constexpr (std::is_same<T, int64_t>::value) {
+      num_bits = bit_util::NumRequiredBits(max_delta_diff);
+    } else {
+      num_bits = bit_util::NumRequiredBits(static_cast<uint32_t>(max_delta_diff));
+    }
+    const int num_bytes = static_cast<int>(bit_util::BytesForBits(num_bits));
+    bit_widths[i] = num_bits;
+
+    for (uint32_t j = start; j < start + values_current_mini_block; j++) {
+      const T value = SafeSignedSubtract(deltas_[j], min_delta);
+      bit_writer_.PutAligned<T>(value, num_bytes);
+    }
+    // If there are not enough values to fill the last miniblock, we pad the miniblock
+    // with zeroes so that its length is the number of values in a full miniblock
+    // multiplied by the bit width.
+    for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) {
+      bit_writer_.PutAligned<T>(0, num_bytes);
+    }
+    values_current_block_ -= values_current_mini_block;
+  }
+  DCHECK_EQ(values_current_block_, 0);
+
+  for (uint32_t i = 0; i < mini_blocks_per_block_; i++) {
+    T val = bit_util::ToLittleEndian(bit_widths[i]);
+    memcpy(bit_width_data + i, &val, 1);

Review Comment:
   Perhaps I don't understand, but does this not work?
   
   ```suggestion
       bit_width_data[i] = bit_util::ToLittleEndian(bit_widths[i]);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org