You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/08/11 04:34:39 UTC

incubator-impala git commit: IMPALA-3946: fix MemPool integrity issues with empty chunks

Repository: incubator-impala
Updated Branches:
  refs/heads/master ac4f22b1b -> 88b89b872


IMPALA-3946: fix MemPool integrity issues with empty chunks

There were various rare code paths that results in the MemPool failing
its own internal integrity checks. This required various small fixes to
ensure that the MemPool was always valid even when there were empty
chunks for various reasons (either from calling Clear() or
ReturnPartialAllocation()).

Testing:
Added unit tests for three secenarios where the integrity check failed.
Also ran fuzz tester to confirm that the original issue was fixed.

Change-Id: I03ad12e5b2b63cbb55e5c52562416d73a4bd9346
Reviewed-on: http://gerrit.cloudera.org:8080/3838
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/88b89b87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/88b89b87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/88b89b87

Branch: refs/heads/master
Commit: 88b89b872dbab6b13df2adbbb40704da96f987ac
Parents: ac4f22b
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Aug 3 16:41:31 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Aug 11 04:33:01 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/mem-pool-test.cc |  67 +++++++++++++++++++
 be/src/runtime/mem-pool.cc      | 125 +++++++++++++++++------------------
 be/src/runtime/mem-pool.h       |  21 ++++--
 3 files changed, 141 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88b89b87/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index 9b954f5..78b0963 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -420,6 +420,73 @@ TEST(MemPoolTest, FragmentationOverhead) {
   p.FreeAll();
 }
 
+/// Test interaction between returning a full allocation and transferring chunks.
+TEST(MemPoolTest, ReturnAllocationThenTransfer) {
+  MemTracker tracker;
+  MemPool src(&tracker);
+  MemPool dst(&tracker);
+
+  const int64_t alloc_size = 1L * 1024 * 1024;
+  uint8_t* mem = src.Allocate(alloc_size);
+  ASSERT_TRUE(mem != NULL);
+
+  src.ReturnPartialAllocation(alloc_size);
+  ASSERT_TRUE(MemPoolTest::CheckIntegrity(&src, true));
+
+  // IMPALA-3946: this would hit a DCHECK because the current chunk was empty.
+  dst.AcquireData(&src, true);
+  ASSERT_EQ(0, dst.GetTotalChunkSizes()); // Src chunk shouldn't be transferred.
+  ASSERT_TRUE(MemPoolTest::CheckIntegrity(&dst, true));
+  ASSERT_TRUE(MemPoolTest::CheckIntegrity(&src, true));
+  src.FreeAll();
+  dst.FreeAll();
+}
+
+/// Test that making a large allocation that doesn't fit in the current chunk after
+/// returning a full allocation works correctly.
+TEST(MemPoolTest, ReturnAllocationThenLargeAllocation) {
+  MemTracker tracker;
+  MemPool src(&tracker);
+
+  const int64_t alloc_size = 1L * 1024 * 1024;
+  uint8_t* mem = src.Allocate(alloc_size);
+  ASSERT_TRUE(mem != NULL);
+
+  src.ReturnPartialAllocation(alloc_size);
+  ASSERT_TRUE(MemPoolTest::CheckIntegrity(&src, true));
+
+  // IMPALA-3946: chunks ended in invalid order because the first chunk was empty.
+  // This resulted in the the integrity check failing.
+  mem = src.Allocate(alloc_size + 1);
+  ASSERT_TRUE(mem != NULL);
+  ASSERT_TRUE(MemPoolTest::CheckIntegrity(&src, false));
+  src.FreeAll();
+}
+
+/// Test that making a large allocation that fails after returning a full allocation works
+/// correctly.
+TEST(MemPoolTest, ReturnAllocationThenFailedAllocation) {
+  const int64_t alloc_size = 1L * 1024 * 1024;
+  const int64_t mem_limit = alloc_size * 10;
+  MemTracker tracker(mem_limit);
+  MemPool src(&tracker);
+
+  uint8_t* mem;
+  // Allocate two chunks then clear them so they are empty.
+  for (int i = 0; i < 2; ++i) {
+    mem = src.Allocate(alloc_size);
+    ASSERT_TRUE(mem != NULL);
+  }
+  src.Clear();
+
+  // IMPALA-3946: chunk index wasn't reset correctly to before the empty chunks when an
+  // allocation failed. This resulted in the the integrity check failing.
+  mem = src.TryAllocate(mem_limit);
+  ASSERT_TRUE(mem == NULL);
+  ASSERT_TRUE(MemPoolTest::CheckIntegrity(&src, false));
+  src.FreeAll();
+}
+
 }
 
 int main(int argc, char **argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88b89b87/be/src/runtime/mem-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.cc b/be/src/runtime/mem-pool.cc
index 27e15d3..185d8a3 100644
--- a/be/src/runtime/mem-pool.cc
+++ b/be/src/runtime/mem-pool.cc
@@ -101,74 +101,69 @@ void MemPool::FreeAll() {
 }
 
 bool MemPool::FindChunk(int64_t min_size, bool check_limits) noexcept {
-  // Try to allocate from a free chunk. The first free chunk, if any, will be immediately
-  // after the current chunk.
-  int first_free_idx = current_chunk_idx_ + 1;
-  // (cast size() to signed int in order to avoid everything else being cast to
-  // unsigned long, in particular -1)
-  while (++current_chunk_idx_  < static_cast<int>(chunks_.size())) {
-    // we found a free chunk
-    DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0);
-
-    if (chunks_[current_chunk_idx_].size >= min_size) {
-      // This chunk is big enough.  Move it before the other free chunks.
-      if (current_chunk_idx_ != first_free_idx) {
-        std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]);
-        current_chunk_idx_ = first_free_idx;
-      }
-      break;
+  DCHECK(current_chunk_idx_ == -1 || chunks_[current_chunk_idx_].size <
+      chunks_[current_chunk_idx_].allocated_bytes + min_size);
+  // Try to allocate from a free chunk. We may have free chunks after the current chunk
+  // if Clear() was called. The current chunk may be free if ReturnPartialAllocation()
+  // was called. The first free chunk (if there is one) can therefore be either the
+  // current chunk or the chunk immediately after the current chunk.
+  int first_free_idx;
+  if (current_chunk_idx_ == -1) {
+    first_free_idx = 0;
+  } else {
+    DCHECK_GE(current_chunk_idx_, 0);
+    first_free_idx = current_chunk_idx_ +
+        (chunks_[current_chunk_idx_].allocated_bytes > 0);
+  }
+  for (int idx = current_chunk_idx_ + 1; idx < chunks_.size(); ++idx) {
+    // All chunks after 'current_chunk_idx_' should be free.
+    DCHECK_EQ(chunks_[idx].allocated_bytes, 0);
+    if (chunks_[idx].size >= min_size) {
+      // This chunk is big enough. Move it before the other free chunks.
+      if (idx != first_free_idx) std::swap(chunks_[idx], chunks_[first_free_idx]);
+      current_chunk_idx_ = first_free_idx;
+      DCHECK(CheckIntegrity(true));
+      return true;
     }
   }
 
-  if (current_chunk_idx_ == static_cast<int>(chunks_.size())) {
-    // need to allocate new chunk.
-    int64_t chunk_size;
-    DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE);
+  // Didn't find a big enough free chunk - need to allocate new chunk.
+  int64_t chunk_size;
+  DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE);
 
