You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "wgtmac (via GitHub)" <gi...@apache.org> on 2023/03/12 14:57:16 UTC

[GitHub] [arrow] wgtmac commented on a diff in pull request #34526: GH-15107: [C++][Parquet] Parquet Encoder: Support RLE for Boolean

wgtmac commented on code in PR #34526:
URL: https://github.com/apache/arrow/pull/34526#discussion_r1133111129


##########
cpp/src/parquet/encoding.cc:
##########
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstdlib>
+#include <iostream>

Review Comment:
   Remove it?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2838,6 +2839,120 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+// ----------------------------------------------------------------------
+// RLE_BOOLEAN_ENCODER
+
+class RleBooleanEncoder final : public EncoderImpl, virtual public BooleanEncoder {
+ public:
+  explicit RleBooleanEncoder(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::RLE, pool) {}
+
+  int64_t EstimatedDataEncodedSize() override;
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  void Put(const T* buffer, int num_values) override;
+  void Put(const ::arrow::Array& values) override {
+    if (values.type_id() != ::arrow::Type::BOOL) {
+      throw ParquetException("RleBooleanEncoder Expected BoolTArray, got ",

Review Comment:
   ```suggestion
         throw ParquetException("RleBooleanEncoder expects BooleanArray, got ",
   ```



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2838,6 +2839,120 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+// ----------------------------------------------------------------------
+// RLE_BOOLEAN_ENCODER
+
+class RleBooleanEncoder final : public EncoderImpl, virtual public BooleanEncoder {
+ public:
+  explicit RleBooleanEncoder(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::RLE, pool) {}
+
+  int64_t EstimatedDataEncodedSize() override;
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  void Put(const T* buffer, int num_values) override;

Review Comment:
   Add one blank line below?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2838,6 +2839,120 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+// ----------------------------------------------------------------------
+// RLE_BOOLEAN_ENCODER
+
+class RleBooleanEncoder final : public EncoderImpl, virtual public BooleanEncoder {
+ public:
+  explicit RleBooleanEncoder(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::RLE, pool) {}
+
+  int64_t EstimatedDataEncodedSize() override;
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  void Put(const T* buffer, int num_values) override;
+  void Put(const ::arrow::Array& values) override {
+    if (values.type_id() != ::arrow::Type::BOOL) {
+      throw ParquetException("RleBooleanEncoder Expected BoolTArray, got ",
+                             values.type()->ToString());
+    }
+    const auto& boolean_array = checked_cast<const ::arrow::BooleanArray&>(values);
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::BooleanType>(
+        *boolean_array.data(),
+        [&](bool value) {
+          buffered_append_values_.push_back(value);
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+  }
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+                                                                   this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+  void Put(const std::vector<bool>& src, int num_values) override;
+
+ protected:
+  template <typename ArrowType>
+  void PutImpl(const ::arrow::Array& values) {
+    if (values.type_id() != ::arrow::Type::BOOL) {
+      throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() +
+                             " from " + values.type()->ToString() + " not supported");
+    }
+    const auto& data = *values.data();
+    PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
+              static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+
+  template <typename SequenceType>
+  void PutImpl(const SequenceType& src, int num_values);
+
+  int MaxRleBufferSize() const noexcept {
+    // TODO(mwish): Encapsulate these rules.
+    return 1 +
+           ::arrow::util::RleEncoder::MaxBufferSize(
+               kBitWidth,
+               static_cast<int>(static_cast<int>(buffered_append_values_.size()))) +
+           ::arrow::util::RleEncoder::MinBufferSize(kBitWidth);
+  }
+
+  constexpr static int32_t kBitWidth = 1;
+  /// 4 bytes in little-endian, which indicates the length.
+  constexpr static int32_t kRleLengthInBytes = 4;
+
+  // std::vector<bool> in C++ is tricky, because it's a bitmap.
+  // Here RleBooleanEncoder will only append values into it, and
+  // dump values into Buffer, so using it here is ok.
+  std::vector<bool> buffered_append_values_;
+};
+
+void RleBooleanEncoder::Put(const bool* src, int num_values) { PutImpl(src, num_values); }

Review Comment:
   Should we support a `const uint8_t*` variant? The decoder has that support.



##########
cpp/src/parquet/encoding_test.cc:
##########
@@ -1576,6 +1576,60 @@ TYPED_TEST(TestDeltaBitPackEncoding, NonZeroPaddedMiniblockBitWidth) {
   }
 }
 
+// ----------------------------------------------------------------------
+// Rle for Boolean encode/decode tests.
+
+class TestRleBooleanEncoding : public TestEncodingBase<BooleanType> {
+ public:
+  using c_type = bool;
+  static constexpr int TYPE = Type::BOOLEAN;
+
+  virtual void CheckRoundtrip() {
+    auto encoder = MakeTypedEncoder<BooleanType>(Encoding::RLE,
+                                                 /*use_dictionary=*/false, descr_.get());
+    auto decoder = MakeTypedDecoder<BooleanType>(Encoding::RLE, descr_.get());
+
+    encoder->Put(draws_, num_values_);
+    encode_buffer_ = encoder->FlushValues();
+
+    decoder->SetData(num_values_, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    int values_decoded = decoder->Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResults<c_type>(decode_buf_, draws_, num_values_));
+  }
+
+  void CheckRoundtripSpaced(const uint8_t* valid_bits, int64_t valid_bits_offset) {
+    auto encoder = MakeTypedEncoder<BooleanType>(Encoding::RLE,
+                                                 /*use_dictionary=*/false, descr_.get());
+    auto decoder = MakeTypedDecoder<BooleanType>(Encoding::RLE, descr_.get());
+    int null_count = 0;
+    for (auto i = 0; i < num_values_; i++) {
+      if (!bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
+        null_count++;
+      }
+    }
+
+    encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset);
+    encode_buffer_ = encoder->FlushValues();
+    decoder->SetData(num_values_ - null_count, encode_buffer_->data(),
+                     static_cast<int>(encode_buffer_->size()));
+    auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count,
+                                                valid_bits, valid_bits_offset);
+    ASSERT_EQ(num_values_, values_decoded);
+    ASSERT_NO_FATAL_FAILURE(VerifyResultsSpaced<c_type>(decode_buf_, draws_, num_values_,
+                                                        valid_bits, valid_bits_offset));
+  }
+};
+
+TEST_F(TestRleBooleanEncoding, BasicRoundTrip) {
+  ASSERT_NO_FATAL_FAILURE(this->Execute(0, 0));
+  ASSERT_NO_FATAL_FAILURE(this->Execute(2000, 200));
+  ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(

Review Comment:
   Add a case to cover arrow array as input?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2838,6 +2839,120 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+// ----------------------------------------------------------------------
+// RLE_BOOLEAN_ENCODER
+
+class RleBooleanEncoder final : public EncoderImpl, virtual public BooleanEncoder {
+ public:
+  explicit RleBooleanEncoder(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::RLE, pool) {}
+
+  int64_t EstimatedDataEncodedSize() override;
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  void Put(const T* buffer, int num_values) override;
+  void Put(const ::arrow::Array& values) override {
+    if (values.type_id() != ::arrow::Type::BOOL) {
+      throw ParquetException("RleBooleanEncoder Expected BoolTArray, got ",
+                             values.type()->ToString());
+    }
+    const auto& boolean_array = checked_cast<const ::arrow::BooleanArray&>(values);
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::BooleanType>(
+        *boolean_array.data(),
+        [&](bool value) {
+          buffered_append_values_.push_back(value);
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+  }
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+                                                                   this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+  void Put(const std::vector<bool>& src, int num_values) override;
+
+ protected:
+  template <typename ArrowType>
+  void PutImpl(const ::arrow::Array& values) {
+    if (values.type_id() != ::arrow::Type::BOOL) {
+      throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() +
+                             " from " + values.type()->ToString() + " not supported");
+    }
+    const auto& data = *values.data();
+    PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
+              static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), data.offset);
+  }
+
+  template <typename SequenceType>
+  void PutImpl(const SequenceType& src, int num_values);
+
+  int MaxRleBufferSize() const noexcept {
+    // TODO(mwish): Encapsulate these rules.
+    return 1 +
+           ::arrow::util::RleEncoder::MaxBufferSize(
+               kBitWidth,
+               static_cast<int>(static_cast<int>(buffered_append_values_.size()))) +
+           ::arrow::util::RleEncoder::MinBufferSize(kBitWidth);
+  }
+
+  constexpr static int32_t kBitWidth = 1;
+  /// 4 bytes in little-endian, which indicates the length.
+  constexpr static int32_t kRleLengthInBytes = 4;
+
+  // std::vector<bool> in C++ is tricky, because it's a bitmap.
+  // Here RleBooleanEncoder will only append values into it, and
+  // dump values into Buffer, so using it here is ok.
+  std::vector<bool> buffered_append_values_;
+};
+
+void RleBooleanEncoder::Put(const bool* src, int num_values) { PutImpl(src, num_values); }
+
+void RleBooleanEncoder::Put(const std::vector<bool>& src, int num_values) {
+  PutImpl(src, num_values);
+}
+
+template <typename SequenceType>
+void RleBooleanEncoder::PutImpl(const SequenceType& src, int num_values) {
+  for (int i = 0; i < num_values; ++i) {
+    // TODO(mwish): using iterator to batch insert?
+    buffered_append_values_.push_back(src[i]);
+  }
+}
+
+int64_t RleBooleanEncoder::EstimatedDataEncodedSize() {
+  return kRleLengthInBytes + MaxRleBufferSize();

Review Comment:
   This may have a risk of over estimation. Could we encode it in a streaming way?



##########
cpp/src/parquet/encoding.cc:
##########
@@ -3265,6 +3380,13 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encodin
       default:
         throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY");
     }
+  } else if (encoding == Encoding::RLE) {
+    switch (type_num) {
+      case Type::BOOLEAN:
+        return std::make_unique<RleBooleanEncoder>(descr, pool);
+      default:
+        throw ParquetException("RLE only supports BOOL within data page");

Review Comment:
   This is not correct. It also supports dictionary indices in the data page. But I think `RLE only supports BOOL` is enough here.



##########
cpp/src/parquet/encoding.cc:
##########
@@ -2838,6 +2839,120 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
 
+// ----------------------------------------------------------------------
+// RLE_BOOLEAN_ENCODER
+
+class RleBooleanEncoder final : public EncoderImpl, virtual public BooleanEncoder {
+ public:
+  explicit RleBooleanEncoder(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::RLE, pool) {}
+
+  int64_t EstimatedDataEncodedSize() override;
+  std::shared_ptr<Buffer> FlushValues() override;
+
+  void Put(const T* buffer, int num_values) override;
+  void Put(const ::arrow::Array& values) override {
+    if (values.type_id() != ::arrow::Type::BOOL) {
+      throw ParquetException("RleBooleanEncoder Expected BoolTArray, got ",
+                             values.type()->ToString());
+    }
+    const auto& boolean_array = checked_cast<const ::arrow::BooleanArray&>(values);
+    PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<::arrow::BooleanType>(
+        *boolean_array.data(),
+        [&](bool value) {
+          buffered_append_values_.push_back(value);
+          return Status::OK();
+        },
+        []() { return Status::OK(); }));
+  }
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+                 int64_t valid_bits_offset) override {
+    if (valid_bits != NULLPTR) {
+      PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+                                                                   this->memory_pool()));
+      T* data = reinterpret_cast<T*>(buffer->mutable_data());
+      int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+          src, num_values, valid_bits, valid_bits_offset, data);
+      Put(data, num_valid_values);
+    } else {
+      Put(src, num_values);
+    }
+  }
+
+  void Put(const std::vector<bool>& src, int num_values) override;
+
+ protected:
+  template <typename ArrowType>
+  void PutImpl(const ::arrow::Array& values) {
+    if (values.type_id() != ::arrow::Type::BOOL) {
+      throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() +
+                             " from " + values.type()->ToString() + " not supported");
+    }
+    const auto& data = *values.data();
+    PutSpaced(data.GetValues<typename ArrowType::c_type>(1),

Review Comment:
   Should we check `values.null_count()` and go to a fast path if there is no null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org