You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "pitrou (via GitHub)" <gi...@apache.org> on 2023/05/16 16:24:23 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #14341: GH-32863: [C++][Parquet] Add DELTA_BYTE_ARRAY encoder to Parquet writer

pitrou commented on code in PR #14341:
URL: https://github.com/apache/arrow/pull/14341#discussion_r1195344198


##########
cpp/src/parquet/encoding.cc:
##########
@@ -1238,25 +1240,35 @@ int PlainBooleanDecoder::Decode(bool* buffer, int max_values) {
   return max_values;
 }
 
-struct ArrowBinaryHelper {
+template <typename DType>
+struct ArrowBinaryHelper;
+
+template <>
+struct ArrowBinaryHelper<ByteArrayType> {
   explicit ArrowBinaryHelper(typename EncodingTraits<ByteArrayType>::Accumulator* out) {
     this->out = out;
     this->builder = out->builder.get();
+    if (ARROW_PREDICT_FALSE(SubtractWithOverflow(::arrow::kBinaryMemoryLimit,
+                                                 this->builder->value_data_length(),
+                                                 &this->chunk_space_remaining))) {

Review Comment:
   I'm curious, in which circumstances can it overflow? These are all 64-bit integers.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,243 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front compression:
+/// for each element in a sequence of strings, store the prefix length of the previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = ::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_(""),
+        kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+                                                                   this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    auto previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB or more");
+          }
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          const uint32_t common_length = std::min(previous_len, src.len);
+          while (j < common_length) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          last_value_view = view;
+          const auto suffix_length = static_cast<uint32_t>(src.len - j);
+          if (suffix_length == 0) {
+            suffix_encoder_.Put(&kEmpty, 1);
+            return Status::OK();
+          }
+          const uint8_t* suffix_ptr = src.ptr + j;
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+  const ByteArray kEmpty;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  throw ParquetException("Put not implemented for " + this->descr_->ToString());
+}
+
+template <>
+void DeltaByteArrayEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+  ArrowPoolVector<int32_t> prefix_lengths(num_values,
+                                          ::arrow::stl::allocator<int32_t>(pool_));
+  std::string_view last_value_view = last_value_;
+
+  for (int i = 0; i < num_values; i++) {
+    // Convert to ByteArray, so we can pass to the suffix_encoder_.
+    const ByteArray value = src[i];
+    if (ARROW_PREDICT_FALSE(value.len >= static_cast<int32_t>(kMaxByteArraySize))) {
+      throw ParquetException("Parquet cannot store strings with size 2GB or more");
+    }
+
+    auto view = string_view{reinterpret_cast<const char*>(value.ptr),
+                            static_cast<uint32_t>(value.len)};
+    uint32_t j = 0;
+    const uint32_t common_length =
+        std::min(value.len, static_cast<uint32_t>(last_value_view.length()));
+    while (j < common_length) {
+      if (last_value_view[j] != view[j]) {
+        break;
+      }
+      j++;
+    }
+
+    last_value_view = view;
+    prefix_lengths[i] = j;
+    const auto suffix_length = static_cast<uint32_t>(value.len - j);
+
+    if (suffix_length == 0) {
+      suffix_encoder_.Put(&kEmpty, 1);

Review Comment:
   Why is this necessary exactly? I think `suffix` below would be valid, did you get a failure somewhere?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,243 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front compression:
+/// for each element in a sequence of strings, store the prefix length of the previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = ::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_(""),
+        kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+                                                                   this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    auto previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB or more");
+          }
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          const uint32_t common_length = std::min(previous_len, src.len);
+          while (j < common_length) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          last_value_view = view;
+          const auto suffix_length = static_cast<uint32_t>(src.len - j);
+          if (suffix_length == 0) {
+            suffix_encoder_.Put(&kEmpty, 1);
+            return Status::OK();
+          }
+          const uint8_t* suffix_ptr = src.ptr + j;
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+  const ByteArray kEmpty;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  throw ParquetException("Put not implemented for " + this->descr_->ToString());
+}
+
+template <>
+void DeltaByteArrayEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+  ArrowPoolVector<int32_t> prefix_lengths(num_values,
+                                          ::arrow::stl::allocator<int32_t>(pool_));
+  std::string_view last_value_view = last_value_;
+
+  for (int i = 0; i < num_values; i++) {
+    // Convert to ByteArray, so we can pass to the suffix_encoder_.
+    const ByteArray value = src[i];
+    if (ARROW_PREDICT_FALSE(value.len >= static_cast<int32_t>(kMaxByteArraySize))) {
+      throw ParquetException("Parquet cannot store strings with size 2GB or more");
+    }
+
+    auto view = string_view{reinterpret_cast<const char*>(value.ptr),
+                            static_cast<uint32_t>(value.len)};

Review Comment:
   Ok, let's instead add a conversion operator to `ByteArray`?
   ```c++
   struct ByteArray {
     explicit operator std::string_view() { return std::string_view{reinterpret_cast<const char*>(ptr), len}; }
   ...
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,243 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front compression:
+/// for each element in a sequence of strings, store the prefix length of the previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = ::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_(""),
+        kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+                                                                   this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    auto previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB or more");
+          }
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          const uint32_t common_length = std::min(previous_len, src.len);
+          while (j < common_length) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          last_value_view = view;
+          const auto suffix_length = static_cast<uint32_t>(src.len - j);
+          if (suffix_length == 0) {
+            suffix_encoder_.Put(&kEmpty, 1);
+            return Status::OK();
+          }
+          const uint8_t* suffix_ptr = src.ptr + j;
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+  const ByteArray kEmpty;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  throw ParquetException("Put not implemented for " + this->descr_->ToString());
+}
+
+template <>
+void DeltaByteArrayEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+  ArrowPoolVector<int32_t> prefix_lengths(num_values,
+                                          ::arrow::stl::allocator<int32_t>(pool_));

Review Comment:
   This will potentially allocate and deallocate a large array each time `Put` is called.
   
   It might be more efficient to work by chunks by creating a small static pool vector in `DeltaByteArrayEncoder`.
   
   (even better might be to chunk in lockstep with the prefix encoder, but more involved)



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3196,6 +3471,45 @@ class DeltaByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+class DeltaByteArrayDecoder : public DeltaByteArrayDecoderImpl<ByteArrayType> {
+ public:
+  using Base = DeltaByteArrayDecoderImpl<ByteArrayType>;
+  using Base::DeltaByteArrayDecoderImpl;
+
+  int Decode(ByteArray* buffer, int max_values) override {
+    return GetInternal(buffer, max_values);
+  }
+};
+
+class DeltaByteArrayFLBADecoder : public DeltaByteArrayDecoderImpl<FLBAType>,
+                                  virtual public FLBADecoder {
+ public:
+  using Base = DeltaByteArrayDecoderImpl<FLBAType>;
+  using Base::DeltaByteArrayDecoderImpl;
+  using Base::pool_;
+
+  int Decode(ByteArray* buffer, int max_values) {
+    return GetInternal(buffer, max_values);
+  }
+  int Decode(FixedLenByteArray* buffer, int max_values) override {
+    int decoded_values_size = max_values;
+    if (MultiplyWithOverflow(decoded_values_size,
+                             descr_->type_length() * sizeof(ByteArray),
+                             &decoded_values_size)) {
+      throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY");
+    }
+    ArrowPoolVector<uint8_t> decode_values(decoded_values_size,
+                                           ::arrow::stl::allocator<uint8_t>(pool_));
+    auto decode_buf = reinterpret_cast<ByteArray*>(decode_values.data());

Review Comment:
   Why not use a `vector<ByteArray>` directly?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3409,6 +3723,16 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encodin
       default:
         throw ParquetException("RLE only supports BOOLEAN");
     }
+  } else if (encoding == Encoding::DELTA_BYTE_ARRAY) {
+    switch (type_num) {
+      case Type::BYTE_ARRAY:
+        return std::make_unique<DeltaByteArrayEncoder<ByteArrayType>>(descr, pool);
+      case Type::FIXED_LEN_BYTE_ARRAY:
+        return std::make_unique<DeltaByteArrayEncoder<FLBAType>>(descr, pool);

Review Comment:
   I'm not sure why you decided to add FLBA support. I would probably keep things simple, though why not?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1955,5 +1955,293 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) {
   CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType, FLBAType> TestDeltaByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));

Review Comment:
   The problem with purely random data is that you statistically have almost no prefixes.
   I think you want to override `InitData` and generate some more interesting data, for example by having a random generator for prefix and suffix lengths.



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1955,5 +1955,293 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) {
   CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType, FLBAType> TestDeltaByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(250, 2));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0));
+
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
+      /*null_probability*/ 0.1));
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowBinaryDirectPut) {
+  const int64_t size = 50;
+  const int32_t min_length = 0;
+  const int32_t max_length = 10;
+  const int32_t num_unique = 10;
+  const double null_probability = 0.25;
+  auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+  auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+
+  auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) {
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    int num_values = static_cast<int>(values->length() - values->null_count());
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<ByteArrayType>::Accumulator acc;
+    if (::arrow::is_string(values->type()->id())) {
+      acc.builder = std::make_unique<::arrow::StringBuilder>();
+    } else {
+      acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+    }
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.builder->Finish(&result));
+    ASSERT_EQ(values->length(), result->length());
+    ASSERT_OK(result->ValidateFull());
+
+    auto upcast_result = CastBinaryTypesHelper(result, values->type());
+    ::arrow::AssertArraysEqual(*values, *result);
+  };
+
+  ::arrow::random::RandomArrayGenerator rag(42);
+  auto values = rag.String(0, min_length, max_length, null_probability);

Review Comment:
   Same remark here: pure random generation will not give us very interesting data.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,243 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front compression:
+/// for each element in a sequence of strings, store the prefix length of the previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = ::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_(""),
+        kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+                                                                   this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    auto previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB or more");
+          }
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          const uint32_t common_length = std::min(previous_len, src.len);
+          while (j < common_length) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          last_value_view = view;
+          const auto suffix_length = static_cast<uint32_t>(src.len - j);
+          if (suffix_length == 0) {
+            suffix_encoder_.Put(&kEmpty, 1);
+            return Status::OK();
+          }
+          const uint8_t* suffix_ptr = src.ptr + j;
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+  const ByteArray kEmpty;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  throw ParquetException("Put not implemented for " + this->descr_->ToString());

Review Comment:
   Why is this necessary? We should never instantiate a `DeltaByteArrayEncoder` for an unsupported DType.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3196,6 +3471,45 @@ class DeltaByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+class DeltaByteArrayDecoder : public DeltaByteArrayDecoderImpl<ByteArrayType> {
+ public:
+  using Base = DeltaByteArrayDecoderImpl<ByteArrayType>;
+  using Base::DeltaByteArrayDecoderImpl;
+
+  int Decode(ByteArray* buffer, int max_values) override {
+    return GetInternal(buffer, max_values);
+  }
+};
+
+class DeltaByteArrayFLBADecoder : public DeltaByteArrayDecoderImpl<FLBAType>,
+                                  virtual public FLBADecoder {
+ public:
+  using Base = DeltaByteArrayDecoderImpl<FLBAType>;
+  using Base::DeltaByteArrayDecoderImpl;
+  using Base::pool_;
+
+  int Decode(ByteArray* buffer, int max_values) {
+    return GetInternal(buffer, max_values);
+  }
+  int Decode(FixedLenByteArray* buffer, int max_values) override {
+    int decoded_values_size = max_values;
+    if (MultiplyWithOverflow(decoded_values_size,
+                             descr_->type_length() * sizeof(ByteArray),

Review Comment:
   What are you trying to compute here exactly?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3196,6 +3471,45 @@ class DeltaByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+class DeltaByteArrayDecoder : public DeltaByteArrayDecoderImpl<ByteArrayType> {
+ public:
+  using Base = DeltaByteArrayDecoderImpl<ByteArrayType>;
+  using Base::DeltaByteArrayDecoderImpl;
+
+  int Decode(ByteArray* buffer, int max_values) override {
+    return GetInternal(buffer, max_values);
+  }
+};
+
+class DeltaByteArrayFLBADecoder : public DeltaByteArrayDecoderImpl<FLBAType>,
+                                  virtual public FLBADecoder {
+ public:
+  using Base = DeltaByteArrayDecoderImpl<FLBAType>;
+  using Base::DeltaByteArrayDecoderImpl;
+  using Base::pool_;
+
+  int Decode(ByteArray* buffer, int max_values) {
+    return GetInternal(buffer, max_values);
+  }
+  int Decode(FixedLenByteArray* buffer, int max_values) override {
+    int decoded_values_size = max_values;
+    if (MultiplyWithOverflow(decoded_values_size,
+                             descr_->type_length() * sizeof(ByteArray),
+                             &decoded_values_size)) {
+      throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY");
+    }
+    ArrowPoolVector<uint8_t> decode_values(decoded_values_size,
+                                           ::arrow::stl::allocator<uint8_t>(pool_));
+    auto decode_buf = reinterpret_cast<ByteArray*>(decode_values.data());
+
+    max_values = GetInternal(decode_buf, max_values);
+    for (int i = 0; i < max_values; i++) {
+      buffer[i].ptr = decode_buf->ptr + i * descr_->type_length();

Review Comment:
   Hmm, how is this supposed to work? `decode_buf` is an array of `ByteArray` structures, not some string data. `decode_buf->ptr` is equal to `decode_buf.ptr[0]`. Also this assumes that all strings have the expected length...



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1955,5 +1955,293 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) {
   CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());

Review Comment:
   What is false? Please add the parameter name.
   ```suggestion
           MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, /*xxx=*/ false, descr_.get());
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3037,12 +3083,243 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
-class DeltaByteArrayDecoder : public DecoderImpl,
-                              virtual public TypedDecoder<ByteArrayType> {
+/// Delta Byte Array encoding also known as incremental encoding or front compression:
+/// for each element in a sequence of strings, store the prefix length of the previous
+/// entry plus the suffix.
+///
+/// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED),
+/// followed by the suffixes encoded as delta length byte arrays
+/// (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr,
                                  MemoryPool* pool = ::arrow::default_memory_pool())
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool),
+        last_value_(""),
+        kEmpty(ByteArray(0, reinterpret_cast<const uint8_t*>(""))) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+                                                                   this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+ protected:
+  template <typename ArrayType>
+  void PutBinaryArray(const ArrayType& array) {
+    auto previous_len = static_cast<uint32_t>(last_value_.size());
+    std::string_view last_value_view = last_value_;
+
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
+        *array.data(),
+        [&](::std::string_view view) {
+          if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) {
+            return Status::Invalid("Parquet cannot store strings with size 2GB or more");
+          }
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray src{view};
+
+          uint32_t j = 0;
+          const uint32_t common_length = std::min(previous_len, src.len);
+          while (j < common_length) {
+            if (last_value_view[j] != view[j]) {
+              break;
+            }
+            j++;
+          }
+          previous_len = src.len;
+          prefix_length_encoder_.Put({static_cast<int32_t>(j)}, 1);
+
+          last_value_view = view;
+          const auto suffix_length = static_cast<uint32_t>(src.len - j);
+          if (suffix_length == 0) {
+            suffix_encoder_.Put(&kEmpty, 1);
+            return Status::OK();
+          }
+          const uint8_t* suffix_ptr = src.ptr + j;
+          // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+          const ByteArray suffix(suffix_length, suffix_ptr);
+          suffix_encoder_.Put(&suffix, 1);
+
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+    last_value_ = last_value_view;
+  }
+
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  std::string last_value_;
+  const ByteArray kEmpty;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  throw ParquetException("Put not implemented for " + this->descr_->ToString());
+}
+
+template <>
+void DeltaByteArrayEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+  ArrowPoolVector<int32_t> prefix_lengths(num_values,
+                                          ::arrow::stl::allocator<int32_t>(pool_));
+  std::string_view last_value_view = last_value_;
+
+  for (int i = 0; i < num_values; i++) {
+    // Convert to ByteArray, so we can pass to the suffix_encoder_.
+    const ByteArray value = src[i];
+    if (ARROW_PREDICT_FALSE(value.len >= static_cast<int32_t>(kMaxByteArraySize))) {
+      throw ParquetException("Parquet cannot store strings with size 2GB or more");
+    }
+
+    auto view = string_view{reinterpret_cast<const char*>(value.ptr),
+                            static_cast<uint32_t>(value.len)};
+    uint32_t j = 0;
+    const uint32_t common_length =
+        std::min(value.len, static_cast<uint32_t>(last_value_view.length()));
+    while (j < common_length) {
+      if (last_value_view[j] != view[j]) {
+        break;
+      }
+      j++;
+    }
+
+    last_value_view = view;
+    prefix_lengths[i] = j;
+    const auto suffix_length = static_cast<uint32_t>(value.len - j);
+
+    if (suffix_length == 0) {
+      suffix_encoder_.Put(&kEmpty, 1);
+      continue;
+    }
+    const uint8_t* suffix_ptr = value.ptr + j;
+    // Convert to ByteArray, so it can be passed to the suffix_encoder_.
+    const ByteArray suffix(suffix_length, suffix_ptr);
+    suffix_encoder_.Put(&suffix, 1);
+  }
+  prefix_length_encoder_.Put(prefix_lengths.data(), num_values);
+  last_value_ = last_value_view;
+}
+
+template <>
+void DeltaByteArrayEncoder<FLBAType>::Put(const FLBA* src, int num_values) {

Review Comment:
   The duplication here should be quite easily avoided...



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3196,6 +3471,45 @@ class DeltaByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+class DeltaByteArrayDecoder : public DeltaByteArrayDecoderImpl<ByteArrayType> {
+ public:
+  using Base = DeltaByteArrayDecoderImpl<ByteArrayType>;
+  using Base::DeltaByteArrayDecoderImpl;
+
+  int Decode(ByteArray* buffer, int max_values) override {
+    return GetInternal(buffer, max_values);
+  }
+};
+
+class DeltaByteArrayFLBADecoder : public DeltaByteArrayDecoderImpl<FLBAType>,
+                                  virtual public FLBADecoder {
+ public:
+  using Base = DeltaByteArrayDecoderImpl<FLBAType>;
+  using Base::DeltaByteArrayDecoderImpl;
+  using Base::pool_;
+
+  int Decode(ByteArray* buffer, int max_values) {

Review Comment:
   It seems unexpected to decode to `ByteArray*` if the Parquet type is `FLBAType`. Is this necessary?



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1955,5 +1955,293 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) {
   CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType, FLBAType> TestDeltaByteArrayEncodingTypes;

Review Comment:
   Style nit :-)
   ```suggestion
   using TestDeltaByteArrayEncodingTypes = ::testing::Types<ByteArrayType, FLBAType>;
   ```



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1955,5 +1955,293 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) {
   CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType, FLBAType> TestDeltaByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(250, 2));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0));
