You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/12/09 16:22:58 UTC

(arrow) branch main updated: GH-39122: [C++][Parquet] Optimize FLBA record reader (#39124)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 20c975d03f GH-39122: [C++][Parquet] Optimize FLBA record reader (#39124)
20c975d03f is described below

commit 20c975d03f8db85a0a3adea2e384b2291fb56da3
Author: Antoine Pitrou <pi...@free.fr>
AuthorDate: Sat Dec 9 17:22:51 2023 +0100

    GH-39122: [C++][Parquet] Optimize FLBA record reader (#39124)
    
    ### Rationale for this change
    
    The FLBA implementation of RecordReader is suboptimal:
    * it doesn't preallocate the output array
    * it reads the decoded validity bitmap one bit at a time and recreates it, one bit at a time
    
    ### What changes are included in this PR?
    
    Optimize the FLBA implementation of RecordReader so as to avoid the aforementioned inefficiencies.
    I did a quick-and-dirty benchmark on a Parquet file with two columns:
    * column 1: uncompressed, PLAIN-encoded, FLBA<3> with no nulls
    * column 2: uncompressed, PLAIN-encoded, FLBA<3> with 25% nulls
    
    With git main, the file can be read at 465 MB/s. With this PR, the file can be read at 700 MB/s.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    No.
    * Closes: #39122
    
    Lead-authored-by: Antoine Pitrou <an...@python.org>
    Co-authored-by: Antoine Pitrou <pi...@free.fr>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/parquet/column_reader.cc | 70 ++++++++++++++++++++++++++++------------
 1 file changed, 50 insertions(+), 20 deletions(-)

diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index ecc48811e4..a49e58afbd 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -30,6 +30,7 @@
 #include <vector>
 
 #include "arrow/array.h"
+#include "arrow/array/array_binary.h"
 #include "arrow/array/builder_binary.h"
 #include "arrow/array/builder_dict.h"
 #include "arrow/array/builder_primitive.h"
@@ -2040,23 +2041,29 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
   LevelInfo leaf_info_;
 };
 
-class FLBARecordReader : public TypedRecordReader<FLBAType>,
-                         virtual public BinaryRecordReader {
+class FLBARecordReader final : public TypedRecordReader<FLBAType>,
+                               virtual public BinaryRecordReader {
  public:
   FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
                    ::arrow::MemoryPool* pool, bool read_dense_for_nullable)
       : TypedRecordReader<FLBAType>(descr, leaf_info, pool, read_dense_for_nullable),
-        builder_(nullptr) {
+        byte_width_(descr_->type_length()),
+        empty_(byte_width_, 0),
+        type_(::arrow::fixed_size_binary(byte_width_)),
+        null_bitmap_builder_(pool),
+        data_builder_(pool) {
     ARROW_DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
-    int byte_width = descr_->type_length();
-    std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
-    builder_ = std::make_unique<::arrow::FixedSizeBinaryBuilder>(type, this->pool_);
   }
 
   ::arrow::ArrayVector GetBuilderChunks() override {
-    std::shared_ptr<::arrow::Array> chunk;
-    PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
-    return ::arrow::ArrayVector({chunk});
+    const int64_t null_count = null_bitmap_builder_.false_count();
+    const int64_t length = null_bitmap_builder_.length();
+    ARROW_DCHECK_EQ(length * byte_width_, data_builder_.length());
+    PARQUET_ASSIGN_OR_THROW(auto data_buffer, data_builder_.Finish());
+    PARQUET_ASSIGN_OR_THROW(auto null_bitmap, null_bitmap_builder_.Finish());
+    auto chunk = std::make_shared<::arrow::FixedSizeBinaryArray>(
+        type_, length, data_buffer, null_bitmap, null_count);
+    return ::arrow::ArrayVector({std::move(chunk)});
   }
 
   void ReadValuesDense(int64_t values_to_read) override {
@@ -2065,9 +2072,9 @@ class FLBARecordReader : public TypedRecordReader<FLBAType>,
         this->current_decoder_->Decode(values, static_cast<int>(values_to_read));
     CheckNumberDecoded(num_decoded, values_to_read);
 
-    for (int64_t i = 0; i < num_decoded; i++) {
-      PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
-    }
+    PARQUET_THROW_NOT_OK(null_bitmap_builder_.Reserve(num_decoded));
+    PARQUET_THROW_NOT_OK(data_builder_.Reserve(num_decoded * byte_width_));
+    UnsafeAppendDense(values, num_decoded);
     ResetValues();
   }
 
@@ -2081,22 +2088,45 @@ class FLBARecordReader : public TypedRecordReader<FLBAType>,
         valid_bits, valid_bits_offset);
     ARROW_DCHECK_EQ(num_decoded, values_to_read);
 
+    PARQUET_THROW_NOT_OK(null_bitmap_builder_.Reserve(num_decoded));
+    PARQUET_THROW_NOT_OK(data_builder_.Reserve(num_decoded * byte_width_));
+    if (null_count == 0) {
+      UnsafeAppendDense(values, num_decoded);
+    } else {
+      UnsafeAppendSpaced(values, num_decoded, valid_bits, valid_bits_offset);
+    }
+    ResetValues();
+  }
+
+  void UnsafeAppendDense(const FLBA* values, int64_t num_decoded) {
+    null_bitmap_builder_.UnsafeAppend(num_decoded, /*value=*/true);
+    for (int64_t i = 0; i < num_decoded; i++) {
+      data_builder_.UnsafeAppend(values[i].ptr, byte_width_);
+    }
+  }
+
+  void UnsafeAppendSpaced(const FLBA* values, int64_t num_decoded,
+                          const uint8_t* valid_bits, int64_t valid_bits_offset) {
+    null_bitmap_builder_.UnsafeAppend(valid_bits, valid_bits_offset, num_decoded);
     for (int64_t i = 0; i < num_decoded; i++) {
       if (::arrow::bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
-        PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
+        data_builder_.UnsafeAppend(values[i].ptr, byte_width_);
       } else {
-        PARQUET_THROW_NOT_OK(builder_->AppendNull());
+        data_builder_.UnsafeAppend(empty_.data(), byte_width_);
       }
     }
-    ResetValues();
   }
 
  private:
-  std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_;
+  const int byte_width_;
+  const std::vector<uint8_t> empty_;
+  std::shared_ptr<::arrow::DataType> type_;
+  ::arrow::TypedBufferBuilder<bool> null_bitmap_builder_;
+  ::arrow::BufferBuilder data_builder_;
 };
 
-class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
-                                     virtual public BinaryRecordReader {
+class ByteArrayChunkedRecordReader final : public TypedRecordReader<ByteArrayType>,
+                                           virtual public BinaryRecordReader {
  public:
   ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
                                ::arrow::MemoryPool* pool, bool read_dense_for_nullable)
@@ -2137,8 +2167,8 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
   typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
 };
 
-class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
-                                        virtual public DictionaryRecordReader {
+class ByteArrayDictionaryRecordReader final : public TypedRecordReader<ByteArrayType>,
+                                              virtual public DictionaryRecordReader {
  public:
   ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
                                   ::arrow::MemoryPool* pool, bool read_dense_for_nullable)