You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2020/03/30 11:20:39 UTC

[arrow] branch master updated: PARQUET-1828: [C++] Use SSE2 for the ByteStreamSplit encoder

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new da94098  PARQUET-1828: [C++] Use SSE2 for the ByteStreamSplit encoder
da94098 is described below

commit da94098d96a78b25910da3ba7fed57bada048852
Author: Martin Radev <ma...@gmail.com>
AuthorDate: Mon Mar 30 13:20:17 2020 +0200

    PARQUET-1828: [C++] Use SSE2 for the ByteStreamSplit encoder
    
    The ByteStreamSplit encoder can benefit from using SSE2 intrinsics
    to speed-up data processing since most of the data is likey in the
    cache.
    
    Benchmark results on Sandy Bridge 3930k:
    BM_ByteStreamSplitEncode_Float_Scalar/1024          264 ns          264 ns      2679735 bytes_per_second=14.4706G/s
    BM_ByteStreamSplitEncode_Float_Scalar/4096         1003 ns         1003 ns       688811 bytes_per_second=15.2114G/s
    BM_ByteStreamSplitEncode_Float_Scalar/32768       11933 ns        11926 ns        59187 bytes_per_second=10.2353G/s
    BM_ByteStreamSplitEncode_Float_Scalar/65536       28137 ns        28137 ns        24634 bytes_per_second=8.67699G/s
    BM_ByteStreamSplitEncode_Double_Scalar/1024        2601 ns         2599 ns       266977 bytes_per_second=2.93583G/s
    BM_ByteStreamSplitEncode_Double_Scalar/4096       32408 ns        32408 ns        21594 bytes_per_second=964.268M/s
    BM_ByteStreamSplitEncode_Double_Scalar/32768     228019 ns       227850 ns         3079 bytes_per_second=1097.21M/s
    BM_ByteStreamSplitEncode_Double_Scalar/65536     475051 ns       475051 ns         1477 bytes_per_second=1052.52M/s
    BM_ByteStreamSplitEncode_Float_SSE2/1024            219 ns          219 ns      3156405 bytes_per_second=17.4093G/s
    BM_ByteStreamSplitEncode_Float_SSE2/4096            860 ns          860 ns       796082 bytes_per_second=17.7457G/s
    BM_ByteStreamSplitEncode_Float_SSE2/32768         11556 ns        11556 ns        61763 bytes_per_second=10.5632G/s
    BM_ByteStreamSplitEncode_Float_SSE2/65536         27759 ns        27758 ns        25234 bytes_per_second=8.7953G/s
    BM_ByteStreamSplitEncode_Double_SSE2/1024           433 ns          433 ns      1596315 bytes_per_second=17.6216G/s
    BM_ByteStreamSplitEncode_Double_SSE2/4096          3358 ns         3358 ns       205874 bytes_per_second=9.08763G/s
    BM_ByteStreamSplitEncode_Double_SSE2/32768        33812 ns        33812 ns        20505 bytes_per_second=7.22053G/s
    BM_ByteStreamSplitEncode_Double_SSE2/65536        68419 ns        68417 ns        10078 bytes_per_second=7.13682G/s
    
    Closes #6723 from martinradev/byte_stream_split_submit
    
    Authored-by: Martin Radev <ma...@gmail.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/util/byte_stream_split.h | 105 +++++++++++++++++++++++++++++++--
 cpp/src/parquet/encoding.cc            |  26 ++++----
 cpp/src/parquet/encoding_benchmark.cc  |  47 +++++++++++++--
 cpp/src/parquet/encoding_test.cc       |   3 -
 4 files changed, 156 insertions(+), 25 deletions(-)

