You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/08/08 20:17:12 UTC

[arrow] branch master updated: ARROW-3024: [C++] Remove mutex in MemoryPool implementations

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b24c3e  ARROW-3024: [C++] Remove mutex in MemoryPool implementations
9b24c3e is described below

commit 9b24c3ecb70122564f3d0947f631e36e79d4689a
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Aug 8 16:17:07 2018 -0400

    ARROW-3024: [C++] Remove mutex in MemoryPool implementations
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #2403 from pitrou/ARROW-3024-memory-pool-atomics and squashes the following commits:
    
    4c919b3a <Antoine Pitrou> ARROW-3024:  Remove mutex in MemoryPool implementations
---
 cpp/src/arrow/allocator-test.cc   |  2 +-
 cpp/src/arrow/memory_pool-test.cc | 14 +++---
 cpp/src/arrow/memory_pool-test.h  |  7 +++
 cpp/src/arrow/memory_pool.cc      | 92 ++++++++++++++++++++-------------------
 4 files changed, 62 insertions(+), 53 deletions(-)

diff --git a/cpp/src/arrow/allocator-test.cc b/cpp/src/arrow/allocator-test.cc
index 55cc203..bdae9b9 100644
--- a/cpp/src/arrow/allocator-test.cc
+++ b/cpp/src/arrow/allocator-test.cc
@@ -52,7 +52,7 @@ TEST(stl_allocator, FreeLargeMemory) {
 
 #ifndef NDEBUG
   EXPECT_DEATH(alloc.deallocate(data, 120),
-               ".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)");
+               ".*Check failed:.* allocation counter became negative");
 #endif
 
   alloc.deallocate(data, 100);
diff --git a/cpp/src/arrow/memory_pool-test.cc b/cpp/src/arrow/memory_pool-test.cc
index 8915708..4cd01d3 100644
--- a/cpp/src/arrow/memory_pool-test.cc
+++ b/cpp/src/arrow/memory_pool-test.cc
@@ -52,7 +52,7 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) {
 
 #ifndef NDEBUG
   EXPECT_DEATH(pool->Free(data, 120),
-               ".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)");
+               ".*Check failed:.* allocation counter became negative");
 #endif
 
   pool->Free(data, 100);
