You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/12/09 16:22:58 UTC
(arrow) branch main updated: GH-39122: [C++][Parquet] Optimize FLBA record reader (#39124)
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 20c975d03f GH-39122: [C++][Parquet] Optimize FLBA record reader (#39124)
20c975d03f is described below
commit 20c975d03f8db85a0a3adea2e384b2291fb56da3
Author: Antoine Pitrou <pi...@free.fr>
AuthorDate: Sat Dec 9 17:22:51 2023 +0100
GH-39122: [C++][Parquet] Optimize FLBA record reader (#39124)
### Rationale for this change
The FLBA implementation of RecordReader is suboptimal:
* it doesn't preallocate the output array
* it reads the decoded validity bitmap one bit at a time and recreates it, one bit at a time
### What changes are included in this PR?
Optimize the FLBA implementation of RecordReader so as to avoid the aforementioned inefficiencies.
I did a quick-and-dirty benchmark on a Parquet file with two columns:
* column 1: uncompressed, PLAIN-encoded, FLBA<3> with no nulls
* column 2: uncompressed, PLAIN-encoded, FLBA<3> with 25% nulls
With git main, the file can be read at 465 MB/s. With this PR, the file can be read at 700 MB/s.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
No.
* Closes: #39122
Lead-authored-by: Antoine Pitrou <an...@python.org>
Co-authored-by: Antoine Pitrou <pi...@free.fr>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/parquet/column_reader.cc | 70 ++++++++++++++++++++++++++++------------
1 file changed, 50 insertions(+), 20 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index ecc48811e4..a49e58afbd 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -30,6 +30,7 @@
#include <vector>
#include "arrow/array.h"
+#include "arrow/array/array_binary.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
#include "arrow/array/builder_primitive.h"
@@ -2040,23 +2041,29 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
LevelInfo leaf_info_;
};
-class FLBARecordReader : public TypedRecordReader<FLBAType>,
- virtual public BinaryRecordReader {
+class FLBARecordReader final : public TypedRecordReader<FLBAType>,
+ virtual public BinaryRecordReader {
public:
FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)
: TypedRecordReader<FLBAType>(descr, leaf_info, pool, read_dense_for_nullable),
- builder_(nullptr) {
+ byte_width_(descr_->type_length()),
+ empty_(byte_width_, 0),
+ type_(::arrow::fixed_size_binary(byte_width_)),
+ null_bitmap_builder_(pool),
+ data_builder_(pool) {
ARROW_DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
- int byte_width = descr_->type_length();
- std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
- builder_ = std::make_unique<::arrow::FixedSizeBinaryBuilder>(type, this->pool_);
}
::arrow::ArrayVector GetBuilderChunks() override {
- std::shared_ptr<::arrow::Array> chunk;
- PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
- return ::arrow::ArrayVector({chunk});
+ const int64_t null_count = null_bitmap_builder_.false_count();
+ const int64_t length = null_bitmap_builder_.length();
+ ARROW_DCHECK_EQ(length * byte_width_, data_builder_.length());
+ PARQUET_ASSIGN_OR_THROW(auto data_buffer, data_builder_.Finish());
+ PARQUET_ASSIGN_OR_THROW(auto null_bitmap, null_bitmap_builder_.Finish());
+ auto chunk = std::make_shared<::arrow::FixedSizeBinaryArray>(
+ type_, length, data_buffer, null_bitmap, null_count);
+ return ::arrow::ArrayVector({std::move(chunk)});
}
void ReadValuesDense(int64_t values_to_read) override {
@@ -2065,9 +2072,9 @@ class FLBARecordReader : public TypedRecordReader<FLBAType>,
this->current_decoder_->Decode(values, static_cast<int>(values_to_read));
CheckNumberDecoded(num_decoded, values_to_read);
- for (int64_t i = 0; i < num_decoded; i++) {
- PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
- }
+ PARQUET_THROW_NOT_OK(null_bitmap_builder_.Reserve(num_decoded));
+ PARQUET_THROW_NOT_OK(data_builder_.Reserve(num_decoded * byte_width_));
+ UnsafeAppendDense(values, num_decoded);
ResetValues();
}
@@ -2081,22 +2088,45 @@ class FLBARecordReader : public TypedRecordReader<FLBAType>,
valid_bits, valid_bits_offset);
ARROW_DCHECK_EQ(num_decoded, values_to_read);
+ PARQUET_THROW_NOT_OK(null_bitmap_builder_.Reserve(num_decoded));
+ PARQUET_THROW_NOT_OK(data_builder_.Reserve(num_decoded * byte_width_));
+ if (null_count == 0) {
+ UnsafeAppendDense(values, num_decoded);
+ } else {
+ UnsafeAppendSpaced(values, num_decoded, valid_bits, valid_bits_offset);
+ }
+ ResetValues();
+ }
+
+ void UnsafeAppendDense(const FLBA* values, int64_t num_decoded) {
+ null_bitmap_builder_.UnsafeAppend(num_decoded, /*value=*/true);
+ for (int64_t i = 0; i < num_decoded; i++) {
+ data_builder_.UnsafeAppend(values[i].ptr, byte_width_);
+ }
+ }
+
+ void UnsafeAppendSpaced(const FLBA* values, int64_t num_decoded,
+ const uint8_t* valid_bits, int64_t valid_bits_offset) {
+ null_bitmap_builder_.UnsafeAppend(valid_bits, valid_bits_offset, num_decoded);
for (int64_t i = 0; i < num_decoded; i++) {
if (::arrow::bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
- PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
+ data_builder_.UnsafeAppend(values[i].ptr, byte_width_);
} else {
- PARQUET_THROW_NOT_OK(builder_->AppendNull());
+ data_builder_.UnsafeAppend(empty_.data(), byte_width_);
}
}
- ResetValues();
}
private:
- std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_;
+ const int byte_width_;
+ const std::vector<uint8_t> empty_;
+ std::shared_ptr<::arrow::DataType> type_;
+ ::arrow::TypedBufferBuilder<bool> null_bitmap_builder_;
+ ::arrow::BufferBuilder data_builder_;
};
-class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
- virtual public BinaryRecordReader {
+class ByteArrayChunkedRecordReader final : public TypedRecordReader<ByteArrayType>,
+ virtual public BinaryRecordReader {
public:
ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)
@@ -2137,8 +2167,8 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
};
-class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
- virtual public DictionaryRecordReader {
+class ByteArrayDictionaryRecordReader final : public TypedRecordReader<ByteArrayType>,
+ virtual public DictionaryRecordReader {
public:
ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool, bool read_dense_for_nullable)