diff --git a/cpp/src/arrow/util/byte_stream_split.h b/cpp/src/arrow/util/byte_stream_split.h
index 27f084a..08cc11b 100644
--- a/cpp/src/arrow/util/byte_stream_split.h
+++ b/cpp/src/arrow/util/byte_stream_split.h
@@ -31,8 +31,8 @@ namespace internal {
 #if defined(ARROW_HAVE_SSE2)
 
 template <typename T>
-void ByteStreamSlitDecodeSSE2(const uint8_t* data, int64_t num_values, int64_t stride,
-                              T* out) {
+void ByteStreamSplitDecodeSSE2(const uint8_t* data, int64_t num_values, int64_t stride,
+                               T* out) {
   constexpr size_t kNumStreams = sizeof(T);
   static_assert(kNumStreams == 4U || kNumStreams == 8U, "Invalid number of streams.");
   constexpr size_t kNumStreamsLog2 = (kNumStreams == 8U ? 3U : 2U);
@@ -84,11 +84,108 @@ void ByteStreamSlitDecodeSSE2(const uint8_t* data, int64_t num_values, int64_t s
   }
 }
 
+template <typename T>
+void ByteStreamSplitEncodeSSE2(const uint8_t* raw_values, const size_t num_values,
+                               uint8_t* output_buffer_raw) {
+  constexpr size_t num_streams = sizeof(T);
+  static_assert(num_streams == 4U || num_streams == 8U, "Invalid number of streams.");
+  __m128i stage[3][num_streams];
+  __m128i final_result[num_streams];
+
+  const size_t size = num_values * sizeof(T);
+  const size_t block_size = sizeof(__m128i) * num_streams;
+  const size_t num_blocks = size / block_size;
+  const __m128i* raw_values_sse = reinterpret_cast<const __m128i*>(raw_values);
+  __m128i* output_buffer_streams[num_streams];
+  for (size_t i = 0; i < num_streams; ++i) {
+    output_buffer_streams[i] =
+        reinterpret_cast<__m128i*>(&output_buffer_raw[num_values * i]);
+  }
+
+  const size_t num_processed_elements = (num_blocks * block_size) / sizeof(T);
+  for (size_t i = num_processed_elements; i < num_values; ++i) {
+    for (size_t j = 0U; j < num_streams; ++j) {
+      const uint8_t byte_in_value = raw_values[i * num_streams + j];
+      output_buffer_raw[j * num_values + i] = byte_in_value;
+    }
+  }
+  // The current shuffling algorithm diverges for float and double types but the compiler
+  // should be able to remove the branch since only one path is taken for each template
+  // instantiation.
+  // Example run for floats:
+  // Step 0, copy:
+  //   0: ABCD ABCD ABCD ABCD 1: ABCD ABCD ABCD ABCD ...
+  // Step 1: _mm_unpacklo_epi8 and mm_unpackhi_epi8:
+  //   0: AABB CCDD AABB CCDD 1: AABB CCDD AABB CCDD ...
+  //   0: AAAA BBBB CCCC DDDD 1: AAAA BBBB CCCC DDDD ...
+  // Step 3: __mm_unpacklo_epi8 and _mm_unpackhi_epi8:
+  //   0: AAAA AAAA BBBB BBBB 1: CCCC CCCC DDDD DDDD ...
+  // Step 4: __mm_unpacklo_epi64 and _mm_unpackhi_epi64:
+  //   0: AAAA AAAA AAAA AAAA 1: BBBB BBBB BBBB BBBB ...
+  for (size_t block_index = 0; block_index < num_blocks; ++block_index) {
+    // First copy the data to stage 0.
+    for (size_t i = 0; i < num_streams; ++i) {
+      stage[0][i] = _mm_loadu_si128(&raw_values_sse[block_index * num_streams + i]);
+    }
+
+    // The shuffling of bytes is performed through the unpack intrinsics.
+    // In my measurements this gives better performance then an implementation
+    // which uses the shuffle intrinsics.
+    for (size_t stage_lvl = 0; stage_lvl < 2U; ++stage_lvl) {
+      for (size_t i = 0; i < num_streams / 2U; ++i) {
+        stage[stage_lvl + 1][i * 2] =
+            _mm_unpacklo_epi8(stage[stage_lvl][i * 2], stage[stage_lvl][i * 2 + 1]);
+        stage[stage_lvl + 1][i * 2 + 1] =
+            _mm_unpackhi_epi8(stage[stage_lvl][i * 2], stage[stage_lvl][i * 2 + 1]);
+      }
+    }
+    if (num_streams == 8U) {
+      // This is the path for double.
+      __m128i tmp[8];
+      for (size_t i = 0; i < 4; ++i) {
+        tmp[i * 2] = _mm_unpacklo_epi32(stage[2][i], stage[2][i + 4]);
+        tmp[i * 2 + 1] = _mm_unpackhi_epi32(stage[2][i], stage[2][i + 4]);
+      }
+
+      for (size_t i = 0; i < 4; ++i) {
+        final_result[i * 2] = _mm_unpacklo_epi32(tmp[i], tmp[i + 4]);
+        final_result[i * 2 + 1] = _mm_unpackhi_epi32(tmp[i], tmp[i + 4]);
+      }
+    } else {
+      // this is the path for float.
+      __m128i tmp[4];
+      for (size_t i = 0; i < 2; ++i) {
+        tmp[i * 2] = _mm_unpacklo_epi8(stage[2][i * 2], stage[2][i * 2 + 1]);
+        tmp[i * 2 + 1] = _mm_unpackhi_epi8(stage[2][i * 2], stage[2][i * 2 + 1]);
+      }
+      for (size_t i = 0; i < 2; ++i) {
+        final_result[i * 2] = _mm_unpacklo_epi64(tmp[i], tmp[i + 2]);
+        final_result[i * 2 + 1] = _mm_unpackhi_epi64(tmp[i], tmp[i + 2]);
+      }
+    }
+    for (size_t i = 0; i < num_streams; ++i) {
+      _mm_storeu_si128(&output_buffer_streams[i][block_index], final_result[i]);
+    }
+  }
+}
+
 #endif
 
 template <typename T>
-void ByteStreamSlitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride,
-                                T* out) {
+void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const size_t num_values,
+                                 uint8_t* output_buffer_raw) {
+  constexpr size_t num_streams = sizeof(T);
+  for (size_t i = 0U; i < num_values; ++i) {
+    for (size_t j = 0U; j < num_streams; ++j) {
+      const uint8_t byte_in_value = raw_values[i * num_streams + j];
+      output_buffer_raw[j * num_values + i] = byte_in_value;
+    }
+  }
+}
+
+template <typename T>
+void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride,
+                                 T* out) {
   constexpr size_t kNumStreams = sizeof(T);
 
   for (int64_t i = 0; i < num_values; ++i) {
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 3ee72cf..fe2545b 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -858,18 +858,18 @@ int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
 
 template <typename DType>
 std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
-  constexpr size_t num_streams = sizeof(T);
   std::shared_ptr<ResizableBuffer> output_buffer =
       AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
   uint8_t* output_buffer_raw = output_buffer->mutable_data();
   const size_t num_values = values_.length();
   const uint8_t* raw_values = reinterpret_cast<const uint8_t*>(values_.data());
-  for (size_t i = 0; i < num_values; ++i) {
-    for (size_t j = 0U; j < num_streams; ++j) {
-      const uint8_t byte_in_value = raw_values[i * num_streams + j];
-      output_buffer_raw[j * num_values + i] = byte_in_value;
-    }
-  }
+#if defined(ARROW_HAVE_SSE2)
+  arrow::util::internal::ByteStreamSplitEncodeSSE2<T>(raw_values, num_values,
+                                                      output_buffer_raw);
+#else
+  arrow::util::internal::ByteStreamSplitEncodeScalar<T>(raw_values, num_values,
+                                                        output_buffer_raw);
+#endif
   values_.Reset();
   return std::move(output_buffer);
 }
@@ -2341,11 +2341,11 @@ int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
   const uint8_t* data = data_ + num_decoded_previously;
 
 #if defined(ARROW_HAVE_SSE2)
-  arrow::util::internal::ByteStreamSlitDecodeSSE2<T>(data, values_to_decode,
-                                                     num_values_in_buffer_, buffer);
+  arrow::util::internal::ByteStreamSplitDecodeSSE2<T>(data, values_to_decode,
+                                                      num_values_in_buffer_, buffer);
 #else
-  arrow::util::internal::ByteStreamSlitDecodeScalar<T>(data, values_to_decode,
-                                                       num_values_in_buffer_, buffer);
+  arrow::util::internal::ByteStreamSplitDecodeScalar<T>(data, values_to_decode,
+                                                        num_values_in_buffer_, buffer);
 #endif
   num_values_ -= values_to_decode;
   len_ -= sizeof(T) * values_to_decode;
@@ -2372,8 +2372,8 @@ int ByteStreamSplitDecoder<DType>::DecodeArrow(
   // Use fast decoding into intermediate buffer.  This will also decode
   // some null values, but it's fast enough that we don't care.
   T* decode_out = EnsureDecodeBuffer(values_decoded);
-  arrow::util::internal::ByteStreamSlitDecodeSSE2<T>(data, values_decoded,
-                                                     num_values_in_buffer_, decode_out);
+  arrow::util::internal::ByteStreamSplitDecodeSSE2<T>(data, values_decoded,
+                                                      num_values_in_buffer_, decode_out);
 
   // XXX If null_count is 0, we could even append in bulk or decode directly into
   // builder
diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc
index da216dd..a4e1176 100644
--- a/cpp/src/parquet/encoding_benchmark.cc
+++ b/cpp/src/parquet/encoding_benchmark.cc
@@ -213,32 +213,69 @@ static void BM_ByteStreamSplitDecode(benchmark::State& state, DecodeFunc&& decod
   state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T));
 }
 
