You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/08/31 19:06:11 UTC

[arrow] branch master updated: ARROW-4398: [C++][Python][Parquet] Improve BYTE_ARRAY PLAIN encoding write performance. Add BYTE_ARRAY write benchmarks

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2164e3b  ARROW-4398: [C++][Python][Parquet] Improve BYTE_ARRAY PLAIN encoding write performance. Add BYTE_ARRAY write benchmarks
2164e3b is described below

commit 2164e3b435fa1764281dd3060537297ca5cad7b9
Author: Wes McKinney <we...@apache.org>
AuthorDate: Sat Aug 31 14:05:48 2019 -0500

    ARROW-4398: [C++][Python][Parquet] Improve BYTE_ARRAY PLAIN encoding write performance. Add BYTE_ARRAY write benchmarks
    
    Use BufferBuilder and UnsafeAppend to accelerate writes.
    
    Before (prior to ARROW-6381, which came up while investigating this):
    
    ```
    ----------------------------------------------------------------------------------
    Benchmark                                           Time           CPU Iterations
    ----------------------------------------------------------------------------------
    BM_ArrowBinaryPlain/EncodeArrow/262144        6111690 ns    6109829 ns        465    246.06MB/s
    BM_ArrowBinaryPlain/EncodeArrow/1048576      30470849 ns   30451048 ns         85   197.296MB/s
    BM_ArrowBinaryPlain/EncodeLowLevel/262144     5352838 ns    5352679 ns        514   280.866MB/s
    BM_ArrowBinaryPlain/EncodeLowLevel/1048576   29736017 ns   29735036 ns         94   202.047MB/s
    ```
    
    After
    
    ```
    ----------------------------------------------------------------------------------
    Benchmark                                           Time           CPU Iterations
    ----------------------------------------------------------------------------------
    BM_ArrowBinaryPlain/EncodeArrow/262144        2020914 ns    2020905 ns       1000   743.918MB/s
    BM_ArrowBinaryPlain/EncodeArrow/1048576      11596223 ns   11596094 ns        242   518.096MB/s
    BM_ArrowBinaryPlain/EncodeLowLevel/262144     2740316 ns    2740256 ns       1021    548.63MB/s
    BM_ArrowBinaryPlain/EncodeLowLevel/1048576   17562138 ns   17560763 ns        157    342.12MB/s
    ```
    
    Dictionary encoding perf is not really affected by this work, so this will mostly affect that case where we fall back to PLAIN encoding when the dictionary grows large.
    
    Closes #5233 from wesm/ARROW-4398 and squashes the following commits:
    
    3a8c37ac9 <Wes McKinney> Code review comments. Don't box string_view as ByteArray
    252b45c4c <Wes McKinney> Fix -Wsign-compare error
    f77838cac <Wes McKinney> Improve benchmark
    f7e659d29 <Wes McKinney> Add ASV benchmarks for binary writes
    844427ca3 <Wes McKinney> More perf improvements, add benchmark for low level vs high level
    ab2ad26ef <Wes McKinney> Use unsafe appends for BinaryArray for better performance
    e6458d2aa <Wes McKinney> Use BufferBuilder in PlainEncoder, use UnsafeAppend to avoid calling Reserve so much
    47f10cde7 <Wes McKinney> Add benchmark of direct Arrow write
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/parquet/encoding.cc           | 159 +++++++++++++++++++---------------
 cpp/src/parquet/encoding_benchmark.cc | 111 +++++++++++++++++-------
 python/benchmarks/parquet.py          |  32 +++++++
 python/pyarrow/_flight.pyx            |   1 +
 4 files changed, 201 insertions(+), 102 deletions(-)

diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 98bd280..ef1dd34 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -75,20 +75,13 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
   using T = typename DType::c_type;
 
   explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
-      : EncoderImpl(descr, Encoding::PLAIN, pool) {
-    values_sink_ = CreateOutputStream(pool);
-  }
+      : EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}
 
-  int64_t EstimatedDataEncodedSize() override {
-    int64_t position = -1;
-    PARQUET_THROW_NOT_OK(values_sink_->Tell(&position));
-    return position;
-  }
+  int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
 
   std::shared_ptr<Buffer> FlushValues() override {
     std::shared_ptr<Buffer> buffer;
-    PARQUET_THROW_NOT_OK(values_sink_->Finish(&buffer));
-    values_sink_ = CreateOutputStream(this->pool_);
+    PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
     return buffer;
   }
 
