You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/08/08 20:17:12 UTC
[arrow] branch master updated: ARROW-3024: [C++] Remove mutex in
MemoryPool implementations
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9b24c3e ARROW-3024: [C++] Remove mutex in MemoryPool implementations
9b24c3e is described below
commit 9b24c3ecb70122564f3d0947f631e36e79d4689a
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Aug 8 16:17:07 2018 -0400
ARROW-3024: [C++] Remove mutex in MemoryPool implementations
Author: Antoine Pitrou <an...@python.org>
Closes #2403 from pitrou/ARROW-3024-memory-pool-atomics and squashes the following commits:
4c919b3a <Antoine Pitrou> ARROW-3024: Remove mutex in MemoryPool implementations
---
cpp/src/arrow/allocator-test.cc | 2 +-
cpp/src/arrow/memory_pool-test.cc | 14 +++---
cpp/src/arrow/memory_pool-test.h | 7 +++
cpp/src/arrow/memory_pool.cc | 92 ++++++++++++++++++++-------------------
4 files changed, 62 insertions(+), 53 deletions(-)
diff --git a/cpp/src/arrow/allocator-test.cc b/cpp/src/arrow/allocator-test.cc
index 55cc203..bdae9b9 100644
--- a/cpp/src/arrow/allocator-test.cc
+++ b/cpp/src/arrow/allocator-test.cc
@@ -52,7 +52,7 @@ TEST(stl_allocator, FreeLargeMemory) {
#ifndef NDEBUG
EXPECT_DEATH(alloc.deallocate(data, 120),
- ".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)");
+ ".*Check failed:.* allocation counter became negative");
#endif
alloc.deallocate(data, 100);
diff --git a/cpp/src/arrow/memory_pool-test.cc b/cpp/src/arrow/memory_pool-test.cc
index 8915708..4cd01d3 100644
--- a/cpp/src/arrow/memory_pool-test.cc
+++ b/cpp/src/arrow/memory_pool-test.cc
@@ -52,7 +52,7 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) {
#ifndef NDEBUG
EXPECT_DEATH(pool->Free(data, 120),
- ".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)");
+ ".*Check failed:.* allocation counter became negative");
#endif
pool->Free(data, 100);
@@ -60,14 +60,14 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) {
TEST(DefaultMemoryPoolDeathTest, MaxMemory) {
MemoryPool* pool = default_memory_pool();
-
- uint8_t* data;
- ASSERT_OK(pool->Allocate(100, &data));
-
+ uint8_t* data1;
uint8_t* data2;
- ASSERT_OK(pool->Allocate(100, &data2));
- pool->Free(data, 100);
+ ASSERT_OK(pool->Allocate(100, &data1));
+ ASSERT_OK(pool->Allocate(50, &data2));
+ pool->Free(data2, 50);
+ ASSERT_OK(pool->Allocate(100, &data2));
+ pool->Free(data1, 100);
pool->Free(data2, 100);
ASSERT_EQ(200, pool->max_memory());
diff --git a/cpp/src/arrow/memory_pool-test.h b/cpp/src/arrow/memory_pool-test.h
index ced59bf..f583da5 100644
--- a/cpp/src/arrow/memory_pool-test.h
+++ b/cpp/src/arrow/memory_pool-test.h
@@ -40,7 +40,14 @@ class TestMemoryPoolBase : public ::testing::Test {
EXPECT_EQ(static_cast<uint64_t>(0), reinterpret_cast<uint64_t>(data) % 64);
ASSERT_EQ(100, pool->bytes_allocated());
+ uint8_t* data2;
+ ASSERT_OK(pool->Allocate(27, &data2));
+ EXPECT_EQ(static_cast<uint64_t>(0), reinterpret_cast<uint64_t>(data2) % 64);
+ ASSERT_EQ(127, pool->bytes_allocated());
+
pool->Free(data, 100);
+ ASSERT_EQ(27, pool->bytes_allocated());
+ pool->Free(data2, 27);
ASSERT_EQ(0, pool->bytes_allocated());
}
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index a92bfbf..2f71c5d 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -24,7 +24,6 @@
#include <cstring>
#include <iostream>
#include <memory>
-#include <mutex>
#include <sstream> // IWYU pragma: keep
#include "arrow/status.h"
@@ -88,22 +87,43 @@ MemoryPool::~MemoryPool() {}
int64_t MemoryPool::max_memory() const { return -1; }
-class DefaultMemoryPool : public MemoryPool {
+///////////////////////////////////////////////////////////////////////
+// Helper tracking memory statistics
+
+class MemoryPoolStats {
public:
- DefaultMemoryPool() : bytes_allocated_(0) { max_memory_ = 0; }
+ MemoryPoolStats() : bytes_allocated_(0), max_memory_(0) {}
+
+ int64_t max_memory() const { return max_memory_.load(); }
+
+ int64_t bytes_allocated() const { return bytes_allocated_.load(); }
+
+ inline void UpdateAllocatedBytes(int64_t diff) {
+ auto allocated = bytes_allocated_.fetch_add(diff) + diff;
+ DCHECK_GE(allocated, 0) << "allocation counter became negative";
+ // "maximum" allocated memory is ill-defined in multi-threaded code,
+ // so don't try to be too rigorous here
+ if (diff > 0 && allocated > max_memory_) {
+ max_memory_ = allocated;
+ }
+ }
+
+ protected:
+ std::atomic<int64_t> bytes_allocated_;
+ std::atomic<int64_t> max_memory_;
+};
+
+///////////////////////////////////////////////////////////////////////
+// Default MemoryPool implementation
+class DefaultMemoryPool : public MemoryPool {
+ public:
~DefaultMemoryPool() override {}
Status Allocate(int64_t size, uint8_t** out) override {
RETURN_NOT_OK(AllocateAligned(size, out));
- bytes_allocated_ += size;
- {
- std::lock_guard<std::mutex> guard(lock_);
- if (bytes_allocated_ > max_memory_) {
- max_memory_ = bytes_allocated_.load();
- }
- }
+ stats_.UpdateAllocatedBytes(size);
return Status::OK();
}
@@ -134,21 +154,13 @@ class DefaultMemoryPool : public MemoryPool {
*ptr = out;
#endif // defined(ARROW_JEMALLOC)
- bytes_allocated_ += new_size - old_size;
- {
- std::lock_guard<std::mutex> guard(lock_);
- if (bytes_allocated_ > max_memory_) {
- max_memory_ = bytes_allocated_.load();
- }
- }
-
+ stats_.UpdateAllocatedBytes(new_size - old_size);
return Status::OK();
}
- int64_t bytes_allocated() const override { return bytes_allocated_.load(); }
+ int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }
void Free(uint8_t* buffer, int64_t size) override {
- DCHECK_GE(bytes_allocated_, size);
#ifdef _MSC_VER
_aligned_free(buffer);
#elif defined(ARROW_JEMALLOC)
@@ -156,15 +168,13 @@ class DefaultMemoryPool : public MemoryPool {
#else
std::free(buffer);
#endif
- bytes_allocated_ -= size;
+ stats_.UpdateAllocatedBytes(-size);
}
- int64_t max_memory() const override { return max_memory_.load(); }
+ int64_t max_memory() const override { return stats_.max_memory(); }
private:
- mutable std::mutex lock_;
- std::atomic<int64_t> bytes_allocated_;
- std::atomic<int64_t> max_memory_;
+ MemoryPoolStats stats_;
};
MemoryPool* default_memory_pool() {
@@ -172,6 +182,9 @@ MemoryPool* default_memory_pool() {
return &default_memory_pool_;
}
+///////////////////////////////////////////////////////////////////////
+// LoggingMemoryPool implementation
+
LoggingMemoryPool::LoggingMemoryPool(MemoryPool* pool) : pool_(pool) {}
Status LoggingMemoryPool::Allocate(int64_t size, uint8_t** out) {
@@ -204,48 +217,37 @@ int64_t LoggingMemoryPool::max_memory() const {
return mem;
}
+///////////////////////////////////////////////////////////////////////
+// ProxyMemoryPool implementation
+
class ProxyMemoryPool::ProxyMemoryPoolImpl {
public:
explicit ProxyMemoryPoolImpl(MemoryPool* pool) : pool_(pool) {}
Status Allocate(int64_t size, uint8_t** out) {
RETURN_NOT_OK(pool_->Allocate(size, out));
- bytes_allocated_ += size;
- {
- std::lock_guard<std::mutex> guard(lock_);
- if (bytes_allocated_ > max_memory_) {
- max_memory_ = bytes_allocated_.load();
- }
- }
+ stats_.UpdateAllocatedBytes(size);
return Status::OK();
}
Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr));
- bytes_allocated_ += new_size - old_size;
- {
- std::lock_guard<std::mutex> guard(lock_);
- if (bytes_allocated_ > max_memory_) {
- max_memory_ = bytes_allocated_.load();
- }
- }
+ stats_.UpdateAllocatedBytes(new_size - old_size);
return Status::OK();
}
void Free(uint8_t* buffer, int64_t size) {
pool_->Free(buffer, size);
- bytes_allocated_ -= size;
+ stats_.UpdateAllocatedBytes(-size);
}
- int64_t bytes_allocated() const { return bytes_allocated_.load(); }
+ int64_t bytes_allocated() const { return stats_.bytes_allocated(); }
- int64_t max_memory() const { return max_memory_.load(); }
+ int64_t max_memory() const { return stats_.max_memory(); }
private:
- mutable std::mutex lock_;
MemoryPool* pool_;
- std::atomic<int64_t> bytes_allocated_{0};
- std::atomic<int64_t> max_memory_{0};
+ MemoryPoolStats stats_;
};
ProxyMemoryPool::ProxyMemoryPool(MemoryPool* pool) {