+template <typename T, typename EncodeFunc>
+static void BM_ByteStreamSplitEncode(benchmark::State& state, EncodeFunc&& encode_func) {
+  std::vector<T> values(state.range(0), 64.0);
+  const uint8_t* values_raw = reinterpret_cast<const uint8_t*>(values.data());
+  std::vector<uint8_t> output(state.range(0) * sizeof(T), 0);
+
+  for (auto _ : state) {
+    encode_func(values_raw, values.size(), output.data());
+    benchmark::ClobberMemory();
+  }
+  state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T));
+}
+
 static void BM_ByteStreamSplitDecode_Float_Scalar(benchmark::State& state) {
   BM_ByteStreamSplitDecode<float>(
-      state, arrow::util::internal::ByteStreamSlitDecodeScalar<float>);
+      state, arrow::util::internal::ByteStreamSplitDecodeScalar<float>);
 }
 
 static void BM_ByteStreamSplitDecode_Double_Scalar(benchmark::State& state) {
   BM_ByteStreamSplitDecode<double>(
-      state, arrow::util::internal::ByteStreamSlitDecodeScalar<double>);
+      state, arrow::util::internal::ByteStreamSplitDecodeScalar<double>);
+}
+
+static void BM_ByteStreamSplitEncode_Float_Scalar(benchmark::State& state) {
+  BM_ByteStreamSplitEncode<float>(
+      state, arrow::util::internal::ByteStreamSplitEncodeScalar<float>);
+}
+
+static void BM_ByteStreamSplitEncode_Double_Scalar(benchmark::State& state) {
+  BM_ByteStreamSplitEncode<double>(
+      state, arrow::util::internal::ByteStreamSplitEncodeScalar<double>);
 }
 
 BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE);
 BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE);
 
 #if defined(ARROW_HAVE_SSE2)
 static void BM_ByteStreamSplitDecode_Float_SSE2(benchmark::State& state) {
-  BM_ByteStreamSplitDecode<float>(state,
-                                  arrow::util::internal::ByteStreamSlitDecodeSSE2<float>);
+  BM_ByteStreamSplitDecode<float>(
+      state, arrow::util::internal::ByteStreamSplitDecodeSSE2<float>);
 }
 
 static void BM_ByteStreamSplitDecode_Double_SSE2(benchmark::State& state) {
   BM_ByteStreamSplitDecode<double>(
-      state, arrow::util::internal::ByteStreamSlitDecodeSSE2<double>);
+      state, arrow::util::internal::ByteStreamSplitDecodeSSE2<double>);
+}
+
+static void BM_ByteStreamSplitEncode_Float_SSE2(benchmark::State& state) {
+  BM_ByteStreamSplitEncode<float>(
+      state, arrow::util::internal::ByteStreamSplitEncodeSSE2<float>);
+}
+
+static void BM_ByteStreamSplitEncode_Double_SSE2(benchmark::State& state) {
+  BM_ByteStreamSplitEncode<double>(
+      state, arrow::util::internal::ByteStreamSplitEncodeSSE2<double>);
 }
 
 BENCHMARK(BM_ByteStreamSplitDecode_Float_SSE2)->Range(MIN_RANGE, MAX_RANGE);
 BENCHMARK(BM_ByteStreamSplitDecode_Double_SSE2)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitEncode_Float_SSE2)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_SSE2)->Range(MIN_RANGE, MAX_RANGE);
 #endif
 
 template <typename Type>
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 47c1685..3d5fdb3 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -1094,9 +1094,6 @@ TYPED_TEST(TestByteStreamSplitEncoding, BasicRoundTrip) {
   // can handle both inputs with size divisible by 4/8 and sizes which would
   // require a scalar loop for the suffix.
 
-  // Exercise only the scalar loop.
-  ASSERT_NO_FATAL_FAILURE(this->Execute(3, 1));
-
   // Exercise only the SIMD loop.
   ASSERT_NO_FATAL_FAILURE(this->Execute(256, 1));