You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/02/09 15:44:00 UTC

arrow git commit: ARROW-521: [C++] Track peak allocations in default memory pool

Repository: arrow
Updated Branches:
  refs/heads/master 31f145dc5 -> dc6cefde4


ARROW-521: [C++] Track peak allocations in default memory pool

This should enable us to remove the `parquet::MemoryAllocator` implementation in parquet-cpp

Author: Wes McKinney <we...@twosigma.com>

Closes #330 from wesm/ARROW-521 and squashes the following commits:

10531c4 [Wes McKinney] Move max_memory_ member to DefaultMemoryPool, add default virtual max_memory() to MemoryPool
a0d134d [Wes McKinney] Add max_memory() method to MemoryPool, leave implementation to subclasses


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/dc6cefde
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/dc6cefde
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/dc6cefde

Branch: refs/heads/master
Commit: dc6cefde46c65ce4753bec3fbafc44a20944f9c9
Parents: 31f145d
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Feb 9 10:43:53 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Feb 9 10:43:53 2017 -0500

----------------------------------------------------------------------
 cpp/src/arrow/array-primitive-test.cc      |  3 +-
 cpp/src/arrow/buffer-test.cc               |  2 +-
 cpp/src/arrow/ipc/json-integration-test.cc |  6 ++-
 cpp/src/arrow/memory_pool-test.cc          | 17 ++++++++
 cpp/src/arrow/memory_pool.cc               | 54 ++++++++++++-------------
 cpp/src/arrow/memory_pool.h                | 31 ++++++++++++++
 cpp/src/arrow/table.cc                     |  6 ++-
 cpp/src/arrow/util/logging.h               |  4 +-
 cpp/src/arrow/util/macros.h                |  2 +-
 9 files changed, 89 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/array-primitive-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-primitive-test.cc b/cpp/src/arrow/array-primitive-test.cc
index a20fdbf..f8bbd77 100644
--- a/cpp/src/arrow/array-primitive-test.cc
+++ b/cpp/src/arrow/array-primitive-test.cc
@@ -242,7 +242,8 @@ void TestPrimitiveBuilder<PBoolean>::Check(
 }
 
 typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16,
-    PInt32, PInt64, PFloat, PDouble> Primitives;
+    PInt32, PInt64, PFloat, PDouble>
+    Primitives;
 
 TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/buffer-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/buffer-test.cc b/cpp/src/arrow/buffer-test.cc
