You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/09/03 02:40:55 UTC
[arrow] branch master updated: ARROW-6411: [Python][Parquet]
Improve performance of DictEncoder::PutIndices
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ab908cc ARROW-6411: [Python][Parquet] Improve performance of DictEncoder::PutIndices
ab908cc is described below
commit ab908cc0486d7daf643d1a1418328566f24c403b
Author: Wes McKinney <we...@apache.org>
AuthorDate: Mon Sep 2 21:40:35 2019 -0500
ARROW-6411: [Python][Parquet] Improve performance of DictEncoder::PutIndices
I don't really understand why this is faster, though.
before
```
----------------------------------------------------------------------------------------
Benchmark Time CPU Iterations
----------------------------------------------------------------------------------------
BM_ArrowBinaryDict/EncodeDictDirectInt8/1048576 7334087 ns 7333876 ns 98 136.354M items/s
BM_ArrowBinaryDict/EncodeDictDirectInt16/1048576 7022430 ns 7022412 ns 100 142.401M items/s
BM_ArrowBinaryDict/EncodeDictDirectInt32/1048576 7061033 ns 7060870 ns 99 141.626M items/s
BM_ArrowBinaryDict/EncodeDictDirectInt64/1048576 7084581 ns 7084398 ns 97 141.155M items/s
```
after
```
----------------------------------------------------------------------------------------
Benchmark Time CPU Iterations
----------------------------------------------------------------------------------------
BM_ArrowBinaryDict/EncodeDictDirectInt8/1048576 4387151 ns 4387175 ns 156 227.937M items/s
BM_ArrowBinaryDict/EncodeDictDirectInt16/1048576 4446167 ns 4446074 ns 159 224.918M items/s
BM_ArrowBinaryDict/EncodeDictDirectInt32/1048576 4501028 ns 4500934 ns 156 222.176M items/s
BM_ArrowBinaryDict/EncodeDictDirectInt64/1048576 4635792 ns 4635728 ns 150 215.716M items/s
```
On an i9-9960X CPU before these changes perf reported that `__memmove_avx_unaligned_erms` was taking up a lot of time. In principle `std::vector::reserve` should be correct since memory is not initialized, but something weird seems to be going wrong. If anyone has any ideas I'm interested to learn more. In any case I'll stick with the empirical benchmark evidence on this
I started to refactor to use `TypedBufferBuilder<int32_t>` but I'm not sure about the performance of that for scalar appends vs. `std::vector` so I'll leave that for future experimentation.
Closes #5248 from wesm/ARROW-6411 and squashes the following commits:
b1159ec8a <Wes McKinney> Add C++ benchmarks for DictEncoder<T>::PutIndices
da8cc9d79 <Wes McKinney> Add C++ benchmarks
5a73bf509 <Wes McKinney> Add Python benchmark
Authored-by: Wes McKinney <we...@apache.org>
Signed-off-by: Wes McKinney <we...@apache.org>
---
cpp/src/parquet/encoding.cc | 16 ++++++------
cpp/src/parquet/encoding_benchmark.cc | 37 ++++++++++++++++++++++++++++
python/benchmarks/parquet.py | 46 ++++++++++++++++++++++++++++++++++-
3 files changed, 91 insertions(+), 8 deletions(-)
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index ef1dd34..e63d69f 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -361,7 +361,8 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
--buffer_len;
arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
- for (int index : buffered_indices_) {
+
+ for (int32_t index : buffered_indices_) {
if (!encoder.Put(index)) return -1;
}
encoder.Flush();
@@ -425,21 +426,22 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType;
const auto& indices = checked_cast<const ArrayType&>(data);
auto values = indices.raw_values();
- buffered_indices_.reserve(
- buffered_indices_.size() +
- static_cast<size_t>(indices.length() - indices.null_count()));
+
+ size_t buffer_position = buffered_indices_.size();
+ buffered_indices_.resize(
+ buffer_position + static_cast<size_t>(indices.length() - indices.null_count()));
if (indices.null_count() > 0) {
arrow::internal::BitmapReader valid_bits_reader(indices.null_bitmap_data(),
indices.offset(), indices.length());
for (int64_t i = 0; i < indices.length(); ++i) {
if (valid_bits_reader.IsSet()) {
- buffered_indices_.push_back(static_cast<int32_t>(values[i]));
+ buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
}
valid_bits_reader.Next();
}
} else {
for (int64_t i = 0; i < indices.length(); ++i) {
- buffered_indices_.push_back(static_cast<int32_t>(values[i]));
+ buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
}
}
}
@@ -480,7 +482,7 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
void ClearIndices() { buffered_indices_.clear(); }
/// Indices that have not yet be written out by WriteIndices().
- std::vector<int> buffered_indices_;
+ std::vector<int32_t> buffered_indices_;
/// The number of bytes needed to encode the dictionary.
int dict_encoded_size_;
diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc
index 2cbe8b3..4424c3f 100644
--- a/cpp/src/parquet/encoding_benchmark.cc
+++ b/cpp/src/parquet/encoding_benchmark.cc
@@ -441,6 +441,27 @@ class BM_ArrowBinaryDict : public BenchmarkDecodeArrow {
num_dict_entries_ = dict_encoder->num_entries();
}
+ template <typename IndexType>
+ void EncodeDictBenchmark(benchmark::State& state) {
+ constexpr int64_t nunique = 100;
+ constexpr int64_t min_length = 32;
+ constexpr int64_t max_length = 32;
+ ::arrow::random::RandomArrayGenerator rag(0);
+ auto dict = rag.String(nunique, min_length, max_length,
+ /*null_probability=*/0);
+ auto indices = rag.Numeric<IndexType, int32_t>(num_values_, 0, nunique - 1);
+
+ auto PutValues = [&](ByteArrayEncoder* encoder) {
+ auto dict_encoder = dynamic_cast<DictEncoder<ByteArrayType>*>(encoder);
+ dict_encoder->PutDictionary(*dict);
+ dict_encoder->PutIndices(*indices);
+ };
+ for (auto _ : state) {
+ DoEncode(std::move(PutValues));
+ }
+ state.SetItemsProcessed(state.iterations() * num_values_);
+ }
+
void DoEncodeArrow() override {
auto PutValues = [&](ByteArrayEncoder* encoder) {
ASSERT_NO_THROW(encoder->Put(*input_array_));
@@ -483,6 +504,22 @@ BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeArrow)
(benchmark::State& state) { EncodeArrowBenchmark(state); }
BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeArrow)->Range(1 << 18, 1 << 20);
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeDictDirectInt8)
+(benchmark::State& state) { EncodeDictBenchmark<::arrow::Int8Type>(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeDictDirectInt8)->Range(1 << 20, 1 << 20);
+
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeDictDirectInt16)
+(benchmark::State& state) { EncodeDictBenchmark<::arrow::Int16Type>(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeDictDirectInt16)->Range(1 << 20, 1 << 20);
+
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeDictDirectInt32)
+(benchmark::State& state) { EncodeDictBenchmark<::arrow::Int32Type>(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeDictDirectInt32)->Range(1 << 20, 1 << 20);
+
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeDictDirectInt64)
+(benchmark::State& state) { EncodeDictBenchmark<::arrow::Int64Type>(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeDictDirectInt64)->Range(1 << 20, 1 << 20);
+
BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeLowLevel)
(benchmark::State& state) { EncodeLowLevelBenchmark(state); }
BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeLowLevel)->Range(1 << 18, 1 << 20);
diff --git a/python/benchmarks/parquet.py b/python/benchmarks/parquet.py
index b591c81..d31e235 100644
--- a/python/benchmarks/parquet.py
+++ b/python/benchmarks/parquet.py
@@ -18,6 +18,7 @@
import shutil
import tempfile
+from pandas.util.testing import rands
import numpy as np
import pandas as pd
@@ -65,7 +66,7 @@ class ParquetWriteBinary(object):
length = 1000000
num_cols = 10
- unique_values = np.array([pd.util.testing.rands(value_size) for
+ unique_values = np.array([rands(value_size) for
i in range(nuniques)], dtype='O')
values = unique_values[np.random.randint(0, nuniques, size=length)]
self.table = pa.table([pa.array(values) for i in range(num_cols)],
@@ -87,3 +88,46 @@ class ParquetWriteBinary(object):
def time_convert_pandas_and_write_binary_table(self):
out = pa.BufferOutputStream()
pq.write_table(pa.table(self.table_df), out)
+
+
+def generate_dict_strings(string_size, nunique, length, random_order=True):
+ uniques = np.array([rands(string_size) for i in range(nunique)], dtype='O')
+ if random_order:
+ indices = np.random.randint(0, nunique, size=length).astype('i4')
+ else:
+ indices = np.arange(nunique).astype('i4').repeat(length // nunique)
+ return pa.DictionaryArray.from_arrays(indices, uniques)
+
+
+def generate_dict_table(num_cols, string_size, nunique, length,
+ random_order=True):
+ data = generate_dict_strings(string_size, nunique, length,
+ random_order=random_order)
+ return pa.table([
+ data for i in range(num_cols)
+ ], names=['f{}'.format(i) for i in range(num_cols)])
+
+
+class ParquetWriteDictionaries(object):
+
+ param_names = ('nunique',)
+ params = [(1000), (100000)]
+
+ def setup(self, nunique):
+ self.num_cols = 10
+ self.value_size = 32
+ self.nunique = nunique
+ self.length = 10000000
+
+ self.table = generate_dict_table(self.num_cols, self.value_size,
+ self.nunique, self.length)
+ self.table_sequential = generate_dict_table(self.num_cols,
+ self.value_size,
+ self.nunique, self.length,
+ random_order=False)
+
+ def time_write_random_order(self, nunique):
+ pq.write_table(self.table, pa.BufferOutputStream())
+
+ def time_write_sequential(self, nunique):
+ pq.write_table(self.table_sequential, pa.BufferOutputStream())