@@ -114,25 +107,30 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
     Put(data, num_valid_values);
   }
 
+  void UnsafePutByteArray(const void* data, uint32_t length) {
+    DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
+    sink_.UnsafeAppend(&length, sizeof(uint32_t));
+    sink_.UnsafeAppend(data, static_cast<int64_t>(length));
+  }
+
   void Put(const ByteArray& val) {
     // Write the result to the output stream
-    PARQUET_THROW_NOT_OK(values_sink_->Write(reinterpret_cast<const uint8_t*>(&val.len),
-                                             sizeof(uint32_t)));
-    if (val.len > 0) {
-      DCHECK(nullptr != val.ptr) << "Value ptr cannot be NULL";
+    const int64_t increment = static_cast<int64_t>(val.len + sizeof(uint32_t));
+    if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) {
+      PARQUET_THROW_NOT_OK(sink_.Reserve(increment));
     }
-    PARQUET_THROW_NOT_OK(
-        values_sink_->Write(reinterpret_cast<const uint8_t*>(val.ptr), val.len));
+    UnsafePutByteArray(val.ptr, val.len);
   }
 
  protected:
-  std::shared_ptr<arrow::io::BufferOutputStream> values_sink_;
+  arrow::BufferBuilder sink_;
 };
 
 template <typename DType>
 void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
-  PARQUET_THROW_NOT_OK(values_sink_->Write(reinterpret_cast<const uint8_t*>(buffer),
-                                           num_values * sizeof(T)));
+  if (num_values > 0) {
+    PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
+  }
 }
 
 template <>
@@ -154,38 +152,38 @@ void AssertBinary(const arrow::Array& values) {
   }
 }
 
-template <typename EncoderType>
-void PutBinaryArray(const arrow::Array& values, EncoderType* encoder) {
+template <>
+void PlainEncoder<ByteArrayType>::Put(const arrow::Array& values) {
   AssertBinary(values);
   const auto& data = checked_cast<const arrow::BinaryArray&>(values);
+  const int64_t total_bytes = data.value_offset(data.length()) - data.value_offset(0);
+  PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + data.length() * sizeof(uint32_t)));
+
   if (data.null_count() == 0) {
     // no nulls, just dump the data
     for (int64_t i = 0; i < data.length(); i++) {
-      encoder->Put(ByteArray(data.GetView(i)));
+      auto view = data.GetView(i);
+      UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
     }
   } else {
     for (int64_t i = 0; i < data.length(); i++) {
       if (data.IsValid(i)) {
-        encoder->Put(ByteArray(data.GetView(i)));
+        auto view = data.GetView(i);
+        UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
       }
     }
   }
 }
 
 template <>
-void PlainEncoder<ByteArrayType>::Put(const arrow::Array& values) {
-  PutBinaryArray(values, this);
-}
-
-template <>
 inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) {
+  if (descr_->type_length() == 0) {
+    return;
+  }
   for (int i = 0; i < num_values; ++i) {
     // Write the result to the output stream
-    if (descr_->type_length() > 0) {
-      DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL";
-    }
-    PARQUET_THROW_NOT_OK(values_sink_->Write(reinterpret_cast<const uint8_t*>(src[i].ptr),
-                                             descr_->type_length()));
+    DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
+    PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
   }
 }
 
