You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/24 12:30:14 UTC

arrow git commit: ARROW-659: [C++] Add multithreaded memcpy implementation

Repository: arrow
Updated Branches:
  refs/heads/master 95f489c4c -> de54eff19


ARROW-659: [C++] Add multithreaded memcpy implementation

parallelize memcopy operations for large objects with a multi-threaded implementation.

Author: Philipp Moritz <pc...@gmail.com>

Closes #580 from atumanov/parallel-memcpy and squashes the following commits:

6ea9873 [Philipp Moritz] fix windows build (?)
66dfa74 [Philipp Moritz] linting
9dd6f3f [Philipp Moritz] cleanup
e81bad9 [Philipp Moritz] add license header
0beb870 [Philipp Moritz] add pthread library
1d73612 [Philipp Moritz] add test of parallel memcopy
1a27431 [Philipp Moritz] restructure code
70d767c [Philipp Moritz] add benchmarks
b320b47 [Philipp Moritz] make memcopy generic
f99606a [Philipp Moritz] add parallel memcpy, contributed by Alexey Tumanov <at...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/de54eff1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/de54eff1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/de54eff1

Branch: refs/heads/master
Commit: de54eff19af024c1ca0e82f4b45c6021443a635b
Parents: 95f489c
Author: Philipp Moritz <pc...@gmail.com>
Authored: Mon Apr 24 08:30:08 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Apr 24 08:30:08 2017 -0400

----------------------------------------------------------------------
 cpp/CMakeLists.txt                      |  3 +-
 cpp/src/arrow/io/CMakeLists.txt         |  2 +
 cpp/src/arrow/io/io-memory-benchmark.cc | 75 ++++++++++++++++++++++++++++
 cpp/src/arrow/io/io-memory-test.cc      | 22 ++++++++
 cpp/src/arrow/io/memory.cc              | 31 ++++++++++--
 cpp/src/arrow/io/memory.h               |  8 +++
 cpp/src/arrow/ipc/CMakeLists.txt        |  1 +
 cpp/src/arrow/util/memory.h             | 69 +++++++++++++++++++++++++
 8 files changed, 207 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 978f70a..2d8c00f 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -902,7 +902,8 @@ set(ARROW_STATIC_PRIVATE_LINK_LIBS
 if (NOT MSVC)
   set(ARROW_LINK_LIBS
     ${ARROW_LINK_LIBS}
-    ${CMAKE_DL_LIBS})
+    ${CMAKE_DL_LIBS}
+    pthread)
 endif()
 
 if(RAPIDJSON_VENDORED)

http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index c0199d7..cd48974 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -22,6 +22,8 @@ ADD_ARROW_TEST(io-file-test)
 ADD_ARROW_TEST(io-hdfs-test)
 ADD_ARROW_TEST(io-memory-test)
 
