You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2020/03/30 11:20:39 UTC
[arrow] branch master updated: PARQUET-1828: [C++] Use SSE2 for the
ByteStreamSplit encoder
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new da94098 PARQUET-1828: [C++] Use SSE2 for the ByteStreamSplit encoder
da94098 is described below
commit da94098d96a78b25910da3ba7fed57bada048852
Author: Martin Radev <ma...@gmail.com>
AuthorDate: Mon Mar 30 13:20:17 2020 +0200
PARQUET-1828: [C++] Use SSE2 for the ByteStreamSplit encoder
The ByteStreamSplit encoder can benefit from using SSE2 intrinsics
to speed-up data processing since most of the data is likey in the
cache.
Benchmark results on Sandy Bridge 3930k:
BM_ByteStreamSplitEncode_Float_Scalar/1024 264 ns 264 ns 2679735 bytes_per_second=14.4706G/s
BM_ByteStreamSplitEncode_Float_Scalar/4096 1003 ns 1003 ns 688811 bytes_per_second=15.2114G/s
BM_ByteStreamSplitEncode_Float_Scalar/32768 11933 ns 11926 ns 59187 bytes_per_second=10.2353G/s
BM_ByteStreamSplitEncode_Float_Scalar/65536 28137 ns 28137 ns 24634 bytes_per_second=8.67699G/s
BM_ByteStreamSplitEncode_Double_Scalar/1024 2601 ns 2599 ns 266977 bytes_per_second=2.93583G/s
BM_ByteStreamSplitEncode_Double_Scalar/4096 32408 ns 32408 ns 21594 bytes_per_second=964.268M/s
BM_ByteStreamSplitEncode_Double_Scalar/32768 228019 ns 227850 ns 3079 bytes_per_second=1097.21M/s
BM_ByteStreamSplitEncode_Double_Scalar/65536 475051 ns 475051 ns 1477 bytes_per_second=1052.52M/s
BM_ByteStreamSplitEncode_Float_SSE2/1024 219 ns 219 ns 3156405 bytes_per_second=17.4093G/s
BM_ByteStreamSplitEncode_Float_SSE2/4096 860 ns 860 ns 796082 bytes_per_second=17.7457G/s
BM_ByteStreamSplitEncode_Float_SSE2/32768 11556 ns 11556 ns 61763 bytes_per_second=10.5632G/s
BM_ByteStreamSplitEncode_Float_SSE2/65536 27759 ns 27758 ns 25234 bytes_per_second=8.7953G/s
BM_ByteStreamSplitEncode_Double_SSE2/1024 433 ns 433 ns 1596315 bytes_per_second=17.6216G/s
BM_ByteStreamSplitEncode_Double_SSE2/4096 3358 ns 3358 ns 205874 bytes_per_second=9.08763G/s
BM_ByteStreamSplitEncode_Double_SSE2/32768 33812 ns 33812 ns 20505 bytes_per_second=7.22053G/s
BM_ByteStreamSplitEncode_Double_SSE2/65536 68419 ns 68417 ns 10078 bytes_per_second=7.13682G/s
Closes #6723 from martinradev/byte_stream_split_submit
Authored-by: Martin Radev <ma...@gmail.com>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/arrow/util/byte_stream_split.h | 105 +++++++++++++++++++++++++++++++--
cpp/src/parquet/encoding.cc | 26 ++++----
cpp/src/parquet/encoding_benchmark.cc | 47 +++++++++++++--
cpp/src/parquet/encoding_test.cc | 3 -
4 files changed, 156 insertions(+), 25 deletions(-)
diff --git a/cpp/src/arrow/util/byte_stream_split.h b/cpp/src/arrow/util/byte_stream_split.h
index 27f084a..08cc11b 100644
--- a/cpp/src/arrow/util/byte_stream_split.h
+++ b/cpp/src/arrow/util/byte_stream_split.h
@@ -31,8 +31,8 @@ namespace internal {
#if defined(ARROW_HAVE_SSE2)
template <typename T>
-void ByteStreamSlitDecodeSSE2(const uint8_t* data, int64_t num_values, int64_t stride,
- T* out) {
+void ByteStreamSplitDecodeSSE2(const uint8_t* data, int64_t num_values, int64_t stride,
+ T* out) {
constexpr size_t kNumStreams = sizeof(T);
static_assert(kNumStreams == 4U || kNumStreams == 8U, "Invalid number of streams.");
constexpr size_t kNumStreamsLog2 = (kNumStreams == 8U ? 3U : 2U);
@@ -84,11 +84,108 @@ void ByteStreamSlitDecodeSSE2(const uint8_t* data, int64_t num_values, int64_t s
}
}
+template <typename T>
+void ByteStreamSplitEncodeSSE2(const uint8_t* raw_values, const size_t num_values,
+ uint8_t* output_buffer_raw) {
+ constexpr size_t num_streams = sizeof(T);
+ static_assert(num_streams == 4U || num_streams == 8U, "Invalid number of streams.");
+ __m128i stage[3][num_streams];
+ __m128i final_result[num_streams];
+
+ const size_t size = num_values * sizeof(T);
+ const size_t block_size = sizeof(__m128i) * num_streams;
+ const size_t num_blocks = size / block_size;
+ const __m128i* raw_values_sse = reinterpret_cast<const __m128i*>(raw_values);
+ __m128i* output_buffer_streams[num_streams];
+ for (size_t i = 0; i < num_streams; ++i) {
+ output_buffer_streams[i] =
+ reinterpret_cast<__m128i*>(&output_buffer_raw[num_values * i]);
+ }
+
+ const size_t num_processed_elements = (num_blocks * block_size) / sizeof(T);
+ for (size_t i = num_processed_elements; i < num_values; ++i) {
+ for (size_t j = 0U; j < num_streams; ++j) {
+ const uint8_t byte_in_value = raw_values[i * num_streams + j];
+ output_buffer_raw[j * num_values + i] = byte_in_value;
+ }
+ }
+ // The current shuffling algorithm diverges for float and double types but the compiler
+ // should be able to remove the branch since only one path is taken for each template
+ // instantiation.
+ // Example run for floats:
+ // Step 0, copy:
+ // 0: ABCD ABCD ABCD ABCD 1: ABCD ABCD ABCD ABCD ...
+ // Step 1: _mm_unpacklo_epi8 and mm_unpackhi_epi8:
+ // 0: AABB CCDD AABB CCDD 1: AABB CCDD AABB CCDD ...
+ // 0: AAAA BBBB CCCC DDDD 1: AAAA BBBB CCCC DDDD ...
+ // Step 3: __mm_unpacklo_epi8 and _mm_unpackhi_epi8:
+ // 0: AAAA AAAA BBBB BBBB 1: CCCC CCCC DDDD DDDD ...
+ // Step 4: __mm_unpacklo_epi64 and _mm_unpackhi_epi64:
+ // 0: AAAA AAAA AAAA AAAA 1: BBBB BBBB BBBB BBBB ...
+ for (size_t block_index = 0; block_index < num_blocks; ++block_index) {
+ // First copy the data to stage 0.
+ for (size_t i = 0; i < num_streams; ++i) {
+ stage[0][i] = _mm_loadu_si128(&raw_values_sse[block_index * num_streams + i]);
+ }
+
+ // The shuffling of bytes is performed through the unpack intrinsics.
+ // In my measurements this gives better performance then an implementation
+ // which uses the shuffle intrinsics.
+ for (size_t stage_lvl = 0; stage_lvl < 2U; ++stage_lvl) {
+ for (size_t i = 0; i < num_streams / 2U; ++i) {
+ stage[stage_lvl + 1][i * 2] =
+ _mm_unpacklo_epi8(stage[stage_lvl][i * 2], stage[stage_lvl][i * 2 + 1]);
+ stage[stage_lvl + 1][i * 2 + 1] =
+ _mm_unpackhi_epi8(stage[stage_lvl][i * 2], stage[stage_lvl][i * 2 + 1]);
+ }
+ }
+ if (num_streams == 8U) {
+ // This is the path for double.
+ __m128i tmp[8];
+ for (size_t i = 0; i < 4; ++i) {
+ tmp[i * 2] = _mm_unpacklo_epi32(stage[2][i], stage[2][i + 4]);
+ tmp[i * 2 + 1] = _mm_unpackhi_epi32(stage[2][i], stage[2][i + 4]);
+ }
+
+ for (size_t i = 0; i < 4; ++i) {
+ final_result[i * 2] = _mm_unpacklo_epi32(tmp[i], tmp[i + 4]);
+ final_result[i * 2 + 1] = _mm_unpackhi_epi32(tmp[i], tmp[i + 4]);
+ }
+ } else {
+ // this is the path for float.
+ __m128i tmp[4];
+ for (size_t i = 0; i < 2; ++i) {
+ tmp[i * 2] = _mm_unpacklo_epi8(stage[2][i * 2], stage[2][i * 2 + 1]);
+ tmp[i * 2 + 1] = _mm_unpackhi_epi8(stage[2][i * 2], stage[2][i * 2 + 1]);
+ }
+ for (size_t i = 0; i < 2; ++i) {
+ final_result[i * 2] = _mm_unpacklo_epi64(tmp[i], tmp[i + 2]);
+ final_result[i * 2 + 1] = _mm_unpackhi_epi64(tmp[i], tmp[i + 2]);
+ }
+ }
+ for (size_t i = 0; i < num_streams; ++i) {
+ _mm_storeu_si128(&output_buffer_streams[i][block_index], final_result[i]);
+ }
+ }
+}
+
#endif
template <typename T>
-void ByteStreamSlitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride,
- T* out) {
+void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const size_t num_values,
+ uint8_t* output_buffer_raw) {
+ constexpr size_t num_streams = sizeof(T);
+ for (size_t i = 0U; i < num_values; ++i) {
+ for (size_t j = 0U; j < num_streams; ++j) {
+ const uint8_t byte_in_value = raw_values[i * num_streams + j];
+ output_buffer_raw[j * num_values + i] = byte_in_value;
+ }
+ }
+}
+
+template <typename T>
+void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride,
+ T* out) {
constexpr size_t kNumStreams = sizeof(T);
for (int64_t i = 0; i < num_values; ++i) {
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 3ee72cf..fe2545b 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -858,18 +858,18 @@ int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
template <typename DType>
std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
- constexpr size_t num_streams = sizeof(T);
std::shared_ptr<ResizableBuffer> output_buffer =
AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
uint8_t* output_buffer_raw = output_buffer->mutable_data();
const size_t num_values = values_.length();
const uint8_t* raw_values = reinterpret_cast<const uint8_t*>(values_.data());
- for (size_t i = 0; i < num_values; ++i) {
- for (size_t j = 0U; j < num_streams; ++j) {
- const uint8_t byte_in_value = raw_values[i * num_streams + j];
- output_buffer_raw[j * num_values + i] = byte_in_value;
- }
- }
+#if defined(ARROW_HAVE_SSE2)
+ arrow::util::internal::ByteStreamSplitEncodeSSE2<T>(raw_values, num_values,
+ output_buffer_raw);
+#else
+ arrow::util::internal::ByteStreamSplitEncodeScalar<T>(raw_values, num_values,
+ output_buffer_raw);
+#endif
values_.Reset();
return std::move(output_buffer);
}
@@ -2341,11 +2341,11 @@ int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
const uint8_t* data = data_ + num_decoded_previously;
#if defined(ARROW_HAVE_SSE2)
- arrow::util::internal::ByteStreamSlitDecodeSSE2<T>(data, values_to_decode,
- num_values_in_buffer_, buffer);
+ arrow::util::internal::ByteStreamSplitDecodeSSE2<T>(data, values_to_decode,
+ num_values_in_buffer_, buffer);
#else
- arrow::util::internal::ByteStreamSlitDecodeScalar<T>(data, values_to_decode,
- num_values_in_buffer_, buffer);
+ arrow::util::internal::ByteStreamSplitDecodeScalar<T>(data, values_to_decode,
+ num_values_in_buffer_, buffer);
#endif
num_values_ -= values_to_decode;
len_ -= sizeof(T) * values_to_decode;
@@ -2372,8 +2372,8 @@ int ByteStreamSplitDecoder<DType>::DecodeArrow(
// Use fast decoding into intermediate buffer. This will also decode
// some null values, but it's fast enough that we don't care.
T* decode_out = EnsureDecodeBuffer(values_decoded);
- arrow::util::internal::ByteStreamSlitDecodeSSE2<T>(data, values_decoded,
- num_values_in_buffer_, decode_out);
+ arrow::util::internal::ByteStreamSplitDecodeSSE2<T>(data, values_decoded,
+ num_values_in_buffer_, decode_out);
// XXX If null_count is 0, we could even append in bulk or decode directly into
// builder
diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc
index da216dd..a4e1176 100644
--- a/cpp/src/parquet/encoding_benchmark.cc
+++ b/cpp/src/parquet/encoding_benchmark.cc
@@ -213,32 +213,69 @@ static void BM_ByteStreamSplitDecode(benchmark::State& state, DecodeFunc&& decod
state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T));
}
+template <typename T, typename EncodeFunc>
+static void BM_ByteStreamSplitEncode(benchmark::State& state, EncodeFunc&& encode_func) {
+ std::vector<T> values(state.range(0), 64.0);
+ const uint8_t* values_raw = reinterpret_cast<const uint8_t*>(values.data());
+ std::vector<uint8_t> output(state.range(0) * sizeof(T), 0);
+
+ for (auto _ : state) {
+ encode_func(values_raw, values.size(), output.data());
+ benchmark::ClobberMemory();
+ }
+ state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T));
+}
+
static void BM_ByteStreamSplitDecode_Float_Scalar(benchmark::State& state) {
BM_ByteStreamSplitDecode<float>(
- state, arrow::util::internal::ByteStreamSlitDecodeScalar<float>);
+ state, arrow::util::internal::ByteStreamSplitDecodeScalar<float>);
}
static void BM_ByteStreamSplitDecode_Double_Scalar(benchmark::State& state) {
BM_ByteStreamSplitDecode<double>(
- state, arrow::util::internal::ByteStreamSlitDecodeScalar<double>);
+ state, arrow::util::internal::ByteStreamSplitDecodeScalar<double>);
+}
+
+static void BM_ByteStreamSplitEncode_Float_Scalar(benchmark::State& state) {
+ BM_ByteStreamSplitEncode<float>(
+ state, arrow::util::internal::ByteStreamSplitEncodeScalar<float>);
+}
+
+static void BM_ByteStreamSplitEncode_Double_Scalar(benchmark::State& state) {
+ BM_ByteStreamSplitEncode<double>(
+ state, arrow::util::internal::ByteStreamSplitEncodeScalar<double>);
}
BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE);
BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE);
#if defined(ARROW_HAVE_SSE2)
static void BM_ByteStreamSplitDecode_Float_SSE2(benchmark::State& state) {
- BM_ByteStreamSplitDecode<float>(state,
- arrow::util::internal::ByteStreamSlitDecodeSSE2<float>);
+ BM_ByteStreamSplitDecode<float>(
+ state, arrow::util::internal::ByteStreamSplitDecodeSSE2<float>);
}
static void BM_ByteStreamSplitDecode_Double_SSE2(benchmark::State& state) {
BM_ByteStreamSplitDecode<double>(
- state, arrow::util::internal::ByteStreamSlitDecodeSSE2<double>);
+ state, arrow::util::internal::ByteStreamSplitDecodeSSE2<double>);
+}
+
+static void BM_ByteStreamSplitEncode_Float_SSE2(benchmark::State& state) {
+ BM_ByteStreamSplitEncode<float>(
+ state, arrow::util::internal::ByteStreamSplitEncodeSSE2<float>);
+}
+
+static void BM_ByteStreamSplitEncode_Double_SSE2(benchmark::State& state) {
+ BM_ByteStreamSplitEncode<double>(
+ state, arrow::util::internal::ByteStreamSplitEncodeSSE2<double>);
}
BENCHMARK(BM_ByteStreamSplitDecode_Float_SSE2)->Range(MIN_RANGE, MAX_RANGE);
BENCHMARK(BM_ByteStreamSplitDecode_Double_SSE2)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitEncode_Float_SSE2)->Range(MIN_RANGE, MAX_RANGE);
+BENCHMARK(BM_ByteStreamSplitEncode_Double_SSE2)->Range(MIN_RANGE, MAX_RANGE);
#endif
template <typename Type>
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 47c1685..3d5fdb3 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -1094,9 +1094,6 @@ TYPED_TEST(TestByteStreamSplitEncoding, BasicRoundTrip) {
// can handle both inputs with size divisible by 4/8 and sizes which would
// require a scalar loop for the suffix.
- // Exercise only the scalar loop.
- ASSERT_NO_FATAL_FAILURE(this->Execute(3, 1));
-
// Exercise only the SIMD loop.
ASSERT_NO_FATAL_FAILURE(this->Execute(256, 1));