@@ -60,14 +60,14 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) {
 
 TEST(DefaultMemoryPoolDeathTest, MaxMemory) {
   MemoryPool* pool = default_memory_pool();
-
-  uint8_t* data;
-  ASSERT_OK(pool->Allocate(100, &data));
-
+  uint8_t* data1;
   uint8_t* data2;
-  ASSERT_OK(pool->Allocate(100, &data2));
 
-  pool->Free(data, 100);
+  ASSERT_OK(pool->Allocate(100, &data1));
+  ASSERT_OK(pool->Allocate(50, &data2));
+  pool->Free(data2, 50);
+  ASSERT_OK(pool->Allocate(100, &data2));
+  pool->Free(data1, 100);
   pool->Free(data2, 100);
 
   ASSERT_EQ(200, pool->max_memory());
diff --git a/cpp/src/arrow/memory_pool-test.h b/cpp/src/arrow/memory_pool-test.h
index ced59bf..f583da5 100644
--- a/cpp/src/arrow/memory_pool-test.h
+++ b/cpp/src/arrow/memory_pool-test.h
@@ -40,7 +40,14 @@ class TestMemoryPoolBase : public ::testing::Test {
     EXPECT_EQ(static_cast<uint64_t>(0), reinterpret_cast<uint64_t>(data) % 64);
     ASSERT_EQ(100, pool->bytes_allocated());
 
+    uint8_t* data2;
+    ASSERT_OK(pool->Allocate(27, &data2));
+    EXPECT_EQ(static_cast<uint64_t>(0), reinterpret_cast<uint64_t>(data2) % 64);
+    ASSERT_EQ(127, pool->bytes_allocated());
+
     pool->Free(data, 100);
+    ASSERT_EQ(27, pool->bytes_allocated());
+    pool->Free(data2, 27);
     ASSERT_EQ(0, pool->bytes_allocated());
   }
 
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index a92bfbf..2f71c5d 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -24,7 +24,6 @@
 #include <cstring>
 #include <iostream>
 #include <memory>
-#include <mutex>
 #include <sstream>  // IWYU pragma: keep
 
 #include "arrow/status.h"
@@ -88,22 +87,43 @@ MemoryPool::~MemoryPool() {}
 
 int64_t MemoryPool::max_memory() const { return -1; }
 
-class DefaultMemoryPool : public MemoryPool {
+///////////////////////////////////////////////////////////////////////
+// Helper tracking memory statistics
+
+class MemoryPoolStats {
  public:
-  DefaultMemoryPool() : bytes_allocated_(0) { max_memory_ = 0; }
+  MemoryPoolStats() : bytes_allocated_(0), max_memory_(0) {}
+
+  int64_t max_memory() const { return max_memory_.load(); }
+
+  int64_t bytes_allocated() const { return bytes_allocated_.load(); }
+
+  inline void UpdateAllocatedBytes(int64_t diff) {
+    auto allocated = bytes_allocated_.fetch_add(diff) + diff;
+    DCHECK_GE(allocated, 0) << "allocation counter became negative";
+    // "maximum" allocated memory is ill-defined in multi-threaded code,
+    // so don't try to be too rigorous here
+    if (diff > 0 && allocated > max_memory_) {
+      max_memory_ = allocated;
+    }
+  }
+
+ protected:
+  std::atomic<int64_t> bytes_allocated_;
+  std::atomic<int64_t> max_memory_;
+};
+
+///////////////////////////////////////////////////////////////////////
+// Default MemoryPool implementation
 
+class DefaultMemoryPool : public MemoryPool {
+ public:
   ~DefaultMemoryPool() override {}
 
   Status Allocate(int64_t size, uint8_t** out) override {
     RETURN_NOT_OK(AllocateAligned(size, out));
-    bytes_allocated_ += size;
 
-    {
-      std::lock_guard<std::mutex> guard(lock_);
-      if (bytes_allocated_ > max_memory_) {
-        max_memory_ = bytes_allocated_.load();
-      }
-    }
+    stats_.UpdateAllocatedBytes(size);
     return Status::OK();
   }
 
@@ -134,21 +154,13 @@ class DefaultMemoryPool : public MemoryPool {
     *ptr = out;
 #endif  // defined(ARROW_JEMALLOC)
 
-    bytes_allocated_ += new_size - old_size;
-    {
-      std::lock_guard<std::mutex> guard(lock_);
-      if (bytes_allocated_ > max_memory_) {
-        max_memory_ = bytes_allocated_.load();
-      }
-    }
-
+    stats_.UpdateAllocatedBytes(new_size - old_size);
     return Status::OK();
   }
 
-  int64_t bytes_allocated() const override { return bytes_allocated_.load(); }
+  int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }
 
   void Free(uint8_t* buffer, int64_t size) override {
-    DCHECK_GE(bytes_allocated_, size);
 #ifdef _MSC_VER
     _aligned_free(buffer);
 #elif defined(ARROW_JEMALLOC)
@@ -156,15 +168,13 @@ class DefaultMemoryPool : public MemoryPool {
 #else
     std::free(buffer);
 #endif
-    bytes_allocated_ -= size;
+    stats_.UpdateAllocatedBytes(-size);
   }
 
-  int64_t max_memory() const override { return max_memory_.load(); }
+  int64_t max_memory() const override { return stats_.max_memory(); }
 
  private:
-  mutable std::mutex lock_;
-  std::atomic<int64_t> bytes_allocated_;
-  std::atomic<int64_t> max_memory_;
+  MemoryPoolStats stats_;
 };
 
 MemoryPool* default_memory_pool() {
@@ -172,6 +182,9 @@ MemoryPool* default_memory_pool() {
   return &default_memory_pool_;
 }
 
+///////////////////////////////////////////////////////////////////////
+// LoggingMemoryPool implementation
+
 LoggingMemoryPool::LoggingMemoryPool(MemoryPool* pool) : pool_(pool) {}
 
 Status LoggingMemoryPool::Allocate(int64_t size, uint8_t** out) {
@@ -204,48 +217,37 @@ int64_t LoggingMemoryPool::max_memory() const {
   return mem;
 }
 
+///////////////////////////////////////////////////////////////////////
+// ProxyMemoryPool implementation
+
 class ProxyMemoryPool::ProxyMemoryPoolImpl {
  public:
   explicit ProxyMemoryPoolImpl(MemoryPool* pool) : pool_(pool) {}
 
   Status Allocate(int64_t size, uint8_t** out) {
     RETURN_NOT_OK(pool_->Allocate(size, out));
-    bytes_allocated_ += size;
-    {
-      std::lock_guard<std::mutex> guard(lock_);
-      if (bytes_allocated_ > max_memory_) {
-        max_memory_ = bytes_allocated_.load();
-      }
-    }
+    stats_.UpdateAllocatedBytes(size);
     return Status::OK();
   }
 
   Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
     RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr));
-    bytes_allocated_ += new_size - old_size;
-    {
-      std::lock_guard<std::mutex> guard(lock_);
-      if (bytes_allocated_ > max_memory_) {
-        max_memory_ = bytes_allocated_.load();
-      }
-    }
+    stats_.UpdateAllocatedBytes(new_size - old_size);
     return Status::OK();
   }
 
   void Free(uint8_t* buffer, int64_t size) {
     pool_->Free(buffer, size);
-    bytes_allocated_ -= size;
+    stats_.UpdateAllocatedBytes(-size);
   }
 
-  int64_t bytes_allocated() const { return bytes_allocated_.load(); }
+  int64_t bytes_allocated() const { return stats_.bytes_allocated(); }
 
-  int64_t max_memory() const { return max_memory_.load(); }
+  int64_t max_memory() const { return stats_.max_memory(); }
 
  private:
-  mutable std::mutex lock_;
   MemoryPool* pool_;
-  std::atomic<int64_t> bytes_allocated_{0};
-  std::atomic<int64_t> max_memory_{0};
+  MemoryPoolStats stats_;
 };
 
 ProxyMemoryPool::ProxyMemoryPool(MemoryPool* pool) {