+
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
+      /*null_probability*/ 0.1));
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowBinaryDirectPut) {
+  const int64_t size = 50;
+  const int32_t min_length = 0;
+  const int32_t max_length = 10;
+  const int32_t num_unique = 10;
+  const double null_probability = 0.25;
+  auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+  auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+
+  auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) {
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    int num_values = static_cast<int>(values->length() - values->null_count());
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<ByteArrayType>::Accumulator acc;
+    if (::arrow::is_string(values->type()->id())) {
+      acc.builder = std::make_unique<::arrow::StringBuilder>();
+    } else {
+      acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+    }
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.builder->Finish(&result));
+    ASSERT_EQ(values->length(), result->length());
+    ASSERT_OK(result->ValidateFull());
+
+    auto upcast_result = CastBinaryTypesHelper(result, values->type());

Review Comment:
   Hmm, I'm not sure what this is and `upcast_result` isn't used below anyway.



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1955,5 +1955,293 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) {
   CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType, FLBAType> TestDeltaByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(250, 2));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0));
+
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
+      /*null_probability*/ 0.1));
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowBinaryDirectPut) {
+  const int64_t size = 50;
+  const int32_t min_length = 0;
+  const int32_t max_length = 10;
+  const int32_t num_unique = 10;
+  const double null_probability = 0.25;
+  auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+  auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+
+  auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) {
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    int num_values = static_cast<int>(values->length() - values->null_count());
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<ByteArrayType>::Accumulator acc;
+    if (::arrow::is_string(values->type()->id())) {
+      acc.builder = std::make_unique<::arrow::StringBuilder>();
+    } else {
+      acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+    }
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.builder->Finish(&result));
+    ASSERT_EQ(values->length(), result->length());
+    ASSERT_OK(result->ValidateFull());
+
+    auto upcast_result = CastBinaryTypesHelper(result, values->type());
+    ::arrow::AssertArraysEqual(*values, *result);
+  };
+
+  ::arrow::random::RandomArrayGenerator rag(42);
+  auto values = rag.String(0, min_length, max_length, null_probability);
+  CheckSeed(values);
+  for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+    rag = ::arrow::random::RandomArrayGenerator(seed);
+
+    values = rag.String(size, min_length, max_length, null_probability);
+    CheckSeed(values);
+
+    values =
+        rag.BinaryWithRepeats(size, num_unique, min_length, max_length, null_probability);
+    CheckSeed(values);
+  }
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowBinaryDirectPutFixedLength) {
+  const int64_t size = 50;
+  const double null_probability = 0.25;
+  ::arrow::random::RandomArrayGenerator rag(0);
+  auto encoder = MakeTypedEncoder<FLBAType>(Encoding::DELTA_BYTE_ARRAY);
+  auto decoder = MakeTypedDecoder<FLBAType>(Encoding::DELTA_BYTE_ARRAY);
+
+  auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) {
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    int num_values = static_cast<int>(values->length() - values->null_count());
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<FLBAType>::Accumulator acc(values->type());
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.Finish(&result));
+    ASSERT_EQ(values->length(), result->length());
+    ASSERT_OK(result->ValidateFull());
+    ::arrow::AssertArraysEqual(*values, *result);
+  };
+
+  for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+    for (auto length : {0, 10, 100, 1000}) {
+      rag = ::arrow::random::RandomArrayGenerator(seed);
+      auto values = rag.FixedSizeBinary(size, length, null_probability);
+      CheckSeed(values);
+    }
+  }
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
+  auto CheckEncode = [](std::shared_ptr<::arrow::Array> values,
+                        std::shared_ptr<::arrow::Array> prefix_lengths,
+                        std::shared_ptr<::arrow::Array> suffix_lengths,
+                        std::string_view value) {
+    auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    auto prefix_lengths_encoder =
+        MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
+    ASSERT_NO_THROW(prefix_lengths_encoder->Put(*prefix_lengths));
+    auto prefix_lengths_buf = prefix_lengths_encoder->FlushValues();
+
+    auto encoded_prefix_lengths_buf = SliceBuffer(buf, 0, prefix_lengths_buf->size());
+
+    auto suffix_lengths_encoder =
+        MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
+    ASSERT_NO_THROW(suffix_lengths_encoder->Put(*suffix_lengths));
+    auto suffix_lengths_buf = suffix_lengths_encoder->FlushValues();
+    auto encoded_values_buf =
+        SliceBuffer(buf, prefix_lengths_buf->size() + suffix_lengths_buf->size());
+
+    auto encoded_prefix_length_buf = SliceBuffer(buf, 0, prefix_lengths_buf->size());
+    EXPECT_TRUE(prefix_lengths_buf->Equals(*encoded_prefix_length_buf));
+    auto encoded_suffix_length_buf =
+        SliceBuffer(buf, prefix_lengths_buf->size(), suffix_lengths_buf->size());
+    EXPECT_TRUE(suffix_lengths_buf->Equals(*encoded_suffix_length_buf));
+    EXPECT_EQ(value, encoded_values_buf->ToString());
+  };
+
+  auto arrayToI32 = [](const std::shared_ptr<::arrow::Array>& lengths) {
+    std::vector<int32_t> arrays;
+    auto data_ptr = checked_cast<::arrow::Int32Array*>(lengths.get());
+    for (int i = 0; i < lengths->length(); ++i) {
+      arrays.push_back(data_ptr->GetView(i));
+    }
+    return arrays;
+  };
+
+  auto CheckDecode = [](std::shared_ptr<Buffer> buf,
+                        std::shared_ptr<::arrow::Array> values) {
+    int num_values = static_cast<int>(values->length());
+    auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<ByteArrayType>::Accumulator acc;
+    if (::arrow::is_string(values->type()->id())) {
+      acc.builder = std::make_unique<::arrow::StringBuilder>();
+    } else {
+      acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+    }
+
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.builder->Finish(&result));
+    ASSERT_EQ(num_values, result->length());
+    ASSERT_OK(result->ValidateFull());
+
+    auto upcast_result = CastBinaryTypesHelper(result, values->type());
+    ::arrow::AssertArraysEqual(*values, *upcast_result);
+  };
+
+  auto CheckEncodeDecode = [&](std::string_view values,
+                               std::shared_ptr<::arrow::Array> prefix_lengths,
+                               std::shared_ptr<::arrow::Array> suffix_lengths,
+                               std::string_view suffix_data) {
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::utf8(), values), prefix_lengths,
+                suffix_lengths, suffix_data);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values), prefix_lengths,
+                suffix_lengths, suffix_data);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), prefix_lengths,
+                suffix_lengths, suffix_data);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values), prefix_lengths,
+                suffix_lengths, suffix_data);

