You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/09/03 02:40:55 UTC

[arrow] branch master updated: ARROW-6411: [Python][Parquet] Improve performance of DictEncoder::PutIndices

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ab908cc  ARROW-6411: [Python][Parquet] Improve performance of DictEncoder::PutIndices
ab908cc is described below

commit ab908cc0486d7daf643d1a1418328566f24c403b
Author: Wes McKinney <we...@apache.org>
AuthorDate: Mon Sep 2 21:40:35 2019 -0500

    ARROW-6411: [Python][Parquet] Improve performance of DictEncoder::PutIndices
    
    I don't really understand why this is faster, though.
    
    before
    
    ```
    ----------------------------------------------------------------------------------------
    Benchmark                                                 Time           CPU Iterations
    ----------------------------------------------------------------------------------------
    BM_ArrowBinaryDict/EncodeDictDirectInt8/1048576     7334087 ns    7333876 ns         98   136.354M items/s
    BM_ArrowBinaryDict/EncodeDictDirectInt16/1048576    7022430 ns    7022412 ns        100   142.401M items/s
    BM_ArrowBinaryDict/EncodeDictDirectInt32/1048576    7061033 ns    7060870 ns         99   141.626M items/s
    BM_ArrowBinaryDict/EncodeDictDirectInt64/1048576    7084581 ns    7084398 ns         97   141.155M items/s
    ```
    
    after
    
    ```
    ----------------------------------------------------------------------------------------
    Benchmark                                                 Time           CPU Iterations
    ----------------------------------------------------------------------------------------
    BM_ArrowBinaryDict/EncodeDictDirectInt8/1048576     4387151 ns    4387175 ns        156   227.937M items/s
    BM_ArrowBinaryDict/EncodeDictDirectInt16/1048576    4446167 ns    4446074 ns        159   224.918M items/s
    BM_ArrowBinaryDict/EncodeDictDirectInt32/1048576    4501028 ns    4500934 ns        156   222.176M items/s
    BM_ArrowBinaryDict/EncodeDictDirectInt64/1048576    4635792 ns    4635728 ns        150   215.716M items/s
    ```
    
    On an i9-9960X CPU before these changes perf reported that `__memmove_avx_unaligned_erms` was taking up a lot of time. In principle `std::vector::reserve` should be correct since memory is not initialized, but something weird seems to be going wrong. If anyone has any ideas I'm interested to learn more. In any case I'll stick with the empirical benchmark evidence on this
    
    I started to refactor to use `TypedBufferBuilder<int32_t>` but I'm not sure about the performance of that for scalar appends vs. `std::vector` so I'll leave that for future experimentation.
    
    Closes #5248 from wesm/ARROW-6411 and squashes the following commits:
    
    b1159ec8a <Wes McKinney> Add C++ benchmarks for DictEncoder<T>::PutIndices
    da8cc9d79 <Wes McKinney> Add C++ benchmarks
    5a73bf509 <Wes McKinney> Add Python benchmark
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/parquet/encoding.cc           | 16 ++++++------
 cpp/src/parquet/encoding_benchmark.cc | 37 ++++++++++++++++++++++++++++
 python/benchmarks/parquet.py          | 46 ++++++++++++++++++++++++++++++++++-
 3 files changed, 91 insertions(+), 8 deletions(-)

diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index ef1dd34..e63d69f 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -361,7 +361,8 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
     --buffer_len;
 
     arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
-    for (int index : buffered_indices_) {
+
+    for (int32_t index : buffered_indices_) {
       if (!encoder.Put(index)) return -1;
     }
     encoder.Flush();
@@ -425,21 +426,22 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
     using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType;
     const auto& indices = checked_cast<const ArrayType&>(data);
     auto values = indices.raw_values();
-    buffered_indices_.reserve(
-        buffered_indices_.size() +
-        static_cast<size_t>(indices.length() - indices.null_count()));
+
+    size_t buffer_position = buffered_indices_.size();
+    buffered_indices_.resize(
+        buffer_position + static_cast<size_t>(indices.length() - indices.null_count()));
     if (indices.null_count() > 0) {
       arrow::internal::BitmapReader valid_bits_reader(indices.null_bitmap_data(),
                                                       indices.offset(), indices.length());
       for (int64_t i = 0; i < indices.length(); ++i) {
         if (valid_bits_reader.IsSet()) {
-          buffered_indices_.push_back(static_cast<int32_t>(values[i]));
+          buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
         }
         valid_bits_reader.Next();
       }
     } else {
       for (int64_t i = 0; i < indices.length(); ++i) {
-        buffered_indices_.push_back(static_cast<int32_t>(values[i]));
+        buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
       }
     }
   }
@@ -480,7 +482,7 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
   void ClearIndices() { buffered_indices_.clear(); }
 
   /// Indices that have not yet be written out by WriteIndices().
-  std::vector<int> buffered_indices_;
+  std::vector<int32_t> buffered_indices_;
 
   /// The number of bytes needed to encode the dictionary.
   int dict_encoded_size_;
diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc
index 2cbe8b3..4424c3f 100644
--- a/cpp/src/parquet/encoding_benchmark.cc
+++ b/cpp/src/parquet/encoding_benchmark.cc
@@ -441,6 +441,27 @@ class BM_ArrowBinaryDict : public BenchmarkDecodeArrow {
     num_dict_entries_ = dict_encoder->num_entries();
   }
 
+  template <typename IndexType>
+  void EncodeDictBenchmark(benchmark::State& state) {
+    constexpr int64_t nunique = 100;
+    constexpr int64_t min_length = 32;
+    constexpr int64_t max_length = 32;
+    ::arrow::random::RandomArrayGenerator rag(0);
+    auto dict = rag.String(nunique, min_length, max_length,
+                           /*null_probability=*/0);
+    auto indices = rag.Numeric<IndexType, int32_t>(num_values_, 0, nunique - 1);
+
+    auto PutValues = [&](ByteArrayEncoder* encoder) {
+      auto dict_encoder = dynamic_cast<DictEncoder<ByteArrayType>*>(encoder);
+      dict_encoder->PutDictionary(*dict);
+      dict_encoder->PutIndices(*indices);
+    };
+    for (auto _ : state) {
+      DoEncode(std::move(PutValues));
+    }
+    state.SetItemsProcessed(state.iterations() * num_values_);
+  }
+
   void DoEncodeArrow() override {
     auto PutValues = [&](ByteArrayEncoder* encoder) {
       ASSERT_NO_THROW(encoder->Put(*input_array_));
@@ -483,6 +504,22 @@ BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeArrow)
 (benchmark::State& state) { EncodeArrowBenchmark(state); }
 BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeArrow)->Range(1 << 18, 1 << 20);
 
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeDictDirectInt8)
+(benchmark::State& state) { EncodeDictBenchmark<::arrow::Int8Type>(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeDictDirectInt8)->Range(1 << 20, 1 << 20);
+
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeDictDirectInt16)
+(benchmark::State& state) { EncodeDictBenchmark<::arrow::Int16Type>(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeDictDirectInt16)->Range(1 << 20, 1 << 20);
+
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeDictDirectInt32)
+(benchmark::State& state) { EncodeDictBenchmark<::arrow::Int32Type>(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeDictDirectInt32)->Range(1 << 20, 1 << 20);
+
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeDictDirectInt64)
+(benchmark::State& state) { EncodeDictBenchmark<::arrow::Int64Type>(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeDictDirectInt64)->Range(1 << 20, 1 << 20);
+
 BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeLowLevel)
 (benchmark::State& state) { EncodeLowLevelBenchmark(state); }
 BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeLowLevel)->Range(1 << 18, 1 << 20);
diff --git a/python/benchmarks/parquet.py b/python/benchmarks/parquet.py
index b591c81..d31e235 100644
--- a/python/benchmarks/parquet.py
+++ b/python/benchmarks/parquet.py
@@ -18,6 +18,7 @@
 import shutil
 import tempfile
 
+from pandas.util.testing import rands
 import numpy as np
 import pandas as pd
 
@@ -65,7 +66,7 @@ class ParquetWriteBinary(object):
         length = 1000000
         num_cols = 10
 
-        unique_values = np.array([pd.util.testing.rands(value_size) for
+        unique_values = np.array([rands(value_size) for
                                   i in range(nuniques)], dtype='O')
         values = unique_values[np.random.randint(0, nuniques, size=length)]
         self.table = pa.table([pa.array(values) for i in range(num_cols)],
@@ -87,3 +88,46 @@ class ParquetWriteBinary(object):
     def time_convert_pandas_and_write_binary_table(self):
         out = pa.BufferOutputStream()
         pq.write_table(pa.table(self.table_df), out)
+
+
+def generate_dict_strings(string_size, nunique, length, random_order=True):
+    uniques = np.array([rands(string_size) for i in range(nunique)], dtype='O')
+    if random_order:
+        indices = np.random.randint(0, nunique, size=length).astype('i4')
+    else:
+        indices = np.arange(nunique).astype('i4').repeat(length // nunique)
+    return pa.DictionaryArray.from_arrays(indices, uniques)
+
+
+def generate_dict_table(num_cols, string_size, nunique, length,
+                        random_order=True):
+    data = generate_dict_strings(string_size, nunique, length,
+                                 random_order=random_order)
+    return pa.table([
+        data for i in range(num_cols)
+    ], names=['f{}'.format(i) for i in range(num_cols)])
+
+
+class ParquetWriteDictionaries(object):
+
+    param_names = ('nunique',)
+    params = [(1000), (100000)]
+
+    def setup(self, nunique):
+        self.num_cols = 10
+        self.value_size = 32
+        self.nunique = nunique
+        self.length = 10000000
+
+        self.table = generate_dict_table(self.num_cols, self.value_size,
+                                         self.nunique, self.length)
+        self.table_sequential = generate_dict_table(self.num_cols,
+                                                    self.value_size,
+                                                    self.nunique, self.length,
+                                                    random_order=False)
+
+    def time_write_random_order(self, nunique):
+        pq.write_table(self.table, pa.BufferOutputStream())
+
+    def time_write_sequential(self, nunique):
+        pq.write_table(self.table_sequential, pa.BufferOutputStream())