@@ -199,8 +197,13 @@ class PlainBooleanEncoder : public EncoderImpl,
                             virtual public TypedEncoder<BooleanType>,
                             virtual public BooleanEncoder {
  public:
-  explicit PlainBooleanEncoder(const ColumnDescriptor* descr,
-                               MemoryPool* pool = arrow::default_memory_pool());
+  explicit PlainBooleanEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+      : EncoderImpl(descr, Encoding::PLAIN, pool),
+        bits_available_(kInMemoryDefaultCapacity * 8),
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        sink_(pool),
+        bit_writer_(bits_buffer_->mutable_data(),
+                    static_cast<int>(bits_buffer_->size())) {}
 
   int64_t EstimatedDataEncodedSize() override;
   std::shared_ptr<Buffer> FlushValues() override;
@@ -232,9 +235,9 @@ class PlainBooleanEncoder : public EncoderImpl,
 
  private:
   int bits_available_;
-  std::unique_ptr<arrow::BitUtil::BitWriter> bit_writer_;
   std::shared_ptr<ResizableBuffer> bits_buffer_;
-  std::shared_ptr<arrow::io::BufferOutputStream> values_sink_;
+  arrow::BufferBuilder sink_;
+  arrow::BitUtil::BitWriter bit_writer_;
 
   template <typename SequenceType>
   void PutImpl(const SequenceType& src, int num_values);
@@ -246,16 +249,16 @@ void PlainBooleanEncoder::PutImpl(const SequenceType& src, int num_values) {
   if (bits_available_ > 0) {
     int bits_to_write = std::min(bits_available_, num_values);
     for (int i = 0; i < bits_to_write; i++) {
-      bit_writer_->PutValue(src[i], 1);
+      bit_writer_.PutValue(src[i], 1);
     }
     bits_available_ -= bits_to_write;
     bit_offset = bits_to_write;
 
     if (bits_available_ == 0) {
-      bit_writer_->Flush();
+      bit_writer_.Flush();
       PARQUET_THROW_NOT_OK(
-          values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()));
-      bit_writer_->Clear();
+          sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+      bit_writer_.Clear();
     }
   }
 
@@ -265,48 +268,36 @@ void PlainBooleanEncoder::PutImpl(const SequenceType& src, int num_values) {
 
     int bits_to_write = std::min(bits_available_, bits_remaining);
     for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {
-      bit_writer_->PutValue(src[i], 1);
+      bit_writer_.PutValue(src[i], 1);
     }
     bit_offset += bits_to_write;
     bits_available_ -= bits_to_write;
     bits_remaining -= bits_to_write;
 
     if (bits_available_ == 0) {
-      bit_writer_->Flush();
+      bit_writer_.Flush();
       PARQUET_THROW_NOT_OK(
-          values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()));
-      bit_writer_->Clear();
+          sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+      bit_writer_.Clear();
     }
   }
 }
 
-PlainBooleanEncoder::PlainBooleanEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
-    : EncoderImpl(descr, Encoding::PLAIN, pool),
-      bits_available_(kInMemoryDefaultCapacity * 8),
-      bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)) {
-  values_sink_ = CreateOutputStream(pool);
-  bit_writer_.reset(new BitUtil::BitWriter(bits_buffer_->mutable_data(),
-                                           static_cast<int>(bits_buffer_->size())));
-}
-
 int64_t PlainBooleanEncoder::EstimatedDataEncodedSize() {
-  int64_t position = -1;
-  PARQUET_THROW_NOT_OK(values_sink_->Tell(&position));
-  return position + bit_writer_->bytes_written();
+  int64_t position = sink_.length();
+  return position + bit_writer_.bytes_written();
 }
 
 std::shared_ptr<Buffer> PlainBooleanEncoder::FlushValues() {
   if (bits_available_ > 0) {
-    bit_writer_->Flush();
-    PARQUET_THROW_NOT_OK(
-        values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()));
-    bit_writer_->Clear();
+    bit_writer_.Flush();
+    PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+    bit_writer_.Clear();
     bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
   }
 
   std::shared_ptr<Buffer> buffer;
-  PARQUET_THROW_NOT_OK(values_sink_->Finish(&buffer));
-  values_sink_ = CreateOutputStream(this->pool_);
+  PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
   return buffer;
 }
 
@@ -405,6 +396,9 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
   /// buffers the value's index to be written later.
   inline void Put(const T& value);
 