Review Comment:
   I think `CheckEncode` can be simplified if you make it take the `encoded` buffer computed below and just compare it with the actual encoded results.



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1955,5 +1955,293 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) {
   CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType, FLBAType> TestDeltaByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(250, 2));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0));
+
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
+      /*null_probability*/ 0.1));
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowBinaryDirectPut) {
+  const int64_t size = 50;
+  const int32_t min_length = 0;
+  const int32_t max_length = 10;
+  const int32_t num_unique = 10;
+  const double null_probability = 0.25;
+  auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+  auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+
+  auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) {
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    int num_values = static_cast<int>(values->length() - values->null_count());
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<ByteArrayType>::Accumulator acc;
+    if (::arrow::is_string(values->type()->id())) {
+      acc.builder = std::make_unique<::arrow::StringBuilder>();
+    } else {
+      acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+    }
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.builder->Finish(&result));
+    ASSERT_EQ(values->length(), result->length());
+    ASSERT_OK(result->ValidateFull());
+
+    auto upcast_result = CastBinaryTypesHelper(result, values->type());
+    ::arrow::AssertArraysEqual(*values, *result);
+  };
+
+  ::arrow::random::RandomArrayGenerator rag(42);
+  auto values = rag.String(0, min_length, max_length, null_probability);
+  CheckSeed(values);
+  for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+    rag = ::arrow::random::RandomArrayGenerator(seed);
+
+    values = rag.String(size, min_length, max_length, null_probability);
+    CheckSeed(values);
+
+    values =
+        rag.BinaryWithRepeats(size, num_unique, min_length, max_length, null_probability);
+    CheckSeed(values);
+  }
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowBinaryDirectPutFixedLength) {
+  const int64_t size = 50;

Review Comment:
   You probably want to reduce code duplication between this and the previous test.



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1955,5 +1955,293 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) {
   CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
 }
 
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY encode/decode tests.
+
+template <typename Type>
+class TestDeltaByteArrayEncoding : public TestEncodingBase<Type> {
+ public:
+  using c_type = typename Type::c_type;
+  static constexpr int TYPE = Type::type_num;
+
+  void CheckRoundtrip() override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits,
+                            int64_t valid_bits_offset) override {
+    auto encoder =
+        MakeTypedEncoder<Type>(Encoding::DELTA_BYTE_ARRAY, false, descr_.get());
+    auto decoder = MakeTypedDecoder<Type>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+typedef ::testing::Types<ByteArrayType, FLBAType> TestDeltaByteArrayEncodingTypes;
+TYPED_TEST_SUITE(TestDeltaByteArrayEncoding, TestDeltaByteArrayEncodingTypes);
+
+TYPED_TEST(TestDeltaByteArrayEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(250, 2));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 1, /*valid_bits_offset*/ 64, /*null_prob*/ 0));
+
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(
+      /*nvalues*/ 1234, /*repeats*/ 10, /*valid_bits_offset*/ 64,
+      /*null_probability*/ 0.1));
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowBinaryDirectPut) {
+  const int64_t size = 50;
+  const int32_t min_length = 0;
+  const int32_t max_length = 10;
+  const int32_t num_unique = 10;
+  const double null_probability = 0.25;
+  auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+  auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+
+  auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) {
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    int num_values = static_cast<int>(values->length() - values->null_count());
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<ByteArrayType>::Accumulator acc;
+    if (::arrow::is_string(values->type()->id())) {
+      acc.builder = std::make_unique<::arrow::StringBuilder>();
+    } else {
+      acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+    }
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.builder->Finish(&result));
+    ASSERT_EQ(values->length(), result->length());
+    ASSERT_OK(result->ValidateFull());
+
+    auto upcast_result = CastBinaryTypesHelper(result, values->type());
+    ::arrow::AssertArraysEqual(*values, *result);
+  };
+
+  ::arrow::random::RandomArrayGenerator rag(42);
+  auto values = rag.String(0, min_length, max_length, null_probability);
+  CheckSeed(values);
+  for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+    rag = ::arrow::random::RandomArrayGenerator(seed);
+
+    values = rag.String(size, min_length, max_length, null_probability);
+    CheckSeed(values);
+
+    values =
+        rag.BinaryWithRepeats(size, num_unique, min_length, max_length, null_probability);
+    CheckSeed(values);
+  }
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowBinaryDirectPutFixedLength) {
+  const int64_t size = 50;
+  const double null_probability = 0.25;
+  ::arrow::random::RandomArrayGenerator rag(0);
+  auto encoder = MakeTypedEncoder<FLBAType>(Encoding::DELTA_BYTE_ARRAY);
+  auto decoder = MakeTypedDecoder<FLBAType>(Encoding::DELTA_BYTE_ARRAY);
+
+  auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) {
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    int num_values = static_cast<int>(values->length() - values->null_count());
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<FLBAType>::Accumulator acc(values->type());
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.Finish(&result));
+    ASSERT_EQ(values->length(), result->length());
+    ASSERT_OK(result->ValidateFull());
+    ::arrow::AssertArraysEqual(*values, *result);
+  };
+
+  for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
+    for (auto length : {0, 10, 100, 1000}) {
+      rag = ::arrow::random::RandomArrayGenerator(seed);
+      auto values = rag.FixedSizeBinary(size, length, null_probability);
+      CheckSeed(values);
+    }
+  }
+}
+
+TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
+  auto CheckEncode = [](std::shared_ptr<::arrow::Array> values,
+                        std::shared_ptr<::arrow::Array> prefix_lengths,
+                        std::shared_ptr<::arrow::Array> suffix_lengths,
+                        std::string_view value) {
+    auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+    ASSERT_NO_THROW(encoder->Put(*values));
+    auto buf = encoder->FlushValues();
+
+    auto prefix_lengths_encoder =
+        MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
+    ASSERT_NO_THROW(prefix_lengths_encoder->Put(*prefix_lengths));
+    auto prefix_lengths_buf = prefix_lengths_encoder->FlushValues();
+
+    auto encoded_prefix_lengths_buf = SliceBuffer(buf, 0, prefix_lengths_buf->size());
+
+    auto suffix_lengths_encoder =
+        MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
+    ASSERT_NO_THROW(suffix_lengths_encoder->Put(*suffix_lengths));
+    auto suffix_lengths_buf = suffix_lengths_encoder->FlushValues();
+    auto encoded_values_buf =
+        SliceBuffer(buf, prefix_lengths_buf->size() + suffix_lengths_buf->size());
+
+    auto encoded_prefix_length_buf = SliceBuffer(buf, 0, prefix_lengths_buf->size());
+    EXPECT_TRUE(prefix_lengths_buf->Equals(*encoded_prefix_length_buf));
+    auto encoded_suffix_length_buf =
+        SliceBuffer(buf, prefix_lengths_buf->size(), suffix_lengths_buf->size());
+    EXPECT_TRUE(suffix_lengths_buf->Equals(*encoded_suffix_length_buf));
+    EXPECT_EQ(value, encoded_values_buf->ToString());
+  };
+
+  auto arrayToI32 = [](const std::shared_ptr<::arrow::Array>& lengths) {
+    std::vector<int32_t> arrays;
+    auto data_ptr = checked_cast<::arrow::Int32Array*>(lengths.get());
+    for (int i = 0; i < lengths->length(); ++i) {
+      arrays.push_back(data_ptr->GetView(i));
+    }
+    return arrays;
+  };
+
+  auto CheckDecode = [](std::shared_ptr<Buffer> buf,
+                        std::shared_ptr<::arrow::Array> values) {
+    int num_values = static_cast<int>(values->length());
+    auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
+    decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
+
+    typename EncodingTraits<ByteArrayType>::Accumulator acc;
+    if (::arrow::is_string(values->type()->id())) {
+      acc.builder = std::make_unique<::arrow::StringBuilder>();
+    } else {
+      acc.builder = std::make_unique<::arrow::BinaryBuilder>();
+    }
+
+    ASSERT_EQ(num_values,
+              decoder->DecodeArrow(static_cast<int>(values->length()),
+                                   static_cast<int>(values->null_count()),
+                                   values->null_bitmap_data(), values->offset(), &acc));
+
+    std::shared_ptr<::arrow::Array> result;
+    ASSERT_OK(acc.builder->Finish(&result));
+    ASSERT_EQ(num_values, result->length());
+    ASSERT_OK(result->ValidateFull());
+
+    auto upcast_result = CastBinaryTypesHelper(result, values->type());
+    ::arrow::AssertArraysEqual(*values, *upcast_result);
+  };
+
+  auto CheckEncodeDecode = [&](std::string_view values,
+                               std::shared_ptr<::arrow::Array> prefix_lengths,
+                               std::shared_ptr<::arrow::Array> suffix_lengths,
+                               std::string_view suffix_data) {
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::utf8(), values), prefix_lengths,
+                suffix_lengths, suffix_data);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values), prefix_lengths,
+                suffix_lengths, suffix_data);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), prefix_lengths,
+                suffix_lengths, suffix_data);
+    CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values), prefix_lengths,
+                suffix_lengths, suffix_data);
+
+    auto encoded = ::arrow::ConcatenateBuffers({DeltaEncode(arrayToI32(prefix_lengths)),
+                                                DeltaEncode(arrayToI32(suffix_lengths)),
+                                                std::make_shared<Buffer>(suffix_data)})
+                       .ValueOrDie();
+
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::utf8(), values));
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_utf8(), values));
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary(), values));
+    CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
+  };
+
+  {
+    auto values = R"(["axis", "axle", "babble", "babyhood"])";
+    auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 2, 0, 3])");
+    auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 2, 6, 5])");
+
+    constexpr std::string_view suffix_data = "axislebabbleyhood";
+    CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+  }
+
+  {
+    auto values = R"(["axis", "axis", "axis", "axis"])";
+    auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4, 4, 4])");
+    auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 0, 0, 0])");
+
+    constexpr std::string_view suffix_data = "axis";
+    CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+  }
+
+  {
+    auto values = R"(["axisba", "axis", "axis", "axis"])";
+    auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4, 4, 4])");
+    auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 0, 0, 0])");
+
+    constexpr std::string_view suffix_data = "axisba";
+    CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+  }
+
+  {
+    auto values = R"(["baaxis", "axis", "axis", "axis"])";
+    auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 0, 4, 4])");
+    auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 4, 0, 0])");
+
+    constexpr std::string_view suffix_data = "baaxisaxis";
+    CheckEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
+  }
+
+  {
+    auto values = R"(["καλημέρα", "καμηλιέρη", "καμηλιέρη", "καλημέρα"])";
+    auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 5, 18, 5])");
+    auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([16, 13, 0, 11])");
+    const std::string suffix_data = "καλημέρα\xbcηλιέρη\xbbημέρα";

Review Comment:
   Interesting. The `\x` is because a utf-8 character is split between prefix and suffix, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org