You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/24 12:30:14 UTC
arrow git commit: ARROW-659: [C++] Add multithreaded memcpy
implementation
Repository: arrow
Updated Branches:
refs/heads/master 95f489c4c -> de54eff19
ARROW-659: [C++] Add multithreaded memcpy implementation
parallelize memcopy operations for large objects with a multi-threaded implementation.
Author: Philipp Moritz <pc...@gmail.com>
Closes #580 from atumanov/parallel-memcpy and squashes the following commits:
6ea9873 [Philipp Moritz] fix windows build (?)
66dfa74 [Philipp Moritz] linting
9dd6f3f [Philipp Moritz] cleanup
e81bad9 [Philipp Moritz] add license header
0beb870 [Philipp Moritz] add pthread library
1d73612 [Philipp Moritz] add test of parallel memcopy
1a27431 [Philipp Moritz] restructure code
70d767c [Philipp Moritz] add benchmarks
b320b47 [Philipp Moritz] make memcopy generic
f99606a [Philipp Moritz] add parallel memcpy, contributed by Alexey Tumanov <at...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/de54eff1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/de54eff1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/de54eff1
Branch: refs/heads/master
Commit: de54eff19af024c1ca0e82f4b45c6021443a635b
Parents: 95f489c
Author: Philipp Moritz <pc...@gmail.com>
Authored: Mon Apr 24 08:30:08 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Apr 24 08:30:08 2017 -0400
----------------------------------------------------------------------
cpp/CMakeLists.txt | 3 +-
cpp/src/arrow/io/CMakeLists.txt | 2 +
cpp/src/arrow/io/io-memory-benchmark.cc | 75 ++++++++++++++++++++++++++++
cpp/src/arrow/io/io-memory-test.cc | 22 ++++++++
cpp/src/arrow/io/memory.cc | 31 ++++++++++--
cpp/src/arrow/io/memory.h | 8 +++
cpp/src/arrow/ipc/CMakeLists.txt | 1 +
cpp/src/arrow/util/memory.h | 69 +++++++++++++++++++++++++
8 files changed, 207 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 978f70a..2d8c00f 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -902,7 +902,8 @@ set(ARROW_STATIC_PRIVATE_LINK_LIBS
if (NOT MSVC)
set(ARROW_LINK_LIBS
${ARROW_LINK_LIBS}
- ${CMAKE_DL_LIBS})
+ ${CMAKE_DL_LIBS}
+ pthread)
endif()
if(RAPIDJSON_VENDORED)
http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index c0199d7..cd48974 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -22,6 +22,8 @@ ADD_ARROW_TEST(io-file-test)
ADD_ARROW_TEST(io-hdfs-test)
ADD_ARROW_TEST(io-memory-test)
+ADD_ARROW_BENCHMARK(io-memory-benchmark)
+
# Headers: top level
install(FILES
file.h
http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/io-memory-benchmark.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-benchmark.cc b/cpp/src/arrow/io/io-memory-benchmark.cc
new file mode 100644
index 0000000..59b511a
--- /dev/null
+++ b/cpp/src/arrow/io/io-memory-benchmark.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/api.h"
+#include "arrow/io/memory.h"
+#include "arrow/test-util.h"
+
+#include "benchmark/benchmark.h"
+
+#include <iostream>
+
+namespace arrow {
+
+static void BM_SerialMemcopy(benchmark::State& state) { // NOLINT non-const reference
+ constexpr int64_t kTotalSize = 100 * 1024 * 1024; // 100MB
+
+ auto buffer1 = std::make_shared<PoolBuffer>(default_memory_pool());
+ buffer1->Resize(kTotalSize);
+
+ auto buffer2 = std::make_shared<PoolBuffer>(default_memory_pool());
+ buffer2->Resize(kTotalSize);
+ test::random_bytes(kTotalSize, 0, buffer2->mutable_data());
+
+ while (state.KeepRunning()) {
+ io::FixedSizeBufferWriter writer(buffer1);
+ writer.Write(buffer2->data(), buffer2->size());
+ }
+ state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
+}
+
+static void BM_ParallelMemcopy(benchmark::State& state) { // NOLINT non-const reference
+ constexpr int64_t kTotalSize = 100 * 1024 * 1024; // 100MB
+
+ auto buffer1 = std::make_shared<PoolBuffer>(default_memory_pool());
+ buffer1->Resize(kTotalSize);
+
+ auto buffer2 = std::make_shared<PoolBuffer>(default_memory_pool());
+ buffer2->Resize(kTotalSize);
+ test::random_bytes(kTotalSize, 0, buffer2->mutable_data());
+
+ while (state.KeepRunning()) {
+ io::FixedSizeBufferWriter writer(buffer1);
+ writer.set_memcopy_threads(4);
+ writer.Write(buffer2->data(), buffer2->size());
+ }
+ state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
+}
+
+BENCHMARK(BM_SerialMemcopy)
+ ->RangeMultiplier(4)
+ ->Range(1, 1 << 13)
+ ->MinTime(1.0)
+ ->UseRealTime();
+
+BENCHMARK(BM_ParallelMemcopy)
+ ->RangeMultiplier(4)
+ ->Range(1, 1 << 13)
+ ->MinTime(1.0)
+ ->UseRealTime();
+
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/io-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc
index 4704fe8..33249cb 100644
--- a/cpp/src/arrow/io/io-memory-test.cc
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -17,6 +17,7 @@
#include <cstdint>
#include <cstdio>
+#include <cstdlib>
#include <cstring>
#include <memory>
#include <string>
@@ -114,5 +115,26 @@ TEST(TestBufferReader, RetainParentReference) {
ASSERT_EQ(0, std::memcmp(slice2->data(), data.c_str() + 4, 6));
}
+TEST(TestMemcopy, ParallelMemcopy) {
+ for (int i = 0; i < 5; ++i) {
+ // randomize size so the memcopy alignment is tested
+ int64_t total_size = 3 * 1024 * 1024 + std::rand() % 100;
+
+ auto buffer1 = std::make_shared<PoolBuffer>(default_memory_pool());
+ buffer1->Resize(total_size);
+
+ auto buffer2 = std::make_shared<PoolBuffer>(default_memory_pool());
+ buffer2->Resize(total_size);
+ test::random_bytes(total_size, 0, buffer2->mutable_data());
+
+ io::FixedSizeBufferWriter writer(buffer1);
+ writer.set_memcopy_threads(4);
+ writer.set_memcopy_threshold(1024 * 1024);
+ writer.Write(buffer2->data(), buffer2->size());
+
+ ASSERT_EQ(0, memcmp(buffer1->data(), buffer2->data(), buffer1->size()));
+ }
+}
+
} // namespace io
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 2e701e1..95c6206 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -29,6 +29,7 @@
#include "arrow/io/interfaces.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
+#include "arrow/util/memory.h"
namespace arrow {
namespace io {
@@ -80,7 +81,7 @@ Status BufferOutputStream::Tell(int64_t* position) {
Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) {
DCHECK(buffer_);
RETURN_NOT_OK(Reserve(nbytes));
- std::memcpy(mutable_data_ + position_, data, nbytes);
+ memcpy(mutable_data_ + position_, data, nbytes);
position_ += nbytes;
return Status::OK();
}
@@ -101,8 +102,15 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
// ----------------------------------------------------------------------
// In-memory buffer writer
+static constexpr int kMemcopyDefaultNumThreads = 1;
+static constexpr int64_t kMemcopyDefaultBlocksize = 64;
+static constexpr int64_t kMemcopyDefaultThreshold = 1024 * 1024;
+
/// Input buffer must be mutable, will abort if not
-FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer) {
+FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer)
+ : memcopy_num_threads_(kMemcopyDefaultNumThreads),
+ memcopy_blocksize_(kMemcopyDefaultBlocksize),
+ memcopy_threshold_(kMemcopyDefaultThreshold) {
buffer_ = buffer;
DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
mutable_data_ = buffer->mutable_data();
@@ -131,7 +139,12 @@ Status FixedSizeBufferWriter::Tell(int64_t* position) {
}
Status FixedSizeBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
- std::memcpy(mutable_data_ + position_, data, nbytes);
+ if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) {
+ parallel_memcopy(mutable_data_ + position_, data, nbytes,
+ memcopy_blocksize_, memcopy_num_threads_);
+ } else {
+ memcpy(mutable_data_ + position_, data, nbytes);
+ }
position_ += nbytes;
return Status::OK();
}
@@ -143,6 +156,18 @@ Status FixedSizeBufferWriter::WriteAt(
return Write(data, nbytes);
}
+void FixedSizeBufferWriter::set_memcopy_threads(int num_threads) {
+ memcopy_num_threads_ = num_threads;
+}
+
+void FixedSizeBufferWriter::set_memcopy_blocksize(int64_t blocksize) {
+ memcopy_blocksize_ = blocksize;
+}
+
+void FixedSizeBufferWriter::set_memcopy_threshold(int64_t threshold) {
+ memcopy_threshold_ = threshold;
+}
+
// ----------------------------------------------------------------------
// In-memory buffer reader
http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index fbb186b..f1b5990 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -81,12 +81,20 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile {
Status Write(const uint8_t* data, int64_t nbytes) override;
Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;
+ void set_memcopy_threads(int num_threads);
+ void set_memcopy_blocksize(int64_t blocksize);
+ void set_memcopy_threshold(int64_t threshold);
+
private:
std::mutex lock_;
std::shared_ptr<Buffer> buffer_;
uint8_t* mutable_data_;
int64_t size_;
int64_t position_;
+
+ int memcopy_num_threads_;
+ int64_t memcopy_blocksize_;
+ int64_t memcopy_threshold_;
};
class ARROW_EXPORT BufferReader : public RandomAccessFile {
http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index fc1d53e..41ab5d7 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -95,6 +95,7 @@ if(MSVC)
else()
set(UTIL_LINK_LIBS
arrow_static
+ pthread
${BOOST_FILESYSTEM_LIBRARY}
${BOOST_SYSTEM_LIBRARY}
dl)
http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/util/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h
new file mode 100644
index 0000000..7feeb29
--- /dev/null
+++ b/cpp/src/arrow/util/memory.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_UTIL_MEMORY_H
+#define ARROW_UTIL_MEMORY_H
+
+#include <thread>
+#include <vector>
+
+namespace arrow {
+
+uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
+ uintptr_t value = reinterpret_cast<uintptr_t>(address);
+ return reinterpret_cast<uint8_t*>(value & bits);
+}
+
+// A helper function for doing memcpy with multiple threads. This is required
+// to saturate the memory bandwidth of modern cpus.
+void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
+ uintptr_t block_size, int num_threads) {
+ std::vector<std::thread> threadpool(num_threads);
+ uint8_t* left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
+ uint8_t* right = pointer_logical_and(src + nbytes, ~(block_size - 1));
+ int64_t num_blocks = (right - left) / block_size;
+
+ // Update right address
+ right = right - (num_blocks % num_threads) * block_size;
+
+ // Now we divide these blocks between available threads. The remainder is
+ // handled on the main thread.
+ int64_t chunk_size = (right - left) / num_threads;
+ int64_t prefix = left - src;
+ int64_t suffix = src + nbytes - right;
+ // Now the data layout is | prefix | k * num_threads * block_size | suffix |.
+ // We have chunk_size = k * block_size, therefore the data layout is
+ // | prefix | num_threads * chunk_size | suffix |.
+ // Each thread gets a "chunk" of k blocks.
+
+ // Start all threads first and handle leftovers while threads run.
+ for (int i = 0; i < num_threads; i++) {
+ threadpool[i] = std::thread(memcpy, dst + prefix + i * chunk_size,
+ left + i * chunk_size, chunk_size);
+ }
+
+ memcpy(dst, src, prefix);
+ memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
+
+ for (auto& t : threadpool) {
+ if (t.joinable()) { t.join(); }
+ }
+}
+
+} // namespace arrow
+
+#endif // ARROW_UTIL_MEMORY_H