-    if (FLAGS_disable_mem_pools) {
-      // Disable pooling by sizing the chunk to fit only this allocation.
-      chunk_size = min_size;
-    } else {
-      DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE);
-      chunk_size = max<int64_t>(min_size, next_chunk_size_);
-    }
+  if (FLAGS_disable_mem_pools) {
+    // Disable pooling by sizing the chunk to fit only this allocation.
+    chunk_size = min_size;
+  } else {
+    DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE);
+    chunk_size = max<int64_t>(min_size, next_chunk_size_);
+  }
 
-    if (check_limits) {
-      if (!mem_tracker_->TryConsume(chunk_size)) {
-        // We couldn't allocate a new chunk so current_chunk_idx_ is now be past the
-        // end of chunks_.
-        DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
-        current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
-        return false;
-      }
-    } else {
-      mem_tracker_->Consume(chunk_size);
-    }
+  if (check_limits) {
+    if (!mem_tracker_->TryConsume(chunk_size)) return false;
+  } else {
+    mem_tracker_->Consume(chunk_size);
+  }
 
-    // Allocate a new chunk. Return early if malloc fails.
-    uint8_t* buf = reinterpret_cast<uint8_t*>(malloc(chunk_size));
-    if (UNLIKELY(buf == NULL)) {
-      mem_tracker_->Release(chunk_size);
-      DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
-      current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
-      return false;
-    }
+  // Allocate a new chunk. Return early if malloc fails.
+  uint8_t* buf = reinterpret_cast<uint8_t*>(malloc(chunk_size));
+  if (UNLIKELY(buf == NULL)) {
+    mem_tracker_->Release(chunk_size);
+    return false;
+  }
 
-    // If there are no free chunks put it at the end, otherwise before the first free.
-    if (first_free_idx == static_cast<int>(chunks_.size())) {
-      chunks_.push_back(ChunkInfo(chunk_size, buf));
-    } else {
-      current_chunk_idx_ = first_free_idx;
-      vector<ChunkInfo>::iterator insert_chunk = chunks_.begin() + current_chunk_idx_;
-      chunks_.insert(insert_chunk, ChunkInfo(chunk_size, buf));
-    }
-    total_reserved_bytes_ += chunk_size;
-    // Don't increment the chunk size until the allocation succeeds: if an attempted
-    // large allocation fails we don't want to increase the chunk size further.
-    next_chunk_size_ = static_cast<int>(min<int64_t>(chunk_size * 2, MAX_CHUNK_SIZE));
+  // Put it before the first free chunk. If no free chunks, it goes at the end.
+  if (first_free_idx == static_cast<int>(chunks_.size())) {
+    chunks_.push_back(ChunkInfo(chunk_size, buf));
+  } else {
+    chunks_.insert(chunks_.begin() + first_free_idx, ChunkInfo(chunk_size, buf));
   }
