You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mapleFU (via GitHub)" <gi...@apache.org> on 2023/03/01 16:21:20 UTC

[GitHub] [arrow] mapleFU commented on a diff in pull request #14341: GH-32863: [C++][Parquet] Add DELTA_BYTE_ARRAY encoder to Parquet writer

mapleFU commented on code in PR #14341:
URL: https://github.com/apache/arrow/pull/14341#discussion_r1121996557


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2900,6 +2900,124 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
+// This is also known as incremental encoding or front compression: for each element in a
+// sequence of strings, store the prefix length of the previous entry plus the suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY,
+                    pool = ::arrow::default_memory_pool()),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override {
+    AssertBaseBinary(values);
+    const auto& data = values.data();

Review Comment:
   Seems this can use the way `DeltaLength` encoder using



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2900,6 +2900,124 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
 // ----------------------------------------------------------------------
 // DELTA_BYTE_ARRAY
 
+// This is also known as incremental encoding or front compression: for each element in a
+// sequence of strings, store the prefix length of the previous entry plus the suffix.
+//
+// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED),
+// followed by the suffixes encoded as delta length byte arrays (DELTA_LENGTH_BYTE_ARRAY).
+
+// ----------------------------------------------------------------------
+// DeltaByteArrayEncoder
+
+template <typename DType>
+class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY,
+                    pool = ::arrow::default_memory_pool()),
+        sink_(pool),
+        prefix_length_encoder_(nullptr, pool),
+        suffix_encoder_(nullptr, pool) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return prefix_length_encoder_.EstimatedDataEncodedSize() +
+           suffix_encoder_.EstimatedDataEncodedSize();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override {
+    AssertBaseBinary(values);
+    const auto& data = values.data();
+    auto src = data->GetValues<T>(1);
+
+    if (values.null_count() == 0) {
+      Put(src, static_cast<int>(values.length()));
+    } else {
+      PutSpaced(src, static_cast<int>(data->length), data->GetValues<uint8_t>(0, 0),
+                data->offset);
+    }
+  }
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+                                                                   this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+  uint32_t total_value_count_{0};
+  ::arrow::BufferBuilder sink_;
+  DeltaBitPackEncoder<Int32Type> prefix_length_encoder_;
+  DeltaLengthByteArrayEncoder<ByteArrayType> suffix_encoder_;
+  string_view last_value_;
+};
+
+template <typename DType>
+void DeltaByteArrayEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+  ArrowPoolVector<int32_t> prefix_lengths(num_values);
+
+  if (ARROW_PREDICT_TRUE(last_value_.empty())) {
+    last_value_ = string_view{reinterpret_cast<const char*>(src[0].ptr), src[0].len};
+    suffix_encoder_.Put(&src[0], 1);
+    prefix_lengths[0] = 0;
+  }
+  total_value_count_ += num_values;
+
+  for (int32_t i = 1; i < num_values; i++) {
+    auto value = string_view{reinterpret_cast<const char*>(src[i].ptr), src[i].len};
+
+    uint j = 0;
+    while (j < std::min(src[i - 1].len, src[i].len)) {
+      if (last_value_[j] != value[j]) {
+        break;
+      }
+      j++;
+    }
+
+    suffix_encoder_.Put(&src[i] + j, 1);

Review Comment:
   `&src[i] + j` lead to a wrong memory address. Seems that we should:
   
   ```
       ByteArray ba;
       ba.len = src[i].len - j;
       ba.ptr = src[i].ptr + j;
       suffix_encoder_.Put(&ba, 1);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org