+ADD_ARROW_BENCHMARK(io-memory-benchmark)
+
 # Headers: top level
 install(FILES
   file.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/io-memory-benchmark.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-benchmark.cc b/cpp/src/arrow/io/io-memory-benchmark.cc
new file mode 100644
index 0000000..59b511a
--- /dev/null
+++ b/cpp/src/arrow/io/io-memory-benchmark.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/api.h"
+#include "arrow/io/memory.h"
+#include "arrow/test-util.h"
+
+#include "benchmark/benchmark.h"
+
+#include <iostream>
+
+namespace arrow {
+
+static void BM_SerialMemcopy(benchmark::State& state) {  // NOLINT non-const reference
+  constexpr int64_t kTotalSize = 100 * 1024 * 1024; // 100MB
+
+  auto buffer1 = std::make_shared<PoolBuffer>(default_memory_pool());
+  buffer1->Resize(kTotalSize);
+
+  auto buffer2 = std::make_shared<PoolBuffer>(default_memory_pool());
+  buffer2->Resize(kTotalSize);
+  test::random_bytes(kTotalSize, 0, buffer2->mutable_data());
+
+  while (state.KeepRunning()) {
+    io::FixedSizeBufferWriter writer(buffer1);
+    writer.Write(buffer2->data(), buffer2->size());
+  }
+  state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
+}
+
+static void BM_ParallelMemcopy(benchmark::State& state) {  // NOLINT non-const reference
+  constexpr int64_t kTotalSize = 100 * 1024 * 1024; // 100MB
+
+  auto buffer1 = std::make_shared<PoolBuffer>(default_memory_pool());
+  buffer1->Resize(kTotalSize);
+
+  auto buffer2 = std::make_shared<PoolBuffer>(default_memory_pool());
+  buffer2->Resize(kTotalSize);
+  test::random_bytes(kTotalSize, 0, buffer2->mutable_data());
+
+  while (state.KeepRunning()) {
+    io::FixedSizeBufferWriter writer(buffer1);
+    writer.set_memcopy_threads(4);
+    writer.Write(buffer2->data(), buffer2->size());
+  }
+  state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
+}
+
+BENCHMARK(BM_SerialMemcopy)
+    ->RangeMultiplier(4)
+    ->Range(1, 1 << 13)
+    ->MinTime(1.0)
+    ->UseRealTime();
+
+BENCHMARK(BM_ParallelMemcopy)
+    ->RangeMultiplier(4)
+    ->Range(1, 1 << 13)
+    ->MinTime(1.0)
+    ->UseRealTime();
+
+} // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/io-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc
index 4704fe8..33249cb 100644
--- a/cpp/src/arrow/io/io-memory-test.cc
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -17,6 +17,7 @@
 
 #include <cstdint>
 #include <cstdio>
+#include <cstdlib>
 #include <cstring>
 #include <memory>
 #include <string>
@@ -114,5 +115,26 @@ TEST(TestBufferReader, RetainParentReference) {
   ASSERT_EQ(0, std::memcmp(slice2->data(), data.c_str() + 4, 6));
 }
 
+TEST(TestMemcopy, ParallelMemcopy) {
+  for (int i = 0; i < 5; ++i) {
+    // randomize size so the memcopy alignment is tested
+    int64_t total_size = 3 * 1024 * 1024 + std::rand() % 100;
+
+    auto buffer1 = std::make_shared<PoolBuffer>(default_memory_pool());
+    buffer1->Resize(total_size);
+
+    auto buffer2 = std::make_shared<PoolBuffer>(default_memory_pool());
+    buffer2->Resize(total_size);
+    test::random_bytes(total_size, 0, buffer2->mutable_data());
+
+    io::FixedSizeBufferWriter writer(buffer1);
+    writer.set_memcopy_threads(4);
+    writer.set_memcopy_threshold(1024 * 1024);
+    writer.Write(buffer2->data(), buffer2->size());
+
+    ASSERT_EQ(0, memcmp(buffer1->data(), buffer2->data(), buffer1->size()));
+  }
+}
+
 }  // namespace io
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 2e701e1..95c6206 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -29,6 +29,7 @@
 #include "arrow/io/interfaces.h"
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/memory.h"
 
 namespace arrow {
 namespace io {
@@ -80,7 +81,7 @@ Status BufferOutputStream::Tell(int64_t* position) {
 Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) {
   DCHECK(buffer_);
   RETURN_NOT_OK(Reserve(nbytes));
-  std::memcpy(mutable_data_ + position_, data, nbytes);
+  memcpy(mutable_data_ + position_, data, nbytes);
   position_ += nbytes;
   return Status::OK();
 }
@@ -101,8 +102,15 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
 // ----------------------------------------------------------------------
 // In-memory buffer writer
 
+static constexpr int kMemcopyDefaultNumThreads = 1;
+static constexpr int64_t kMemcopyDefaultBlocksize = 64;
+static constexpr int64_t kMemcopyDefaultThreshold = 1024 * 1024;
+
 /// Input buffer must be mutable, will abort if not
-FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer) {
+FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer)
+    : memcopy_num_threads_(kMemcopyDefaultNumThreads),
+      memcopy_blocksize_(kMemcopyDefaultBlocksize),
+      memcopy_threshold_(kMemcopyDefaultThreshold) {
   buffer_ = buffer;
   DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
   mutable_data_ = buffer->mutable_data();
@@ -131,7 +139,12 @@ Status FixedSizeBufferWriter::Tell(int64_t* position) {
 }
 
 Status FixedSizeBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
-  std::memcpy(mutable_data_ + position_, data, nbytes);
+  if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) {
+    parallel_memcopy(mutable_data_ + position_, data, nbytes,
+                     memcopy_blocksize_, memcopy_num_threads_);
+  } else {
+    memcpy(mutable_data_ + position_, data, nbytes);
+  }
   position_ += nbytes;
   return Status::OK();
 }