+  current_chunk_idx_ = first_free_idx;
+  total_reserved_bytes_ += chunk_size;
+  // Don't increment the chunk size until the allocation succeeds: if an attempted
+  // large allocation fails we don't want to increase the chunk size further.
+  next_chunk_size_ = static_cast<int>(min<int64_t>(chunk_size * 2, MAX_CHUNK_SIZE));
 
-  DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
   DCHECK(CheckIntegrity(true));
   return true;
 }
@@ -223,6 +218,7 @@ void MemPool::AcquireData(MemPool* src, bool keep_current) {
   peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
 
   if (!keep_current) src->FreeAll();
+  DCHECK(src->CheckIntegrity(false));
   DCHECK(CheckIntegrity(false));
 }
 
@@ -252,25 +248,22 @@ int64_t MemPool::GetTotalChunkSizes() const {
   return result;
 }
 
-bool MemPool::CheckIntegrity(bool current_chunk_empty) {
+bool MemPool::CheckIntegrity(bool check_current_chunk_empty) {
   DCHECK_EQ(zero_length_region_, MEM_POOL_POISON);
+  DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
 
   // Without pooling, there are way too many chunks and this takes too long.
   if (FLAGS_disable_mem_pools) return true;
 
   // check that current_chunk_idx_ points to the last chunk with allocated data
-  DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
   int64_t total_allocated = 0;
   for (int i = 0; i < chunks_.size(); ++i) {
     DCHECK_GT(chunks_[i].size, 0);
     if (i < current_chunk_idx_) {
       DCHECK_GT(chunks_[i].allocated_bytes, 0);
     } else if (i == current_chunk_idx_) {
-      if (current_chunk_empty) {
-        DCHECK_EQ(chunks_[i].allocated_bytes, 0);
-      } else {
-        DCHECK_GT(chunks_[i].allocated_bytes, 0);
-      }
+      DCHECK_GE(chunks_[i].allocated_bytes, 0);
+      if (check_current_chunk_empty) DCHECK_EQ(chunks_[i].allocated_bytes, 0);
     } else {
       DCHECK_EQ(chunks_[i].allocated_bytes, 0);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/88b89b87/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index 3e703e2..d8c1e61 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -45,10 +45,17 @@ class MemTracker;
 /// recently added; if that chunk doesn't have enough memory to
 /// satisfy the allocation request, the free chunks are searched for one that is
 /// big enough otherwise a new chunk is added to the list.
-/// The current_chunk_idx_ always points to the last chunk with allocated memory.
 /// In order to keep allocation overhead low, chunk sizes double with each new one
 /// added, until they hit a maximum size.
-//
+///
+/// Allocated chunks can be reused for new allocations if Clear() is called to free
+/// all allocations or ReturnPartialAllocation() is called to return part of the last
+/// allocation.
+///
+/// All chunks before 'current_chunk_idx_' have allocated memory, while all chunks
+/// after 'current_chunk_idx_' are free. The chunk at 'current_chunk_idx_' may or may
+/// not have allocated memory.
+///
 ///     Example:
 ///     MemPool* p = new MemPool();
 ///     for (int i = 0; i < 1024; ++i) {
@@ -173,8 +180,9 @@ class MemPool {
 
   /// chunk from which we served the last Allocate() call;
   /// always points to the last chunk that contains allocated data;
-  /// chunks 0..current_chunk_idx_ are guaranteed to contain data
-  /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_);
+  /// chunks 0..current_chunk_idx_ - 1 are guaranteed to contain data
+  /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_ - 1);
+  /// chunks after 'current_chunk_idx_' are "free chunks" that contain no data.
   /// -1 if no chunks present
   int current_chunk_idx_;
 
@@ -205,8 +213,9 @@ class MemPool {
 
   /// Check integrity of the supporting data structures; always returns true but DCHECKs
   /// all invariants.
-  /// If 'current_chunk_empty' is false, checks that the current chunk contains data.
-  bool CheckIntegrity(bool current_chunk_empty);
+  /// If 'check_current_chunk_empty' is true, checks that the current chunk contains no
+  /// data. Otherwise the current chunk can be either empty or full.
+  bool CheckIntegrity(bool check_current_chunk_empty);
 
   /// Return offset to unoccupied space in current chunk.
   int64_t GetFreeOffset() const {