You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/02/09 15:44:00 UTC
arrow git commit: ARROW-521: [C++] Track peak allocations in default
memory pool
Repository: arrow
Updated Branches:
refs/heads/master 31f145dc5 -> dc6cefde4
ARROW-521: [C++] Track peak allocations in default memory pool
This should enable us to remove the `parquet::MemoryAllocator` implementation in parquet-cpp
Author: Wes McKinney <we...@twosigma.com>
Closes #330 from wesm/ARROW-521 and squashes the following commits:
10531c4 [Wes McKinney] Move max_memory_ member to DefaultMemoryPool, add default virtual max_memory() to MemoryPool
a0d134d [Wes McKinney] Add max_memory() method to MemoryPool, leave implementation to subclasses
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/dc6cefde
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/dc6cefde
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/dc6cefde
Branch: refs/heads/master
Commit: dc6cefde46c65ce4753bec3fbafc44a20944f9c9
Parents: 31f145d
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Feb 9 10:43:53 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Feb 9 10:43:53 2017 -0500
----------------------------------------------------------------------
cpp/src/arrow/array-primitive-test.cc | 3 +-
cpp/src/arrow/buffer-test.cc | 2 +-
cpp/src/arrow/ipc/json-integration-test.cc | 6 ++-
cpp/src/arrow/memory_pool-test.cc | 17 ++++++++
cpp/src/arrow/memory_pool.cc | 54 ++++++++++++-------------
cpp/src/arrow/memory_pool.h | 31 ++++++++++++++
cpp/src/arrow/table.cc | 6 ++-
cpp/src/arrow/util/logging.h | 4 +-
cpp/src/arrow/util/macros.h | 2 +-
9 files changed, 89 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/array-primitive-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-primitive-test.cc b/cpp/src/arrow/array-primitive-test.cc
index a20fdbf..f8bbd77 100644
--- a/cpp/src/arrow/array-primitive-test.cc
+++ b/cpp/src/arrow/array-primitive-test.cc
@@ -242,7 +242,8 @@ void TestPrimitiveBuilder<PBoolean>::Check(
}
typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16,
- PInt32, PInt64, PFloat, PDouble> Primitives;
+ PInt32, PInt64, PFloat, PDouble>
+ Primitives;
TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives);
http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/buffer-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/buffer-test.cc b/cpp/src/arrow/buffer-test.cc
index d76e991..934fcfe 100644
--- a/cpp/src/arrow/buffer-test.cc
+++ b/cpp/src/arrow/buffer-test.cc
@@ -67,7 +67,7 @@ TEST_F(TestBuffer, Resize) {
}
TEST_F(TestBuffer, ResizeOOM) {
- // This test doesn't play nice with AddressSanitizer
+// This test doesn't play nice with AddressSanitizer
#ifndef ADDRESS_SANITIZER
// realloc fails, even though there may be no explicit limit
PoolBuffer buf;
http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 17ccc4a..95bc742 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -144,8 +144,10 @@ static Status ValidateArrowVsJson(
if (!json_schema->Equals(arrow_schema)) {
std::stringstream ss;
- ss << "JSON schema: \n" << json_schema->ToString() << "\n"
- << "Arrow schema: \n" << arrow_schema->ToString();
+ ss << "JSON schema: \n"
+ << json_schema->ToString() << "\n"
+ << "Arrow schema: \n"
+ << arrow_schema->ToString();
if (FLAGS_verbose) { std::cout << ss.str() << std::endl; }
return Status::Invalid("Schemas did not match");
http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/memory_pool-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool-test.cc b/cpp/src/arrow/memory_pool-test.cc
index 56bb32f..6ab73fb 100644
--- a/cpp/src/arrow/memory_pool-test.cc
+++ b/cpp/src/arrow/memory_pool-test.cc
@@ -59,6 +59,23 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) {
pool->Free(data, 100);
}
+TEST(DefaultMemoryPoolDeathTest, MaxMemory) {
+ DefaultMemoryPool pool;
+
+ ASSERT_EQ(0, pool.max_memory());
+
+ uint8_t* data;
+ ASSERT_OK(pool.Allocate(100, &data));
+
+ uint8_t* data2;
+ ASSERT_OK(pool.Allocate(100, &data2));
+
+ pool.Free(data, 100);
+ pool.Free(data2, 100);
+
+ ASSERT_EQ(200, pool.max_memory());
+}
+
#endif // ARROW_VALGRIND
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/memory_pool.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index aea5e21..8d85a08 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -60,36 +60,30 @@ Status AllocateAligned(int64_t size, uint8_t** out) {
}
} // namespace
-MemoryPool::~MemoryPool() {}
-
-class InternalMemoryPool : public MemoryPool {
- public:
- InternalMemoryPool() : bytes_allocated_(0) {}
- virtual ~InternalMemoryPool();
-
- Status Allocate(int64_t size, uint8_t** out) override;
- Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
+MemoryPool::MemoryPool() {}
- void Free(uint8_t* buffer, int64_t size) override;
+MemoryPool::~MemoryPool() {}
- int64_t bytes_allocated() const override;
+int64_t MemoryPool::max_memory() const {
+ return -1;
+}
- private:
- mutable std::mutex pool_lock_;
- int64_t bytes_allocated_;
-};
+DefaultMemoryPool::DefaultMemoryPool() : bytes_allocated_(0) {
+ max_memory_ = 0;
+}
-Status InternalMemoryPool::Allocate(int64_t size, uint8_t** out) {
- std::lock_guard<std::mutex> guard(pool_lock_);
+Status DefaultMemoryPool::Allocate(int64_t size, uint8_t** out) {
RETURN_NOT_OK(AllocateAligned(size, out));
bytes_allocated_ += size;
+ {
+ std::lock_guard<std::mutex> guard(lock_);
+ if (bytes_allocated_ > max_memory_) { max_memory_ = bytes_allocated_.load(); }
+ }
return Status::OK();
}
-Status InternalMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
- std::lock_guard<std::mutex> guard(pool_lock_);
-
+Status DefaultMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
// Note: We cannot use realloc() here as it doesn't guarantee alignment.
// Allocate new chunk
@@ -105,17 +99,19 @@ Status InternalMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_
*ptr = out;
bytes_allocated_ += new_size - old_size;
+ {
+ std::lock_guard<std::mutex> guard(lock_);
+ if (bytes_allocated_ > max_memory_) { max_memory_ = bytes_allocated_.load(); }
+ }
return Status::OK();
}
-int64_t InternalMemoryPool::bytes_allocated() const {
- std::lock_guard<std::mutex> guard(pool_lock_);
- return bytes_allocated_;
+int64_t DefaultMemoryPool::bytes_allocated() const {
+ return bytes_allocated_.load();
}
-void InternalMemoryPool::Free(uint8_t* buffer, int64_t size) {
- std::lock_guard<std::mutex> guard(pool_lock_);
+void DefaultMemoryPool::Free(uint8_t* buffer, int64_t size) {
DCHECK_GE(bytes_allocated_, size);
#ifdef _MSC_VER
_aligned_free(buffer);
@@ -125,10 +121,14 @@ void InternalMemoryPool::Free(uint8_t* buffer, int64_t size) {
bytes_allocated_ -= size;
}
-InternalMemoryPool::~InternalMemoryPool() {}
+int64_t DefaultMemoryPool::max_memory() const {
+ return max_memory_.load();
+}
+
+DefaultMemoryPool::~DefaultMemoryPool() {}
MemoryPool* default_memory_pool() {
- static InternalMemoryPool default_memory_pool_;
+ static DefaultMemoryPool default_memory_pool_;
return &default_memory_pool_;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/memory_pool.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h
index 89477b6..33d4c3e 100644
--- a/cpp/src/arrow/memory_pool.h
+++ b/cpp/src/arrow/memory_pool.h
@@ -18,7 +18,9 @@
#ifndef ARROW_UTIL_MEMORY_POOL_H
#define ARROW_UTIL_MEMORY_POOL_H
+#include <atomic>
#include <cstdint>
+#include <mutex>
#include "arrow/util/visibility.h"
@@ -56,6 +58,35 @@ class ARROW_EXPORT MemoryPool {
/// The number of bytes that were allocated and not yet free'd through
/// this allocator.
virtual int64_t bytes_allocated() const = 0;
+
+ /// Return peak memory allocation in this memory pool
+ ///
+ /// \return Maximum bytes allocated. If not known (or not implemented),
+ /// returns -1
+ virtual int64_t max_memory() const;
+
+ protected:
+ MemoryPool();
+};
+
+class ARROW_EXPORT DefaultMemoryPool : public MemoryPool {
+ public:
+ DefaultMemoryPool();
+ virtual ~DefaultMemoryPool();
+
+ Status Allocate(int64_t size, uint8_t** out) override;
+ Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
+
+ void Free(uint8_t* buffer, int64_t size) override;
+
+ int64_t bytes_allocated() const override;
+
+ int64_t max_memory() const override;
+
+ private:
+ mutable std::mutex lock_;
+ std::atomic<int64_t> bytes_allocated_;
+ std::atomic<int64_t> max_memory_;
};
ARROW_EXPORT MemoryPool* default_memory_pool();
http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 9e31ba5..a9e0909 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -106,7 +106,8 @@ Status Table::FromRecordBatches(const std::string& name,
if (!batches[i]->schema()->Equals(schema)) {
std::stringstream ss;
ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
- << schema->ToString() << "\nvs\n" << batches[i]->schema()->ToString();
+ << schema->ToString() << "\nvs\n"
+ << batches[i]->schema()->ToString();
return Status::Invalid(ss.str());
}
}
@@ -138,7 +139,8 @@ Status ConcatenateTables(const std::string& output_name,
if (!tables[i]->schema()->Equals(schema)) {
std::stringstream ss;
ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
- << schema->ToString() << "\nvs\n" << tables[i]->schema()->ToString();
+ << schema->ToString() << "\nvs\n"
+ << tables[i]->schema()->ToString();
return Status::Invalid(ss.str());
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index 06ee841..b22f07d 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -118,9 +118,9 @@ class CerrLog {
class FatalLog : public CerrLog {
public:
explicit FatalLog(int /* severity */) // NOLINT
- : CerrLog(ARROW_FATAL) {} // NOLINT
+ : CerrLog(ARROW_FATAL){} // NOLINT
- [[noreturn]] ~FatalLog() {
+ [[noreturn]] ~FatalLog() {
if (has_logged_) { std::cerr << std::endl; }
std::exit(1);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/util/macros.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h
index 81a9b0c..c4a62a4 100644
--- a/cpp/src/arrow/util/macros.h
+++ b/cpp/src/arrow/util/macros.h
@@ -25,6 +25,6 @@
TypeName& operator=(const TypeName&) = delete
#endif
-#define UNUSED(x) (void) x
+#define UNUSED(x) (void)x
#endif // ARROW_UTIL_MACROS_H