+  // Not implemented for other data types
+  inline void PutByteArray(const void* ptr, int32_t length);
+
   void Put(const T* src, int num_values) override {
     for (int32_t i = 0; i < num_values; i++) {
       Put(src[i]);
@@ -534,23 +528,33 @@ inline void DictEncoderImpl<DType>::Put(const T& v) {
   buffered_indices_.push_back(memo_index);
 }
 
+template <typename DType>
+inline void DictEncoderImpl<DType>::PutByteArray(const void* ptr, int32_t length) {
+  DCHECK(false);
+}
+
 template <>
-inline void DictEncoderImpl<ByteArrayType>::Put(const ByteArray& v) {
+inline void DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr,
+                                                         int32_t length) {
   static const uint8_t empty[] = {0};
 
   auto on_found = [](int32_t memo_index) {};
   auto on_not_found = [&](int32_t memo_index) {
-    dict_encoded_size_ += static_cast<int>(v.len + sizeof(uint32_t));
+    dict_encoded_size_ += static_cast<int>(length + sizeof(uint32_t));
   };
 
-  DCHECK(v.ptr != nullptr || v.len == 0);
-  const void* ptr = (v.ptr != nullptr) ? v.ptr : empty;
-  auto memo_index =
-      memo_table_.GetOrInsert(ptr, static_cast<int32_t>(v.len), on_found, on_not_found);
+  DCHECK(ptr != nullptr || length == 0);
+  ptr = (ptr != nullptr) ? ptr : empty;
+  auto memo_index = memo_table_.GetOrInsert(ptr, length, on_found, on_not_found);
   buffered_indices_.push_back(memo_index);
 }
 
 template <>
+inline void DictEncoderImpl<ByteArrayType>::Put(const ByteArray& val) {
+  return PutByteArray(val.ptr, static_cast<int32_t>(val.len));
+}
+
+template <>
 inline void DictEncoderImpl<FLBAType>::Put(const FixedLenByteArray& v) {
   static const uint8_t empty[] = {0};
 
@@ -570,7 +574,22 @@ void DictEncoderImpl<DType>::Put(const arrow::Array& values) {
 
 template <>
 void DictEncoderImpl<ByteArrayType>::Put(const arrow::Array& values) {
-  PutBinaryArray(values, this);
+  AssertBinary(values);
+  const auto& data = checked_cast<const arrow::BinaryArray&>(values);
+  if (data.null_count() == 0) {
+    // no nulls, just dump the data
+    for (int64_t i = 0; i < data.length(); i++) {
+      auto view = data.GetView(i);
+      PutByteArray(view.data(), static_cast<int32_t>(view.size()));
+    }
+  } else {
+    for (int64_t i = 0; i < data.length(); i++) {
+      if (data.IsValid(i)) {
+        auto view = data.GetView(i);
+        PutByteArray(view.data(), static_cast<int32_t>(view.size()));
+      }
+    }
+  }
 }
 
 template <typename DType>
diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc
index d44fbb4..2cbe8b3 100644
--- a/cpp/src/parquet/encoding_benchmark.cc
+++ b/cpp/src/parquet/encoding_benchmark.cc
@@ -270,7 +270,7 @@ class BenchmarkDecodeArrow : public ::benchmark::Fixture {
   void SetUp(const ::benchmark::State& state) override {
     num_values_ = static_cast<int>(state.range());
     InitDataInputs();
-    DoEncodeData();
+    DoEncodeArrow();
   }
 
   void TearDown(const ::benchmark::State& state) override {
@@ -288,26 +288,39 @@ class BenchmarkDecodeArrow : public ::benchmark::Fixture {
     input_array_ = rag.StringWithRepeats(num_values_, num_values_ / repeat_factor,
                                          min_length, max_length, /*null_probability=*/0);
     valid_bits_ = input_array_->null_bitmap()->data();
-    values_ = std::vector<ByteArray>();
+    total_size_ = input_array_->data()->buffers[2]->size();
+
     values_.reserve(num_values_);
-    total_size_ = 0;
     const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(*input_array_);
-
     for (int64_t i = 0; i < binary_array.length(); i++) {
       auto view = binary_array.GetView(i);
       values_.emplace_back(static_cast<uint32_t>(view.length()),
                            reinterpret_cast<const uint8_t*>(view.data()));
-      total_size_ += view.length();
     }
   }
 
-  virtual void DoEncodeData() = 0;
+  virtual void DoEncodeArrow() = 0;
+  virtual void DoEncodeLowLevel() = 0;
 
   virtual std::unique_ptr<ByteArrayDecoder> InitializeDecoder() = 0;
 
   template <typename BuilderType>
   std::unique_ptr<BuilderType> CreateBuilder();
 
+  void EncodeArrowBenchmark(benchmark::State& state) {
+    for (auto _ : state) {
+      DoEncodeArrow();
+    }
+    state.SetBytesProcessed(state.iterations() * total_size_);
+  }
+
+  void EncodeLowLevelBenchmark(benchmark::State& state) {
+    for (auto _ : state) {
+      DoEncodeLowLevel();
+    }
+    state.SetBytesProcessed(state.iterations() * total_size_);
+  }
+
   template <typename BuilderType>
   void DecodeArrowBenchmark(benchmark::State& state) {
     for (auto _ : state) {
@@ -333,8 +346,8 @@ class BenchmarkDecodeArrow : public ::benchmark::Fixture {
  protected:
   int num_values_;
   std::shared_ptr<::arrow::Array> input_array_;
-  uint64_t total_size_;
   std::vector<ByteArray> values_;
+  uint64_t total_size_;
   const uint8_t* valid_bits_;
   std::shared_ptr<Buffer> buffer_;
 };
@@ -357,9 +370,15 @@ std::unique_ptr<BinaryDictionary32Builder> BenchmarkDecodeArrow::CreateBuilder()
 
 // ----------------------------------------------------------------------
 // Benchmark Decoding from Plain Encoding
-class BM_PlainDecodingByteArray : public BenchmarkDecodeArrow {
+class BM_ArrowBinaryPlain : public BenchmarkDecodeArrow {
  public:
-  void DoEncodeData() override {
+  void DoEncodeArrow() override {
+    auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN);
+    encoder->Put(*input_array_);
+    buffer_ = encoder->FlushValues();
+  }
+
+  void DoEncodeLowLevel() override {
     auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN);
     encoder->Put(values_.data(), num_values_);
     buffer_ = encoder->FlushValues();
@@ -372,38 +391,46 @@ class BM_PlainDecodingByteArray : public BenchmarkDecodeArrow {
   }
 };
 
-BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrow_Dense)
+BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, EncodeArrow)
+(benchmark::State& state) { EncodeArrowBenchmark(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, EncodeArrow)->Range(1 << 18, 1 << 20);
+
+BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, EncodeLowLevel)
+(benchmark::State& state) { EncodeLowLevelBenchmark(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, EncodeLowLevel)->Range(1 << 18, 1 << 20);
+
+BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrow_Dense)
 (benchmark::State& state) { DecodeArrowBenchmark<ChunkedBinaryBuilder>(state); }
-BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrow_Dense)
-    ->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrow_Dense)->Range(MIN_RANGE, MAX_RANGE);
 
-BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dense)
+BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dense)
 (benchmark::State& state) { DecodeArrowNonNullBenchmark<ChunkedBinaryBuilder>(state); }
-BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dense)
+BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dense)
     ->Range(MIN_RANGE, MAX_RANGE);
 
-BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrow_Dict)
+BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrow_Dict)
 (benchmark::State& state) { DecodeArrowBenchmark<BinaryDictionary32Builder>(state); }
-BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrow_Dict)
-    ->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrow_Dict)->Range(MIN_RANGE, MAX_RANGE);
 
-BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dict)
+BENCHMARK_DEFINE_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dict)
 (benchmark::State& state) {
   DecodeArrowNonNullBenchmark<BinaryDictionary32Builder>(state);
 }
-BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dict)
+BENCHMARK_REGISTER_F(BM_ArrowBinaryPlain, DecodeArrowNonNull_Dict)
     ->Range(MIN_RANGE, MAX_RANGE);
 
 // ----------------------------------------------------------------------
 // Benchmark Decoding from Dictionary Encoding
-class BM_DictDecodingByteArray : public BenchmarkDecodeArrow {
+class BM_ArrowBinaryDict : public BenchmarkDecodeArrow {
  public:
-  void DoEncodeData() override {
+  template <typename PutValuesFunc>
+  void DoEncode(PutValuesFunc&& put_values) {
     auto node = schema::ByteArray("name");
     descr_ = std::unique_ptr<ColumnDescriptor>(new ColumnDescriptor(node, 0, 0));
+
     auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN,
                                                    /*use_dictionary=*/true, descr_.get());
-    ASSERT_NO_THROW(encoder->Put(values_.data(), num_values_));
+    put_values(encoder.get());
     buffer_ = encoder->FlushValues();
 
     auto dict_encoder = dynamic_cast<DictEncoder<ByteArrayType>*>(encoder.get());
@@ -414,6 +441,20 @@ class BM_DictDecodingByteArray : public BenchmarkDecodeArrow {
     num_dict_entries_ = dict_encoder->num_entries();
   }
 
+  void DoEncodeArrow() override {
+    auto PutValues = [&](ByteArrayEncoder* encoder) {
+      ASSERT_NO_THROW(encoder->Put(*input_array_));
+    };
+    DoEncode(std::move(PutValues));
+  }
+
+  void DoEncodeLowLevel() override {
+    auto PutValues = [&](ByteArrayEncoder* encoder) {
+      encoder->Put(values_.data(), num_values_);
+    };
+    DoEncode(std::move(PutValues));
+  }
+
   std::unique_ptr<ByteArrayDecoder> InitializeDecoder() override {
     auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN, descr_.get());
     decoder->SetData(num_dict_entries_, dict_buffer_->data(),
@@ -438,27 +479,33 @@ class BM_DictDecodingByteArray : public BenchmarkDecodeArrow {
   int num_dict_entries_;
 };
 
-BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrow_Dense)(benchmark::State& state) {
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeArrow)
+(benchmark::State& state) { EncodeArrowBenchmark(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeArrow)->Range(1 << 18, 1 << 20);
+
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, EncodeLowLevel)
+(benchmark::State& state) { EncodeLowLevelBenchmark(state); }
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, EncodeLowLevel)->Range(1 << 18, 1 << 20);
+
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, DecodeArrow_Dense)(benchmark::State& state) {
   DecodeArrowBenchmark<ChunkedBinaryBuilder>(state);
 }
-BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrow_Dense)
-    ->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, DecodeArrow_Dense)->Range(MIN_RANGE, MAX_RANGE);
 
-BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dense)
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, DecodeArrowNonNull_Dense)
 (benchmark::State& state) { DecodeArrowNonNullBenchmark<ChunkedBinaryBuilder>(state); }
-BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dense)
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, DecodeArrowNonNull_Dense)
     ->Range(MIN_RANGE, MAX_RANGE);
 
-BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrow_Dict)
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, DecodeArrow_Dict)
 (benchmark::State& state) { DecodeArrowBenchmark<BinaryDictionary32Builder>(state); }
-BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrow_Dict)
-    ->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, DecodeArrow_Dict)->Range(MIN_RANGE, MAX_RANGE);
 
-BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dict)
+BENCHMARK_DEFINE_F(BM_ArrowBinaryDict, DecodeArrowNonNull_Dict)
 (benchmark::State& state) {
   DecodeArrowNonNullBenchmark<BinaryDictionary32Builder>(state);
 }
-BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dict)
+BENCHMARK_REGISTER_F(BM_ArrowBinaryDict, DecodeArrowNonNull_Dict)
     ->Range(MIN_RANGE, MAX_RANGE);
 
 }  // namespace parquet
diff --git a/python/benchmarks/parquet.py b/python/benchmarks/parquet.py
index 4f55587..b591c81 100644
--- a/python/benchmarks/parquet.py
+++ b/python/benchmarks/parquet.py
@@ -55,3 +55,35 @@ class ParquetManifestCreation(object):
 
     def time_manifest_creation(self, num_partitions, num_threads):
         pq.ParquetManifest(self.tmpdir, metadata_nthreads=num_threads)
+
+
+class ParquetWriteBinary(object):
+
+    def setup(self):
+        nuniques = 100000
+        value_size = 50
+        length = 1000000
+        num_cols = 10
+
+        unique_values = np.array([pd.util.testing.rands(value_size) for
+                                  i in range(nuniques)], dtype='O')
+        values = unique_values[np.random.randint(0, nuniques, size=length)]
+        self.table = pa.table([pa.array(values) for i in range(num_cols)],
+                              names=['f{}'.format(i) for i in range(num_cols)])
+        self.table_df = self.table.to_pandas()
+
+    def time_write_binary_table(self):
+        out = pa.BufferOutputStream()
+        pq.write_table(self.table, out)
+
+    def time_write_binary_table_uncompressed(self):
+        out = pa.BufferOutputStream()
+        pq.write_table(self.table, out, compression='none')
+
+    def time_write_binary_table_no_dictionary(self):
+        out = pa.BufferOutputStream()
+        pq.write_table(self.table, out, use_dictionary=False)
+
+    def time_convert_pandas_and_write_binary_table(self):
+        out = pa.BufferOutputStream()
+        pq.write_table(pa.table(self.table_df), out)
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index 147d407..d7a4b82 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -16,6 +16,7 @@
 # under the License.
 
 # cython: language_level = 3
+# cython: embedsignature = True
 
 from __future__ import absolute_import