index d76e991..934fcfe 100644
--- a/cpp/src/arrow/buffer-test.cc
+++ b/cpp/src/arrow/buffer-test.cc
@@ -67,7 +67,7 @@ TEST_F(TestBuffer, Resize) {
 }
 
 TEST_F(TestBuffer, ResizeOOM) {
-  // This test doesn't play nice with AddressSanitizer
+// This test doesn't play nice with AddressSanitizer
 #ifndef ADDRESS_SANITIZER
   // realloc fails, even though there may be no explicit limit
   PoolBuffer buf;

http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 17ccc4a..95bc742 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -144,8 +144,10 @@ static Status ValidateArrowVsJson(
 
   if (!json_schema->Equals(arrow_schema)) {
     std::stringstream ss;
-    ss << "JSON schema: \n" << json_schema->ToString() << "\n"
-       << "Arrow schema: \n" << arrow_schema->ToString();
+    ss << "JSON schema: \n"
+       << json_schema->ToString() << "\n"
+       << "Arrow schema: \n"
+       << arrow_schema->ToString();
 
     if (FLAGS_verbose) { std::cout << ss.str() << std::endl; }
     return Status::Invalid("Schemas did not match");

http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/memory_pool-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool-test.cc b/cpp/src/arrow/memory_pool-test.cc
index 56bb32f..6ab73fb 100644
--- a/cpp/src/arrow/memory_pool-test.cc
+++ b/cpp/src/arrow/memory_pool-test.cc
@@ -59,6 +59,23 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) {
   pool->Free(data, 100);
 }
 
+TEST(DefaultMemoryPoolDeathTest, MaxMemory) {
+  DefaultMemoryPool pool;
+
+  ASSERT_EQ(0, pool.max_memory());
+
+  uint8_t* data;
+  ASSERT_OK(pool.Allocate(100, &data));
+
+  uint8_t* data2;
+  ASSERT_OK(pool.Allocate(100, &data2));
+
+  pool.Free(data, 100);
+  pool.Free(data2, 100);
+
+  ASSERT_EQ(200, pool.max_memory());
+}
+
 #endif  // ARROW_VALGRIND
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/memory_pool.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index aea5e21..8d85a08 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -60,36 +60,30 @@ Status AllocateAligned(int64_t size, uint8_t** out) {
 }
 }  // namespace
 
-MemoryPool::~MemoryPool() {}
-
-class InternalMemoryPool : public MemoryPool {
- public:
-  InternalMemoryPool() : bytes_allocated_(0) {}
-  virtual ~InternalMemoryPool();
-
-  Status Allocate(int64_t size, uint8_t** out) override;
-  Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
+MemoryPool::MemoryPool() {}
 
-  void Free(uint8_t* buffer, int64_t size) override;
+MemoryPool::~MemoryPool() {}
 
-  int64_t bytes_allocated() const override;
+int64_t MemoryPool::max_memory() const {
+  return -1;
+}
 
- private:
-  mutable std::mutex pool_lock_;
-  int64_t bytes_allocated_;
-};
+DefaultMemoryPool::DefaultMemoryPool() : bytes_allocated_(0) {
+  max_memory_ = 0;
+}
 
-Status InternalMemoryPool::Allocate(int64_t size, uint8_t** out) {
-  std::lock_guard<std::mutex> guard(pool_lock_);
+Status DefaultMemoryPool::Allocate(int64_t size, uint8_t** out) {
   RETURN_NOT_OK(AllocateAligned(size, out));
   bytes_allocated_ += size;
 
+  {
+    std::lock_guard<std::mutex> guard(lock_);
+    if (bytes_allocated_ > max_memory_) { max_memory_ = bytes_allocated_.load(); }
+  }
   return Status::OK();
 }
 
-Status InternalMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
-  std::lock_guard<std::mutex> guard(pool_lock_);
-
+Status DefaultMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
   // Note: We cannot use realloc() here as it doesn't guarantee alignment.
 
   // Allocate new chunk
@@ -105,17 +99,19 @@ Status InternalMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_
   *ptr = out;
 
   bytes_allocated_ += new_size - old_size;
+  {
+    std::lock_guard<std::mutex> guard(lock_);
+    if (bytes_allocated_ > max_memory_) { max_memory_ = bytes_allocated_.load(); }
+  }
 
   return Status::OK();
 }
 
-int64_t InternalMemoryPool::bytes_allocated() const {
-  std::lock_guard<std::mutex> guard(pool_lock_);
-  return bytes_allocated_;
+int64_t DefaultMemoryPool::bytes_allocated() const {
+  return bytes_allocated_.load();
 }
 
-void InternalMemoryPool::Free(uint8_t* buffer, int64_t size) {
-  std::lock_guard<std::mutex> guard(pool_lock_);
+void DefaultMemoryPool::Free(uint8_t* buffer, int64_t size) {
   DCHECK_GE(bytes_allocated_, size);
 #ifdef _MSC_VER
   _aligned_free(buffer);
@@ -125,10 +121,14 @@ void InternalMemoryPool::Free(uint8_t* buffer, int64_t size) {
   bytes_allocated_ -= size;
 }
 
-InternalMemoryPool::~InternalMemoryPool() {}
+int64_t DefaultMemoryPool::max_memory() const {
+  return max_memory_.load();
+}
+
+DefaultMemoryPool::~DefaultMemoryPool() {}
 
 MemoryPool* default_memory_pool() {
-  static InternalMemoryPool default_memory_pool_;
+  static DefaultMemoryPool default_memory_pool_;
   return &default_memory_pool_;
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/memory_pool.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h
index 89477b6..33d4c3e 100644
--- a/cpp/src/arrow/memory_pool.h
+++ b/cpp/src/arrow/memory_pool.h
@@ -18,7 +18,9 @@
 #ifndef ARROW_UTIL_MEMORY_POOL_H
 #define ARROW_UTIL_MEMORY_POOL_H
 
+#include <atomic>
 #include <cstdint>
+#include <mutex>
 
 #include "arrow/util/visibility.h"
 
@@ -56,6 +58,35 @@ class ARROW_EXPORT MemoryPool {
   /// The number of bytes that were allocated and not yet free'd through
   /// this allocator.
   virtual int64_t bytes_allocated() const = 0;
+
+  /// Return peak memory allocation in this memory pool
+  ///
+  /// \return Maximum bytes allocated. If not known (or not implemented),
+  /// returns -1
+  virtual int64_t max_memory() const;
+
+ protected:
+  MemoryPool();
+};
+
+class ARROW_EXPORT DefaultMemoryPool : public MemoryPool {
+ public:
+  DefaultMemoryPool();
+  virtual ~DefaultMemoryPool();
+
+  Status Allocate(int64_t size, uint8_t** out) override;
+  Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
+
+  void Free(uint8_t* buffer, int64_t size) override;
+
+  int64_t bytes_allocated() const override;
+
+  int64_t max_memory() const override;
+
+ private:
+  mutable std::mutex lock_;
+  std::atomic<int64_t> bytes_allocated_;
+  std::atomic<int64_t> max_memory_;
 };
 
 ARROW_EXPORT MemoryPool* default_memory_pool();

http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 9e31ba5..a9e0909 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -106,7 +106,8 @@ Status Table::FromRecordBatches(const std::string& name,
     if (!batches[i]->schema()->Equals(schema)) {
       std::stringstream ss;
       ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
-         << schema->ToString() << "\nvs\n" << batches[i]->schema()->ToString();
+         << schema->ToString() << "\nvs\n"
+         << batches[i]->schema()->ToString();
       return Status::Invalid(ss.str());
     }
   }
@@ -138,7 +139,8 @@ Status ConcatenateTables(const std::string& output_name,
     if (!tables[i]->schema()->Equals(schema)) {
       std::stringstream ss;
       ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
-         << schema->ToString() << "\nvs\n" << tables[i]->schema()->ToString();
+         << schema->ToString() << "\nvs\n"
+         << tables[i]->schema()->ToString();
       return Status::Invalid(ss.str());
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index 06ee841..b22f07d 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -118,9 +118,9 @@ class CerrLog {
 class FatalLog : public CerrLog {
  public:
   explicit FatalLog(int /* severity */)  // NOLINT
-      : CerrLog(ARROW_FATAL) {}          // NOLINT
+      : CerrLog(ARROW_FATAL){}           // NOLINT
 
-  [[noreturn]] ~FatalLog() {
+            [[noreturn]] ~FatalLog() {
     if (has_logged_) { std::cerr << std::endl; }
     std::exit(1);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/dc6cefde/cpp/src/arrow/util/macros.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h
index 81a9b0c..c4a62a4 100644
--- a/cpp/src/arrow/util/macros.h
+++ b/cpp/src/arrow/util/macros.h
@@ -25,6 +25,6 @@
   TypeName& operator=(const TypeName&) = delete
 #endif
 
-#define UNUSED(x) (void) x
+#define UNUSED(x) (void)x
 
 #endif  // ARROW_UTIL_MACROS_H