You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/10 18:48:09 UTC

[GitHub] [arrow] zeroshade commented on a diff in pull request #14191: ARROW-17798: [C++][Parquet] Add DELTA_BINARY_PACKED encoder to Parquet writer

zeroshade commented on code in PR #14191:
URL: https://github.com/apache/arrow/pull/14191#discussion_r991543665


##########
cpp/src/parquet/encoding.cc:
##########
@@ -2062,6 +2064,200 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
   }
 };
 
+// ----------------------------------------------------------------------
+// DeltaBitPackEncoder
+
+template <typename DType>
+class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool),
+        values_per_block_(128),
+        mini_blocks_per_block_(4),
+        values_per_mini_block_(values_per_block_ / mini_blocks_per_block_),
+        values_current_block_(0),
+        total_value_count_(0),
+        first_value_(0),
+        current_value_(0),
+        sink_(pool),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity * 2)),
+        bit_writer_(bits_buffer_->mutable_data(), static_cast<int>(bits_buffer_->size())),
+        deltas_(std::vector<T>(values_per_block_)) {}
+
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  int64_t EstimatedDataEncodedSize() override {
+    return 4 * sizeof(uint64_t) + sink_.length();
+  }
+
+  using TypedEncoder<DType>::Put;
+
+  void Put(const ::arrow::Array& values) override;
+
+  void Put(const T* buffer, int num_values) override;
+
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override;
+
+ protected:
+  const uint32_t values_per_block_;
+  const uint32_t mini_blocks_per_block_;
+  const uint32_t values_per_mini_block_;
+  uint32_t values_current_block_;
+  uint32_t total_value_count_;
+  int64_t first_value_;
+  T current_value_;
+  ::arrow::BufferBuilder sink_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
+  ::arrow::bit_util::BitWriter bit_writer_;
+  std::vector<T> deltas_;
+
+ private:
+  void FlushBlock();
+};
+
+template <typename DType>
+void DeltaBitPackEncoder<DType>::Put(const T* src, int num_values) {
+  if (num_values == 0) {
+    return;
+  }
+
+  int32_t idx = 0;
+  if (total_value_count_ == 0) {
+    first_value_ = src[0];
+    current_value_ = src[0];
+    idx = 1;
+  }
+  total_value_count_ += num_values;
+  PARQUET_THROW_NOT_OK(sink_.Resize(total_value_count_ * sizeof(T), false));
+
+  while (idx < num_values) {
+    T value = src[idx];
+    deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_);
+    current_value_ = value;
+    idx++;
+    values_current_block_++;
+    if (values_current_block_ == values_per_block_) {
+      FlushBlock();
+    }
+  }
+
+  if (values_current_block_ != 0) {
+    FlushBlock();
+  }

Review Comment:
   Wouldn't this potentially be an early flush if `Put` is called multiple times? You'd end up flushing smaller blocks resulting in inefficient encoding. The better solution would be to check `values_current_block_` and flush at the top of `FlushValues` so that you avoid short blocks where possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org