@@ -143,6 +156,18 @@ Status FixedSizeBufferWriter::WriteAt(
   return Write(data, nbytes);
 }
 
+void FixedSizeBufferWriter::set_memcopy_threads(int num_threads) {
+  memcopy_num_threads_ = num_threads;
+}
+
+void FixedSizeBufferWriter::set_memcopy_blocksize(int64_t blocksize) {
+  memcopy_blocksize_ = blocksize;
+}
+
+void FixedSizeBufferWriter::set_memcopy_threshold(int64_t threshold) {
+  memcopy_threshold_ = threshold;
+}
+
 // ----------------------------------------------------------------------
 // In-memory buffer reader
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index fbb186b..f1b5990 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -81,12 +81,20 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile {
   Status Write(const uint8_t* data, int64_t nbytes) override;
   Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;
 
+  void set_memcopy_threads(int num_threads);
+  void set_memcopy_blocksize(int64_t blocksize);
+  void set_memcopy_threshold(int64_t threshold);
+
  private:
   std::mutex lock_;
   std::shared_ptr<Buffer> buffer_;
   uint8_t* mutable_data_;
   int64_t size_;
   int64_t position_;
+
+  int memcopy_num_threads_;
+  int64_t memcopy_blocksize_;
+  int64_t memcopy_threshold_;
 };
 
 class ARROW_EXPORT BufferReader : public RandomAccessFile {

http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index fc1d53e..41ab5d7 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -95,6 +95,7 @@ if(MSVC)
 else()
   set(UTIL_LINK_LIBS
     arrow_static
+    pthread
     ${BOOST_FILESYSTEM_LIBRARY}
     ${BOOST_SYSTEM_LIBRARY}
     dl)

http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/util/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h
new file mode 100644
index 0000000..7feeb29
--- /dev/null
+++ b/cpp/src/arrow/util/memory.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_UTIL_MEMORY_H
+#define ARROW_UTIL_MEMORY_H
+
+#include <thread>
+#include <vector>
+
+namespace arrow {
+
+uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
+  uintptr_t value = reinterpret_cast<uintptr_t>(address);
+  return reinterpret_cast<uint8_t*>(value & bits);
+}
+
+// A helper function for doing memcpy with multiple threads. This is required
+// to saturate the memory bandwidth of modern cpus.
+void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
+                      uintptr_t block_size, int num_threads) {
+  std::vector<std::thread> threadpool(num_threads);
+  uint8_t* left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
+  uint8_t* right = pointer_logical_and(src + nbytes, ~(block_size - 1));
+  int64_t num_blocks = (right - left) / block_size;
+
+  // Update right address
+  right = right - (num_blocks % num_threads) * block_size;
+
+  // Now we divide these blocks between available threads. The remainder is
+  // handled on the main thread.
+  int64_t chunk_size = (right - left) / num_threads;
+  int64_t prefix = left - src;
+  int64_t suffix = src + nbytes - right;
+  // Now the data layout is | prefix | k * num_threads * block_size | suffix |.
+  // We have chunk_size = k * block_size, therefore the data layout is
+  // | prefix | num_threads * chunk_size | suffix |.
+  // Each thread gets a "chunk" of k blocks.
+
+  // Start all threads first and handle leftovers while threads run.
+  for (int i = 0; i < num_threads; i++) {
+    threadpool[i] = std::thread(memcpy, dst + prefix + i * chunk_size,
+        left + i * chunk_size, chunk_size);
+  }
+
+  memcpy(dst, src, prefix);
+  memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
+
+  for (auto& t : threadpool) {
+    if (t.joinable()) { t.join(); }
+  }
+}
+
+} // namespace arrow
+
+#endif  // ARROW_UTIL_MEMORY_H