You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/04/18 06:42:33 UTC

[1/4] incubator-impala git commit: IMPALA-3203: Part 2: per-core free lists in buffer pool

Repository: incubator-impala
Updated Branches:
  refs/heads/master 8bdfe0320 -> 42002b91c


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index da8ef3c..7770fff 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -20,11 +20,13 @@
 
 #include <stdint.h>
 #include <string>
+#include <vector>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/locks.hpp>
 
 #include "common/atomic.h"
 #include "common/compiler-util.h"
+#include "common/object-pool.h"
 #include "common/status.h"
 #include "gutil/macros.h"
 #include "runtime/tmp-file-mgr.h"
@@ -35,9 +37,9 @@
 
 namespace impala {
 
-class BufferAllocator;
 class ReservationTracker;
 class RuntimeProfile;
+class SystemAllocator;
 
 /// A buffer pool that manages memory buffers for all queries in an Impala daemon.
 /// The buffer pool enforces buffer reservations, limits, and implements policies
@@ -57,9 +59,6 @@ class RuntimeProfile;
 /// with the buffer). Unless otherwise noted, it is not safe to invoke concurrent buffer
 /// pool operations for the same client.
 ///
-/// TODO:
-/// * Decide on, document, and enforce upper limits on page size.
-///
 /// Pages, Buffers and Pinning
 /// ==========================
 /// * A page is a logical block of memory that can reside in memory or on disk.
@@ -146,9 +145,9 @@ class RuntimeProfile;
 /// operations with the same Client, PageHandle or BufferHandle.
 class BufferPool : public CacheLineAligned {
  public:
+  class BufferAllocator;
   class BufferHandle;
   class ClientHandle;
-  class Client;
   class PageHandle;
 
   /// Constructs a new buffer pool.
@@ -240,79 +239,44 @@ class BufferPool : public CacheLineAligned {
   Status TransferBuffer(ClientHandle* src_client, BufferHandle* src,
       ClientHandle* dst_client, BufferHandle* dst) WARN_UNUSED_RESULT;
 
+  /// Try to release at least 'bytes_to_free' bytes of memory to the system allocator.
+  /// TODO: once IMPALA-4834 is done and all large allocations are served from the buffer
+  /// pool, this may not be necessary.
+  void ReleaseMemory(int64_t bytes_to_free);
+
+  /// Called periodically by a maintenance thread to released unneeded memory back to the
+  /// system allocator.
+  void Maintenance();
+
   /// Print a debug string with the state of the buffer pool.
   std::string DebugString();
 
   int64_t min_buffer_len() const { return min_buffer_len_; }
-  int64_t buffer_bytes_limit() const { return buffer_bytes_limit_; }
+
+  /// Generous upper bounds on page and buffer size and the number of different
+  /// power-of-two buffer sizes.
+  static constexpr int LOG_MAX_BUFFER_BYTES = 48;
+  static constexpr int64_t MAX_BUFFER_BYTES = 1L << LOG_MAX_BUFFER_BYTES;
+
+ protected:
+  friend class BufferPoolTest;
+  /// Test helper: get a reference to the allocator.
+  BufferAllocator* allocator() { return allocator_.get(); }
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferPool);
+  class Client;
+  class FreeBufferArena;
   class PageList;
   struct Page;
-  /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already
-  /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the
-  /// reservation. This function may acquire 'clean_pages_lock_' and Page::lock so
-  /// no locks lower in the lock acquisition order (see buffer-pool-internal.h) should be
-  /// held by the caller.
-  Status AllocateBufferInternal(
-      ClientHandle* client, int64_t len, BufferHandle* buffer) WARN_UNUSED_RESULT;
-
-  /// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates
-  /// internal state but does not release to any reservation.
-  void FreeBufferInternal(BufferHandle* buffer);
-
-  /// Decrease 'buffer_bytes_remaining_' by up to 'len', down to a minimum of 0.
-  /// Returns the amount it was decreased by.
-  int64_t DecreaseBufferBytesRemaining(int64_t max_decrease);
-
-  /// Adds a clean page 'page' to the global clean pages list, unless the page is in the
-  /// process of being cleaned up. Caller must hold the page's client's lock via
-  /// 'client_lock' so that moving the page between a client list and the global free
-  /// page list is atomic. Caller must not hold 'clean_pages_lock_' or any Page::lock.
-  void AddCleanPage(const boost::unique_lock<boost::mutex>& client_lock, Page* page);
-
-  /// Removes a clean page 'page' from the global clean pages list, if present. Returns
-  /// true if it was present. Caller must hold the page's client's lock via
-  /// 'client_lock' so that moving the page between list is atomic and there is not a
-  /// window so that moving the page between a client list and the global free page list
-  /// is atomic. Caller must not hold 'clean_pages_lock_' or any Page::lock.
-  bool RemoveCleanPage(const boost::unique_lock<boost::mutex>& client_lock, Page* page);
-
-  /// Evict at least 'bytes_to_evict' bytes of clean pages and free the associated
-  /// buffers with 'allocator_'. Any bytes freed in excess of 'bytes_to_evict' are
-  /// added to 'buffer_bytes_remaining_.'
-  ///
-  /// Returns an error and adds any freed bytes to 'buffer_bytes_remaining_' if not
-  /// enough bytes could be evicted. This will only happen if there is an internal
-  /// bug: if all clients write out enough dirty pages to stay within their reservation,
-  /// then there should always be enough clean pages.
-  Status EvictCleanPages(int64_t bytes_to_evict);
 
-  /// Allocator for allocating and freeing all buffer memory.
+  /// Allocator for allocating and freeing all buffer memory and managing lists of free
+  /// buffers and clean pages.
   boost::scoped_ptr<BufferAllocator> allocator_;
 
   /// The minimum length of a buffer in bytes. All buffers and pages are a power-of-two
   /// multiple of this length. This is always a power of two.
   const int64_t min_buffer_len_;
-
-  /// The maximum physical memory in bytes that can be used for buffers.
-  const int64_t buffer_bytes_limit_;
-
-  /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used for
-  /// allocating new buffers. Must be updated atomically before a new buffer is
-  /// allocated or after an existing buffer is freed.
-  /// TODO: reconsider this to avoid all threads contending on this one value.
-  AtomicInt64 buffer_bytes_remaining_;
-
-  /// Unpinned pages that have had their contents written to disk. These pages can be
-  /// evicted to allocate a buffer for any client. Pages are evicted in FIFO order,
-  /// so that pages are evicted in approximately the same order that the clients wrote
-  /// them to disk. 'clean_pages_lock_' protects 'clean_pages_'.
-  /// TODO: consider breaking up by page size
-  /// TODO: consider breaking up by core/NUMA node to improve locality
-  alignas(CACHE_LINE_SIZE) SpinLock clean_pages_lock_;
-  InternalList<Page> clean_pages_;
 };
 
 /// External representation of a client of the BufferPool. Clients are used for
@@ -390,17 +354,17 @@ class BufferPool::BufferHandle {
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferHandle);
   friend class BufferPool;
-  friend class BufferAllocator;
+  friend class SystemAllocator;
 
   /// Internal helper to set the handle to an opened state.
-  void Open(uint8_t* data, int64_t len);
+  void Open(uint8_t* data, int64_t len, int home_core);
 
   /// Internal helper to reset the handle to an unopened state. Inlined to make moving
   /// efficient.
   inline void Reset();
 
   /// The client the buffer handle belongs to, used to validate that the correct client
-  /// is provided in BufferPool method calls.
+  /// is provided in BufferPool method calls. Set to NULL if the buffer is in a free list.
   const ClientHandle* client_;
 
   /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed.
@@ -408,6 +372,10 @@ class BufferPool::BufferHandle {
 
   /// Length of the buffer in bytes.
   int64_t len_;
+
+  /// The CPU core that the buffer was allocated from - used to determine which arena
+  /// it will be added to.
+  int home_core_;
 };
 
 /// The handle for a page used by clients of the BufferPool. Each PageHandle should
@@ -477,6 +445,7 @@ inline BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(
   client_ = src.client_;
   data_ = src.data_;
   len_ = src.len_;
+  home_core_ = src.home_core_;
   src.Reset();
   return *this;
 }
@@ -485,6 +454,7 @@ inline void BufferPool::BufferHandle::Reset() {
   client_ = NULL;
   data_ = NULL;
   len_ = -1;
+  home_core_ = -1;
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/free-list-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/free-list-test.cc b/be/src/runtime/bufferpool/free-list-test.cc
index 190faa9..7cb80b4 100644
--- a/be/src/runtime/bufferpool/free-list-test.cc
+++ b/be/src/runtime/bufferpool/free-list-test.cc
@@ -20,7 +20,9 @@
 
 #include "common/object-pool.h"
 #include "runtime/bufferpool/free-list.h"
+#include "runtime/bufferpool/system-allocator.h"
 #include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
 
 #include "common/names.h"
 
@@ -29,8 +31,8 @@ namespace impala {
 class FreeListTest : public ::testing::Test {
  protected:
   virtual void SetUp() override {
-    allocator_ = obj_pool_.Add(new BufferAllocator(MIN_BUFFER_LEN));
-    SeedRng();
+    allocator_ = obj_pool_.Add(new SystemAllocator(MIN_BUFFER_LEN));
+    RandTestUtil::SeedRng("FREE_LIST_TEST_SEED", &rng_);
   }
 
   virtual void TearDown() override {
@@ -38,16 +40,8 @@ class FreeListTest : public ::testing::Test {
     obj_pool_.Clear();
   }
 
-  /// Seed 'rng_' with a seed either for the environment or based on the current time.
-  void SeedRng() {
-    const char* seed_str = getenv("FREE_LIST_TEST_SEED");
-    int64_t seed = seed_str != nullptr ? atoi(seed_str) : time(nullptr);
-    LOG(INFO) << "Random seed: " << seed;
-    rng_.seed(seed);
-  }
-
-  void AllocateBuffers(int num_buffers, int64_t buffer_len,
-      vector<BufferHandle>* buffers) {
+  void AllocateBuffers(
+      int num_buffers, int64_t buffer_len, vector<BufferHandle>* buffers) {
     for (int i = 0; i < num_buffers; ++i) {
       BufferHandle buffer;
       ASSERT_OK(allocator_->Allocate(buffer_len, &buffer));
@@ -69,11 +63,10 @@ class FreeListTest : public ::testing::Test {
     buffers->clear();
   }
 
-  void FreeBuffers(vector<BufferHandle>* buffers) {
-    for (BufferHandle& buffer : *buffers) {
+  void FreeBuffers(vector<BufferHandle>&& buffers) {
+    for (BufferHandle& buffer : buffers) {
       allocator_->Free(move(buffer));
     }
-    buffers->clear();
   }
 
   const static int MIN_BUFFER_LEN = 1024;
@@ -85,7 +78,7 @@ class FreeListTest : public ::testing::Test {
   ObjectPool obj_pool_;
 
   /// The buffer allocator, owned by 'obj_pool_'.
-  BufferAllocator* allocator_;
+  SystemAllocator* allocator_;
 };
 
 const int FreeListTest::MIN_BUFFER_LEN;
@@ -115,7 +108,8 @@ TEST_F(FreeListTest, SmallList) {
       std::shuffle(buffers.begin(), buffers.end(), rng_);
       AddFreeBuffers(&small_list, &buffers);
       // Shrink list down to LIST_SIZE.
-      small_list.FreeBuffers(allocator_, max<int64_t>(0, small_list.Size() - LIST_SIZE));
+      FreeBuffers(
+          small_list.GetBuffersToFree(max<int64_t>(0, small_list.Size() - LIST_SIZE)));
 
       // The LIST_SIZE buffers with the lowest address should be retained, and the
       // remaining buffers should have been freed.
@@ -125,7 +119,7 @@ TEST_F(FreeListTest, SmallList) {
         buffers.push_back(move(buffer));
       }
       ASSERT_FALSE(small_list.PopFreeBuffer(&buffer));
-      FreeBuffers(&buffers);
+      FreeBuffers(move(buffers));
     }
   }
 }
@@ -148,7 +142,7 @@ TEST_F(FreeListTest, ReturnOrder) {
       AddFreeBuffers(&list, &buffers);
 
       // Free buffers. Only the buffers with the high addresses should be freed.
-      list.FreeBuffers(allocator_, max<int64_t>(0, list.Size() - LIST_SIZE));
+      FreeBuffers(list.GetBuffersToFree(max<int64_t>(0, list.Size() - LIST_SIZE)));
 
       // Validate that the buffers with lowest addresses are returned in ascending order.
       BufferHandle buffer;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/free-list.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/free-list.h b/be/src/runtime/bufferpool/free-list.h
index 6cec0e8..ec1057d 100644
--- a/be/src/runtime/bufferpool/free-list.h
+++ b/be/src/runtime/bufferpool/free-list.h
@@ -26,7 +26,6 @@
 
 #include "common/logging.h"
 #include "gutil/macros.h"
-#include "runtime/bufferpool/buffer-allocator.h"
 #include "runtime/bufferpool/buffer-pool.h"
 
 namespace impala {
@@ -75,28 +74,21 @@ class FreeList {
     std::push_heap(free_list_.begin(), free_list_.end(), HeapCompare);
   }
 
-  /// Frees all the buffers in the list with 'allocator'. Returns the number of bytes
-  /// freed.
-  int64_t FreeAll(BufferAllocator* allocator) {
-    return FreeBuffers(allocator, free_list_.size());
-  }
-
-  /// Free 'num_buffers' buffers from the list with 'allocator'. Returns the number of
-  /// bytes freed. The average time complexity is n log n, where n is the current size of
-  /// the list.
-  int64_t FreeBuffers(BufferAllocator* allocator, int64_t num_buffers) {
+  /// Get the 'num_buffers' buffers with the highest memory address from the list to
+  /// free. The average time complexity is n log n, where n is the current size of the
+  /// list.
+  vector<BufferHandle> GetBuffersToFree(int64_t num_buffers) {
+    vector<BufferHandle> buffers;
     DCHECK_LE(num_buffers, free_list_.size());
     // Sort the list so we can free the buffers with higher memory addresses.
     // Note that the sorted list is still a valid min-heap.
     std::sort(free_list_.begin(), free_list_.end(), SortCompare);
 
-    int64_t bytes_freed = 0;
     for (int64_t i = 0; i < num_buffers; ++i) {
-      bytes_freed += free_list_.back().len();
-      allocator->Free(std::move(free_list_.back()));
+      buffers.emplace_back(std::move(free_list_.back()));
       free_list_.pop_back();
     }
-    return bytes_freed;
+    return buffers;
   }
 
   /// Returns the number of buffers currently in the list.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc
index 13bc597..ea515fe 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -29,6 +29,7 @@
 #include "runtime/bufferpool/suballocator.h"
 #include "testutil/death-test-util.h"
 #include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
 #include "util/bit-util.h"
 
 #include "common/names.h"
@@ -43,7 +44,7 @@ namespace impala {
 class SuballocatorTest : public ::testing::Test {
  public:
   virtual void SetUp() override {
-    SeedRng();
+    RandTestUtil::SeedRng("SUBALLOCATOR_TEST_SEED", &rng_);
     profile_.reset(new RuntimeProfile(&obj_pool_, "test profile"));
   }
 
@@ -63,19 +64,6 @@ class SuballocatorTest : public ::testing::Test {
   const static int64_t TEST_BUFFER_LEN = Suballocator::MIN_ALLOCATION_BYTES * 16;
 
  protected:
-  /// Seed 'rng_' with a seed either for the environment or based on the current time.
-  void SeedRng() {
-    const char* seed_str = getenv("SUBALLOCATOR_TEST_SEED");
-    int64_t seed;
-    if (seed_str != nullptr) {
-      seed = atoi(seed_str);
-    } else {
-      seed = time(nullptr);
-    }
-    LOG(INFO) << "Random seed: " << seed;
-    rng_.seed(seed);
-  }
-
   /// Initialize 'buffer_pool_' and 'global_reservation_' with a limit of 'total_mem'
   /// bytes of buffers of minimum length 'min_buffer_len'.
   void InitPool(int64_t min_buffer_len, int total_mem) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/suballocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator.h b/be/src/runtime/bufferpool/suballocator.h
index 53f4ef5..e6834ad 100644
--- a/be/src/runtime/bufferpool/suballocator.h
+++ b/be/src/runtime/bufferpool/suballocator.h
@@ -89,10 +89,10 @@ class Suballocator {
   /// failed Allocate() call).
   void Free(std::unique_ptr<Suballocation> allocation);
 
-  /// Generous upper bounds on the max allocation size and the number of different
+  /// Upper bounds on the max allocation size and the number of different
   /// power-of-two allocation sizes. Used to bound the number of free lists.
-  static constexpr int LOG_MAX_ALLOCATION_BYTES = 48;
-  static constexpr int64_t MAX_ALLOCATION_BYTES = 1L << LOG_MAX_ALLOCATION_BYTES;
+  static constexpr int LOG_MAX_ALLOCATION_BYTES = BufferPool::LOG_MAX_BUFFER_BYTES;
+  static constexpr int64_t MAX_ALLOCATION_BYTES = BufferPool::MAX_BUFFER_BYTES;
 
   /// Don't support allocations less than 4kb to avoid high overhead.
   static constexpr int LOG_MIN_ALLOCATION_BYTES = 12;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/system-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/system-allocator.cc b/be/src/runtime/bufferpool/system-allocator.cc
new file mode 100644
index 0000000..0ea429a
--- /dev/null
+++ b/be/src/runtime/bufferpool/system-allocator.cc
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/bufferpool/system-allocator.h"
+
+#include "util/bit-util.h"
+
+namespace impala {
+
+SystemAllocator::SystemAllocator(int64_t min_buffer_len)
+  : min_buffer_len_(min_buffer_len) {
+  DCHECK(BitUtil::IsPowerOf2(min_buffer_len));
+}
+
+Status SystemAllocator::Allocate(int64_t len, BufferPool::BufferHandle* buffer) {
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_LE(len, BufferPool::MAX_BUFFER_BYTES);
+  DCHECK(BitUtil::IsPowerOf2(len)) << len;
+
+  uint8_t* alloc = reinterpret_cast<uint8_t*>(malloc(len));
+  if (alloc == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len);
+  buffer->Open(alloc, len, CpuInfo::GetCurrentCore());
+  return Status::OK();
+}
+
+void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) {
+  free(buffer.data());
+  buffer.Reset(); // Avoid DCHECK in ~BufferHandle().
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/system-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/system-allocator.h b/be/src/runtime/bufferpool/system-allocator.h
new file mode 100644
index 0000000..8b4a544
--- /dev/null
+++ b/be/src/runtime/bufferpool/system-allocator.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_SYSTEM_ALLOCATOR_H
+#define IMPALA_RUNTIME_SYSTEM_ALLOCATOR_H
+
+#include "common/status.h"
+
+#include "runtime/bufferpool/buffer-pool.h"
+
+namespace impala {
+
+/// The underlying memory allocator for the buffer pool that allocates buffer memory from
+/// the system. All buffers are allocated through the BufferPool's SystemAllocator. The
+/// allocator only handles allocating buffers that are power-of-two multiples of the
+/// minimum buffer length.
+///
+/// TODO:
+/// * Allocate memory with mmap() instead of malloc().
+class SystemAllocator {
+ public:
+  SystemAllocator(int64_t min_buffer_len);
+
+  /// Allocate memory for a buffer of 'len' bytes. 'len' must be a power-of-two multiple
+  /// of the minimum buffer length.
+  Status Allocate(int64_t len, BufferPool::BufferHandle* buffer) WARN_UNUSED_RESULT;
+
+  /// Free the memory for a previously-allocated buffer.
+  void Free(BufferPool::BufferHandle&& buffer);
+
+ private:
+  const int64_t min_buffer_len_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 5d9ced1..b973a84 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -380,9 +380,6 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
   DCHECK(process_mem_tracker != NULL);
   free_buffer_mem_tracker_.reset(
       new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
-  // If we hit the process limit, see if we can reclaim some memory by removing
-  // previously allocated (but unused) io buffers.
-  process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::GcIoBuffers, this));
 
   for (int i = 0; i < disk_queues_.size(); ++i) {
     disk_queues_[i] = new DiskQueue(i);
@@ -759,12 +756,17 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* read
   return GetBufferDesc(reader, reader->mem_tracker_, range, buffer, buffer_size);
 }
 
-void DiskIoMgr::GcIoBuffers() {
+void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
   unique_lock<mutex> lock(free_buffers_lock_);
   int buffers_freed = 0;
   int bytes_freed = 0;
+  // Free small-to-large to avoid retaining many small buffers and fragmenting memory.
   for (int idx = 0; idx < free_buffers_.size(); ++idx) {
-    for (uint8_t* buffer : free_buffers_[idx]) {
+    std::list<uint8_t*>* free_buffers = &free_buffers_[idx];
+    while (
+        !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= bytes_to_free)) {
+      uint8_t* buffer = free_buffers->front();
+      free_buffers->pop_front();
       int64_t buffer_size = (1LL << idx) * min_buffer_size_;
       delete[] buffer;
       free_buffer_mem_tracker_->Release(buffer_size);
@@ -773,7 +775,7 @@ void DiskIoMgr::GcIoBuffers() {
       ++buffers_freed;
       bytes_freed += buffer_size;
     }
-    free_buffers_[idx].clear();
+    if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break;
   }
 
   if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
@@ -783,7 +785,7 @@ void DiskIoMgr::GcIoBuffers() {
     ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed);
   }
   if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
-    ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->set_value(0);
+    ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed);
   }
 }
 
@@ -1125,7 +1127,7 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange(
   DCHECK(reader->mem_tracker_ != NULL);
   bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
   if (!enough_memory) {
-    // Low memory, GC and try again.
+    // Low memory, GC all the buffers and try again.
     GcIoBuffers();
     enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index b7222cb..b70c9c8 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -774,6 +774,10 @@ class DiskIoMgr {
   /// later reuse.
   void CacheOrCloseFileHandle(const char* fname, HdfsCachedFileHandle* fid, bool close);
 
+  /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the buffers if
+  /// 'bytes_to_free' is -1.
+  void GcIoBuffers(int64_t bytes_to_free = -1);
+
   /// Default ready buffer queue capacity. This constant doesn't matter too much
   /// since the system dynamically adjusts.
   static const int DEFAULT_QUEUE_CAPACITY;
@@ -902,12 +906,6 @@ class DiskIoMgr {
   /// Returns a buffer desc object which can now be used for another reader.
   void ReturnBufferDesc(BufferDescriptor* desc);
 
-  /// Garbage collect all unused io buffers. This is currently only triggered when the
-  /// process wide limit is hit. This is not good enough. While it is sufficient for
-  /// the IoMgr, other components do not trigger this GC.
-  /// TODO: make this run periodically?
-  void GcIoBuffers();
-
   /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be NULL), either
   /// freeing it or returning it to 'free_buffers_'. Memory tracking is updated to
   /// reflect the transfer of ownership from desc->mem_tracker_ to the disk I/O mgr.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index dd27a91..1fa863e 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -313,12 +313,6 @@ Status ExecEnv::StartServices() {
   // Limit of -1 means no memory limit.
   mem_tracker_.reset(new MemTracker(TcmallocMetric::PHYSICAL_BYTES_RESERVED,
       bytes_limit > 0 ? bytes_limit : -1, "Process"));
-
-  // Since tcmalloc does not free unused memory, we may exceed the process mem limit even
-  // if Impala is not actually using that much memory. Add a callback to free any unused
-  // memory if we hit the process limit.
-  mem_tracker_->AddGcFunction(boost::bind(&MallocExtension::ReleaseFreeMemory,
-                                          MallocExtension::instance()));
 #else
   // tcmalloc metrics aren't defined in ASAN builds, just use the default behavior to
   // track process memory usage (sum of all children trackers).
@@ -338,6 +332,22 @@ Status ExecEnv::StartServices() {
 
   RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get()));
 
+  mem_tracker_->AddGcFunction(
+      [this](int64_t bytes_to_free) { disk_io_mgr_->GcIoBuffers(bytes_to_free); });
+
+  // TODO: IMPALA-3200: register BufferPool::ReleaseMemory() as GC function.
+
+#ifndef ADDRESS_SANITIZER
+  // Since tcmalloc does not free unused memory, we may exceed the process mem limit even
+  // if Impala is not actually using that much memory. Add a callback to free any unused
+  // memory if we hit the process limit. TCMalloc GC must run last, because other GC
+  // functions may have released memory to TCMalloc, and TCMalloc may have cached it
+  // instead of releasing it to the system.
+  mem_tracker_->AddGcFunction([](int64_t bytes_to_free) {
+    MallocExtension::instance()->ReleaseToSystem(bytes_to_free);
+  });
+#endif
+
   // Start services in order to ensure that dependencies between them are met
   if (enable_webserver_) {
     AddDefaultUrlCallbacks(webserver_.get(), mem_tracker_.get(), metrics_.get());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index a48ede9..9e3775d 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -292,26 +292,37 @@ Status MemTracker::MemLimitExceeded(RuntimeState* state, const std::string& deta
   return status;
 }
 
+void MemTracker::AddGcFunction(GcFunction f) {
+  gc_functions_.push_back(f);
+}
+
 bool MemTracker::GcMemory(int64_t max_consumption) {
   if (max_consumption < 0) return true;
   lock_guard<mutex> l(gc_lock_);
   if (consumption_metric_ != NULL) consumption_->Set(consumption_metric_->value());
-  uint64_t pre_gc_consumption = consumption();
+  int64_t pre_gc_consumption = consumption();
   // Check if someone gc'd before us
   if (pre_gc_consumption < max_consumption) return false;
   if (num_gcs_metric_ != NULL) num_gcs_metric_->Increment(1);
 
+  int64_t curr_consumption = pre_gc_consumption;
   // Try to free up some memory
   for (int i = 0; i < gc_functions_.size(); ++i) {
-    gc_functions_[i]();
+    // Try to free up the amount we are over plus some extra so that we don't have to
+    // immediately GC again. Don't free all the memory since that can be unnecessarily
+    // expensive.
+    const int64_t EXTRA_BYTES_TO_FREE = 512L * 1024L * 1024L;
+    int64_t bytes_to_free = curr_consumption - max_consumption + EXTRA_BYTES_TO_FREE;
+    gc_functions_[i](bytes_to_free);
     if (consumption_metric_ != NULL) RefreshConsumptionFromMetric();
-    if (consumption() <= max_consumption) break;
+    curr_consumption = consumption();
+    if (max_consumption - curr_consumption <= EXTRA_BYTES_TO_FREE) break;
   }
 
   if (bytes_freed_by_last_gc_metric_ != NULL) {
-    bytes_freed_by_last_gc_metric_->set_value(pre_gc_consumption - consumption());
+    bytes_freed_by_last_gc_metric_->set_value(pre_gc_consumption - curr_consumption);
   }
-  return consumption() > max_consumption;
+  return curr_consumption > max_consumption;
 }
 
 void MemTracker::GcTcmalloc() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index acf3b46..863d3c7 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -294,14 +294,17 @@ class MemTracker {
 
   MemTracker* parent() const { return parent_; }
 
-  /// Signature for function that can be called to free some memory after limit is reached.
-  /// See the class header for further details on what these functions should do.
-  typedef boost::function<void ()> GcFunction;
-
-  /// Add a function 'f' to be called if the limit is reached.
+  /// Signature for function that can be called to free some memory after limit is
+  /// reached. The function should try to free at least 'bytes_to_free' bytes of
+  /// memory. See the class header for further details on the expected behaviour of
+  /// these functions.
+  typedef std::function<void(int64_t bytes_to_free)> GcFunction;
+
+  /// Add a function 'f' to be called if the limit is reached, if none of the other
+  /// previously-added GC functions were successful at freeing up enough memory.
   /// 'f' does not need to be thread-safe as long as it is added to only one MemTracker.
   /// Note that 'f' must be valid for the lifetime of this MemTracker.
-  void AddGcFunction(GcFunction f) { gc_functions_.push_back(f); }
+  void AddGcFunction(GcFunction f);
 
   /// Register this MemTracker's metrics. Each key will be of the form
   /// "<prefix>.<metric name>".

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index b220fff..dee3c64 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -481,7 +481,9 @@ TEST_F(TmpFileMgrTest, TestEncryptionDuringCancellation) {
   string file_path = handle->TmpFilePath();
 
   // Cancel the write - prior to the IMPALA-4820 fix decryption could race with the write.
-  ASSERT_OK(file_group.CancelWriteAndRestoreData(move(handle), data_mem_range));
+  handle->Cancel();
+  handle->WaitForWrite();
+  ASSERT_OK(file_group.RestoreData(move(handle), data_mem_range));
   WaitForCallbacks(1);
 
   // Read the data from the scratch file and check that the plaintext isn't present.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index c939d88..abba192 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -407,13 +407,11 @@ Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) {
   return Status::OK();
 }
 
-Status TmpFileMgr::FileGroup::CancelWriteAndRestoreData(
+Status TmpFileMgr::FileGroup::RestoreData(
     unique_ptr<WriteHandle> handle, MemRange buffer) {
   DCHECK_EQ(handle->write_range_->data(), buffer.data());
   DCHECK_EQ(handle->len(), buffer.len());
-  handle->Cancel();
-
-  handle->WaitForWrite();
+  DCHECK(!handle->write_in_flight_);
   // Decrypt after the write is finished, so that we don't accidentally write decrypted
   // data to disk.
   Status status;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index cab2d87..d66c527 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -117,8 +117,7 @@ class TmpFileMgr {
     /// a different thread when the write completes successfully or unsuccessfully or is
     /// cancelled.
     ///
-    /// 'handle' must be destroyed by passing the DestroyWriteHandle() or
-    /// CancelWriteAndRestoreData().
+    /// 'handle' must be destroyed by passing the DestroyWriteHandle() or RestoreData().
     Status Write(MemRange buffer, WriteDoneCallback cb,
         std::unique_ptr<WriteHandle>* handle) WARN_UNUSED_RESULT;
 
@@ -127,11 +126,11 @@ class TmpFileMgr {
     /// after a write successfully completes.
     Status Read(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
 
-    /// Cancels the write referenced by 'handle' and destroy associate resources. Also
-    /// restore the original data in the 'buffer' passed to Write(), decrypting or
-    /// decompressing as necessary. The cancellation always succeeds, but an error
-    /// is returned if restoring the data fails.
-    Status CancelWriteAndRestoreData(
+    /// Restore the original data in the 'buffer' passed to Write(), decrypting or
+    /// decompressing as necessary. Returns an error if restoring the data fails.
+    /// The write must not be in-flight - the caller is responsible for waiting for
+    /// the write to complete.
+    Status RestoreData(
         std::unique_ptr<WriteHandle> handle, MemRange buffer) WARN_UNUSED_RESULT;
 
     /// Wait for the in-flight I/Os to complete and destroy resources associated with
@@ -255,19 +254,27 @@ class TmpFileMgr {
   /// handle can be passed to FileGroup::Read() to read back the data zero or more times.
   /// FileGroup::DestroyWriteHandle() can be called at any time to destroy the handle and
   /// allow reuse of the scratch file range written to. Alternatively,
-  /// FileGroup::CancelWriteAndRestoreData() can be called to reverse the effects of
-  /// FileGroup::Write() by destroying the handle and restoring the original data to the
-  /// buffer, so long as the data in the buffer was not modified by the caller.
+  /// FileGroup::RestoreData() can be called to reverse the effects of FileGroup::Write()
+  /// by destroying the handle and restoring the original data to the buffer, so long as
+  /// the data in the buffer was not modified by the caller.
   ///
   /// Public methods of WriteHandle are safe to call concurrently from multiple threads.
   class WriteHandle {
    public:
     /// The write must be destroyed by passing it to FileGroup - destroying it before
-    /// cancelling the write is an error.
-    ~WriteHandle() {
-      DCHECK(!write_in_flight_);
-      DCHECK(is_cancelled_);
-    }
+    /// the write completes is an error.
+    ~WriteHandle() { DCHECK(!write_in_flight_); }
+
+    /// Cancels the write asynchronously. After Cancel() is called, writes are not
+    /// retried. The write callback may be called with a CANCELLED status (unless
+    /// it succeeded or encountered a different error first).
+    /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it.
+    void Cancel();
+
+    /// Blocks until the write completes either successfully or unsuccessfully.
+    /// May return before the write callback has been called.
+    /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it.
+    void WaitForWrite();
 
     /// Path of temporary file backing the block. Intended for use in testing.
     /// Returns empty string if no backing file allocated.
@@ -296,13 +303,6 @@ class TmpFileMgr {
     Status RetryWrite(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file,
         int64_t offset) WARN_UNUSED_RESULT;
 
-    /// Cancels the write asynchronously. After Cancel() is called, writes are not
-    /// retried.
-    void Cancel();
-
-    /// Blocks until the write completes either successfully or unsuccessfully.
-    void WaitForWrite();
-
     /// Called when the write has completed successfully or not. Sets 'write_in_flight_'
     /// then calls 'cb_'.
     void WriteComplete(const Status& write_status);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/testutil/cpu-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/cpu-util.h b/be/src/testutil/cpu-util.h
new file mode 100644
index 0000000..d465c52
--- /dev/null
+++ b/be/src/testutil/cpu-util.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_TESTUTIL_CPU_UTIL_H_
+#define IMPALA_TESTUTIL_CPU_UTIL_H_
+
+#include <pthread.h>
+#include <sched.h>
+
+#include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
+#include "util/cpu-info.h"
+
+namespace impala {
+
+class CpuTestUtil {
+ public:
+  /// Set the thread affinity so that it always runs on 'core'. Fail the test if
+  /// unsuccessful.
+  static void PinToCore(int core) {
+    cpu_set_t cpuset;
+    CPU_ZERO(&cpuset);
+    CPU_SET(core, &cpuset);
+    ASSERT_EQ(0, pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset))
+        << core;
+    ASSERT_EQ(core, CpuInfo::GetCurrentCore());
+  }
+
+  /// Reset the thread affinity of the current thread to all cores.
+  static void ResetAffinity() {
+    cpu_set_t cpuset;
+    CPU_ZERO(&cpuset);
+    for (int i = 0; i < CpuInfo::GetMaxNumCores(); ++i) {
+      CPU_SET(i, &cpuset);
+    }
+    ASSERT_EQ(0, pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset));
+  }
+
+  /// Choose a random core in [0, CpuInfo::num_cores()) and pin the current thread to it.
+  /// Uses 'rng' for randomness.
+  static void PinToRandomCore(std::mt19937* rng) {
+    int core = std::uniform_int_distribution<int>(0, CpuInfo::num_cores() - 1)(*rng);
+    PinToCore(core);
+  }
+
+  /// Setup a fake NUMA setup where CpuInfo will report a NUMA configuration other than
+  /// the system's actual configuration. If 'has_numa' is true, sets it up as three NUMA
+  /// nodes with the cores distributed between them. Otherwise sets it up as a single
+  /// NUMA node.
+  static void SetupFakeNuma(bool has_numa) {
+    std::vector<int> core_to_node(CpuInfo::GetMaxNumCores());
+    int num_nodes = has_numa ? 3 : 1;
+    for (int i = 0; i < core_to_node.size(); ++i) core_to_node[i] = i % num_nodes;
+    CpuInfo::InitFakeNumaForTest(num_nodes, core_to_node);
+    LOG(INFO) << CpuInfo::DebugString();
+  }
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/testutil/rand-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/rand-util.h b/be/src/testutil/rand-util.h
new file mode 100644
index 0000000..6cdbed4
--- /dev/null
+++ b/be/src/testutil/rand-util.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_TESTUTIL_RAND_UTIL_H_
+#define IMPALA_TESTUTIL_RAND_UTIL_H_
+
+#include <cstdint>
+#include <cstdlib>
+#include <random>
+
+#include "common/logging.h"
+
+namespace impala {
+
+/// Test helpers for randomised tests.
+class RandTestUtil {
+ public:
+  /// Seed 'rng' with a seed either from the environment variable 'env_var' or the
+  /// current time.
+  static void SeedRng(const char* env_var, std::mt19937* rng) {
+    const char* seed_str = getenv(env_var);
+    int64_t seed;
+    if (seed_str != nullptr) {
+      seed = atoi(seed_str);
+    } else {
+      seed = time(nullptr);
+    }
+    LOG(INFO) << "Random seed (overridable with " << env_var << "): " << seed;
+    rng->seed(seed);
+  }
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/util/cpu-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.cc b/be/src/util/cpu-info.cc
index e5d2a83..a32571e 100644
--- a/be/src/util/cpu-info.cc
+++ b/be/src/util/cpu-info.cc
@@ -77,6 +77,8 @@ int CpuInfo::max_num_cores_;
 string CpuInfo::model_name_ = "unknown";
 int CpuInfo::max_num_numa_nodes_;
 unique_ptr<int[]> CpuInfo::core_to_numa_node_;
+vector<vector<int>> CpuInfo::numa_node_to_cores_;
+vector<int> CpuInfo::numa_node_core_idx_;
 
 static struct {
   string name;
@@ -182,6 +184,7 @@ void CpuInfo::InitNuma() {
     // Assume a single NUMA node.
     max_num_numa_nodes_ = 1;
     std::fill_n(core_to_numa_node_.get(), max_num_cores_, 0);
+    InitNumaNodeToCores();
     return;
   }
 
@@ -215,6 +218,29 @@ void CpuInfo::InitNuma() {
       core_to_numa_node_[core] = 0;
     }
   }
+  InitNumaNodeToCores();
+}
+
+void CpuInfo::InitFakeNumaForTest(
+    int max_num_numa_nodes, const vector<int>& core_to_numa_node) {
+  DCHECK_EQ(max_num_cores_, core_to_numa_node.size());
+  max_num_numa_nodes_ = max_num_numa_nodes;
+  for (int i = 0; i < max_num_cores_; ++i) {
+    core_to_numa_node_[i] = core_to_numa_node[i];
+  }
+  numa_node_to_cores_.clear();
+  InitNumaNodeToCores();
+}
+
+void CpuInfo::InitNumaNodeToCores() {
+  DCHECK(numa_node_to_cores_.empty());
+  numa_node_to_cores_.resize(max_num_numa_nodes_);
+  numa_node_core_idx_.resize(max_num_cores_);
+  for (int core = 0; core < max_num_cores_; ++core) {
+    vector<int>* cores_of_node = &numa_node_to_cores_[core_to_numa_node_[core]];
+    numa_node_core_idx_[core] = cores_of_node->size();
+    cores_of_node->push_back(core);
+  }
 }
 
 void CpuInfo::VerifyCpuRequirements() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/util/cpu-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.h b/be/src/util/cpu-info.h
index 28af3e5..38d6782 100644
--- a/be/src/util/cpu-info.h
+++ b/be/src/util/cpu-info.h
@@ -21,6 +21,7 @@
 
 #include <memory>
 #include <string>
+#include <vector>
 #include <boost/cstdint.hpp>
 
 #include "common/logging.h"
@@ -109,10 +110,35 @@ class CpuInfo {
   /// [0, GetMaxNumCores()).
   static int GetNumaNodeOfCore(int core) {
     DCHECK_LE(0, core);
-    DCHECK_LT(core, max_num_numa_nodes_);
+    DCHECK_LT(core, max_num_cores_);
     return core_to_numa_node_[core];
   }
 
+  /// Returns the cores in a NUMA node. 'node' must be in the range
+  /// [0, GetMaxNumNumaNodes()).
+  static const std::vector<int>& GetCoresOfNumaNode(int node) {
+    DCHECK_LE(0, node);
+    DCHECK_LT(node, max_num_numa_nodes_);
+    return numa_node_to_cores_[node];
+  }
+
+  /// Returns the cores in the same NUMA node as 'core'. 'core' must be in the range
+  /// [0, GetMaxNumCores()).
+  static const std::vector<int>& GetCoresOfSameNumaNode(int core) {
+    DCHECK_LE(0, core);
+    DCHECK_LT(core, max_num_cores_);
+    return GetCoresOfNumaNode(GetNumaNodeOfCore(core));
+  }
+
+  /// Returns the index of the given core within the vector returned by
+  /// GetCoresOfNumaNode() and GetCoresOfSameNumaNode(). 'core' must be in the range
+  /// [0, GetMaxNumCores()).
+  static int GetNumaNodeCoreIdx(int core) {
+    DCHECK_LE(0, core);
+    DCHECK_LT(core, max_num_cores_);
+    return numa_node_core_idx_[core];
+  }
+
   /// Returns the model name of the cpu (e.g. Intel i7-2600)
   static std::string model_name() {
     DCHECK(initialized_);
@@ -150,10 +176,23 @@ class CpuInfo {
     bool reenable_;
   };
 
+ protected:
+  friend class CpuTestUtil;
+
+  /// Setup fake NUMA info to simulate NUMA for backend tests. Sets up CpuInfo to
+  /// simulate 'max_num_numa_nodes' with 'core_to_numa_node' specifying the NUMA node
+  /// of each core in [0, GetMaxNumCores()).
+  static void InitFakeNumaForTest(
+      int max_num_numa_nodes, const std::vector<int>& core_to_numa_node);
+
  private:
   /// Initialize NUMA-related state - called from Init();
   static void InitNuma();
 
+  /// Initialize 'numa_node_to_cores_' based on 'max_num_numa_nodes_' and
+  /// 'core_to_numa_node_'. Called from InitNuma();
+  static void InitNumaNodeToCores();
+
   /// Populates the arguments with information about this machine's caches.
   /// The values returned are not reliable in some environments, e.g. RHEL5 on EC2, so
   /// so we will keep this as a private method.
@@ -173,7 +212,14 @@ class CpuInfo {
 
   /// Array with 'max_num_cores_' entries, each of which is the NUMA node of that core.
   static std::unique_ptr<int[]> core_to_numa_node_;
-};
 
+  /// Vector with 'max_num_numa_nodes_' entries, each of which is a vector of the cores
+  /// belonging to that NUMA node.
+  static std::vector<std::vector<int>> numa_node_to_cores_;
+
+  /// Array with 'max_num_cores_' entries, each of which is the index of that core in its
+  /// NUMA node.
+  static std::vector<int> numa_node_core_idx_;
+};
 }
 #endif



[4/4] incubator-impala git commit: IMPALA-5124: add tests for scratch read errors

Posted by ta...@apache.org.
IMPALA-5124: add tests for scratch read errors

Adds tests for read errors from permissions (i.e. open() fails),
corrupt data (integrity check fails) and truncated files (read() fails).

Fixes a couple of bugs:
* Truncated reads were not detected in TmpFilemgr
* IoMgr buffers weren't returned on error paths (this isn't a true leak
  but results in DCHECKs being hit).

Change-Id: I3f2b93588dd47f70a4863ecad3b5556c3634ccb4
Reviewed-on: http://gerrit.cloudera.org:8080/6562
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/42002b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/42002b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/42002b91

Branch: refs/heads/master
Commit: 42002b91cb2db42be984b6ed95bc08c6dbd70c74
Parents: cb900df
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Apr 5 14:31:33 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Apr 18 06:34:47 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/bufferpool/buffer-pool-test.cc | 92 +++++++++++++++++++++-
 be/src/runtime/tmp-file-mgr.cc                | 27 +++++--
 common/thrift/generate_error_codes.py         |  3 +
 3 files changed, 112 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/42002b91/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index a40feac..dae7bcd 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -296,7 +296,7 @@ class BufferPoolTest : public ::testing::Test {
     directory_iterator dir_it(SCRATCH_DIR);
     for (; dir_it != directory_iterator(); ++dir_it) {
       ++num_files;
-      chmod(dir_it->path().c_str(), 0);
+      EXPECT_EQ(0, chmod(dir_it->path().c_str(), 0));
     }
     return num_files;
   }
@@ -309,6 +309,25 @@ class BufferPoolTest : public ::testing::Test {
     LOG(INFO) << "Injected fault by removing file permissions " << path;
   }
 
+  /// Write out a bunch of nonsense to replace the file's current data.
+  static void CorruptBackingFile(const string& path) {
+    EXPECT_GT(path.size(), 0);
+    FILE* file = fopen(path.c_str(), "rb+");
+    EXPECT_EQ(0, fseek(file, 0, SEEK_END));
+    int64_t size = ftell(file);
+    EXPECT_EQ(0, fseek(file, 0, SEEK_SET));
+    for (int64_t i = 0; i < size; ++i) fputc(123, file);
+    fclose(file);
+    LOG(INFO) << "Injected fault by corrupting file " << path;
+  }
+
+  /// Truncate the file to 0 bytes.
+  static void TruncateBackingFile(const string& path) {
+    EXPECT_GT(path.size(), 0);
+    EXPECT_EQ(0, truncate(path.c_str(), 0));
+    LOG(INFO) << "Injected fault by truncating file " << path;
+  }
+
   // Return the path of the temporary file backing the page.
   static string TmpFilePath(PageHandle* page) {
     return page->page_->write_handle->TmpFilePath();
@@ -1194,7 +1213,7 @@ void BufferPoolTest::TestWriteError(int write_delay_ms) {
   WaitForAllWrites(&client);
   // Repin the pages
   PinAll(&pool, &client, &pages);
-  // Remove the backing storage so that future writes will fail
+  // Remove permissions to the backing storage so that future writes will fail
   ASSERT_GT(RemoveScratchPerms(), 0);
   // Give the first write a chance to fail before the second write starts.
   const int INTERVAL_MS = 10;
@@ -1247,7 +1266,7 @@ TEST_F(BufferPoolTest, TmpFileAllocateError) {
   // Unpin a page, which will trigger a write.
   pool.Unpin(&client, &pages[0]);
   WaitForAllWrites(&client);
-  // Remove temporary files - subsequent operations will fail.
+  // Remove permissions to the temporary files - subsequent operations will fail.
   ASSERT_GT(RemoveScratchPerms(), 0);
   // The write error will happen asynchronously.
   pool.Unpin(&client, &pages[1]);
@@ -1382,6 +1401,73 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) {
   }
 }
 
+// Test error handling when on-disk data is corrupted and the read fails.
+TEST_F(BufferPoolTest, ScratchReadError) {
+  // Only allow one buffer in memory.
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+
+  // Simulate different types of error.
+  enum ErrType {
+    CORRUPT_DATA, // Overwrite real spilled data with bogus data.
+    NO_PERMS, // Remove permissions on the scratch file.
+    TRUNCATE // Truncate the scratch file, destroying spilled data.
+  };
+  for (ErrType error_type : {CORRUPT_DATA, NO_PERMS, TRUNCATE}) {
+    ClientHandle client;
+    ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+        nullptr, TOTAL_MEM, NewProfile(), &client));
+    ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+    PageHandle page;
+    ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
+    // Unpin a page, which will trigger a write.
+    pool.Unpin(&client, &page);
+    WaitForAllWrites(&client);
+
+    // Force eviction of the page.
+    ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
+
+    string tmp_file = TmpFilePath(&page);
+    if (error_type == CORRUPT_DATA) {
+      CorruptBackingFile(tmp_file);
+    } else if (error_type == NO_PERMS) {
+      DisableBackingFile(tmp_file);
+    } else {
+      DCHECK_EQ(error_type, TRUNCATE);
+      TruncateBackingFile(tmp_file);
+    }
+    Status status = pool.Pin(&client, &page);
+    if (error_type == CORRUPT_DATA && !FLAGS_disk_spill_encryption) {
+      // Without encryption we can't detect that the data changed.
+      EXPECT_OK(status);
+    } else {
+      // Otherwise the read should fail.
+      EXPECT_FALSE(status.ok());
+    }
+    // Should be able to destroy the page, even though we hit an error.
+    pool.DestroyPage(&client, &page);
+
+    // If the backing file is still enabled, we should still be able to pin and unpin
+    // pages as normal.
+    ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
+    WriteData(page, 1);
+    pool.Unpin(&client, &page);
+    WaitForAllWrites(&client);
+    if (error_type == NO_PERMS) {
+      // The error prevents read/write of scratch files - this will fail.
+      EXPECT_FALSE(pool.Pin(&client, &page).ok());
+    } else {
+      // The error does not prevent read/write of scratch files.
+      ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
+      ASSERT_OK(pool.Pin(&client, &page));
+      VerifyData(page, 1);
+    }
+    pool.DestroyPage(&client, &page);
+    pool.DeregisterClient(&client);
+  }
+}
+
 /// Test that the buffer pool fails cleanly when all scratch directories are inaccessible
 /// at runtime.
 TEST_F(BufferPoolTest, NoDirsAllocationError) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/42002b91/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 4650e69..37a05cf 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -372,6 +372,7 @@ Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) {
   DCHECK(handle->write_range_ != nullptr);
   DCHECK(!handle->is_cancelled_);
   DCHECK_EQ(buffer.len(), handle->len());
+  Status status;
 
   // Don't grab 'lock_' in this method - it is not necessary because we don't touch
   // any members that it protects and could block other threads for the duration of
@@ -384,23 +385,35 @@ Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) {
   scan_range->Reset(nullptr, handle->write_range_->file(), handle->write_range_->len(),
       handle->write_range_->offset(), handle->write_range_->disk_id(), false,
       DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len()));
-  DiskIoMgr::BufferDescriptor* io_mgr_buffer;
+  DiskIoMgr::BufferDescriptor* io_mgr_buffer = nullptr;
   {
     SCOPED_TIMER(disk_read_timer_);
     read_counter_->Add(1);
     bytes_read_counter_->Add(buffer.len());
-    RETURN_IF_ERROR(io_mgr_->Read(io_ctx_, scan_range, &io_mgr_buffer));
+    status = io_mgr_->Read(io_ctx_, scan_range, &io_mgr_buffer);
+    if (!status.ok()) goto exit;
   }
 
   if (FLAGS_disk_spill_encryption) {
-    RETURN_IF_ERROR(handle->CheckHashAndDecrypt(buffer));
+    status = handle->CheckHashAndDecrypt(buffer);
+    if (!status.ok()) goto exit;
   }
 
-  DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data());
-  DCHECK_EQ(io_mgr_buffer->len(), buffer.len());
   DCHECK(io_mgr_buffer->eosr());
-  io_mgr_buffer->Return();
-  return Status::OK();
+  DCHECK_LE(io_mgr_buffer->len(), buffer.len());
+  if (io_mgr_buffer->len() < buffer.len()) {
+    // The read was truncated - this is an error.
+    status = Status(TErrorCode::SCRATCH_READ_TRUNCATED, buffer.len(),
+        handle->write_range_->file(), handle->write_range_->offset(),
+        io_mgr_buffer->len());
+    goto exit;
+  }
+  DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data());
+
+exit:
+  // Always return the buffer before exiting to avoid leaking it.
+  if (io_mgr_buffer != nullptr) io_mgr_buffer->Return();
+  return status;
 }
 
 Status TmpFileMgr::FileGroup::RestoreData(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/42002b91/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 4c32768..39036c5 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -312,6 +312,9 @@ error_codes = (
   ("SCRATCH_ALLOCATION_FAILED", 101, "Could not create files in any configured scratch "
    "directories (--scratch_dirs). See logs for previous errors that may have prevented "
    "creating or writing scratch files."),
+
+  ("SCRATCH_READ_TRUNCATED", 102, "Error reading $0 bytes from scratch file '$1' at "
+   "offset $2: could only read $3 bytes"),
 )
 
 import sys


[2/4] incubator-impala git commit: IMPALA-3203: Part 2: per-core free lists in buffer pool

Posted by ta...@apache.org.
IMPALA-3203: Part 2: per-core free lists in buffer pool

Add per-core lists of clean pages and free pages to enable allocation
of buffers without contention on shared locks in the common case.

This is implemented with an additional layer of abstraction in
"BufferAllocator", which tracks all memory (free buffers and clean
pages) that is not in use but has not been released to the OS.
The old BufferAllocator is renamed to SystemAllocator.

See "Spilled Page Mgmt" and "MMap Allocator & Scalable Free Lists" in
https://goo.gl/0zuy97 for a high-level summary of how this fits into
the buffer pool design.

The guts of the new code is BufferAllocator::AllocateInternal(),
which progresses through several strategies for allocating memory.

Misc changes:
* Enforce upper limit on buffer size to reduce the number of free lists
  required.
* Add additional allocation counters.
* Slightly reorganise the MemTracker GC functions to use lambdas and
  clarify the order in which they should be called. Also adds a target
  memory value so that they don't need to free *all* of the memory in
  the system.
* Fix an accounting bug in the buffer pool where it didn't
  evict dirty pages before reclaiming a clean page.

Performance:
We will need to validate the performance of the system under high query
concurrency before this is used as part of query execution. The benchmark
in Part 1 provided some evidence that this approach of a list per core
should scale well to many cores.

Testing:
Added buffer-allocator-test to test the free list resizing algorithm
directly.

Added a test to buffer-pool-test to exercise the various new memory
reclamation code paths that are now possible. Also run buffer-pool-test
under two different faked-out NUMA setups - one with no NUMA and another
with three NUMA nodes.

buffer-pool-test, suballocator-test, and buffered-tuple-stream-v2-test
provide some further basic coverage. Future system and unit tests will
validate this further before it is used for query execution (see
IMPALA-3200).

Ran an initial version of IMPALA-4114, the ported BufferedBlockMgr
tests, against this. The randomised stress test revealed some accounting
bugs which are fixed. I'll post those tests as a follow-on patch.

Change-Id: I612bd1cd0f0e87f7d8186e5bedd53a22f2d80832
Reviewed-on: http://gerrit.cloudera.org:8080/6414
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6c162df3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6c162df3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6c162df3

Branch: refs/heads/master
Commit: 6c162df38c1e929a6e8393733f27d56f07c3dea1
Parents: 8bdfe03
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Apr 5 23:09:39 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Apr 18 06:27:39 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/free-lists-benchmark.cc       |  12 +-
 be/src/common/init.cc                           |  61 +-
 be/src/runtime/buffered-block-mgr.cc            |   7 +-
 be/src/runtime/bufferpool/CMakeLists.txt        |   2 +
 .../runtime/bufferpool/buffer-allocator-test.cc | 125 ++++
 be/src/runtime/bufferpool/buffer-allocator.cc   | 573 ++++++++++++++++++-
 be/src/runtime/bufferpool/buffer-allocator.h    | 192 ++++++-
 .../runtime/bufferpool/buffer-pool-counters.h   |  10 +-
 .../runtime/bufferpool/buffer-pool-internal.h   |   5 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 175 +++++-
 be/src/runtime/bufferpool/buffer-pool.cc        | 164 ++----
 be/src/runtime/bufferpool/buffer-pool.h         | 102 ++--
 be/src/runtime/bufferpool/free-list-test.cc     |  32 +-
 be/src/runtime/bufferpool/free-list.h           |  22 +-
 be/src/runtime/bufferpool/suballocator-test.cc  |  16 +-
 be/src/runtime/bufferpool/suballocator.h        |   6 +-
 be/src/runtime/bufferpool/system-allocator.cc   |  44 ++
 be/src/runtime/bufferpool/system-allocator.h    |  50 ++
 be/src/runtime/disk-io-mgr.cc                   |  18 +-
 be/src/runtime/disk-io-mgr.h                    |  10 +-
 be/src/runtime/exec-env.cc                      |  22 +-
 be/src/runtime/mem-tracker.cc                   |  21 +-
 be/src/runtime/mem-tracker.h                    |  15 +-
 be/src/runtime/tmp-file-mgr-test.cc             |   4 +-
 be/src/runtime/tmp-file-mgr.cc                  |   6 +-
 be/src/runtime/tmp-file-mgr.h                   |  44 +-
 be/src/testutil/cpu-util.h                      |  74 +++
 be/src/testutil/rand-util.h                     |  48 ++
 be/src/util/cpu-info.cc                         |  26 +
 be/src/util/cpu-info.h                          |  50 +-
 30 files changed, 1569 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/benchmarks/free-lists-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/free-lists-benchmark.cc b/be/src/benchmarks/free-lists-benchmark.cc
index 43caeb6..5c09b39 100644
--- a/be/src/benchmarks/free-lists-benchmark.cc
+++ b/be/src/benchmarks/free-lists-benchmark.cc
@@ -27,8 +27,8 @@
 
 #include "common/object-pool.h"
 #include "gutil/strings/substitute.h"
-#include "runtime/bufferpool/buffer-allocator.h"
 #include "runtime/bufferpool/free-list.h"
+#include "runtime/bufferpool/system-allocator.h"
 #include "util/aligned-new.h"
 #include "util/benchmark.h"
 #include "util/cpu-info.h"
@@ -320,7 +320,7 @@ static const int MAX_LIST_ENTRIES = 64;
 static const int ALLOC_OP = 0;
 static const int FREE_OP = 1;
 
-static BufferAllocator allocator(64 * 1024);
+static SystemAllocator allocator(64 * 1024);
 
 // Simulate doing some work with the buffer.
 void DoWork(uint8_t* data, int64_t len) {
@@ -359,7 +359,9 @@ void DoFree(const BenchmarkParams& params, LockedList* free_list,
       list->AddFreeBuffer(move(buffers->back()));
       if (list->Size() > MAX_LIST_ENTRIES) {
         // Discard around 1/4 of the buffers to amortise the cost of sorting.
-        list->FreeBuffers(&allocator, list->Size() - MAX_LIST_ENTRIES * 3 / 4);
+        vector<BufferHandle> buffers =
+            list->GetBuffersToFree(list->Size() - MAX_LIST_ENTRIES * 3 / 4);
+        for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
       }
     } else {
       allocator.Free(move(buffers->back()));
@@ -417,7 +419,9 @@ void FreeListBenchmark(int batch_size, void* data) {
 
   // Empty out all of the free lists.
   for (LockedList* free_list : free_lists) {
-    free_list->list.FreeAll(&allocator);
+    vector<BufferHandle> buffers =
+        free_list->list.GetBuffersToFree(free_list->list.Size());
+    for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 16f0f41..4c90fa3 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -27,6 +27,7 @@
 #include "gutil/atomicops.h"
 #include "rpc/authentication.h"
 #include "rpc/thrift-util.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/decimal-value.h"
 #include "runtime/exec-env.h"
 #include "runtime/hdfs-fs-cache.h"
@@ -68,6 +69,9 @@ DEFINE_int32(max_audit_event_log_files, 0, "Maximum number of audit event log fi
     "to retain. The most recent audit event log files are retained. If set to 0, "
     "all audit event log files are retained.");
 
+DEFINE_int32(memory_maintenance_sleep_time_ms, 1000, "Sleep time in milliseconds "
+    "between memory maintenance iterations");
+
 DEFINE_int64(pause_monitor_sleep_time_ms, 500, "Sleep time in milliseconds for "
     "pause monitor thread.");
 
@@ -92,27 +96,45 @@ static const float TCMALLOC_RELEASE_FREE_MEMORY_FRACTION = 0.5f;
 
 using std::string;
 
-// Maintenance thread that runs periodically. It does a few things:
-// 1) flushes glog every logbufsecs sec. glog flushes the log file only if
-//    logbufsecs has passed since the previous flush when a new log is written. That means
-//    that on a quiet system, logs will be buffered indefinitely.
-// 2) checks that tcmalloc has not left too much memory in its pageheap
-static scoped_ptr<impala::Thread> maintenance_thread;
+// Log maintenance thread that runs periodically. It flushes glog every logbufsecs sec.
+// glog only automatically flushes the log file if logbufsecs has passed since the
+// previous flush when a new log is written. That means that on a quiet system, logs
+// will be buffered indefinitely. It also rotates log files.
+static scoped_ptr<impala::Thread> log_maintenance_thread;
+
+// Memory Maintenance thread that runs periodically to free up memory. It does the
+// following things every memory_maintenance_sleep_time_ms secs:
+// 1) Releases BufferPool memory that is not currently in use.
+// 2) Frees excess memory that TCMalloc has left in its pageheap.
+static scoped_ptr<impala::Thread> memory_maintenance_thread;
 
 // A pause monitor thread to monitor process pauses in impala daemons. The thread sleeps
 // for a short interval of time (THREAD_SLEEP_TIME_MS), wakes up and calculates the actual
 // time slept. If that exceeds PAUSE_WARN_THRESHOLD_MS, a warning is logged.
 static scoped_ptr<impala::Thread> pause_monitor;
 
-[[noreturn]] static void MaintenanceThread() {
+[[noreturn]] static void LogMaintenanceThread() {
   while (true) {
     sleep(FLAGS_logbufsecs);
 
     google::FlushLogFiles(google::GLOG_INFO);
 
-    // Tests don't need to run the maintenance thread. It causes issues when
-    // on teardown.
+    // No need to rotate log files in tests.
     if (impala::TestInfo::is_test()) continue;
+    // Check for log rotation in every interval of the maintenance thread
+    impala::CheckAndRotateLogFiles(FLAGS_max_log_files);
+    // Check for audit event log rotation in every interval of the maintenance thread
+    impala::CheckAndRotateAuditEventLogFiles(FLAGS_max_audit_event_log_files);
+  }
+}
+
+[[noreturn]] static void MemoryMaintenanceThread() {
+  while (true) {
+    SleepForMs(FLAGS_memory_maintenance_sleep_time_ms);
+    impala::ExecEnv* env = impala::ExecEnv::GetInstance();
+    if (env == nullptr) continue; // ExecEnv may not have been created yet.
+    BufferPool* buffer_pool = env->buffer_pool();
+    if (buffer_pool != nullptr) buffer_pool->Maintenance();
 
 #ifndef ADDRESS_SANITIZER
     // Required to ensure memory gets released back to the OS, even if tcmalloc doesn't do
@@ -139,18 +161,12 @@ static scoped_ptr<impala::Thread> pause_monitor;
 
     // When using tcmalloc, the process limit as measured by our trackers will
     // be out of sync with the process usage. Update the process tracker periodically.
-    impala::ExecEnv* env = impala::ExecEnv::GetInstance();
     if (env != NULL && env->process_mem_tracker() != NULL) {
       env->process_mem_tracker()->RefreshConsumptionFromMetric();
     }
 #endif
     // TODO: we should also update the process mem tracker with the reported JVM
     // mem usage.
-
-    // Check for log rotation in every interval of the maintenance thread
-    impala::CheckAndRotateLogFiles(FLAGS_max_log_files);
-    // Check for audit event log rotation in every interval of the maintenance thread
-    impala::CheckAndRotateAuditEventLogFiles(FLAGS_max_audit_event_log_files);
   }
 }
 
@@ -204,11 +220,18 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
   ABORT_IF_ERROR(impala::InitAuth(argv[0]));
 
   // Initialize maintenance_thread after InitGoogleLoggingSafe and InitThreading.
-  maintenance_thread.reset(
-      new Thread("common", "maintenance-thread", &MaintenanceThread));
+  log_maintenance_thread.reset(
+      new Thread("common", "log-maintenance-thread", &LogMaintenanceThread));
+
+  // Memory maintenance isn't necessary for frontend tests, and it's undesirable
+  // to asynchronously free memory in backend tests that are testing memory
+  // management behaviour.
+  if (!impala::TestInfo::is_test()) {
+    memory_maintenance_thread.reset(
+        new Thread("common", "memory-maintenance-thread", &MemoryMaintenanceThread));
+  }
 
-  pause_monitor.reset(
-      new Thread("common", "pause-monitor", &PauseMonitorLoop));
+  pause_monitor.reset(new Thread("common", "pause-monitor", &PauseMonitorLoop));
 
   LOG(INFO) << impala::GetVersionString();
   LOG(INFO) << "Using hostname: " << FLAGS_hostname;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index 199807b..e4737c2 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -693,9 +693,12 @@ Status BufferedBlockMgr::CancelWrite(Block* block) {
     if (is_cancelled_) return Status::CANCELLED;
   }
   if (block->write_handle_ != NULL) {
+    // Make sure the write is not in-flight.
+    block->write_handle_->Cancel();
+    block->write_handle_->WaitForWrite();
     // Restore the in-memory data without reading from disk (e.g. decrypt it).
-    RETURN_IF_ERROR(tmp_file_group_->CancelWriteAndRestoreData(
-        move(block->write_handle_), block->valid_data()));
+    RETURN_IF_ERROR(
+        tmp_file_group_->RestoreData(move(block->write_handle_), block->valid_data()));
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/CMakeLists.txt b/be/src/runtime/bufferpool/CMakeLists.txt
index 9f98968..231230b 100644
--- a/be/src/runtime/bufferpool/CMakeLists.txt
+++ b/be/src/runtime/bufferpool/CMakeLists.txt
@@ -26,9 +26,11 @@ add_library(BufferPool
   buffer-pool.cc
   reservation-tracker.cc
   suballocator.cc
+  system-allocator.cc
 )
 add_dependencies(BufferPool thrift-deps)
 
+ADD_BE_TEST(buffer-allocator-test)
 ADD_BE_TEST(buffer-pool-test)
 ADD_BE_TEST(free-list-test)
 ADD_BE_TEST(reservation-tracker-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-allocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc b/be/src/runtime/bufferpool/buffer-allocator-test.cc
new file mode 100644
index 0000000..515c1ae
--- /dev/null
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+
+#include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-allocator.h"
+#include "runtime/bufferpool/buffer-pool-internal.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "testutil/cpu-util.h"
+#include "testutil/gtest-util.h"
+#include "util/cpu-info.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+class BufferAllocatorTest : public ::testing::Test {
+ public:
+  virtual void SetUp() {
+    dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0));
+    dummy_reservation_.InitRootTracker(nullptr, 0);
+    ASSERT_OK(dummy_pool_->RegisterClient("", nullptr, &dummy_reservation_, nullptr, 0,
+        obj_pool_.Add(new RuntimeProfile(&obj_pool_, "")), &dummy_client_));
+  }
+
+  virtual void TearDown() {
+    dummy_pool_->DeregisterClient(&dummy_client_);
+    dummy_reservation_.Close();
+    obj_pool_.Clear();
+    CpuTestUtil::ResetAffinity(); // Some tests modify affinity.
+  }
+
+  int GetFreeListSize(BufferPool::BufferAllocator* allocator, int core, int64_t len) {
+    return allocator->GetFreeListSize(core, len);
+  }
+
+  /// The minimum buffer size used in most tests.
+  const static int64_t TEST_BUFFER_LEN = 1024;
+
+  ObjectPool obj_pool_;
+
+  /// Need a dummy pool and client to pass around. We bypass the reservation mechanisms
+  /// in these tests so they don't need to be properly initialised.
+  BufferPool* dummy_pool_;
+  BufferPool::ClientHandle dummy_client_;
+  ReservationTracker dummy_reservation_;
+};
+
+// Functional test that makes sure the free lists cache as many buffers as expected.
+TEST_F(BufferAllocatorTest, FreeListSizes) {
+  // Run on core 0 to ensure that we always go to the same free list.
+  const int CORE = 0;
+  CpuTestUtil::PinToCore(CORE);
+
+  const int NUM_BUFFERS = 512;
+  const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN;
+
+  BufferPool::BufferAllocator allocator(dummy_pool_, TEST_BUFFER_LEN, TOTAL_BYTES);
+
+  // Allocate a bunch of buffers - all free list checks should miss.
+  vector<BufferHandle> buffers(NUM_BUFFERS);
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
+    ASSERT_OK(allocator.Allocate(&dummy_client_, TEST_BUFFER_LEN, &buffers[i]));
+  }
+
+  // Add back the allocated buffers - all should be added to the list.
+  for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
+  ASSERT_EQ(NUM_BUFFERS, GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN));
+
+  // We should be able to get back the buffers from this list.
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
+    ASSERT_OK(allocator.Allocate(&dummy_client_, TEST_BUFFER_LEN, &buffers[i]));
+    ASSERT_TRUE(buffers[i].is_open());
+  }
+  ASSERT_EQ(0, GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN));
+
+  // Add back the buffers.
+  for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
+  ASSERT_EQ(NUM_BUFFERS, GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN));
+
+  // Test DebugString().
+  LOG(INFO) << allocator.DebugString();
+
+  // Periodic maintenance should shrink the list's size each time after the first two
+  // calls, since the low water mark is the current size.
+  int maintenance_calls = 0;
+  while (GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN) > 0) {
+    int prev_size = GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN);
+    allocator.Maintenance();
+    int new_size = GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN);
+    if (maintenance_calls == 0) {
+      // The low water mark should be zero until we've called Maintenance() once.
+      EXPECT_EQ(prev_size, new_size);
+    } else {
+      // The low water mark will be the current size, so half the buffers should be freed.
+      EXPECT_EQ(prev_size == 1 ? 0 : prev_size - prev_size / 2, new_size);
+    }
+    ++maintenance_calls;
+  }
+
+  // Also exercise ReleaseMemory() - it should clear out the list entirely.
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
+    ASSERT_OK(allocator.Allocate(&dummy_client_, TEST_BUFFER_LEN, &buffers[i]));
+  }
+  for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
+  allocator.ReleaseMemory(TOTAL_BYTES);
+  ASSERT_EQ(0, GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN));
+}
+}
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.cc b/be/src/runtime/bufferpool/buffer-allocator.cc
index 7b7d216..e3c6e60 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator.cc
@@ -17,25 +17,576 @@
 
 #include "runtime/bufferpool/buffer-allocator.h"
 
-#include "util/bit-util.h"
+#include <mutex>
+
+#include <boost/bind.hpp>
+
+#include "common/atomic.h"
+#include "gutil/bits.h"
+#include "runtime/bufferpool/system-allocator.h"
+#include "util/cpu-info.h"
+#include "util/pretty-printer.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
 
 namespace impala {
 
-BufferAllocator::BufferAllocator(int64_t min_buffer_len)
-  : min_buffer_len_(min_buffer_len) {}
+/// An arena containing free buffers and clean pages that are associated with a
+/// particular core. All public methods are thread-safe.
+class BufferPool::FreeBufferArena : public CacheLineAligned {
+ public:
+  FreeBufferArena(BufferAllocator* parent);
+
+  // Destructor should only run in backend tests.
+  ~FreeBufferArena();
+
+  /// Add a free buffer to the free lists. May free buffers to the system allocator
+  /// if the list becomes full. Caller should not hold 'lock_'
+  void AddFreeBuffer(BufferHandle buffer);
+
+  /// Try to get a free buffer of 'buffer_len' bytes from this arena. Returns true and
+  /// sets 'buffer' if found or false if not found. Caller should not hold 'lock_'.
+  bool PopFreeBuffer(int64_t buffer_len, BufferHandle* buffer);
+
+  /// Try to get a buffer of 'buffer_len' bytes from this arena by evicting a clean page.
+  /// Returns true and sets 'buffer' if a clean page was evicted or false otherwise.
+  /// Caller should not hold 'lock_'
+  bool EvictCleanPage(int64_t buffer_len, BufferHandle* buffer);
+
+  /// Try to free 'target_bytes' of memory from this arena back to the system allocator.
+  /// Up to 'target_bytes_to_claim' will be given back to the caller, so it can allocate
+  /// a buffer of that size from the system. Any bytes freed in excess of
+  /// 'target_bytes_to_claim' are added to 'system_bytes_remaining_'. Returns the actual
+  /// number of bytes freed and the actual number of bytes claimed.
+  ///
+  /// Caller should not hold 'lock_'. If 'arena_lock' is non-null, ownership of the
+  /// arena lock is transferred to the caller. Uses std::unique_lock instead of
+  /// boost::unique_lock because it is movable.
+  pair<int64_t, int64_t> FreeSystemMemory(int64_t target_bytes_to_free,
+      int64_t target_bytes_to_claim, std::unique_lock<SpinLock>* arena_lock);
+
+  /// Add a clean page to the arena. Caller must hold the page's client's lock and not
+  /// hold 'lock_' or any Page::lock_.
+  void AddCleanPage(Page* page);
+
+  /// Removes the clean page from the arena if present. Returns true if removed. If
+  /// 'claim_buffer' is true, the buffer is returned with the page, otherwise it is
+  /// added to the free buffer list. Caller must hold the page's client's lock and
+  /// not hold 'lock_' or any Page::lock_.
+  bool RemoveCleanPage(bool claim_buffer, Page* page);
+
+  /// Called periodically. Shrinks free lists that are holding onto more memory than
+  /// needed.
+  void Maintenance();
+
+  /// Test helper: gets the current size of the free list for buffers of 'len' bytes
+  /// on core 'core'.
+  int GetFreeListSize(int64_t len);
+
+  string DebugString();
+
+ private:
+  /// The data structures for each power-of-two size of buffers/pages.
+  /// All members are protected by FreeBufferArena::lock_ unless otherwise mentioned.
+  struct PerSizeLists {
+    PerSizeLists() : num_free_buffers(0), low_water_mark(0), num_clean_pages(0) {}
+    /// The number of entries in 'free_buffers'. Can be read without holding a lock to
+    /// allow threads to quickly skip over empty lists when trying to find a buffer.
+    AtomicInt64 num_free_buffers;
+
+    /// Buffers that are not in use that were originally allocated on the core
+    /// corresponding to this arena.
+    FreeList free_buffers;
+
+    /// The minimum size of 'free_buffers' since the last Maintenance() call.
+    int low_water_mark;
+
+    /// The number of entries in 'clean_pages'.
+    /// Can be read without holding a lock to allow threads to quickly skip over empty
+    /// lists when trying to find a buffer in a different arena.
+    AtomicInt64 num_clean_pages;
+
+    /// Unpinned pages that have had their contents written to disk. These pages can be
+    /// evicted to reclaim a buffer for any client. Pages are evicted in FIFO order,
+    /// so that pages are evicted in approximately the same order that the clients wrote
+    /// them to disk. Protected by FreeBufferArena::lock_.
+    InternalList<Page> clean_pages;
+  };
+
+  /// Return the number of buffer sizes for this allocator.
+  int NumBufferSizes() const {
+    return parent_->log_max_buffer_len_ - parent_->log_min_buffer_len_ + 1;
+  }
+
+  /// Return the lists of buffers for buffers of the given length.
+  PerSizeLists* GetListsForSize(int64_t buffer_len) {
+    DCHECK(BitUtil::IsPowerOf2(buffer_len));
+    int idx = Bits::Log2Ceiling64(buffer_len) - parent_->log_min_buffer_len_;
+    DCHECK_LT(idx, NumBufferSizes());
+    return &buffer_sizes_[idx];
+  }
+
+  BufferAllocator* const parent_;
+
+  /// Protects all data structures in the arena. See buffer-pool-internal.h for lock
+  /// order.
+  SpinLock lock_;
 
-Status BufferAllocator::Allocate(int64_t len, BufferPool::BufferHandle* buffer) {
+  /// Free buffers and clean pages for each buffer size for this arena.
+  /// Indexed by log2(bytes) - log2(min_buffer_len_).
+  PerSizeLists buffer_sizes_[LOG_MAX_BUFFER_BYTES + 1];
+};
+
+int64_t BufferPool::BufferAllocator::CalcMaxBufferLen(
+    int64_t min_buffer_len, int64_t system_bytes_limit) {
+  // Find largest power of 2 smaller than 'system_bytes_limit'.
+  int64_t upper_bound = system_bytes_limit == 0 ? 1L : 1L
+          << Bits::Log2Floor64(system_bytes_limit);
+  upper_bound = min(MAX_BUFFER_BYTES, upper_bound);
+  return max(min_buffer_len, upper_bound); // Can't be < min_buffer_len.
+}
+
+BufferPool::BufferAllocator::BufferAllocator(
+    BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit)
+  : pool_(pool),
+    system_allocator_(new SystemAllocator(min_buffer_len)),
+    min_buffer_len_(min_buffer_len),
+    max_buffer_len_(CalcMaxBufferLen(min_buffer_len, system_bytes_limit)),
+    log_min_buffer_len_(Bits::Log2Ceiling64(min_buffer_len_)),
+    log_max_buffer_len_(Bits::Log2Ceiling64(max_buffer_len_)),
+    system_bytes_limit_(system_bytes_limit),
+    system_bytes_remaining_(system_bytes_limit),
+    per_core_arenas_(CpuInfo::GetMaxNumCores()),
+    max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS) {
+  DCHECK(BitUtil::IsPowerOf2(min_buffer_len_)) << min_buffer_len_;
+  DCHECK(BitUtil::IsPowerOf2(max_buffer_len_)) << max_buffer_len_;
+  DCHECK_LE(0, min_buffer_len_);
+  DCHECK_LE(min_buffer_len_, max_buffer_len_);
+  DCHECK_LE(max_buffer_len_, MAX_BUFFER_BYTES);
+  DCHECK_LE(max_buffer_len_, max(system_bytes_limit_, min_buffer_len_));
+
+  for (unique_ptr<FreeBufferArena>& arena : per_core_arenas_) {
+    arena.reset(new FreeBufferArena(this));
+  }
+}
+
+BufferPool::BufferAllocator::~BufferAllocator() {
+  per_core_arenas_.clear(); // Release all the memory.
+  // Check for accounting leaks.
+  DCHECK_EQ(system_bytes_limit_, system_bytes_remaining_.Load());
+}
+
+Status BufferPool::BufferAllocator::Allocate(
+    ClientHandle* client, int64_t len, BufferHandle* buffer) {
+  SCOPED_TIMER(client->impl_->counters().alloc_time);
+  COUNTER_ADD(client->impl_->counters().bytes_alloced, len);
+  COUNTER_ADD(client->impl_->counters().num_allocations, 1);
+
+  RETURN_IF_ERROR(AllocateInternal(len, buffer));
+  DCHECK(buffer->is_open());
+  buffer->client_ = client;
+  return Status::OK();
+}
+
+Status BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle* buffer) {
+  DCHECK(!buffer->is_open());
   DCHECK_GE(len, min_buffer_len_);
-  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+  DCHECK(BitUtil::IsPowerOf2(len)) << len;
+
+  if (UNLIKELY(len > MAX_BUFFER_BYTES)) {
+    return Status(Substitute(
+        "Tried to allocate buffer of $0 bytes > max of $1 bytes", len, MAX_BUFFER_BYTES));
+  }
+  if (UNLIKELY(len > system_bytes_limit_)) {
+    return Status(Substitute("Tried to allocate buffer of $0 bytes > buffer pool limit "
+        "of $1 bytes", len, system_bytes_limit_));
+  }
+
+  const int current_core = CpuInfo::GetCurrentCore();
+  // Fast path: recycle a buffer of the correct size from this core's arena.
+  FreeBufferArena* current_core_arena = per_core_arenas_[current_core].get();
+  if (current_core_arena->PopFreeBuffer(len, buffer)) return Status::OK();
+
+  // Fast-ish path: allocate a new buffer if there is room in 'system_bytes_remaining_'.
+  int64_t delta = DecreaseSystemBytesRemaining(len, true);
+  if (delta != len) {
+    DCHECK_EQ(0, delta);
+    const vector<int>& numa_node_cores = CpuInfo::GetCoresOfSameNumaNode(current_core);
+    const int numa_node_core_idx = CpuInfo::GetNumaNodeCoreIdx(current_core);
+
+    // Fast-ish path: find a buffer of the right size from another core on the same
+    // NUMA node. Avoid getting a buffer from another NUMA node - prefer reclaiming
+    // a clean page on this NUMA node or scavenging then reallocating a new buffer.
+    // We don't want to get into a state where allocations between the nodes are
+    // unbalanced and one node is stuck reusing memory allocated on the other node.
+    for (int i = 1; i < numa_node_cores.size(); ++i) {
+      // Each core should start searching from a different point to avoid hot-spots.
+      int other_core = numa_node_cores[(numa_node_core_idx + i) % numa_node_cores.size()];
+      FreeBufferArena* other_core_arena = per_core_arenas_[other_core].get();
+      if (other_core_arena->PopFreeBuffer(len, buffer)) return Status::OK();
+    }
+
+    // Fast-ish path: evict a clean page of the right size from the current NUMA node.
+    for (int i = 0; i < numa_node_cores.size(); ++i) {
+      int other_core = numa_node_cores[(numa_node_core_idx + i) % numa_node_cores.size()];
+      FreeBufferArena* other_core_arena = per_core_arenas_[other_core].get();
+      if (other_core_arena->EvictCleanPage(len, buffer)) return Status::OK();
+    }
 
-  uint8_t* alloc = reinterpret_cast<uint8_t*>(malloc(len));
-  if (alloc == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len);
-  buffer->Open(alloc, len);
+    // Slow path: scavenge buffers of different sizes from free buffer lists and clean
+    // pages. Make initial, fast attempts to gather the required buffers, before
+    // finally making a slower, but guaranteed-to-succeed attempt.
+    // TODO: IMPALA-4703: add a stress option where we vary the number of attempts
+    // randomly.
+    int attempt = 0;
+    while (attempt < max_scavenge_attempts_ && delta < len) {
+      bool final_attempt = attempt == max_scavenge_attempts_ - 1;
+      delta += ScavengeBuffers(final_attempt, current_core, len - delta);
+      ++attempt;
+    }
+    if (delta < len) {
+      system_bytes_remaining_.Add(delta);
+      // This indicates an accounting bug - we should be able to always get the memory.
+      return Status(TErrorCode::INTERNAL_ERROR, Substitute(
+          "Could not allocate $0 bytes: was only able to free up $1 bytes after $2 "
+          "attempts:\n$3", len, delta, max_scavenge_attempts_, pool_->DebugString()));
+    }
+  }
+  // We have headroom to allocate a new buffer at this point.
+  DCHECK_EQ(delta, len);
+  Status status = system_allocator_->Allocate(len, buffer);
+  if (!status.ok()) {
+    system_bytes_remaining_.Add(len);
+    return status;
+  }
   return Status::OK();
 }
 
-void BufferAllocator::Free(BufferPool::BufferHandle&& buffer) {
-  free(buffer.data());
-  buffer.Reset(); // Avoid DCHECK in ~BufferHandle().
+int64_t BufferPool::BufferAllocator::DecreaseSystemBytesRemaining(
+    int64_t max_decrease, bool require_full_decrease) {
+  while (true) {
+    int64_t old_value = system_bytes_remaining_.Load();
+    if (require_full_decrease && old_value < max_decrease) return 0;
+    int64_t decrease = min(old_value, max_decrease);
+    int64_t new_value = old_value - decrease;
+    if (system_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
+      return decrease;
+    }
+  }
+}
+
+int64_t BufferPool::BufferAllocator::ScavengeBuffers(
+    bool slow_but_sure, int current_core, int64_t target_bytes) {
+  // There are two strategies for scavenging buffers:
+  // 1) Fast, opportunistic: Each arena is searched in succession. Although reservations
+  //    guarantee that the memory we need is available somewhere, this may fail if we
+  //    we race with another thread that returned buffers to an arena that we've already
+  //    searched and took the buffers from an arena we haven't yet searched.
+  // 2) Slow, guaranteed to succeed: In order to ensure that we can find the memory in a
+  //    single pass, we hold locks for all arenas we've already examined. That way, other
+  //    threads can't take the memory that we need from an arena that we haven't yet
+  //    examined (or from 'system_bytes_available_') because in order to do so, it would
+  //    have had to return the equivalent amount of memory to an earlier arena or added
+  //    it back into 'systems_bytes_reamining_'. The former can't happen since we're
+  //    still holding those locks, and the latter is solved by trying
+  //    DecreaseSystemBytesRemaining() at the end.
+  DCHECK_GT(target_bytes, 0);
+  // First make sure we've used up all the headroom in the buffer limit.
+  int64_t bytes_found = DecreaseSystemBytesRemaining(target_bytes, false);
+  if (bytes_found == target_bytes) return bytes_found;
+
+  // In 'slow_but_sure' mode, we will hold locks for multiple arenas at the same time and
+  // therefore must start at 0 to respect the lock order. Otherwise we start with the
+  // current core's arena for locality and to avoid excessive contention on arena 0.
+  int start_core = slow_but_sure ? 0 : current_core;
+  vector<std::unique_lock<SpinLock>> arena_locks;
+  if (slow_but_sure) arena_locks.resize(per_core_arenas_.size());
+
+  for (int i = 0; i < per_core_arenas_.size(); ++i) {
+    int core_to_check = (start_core + i) % per_core_arenas_.size();
+    FreeBufferArena* arena = per_core_arenas_[core_to_check].get();
+    int64_t bytes_needed = target_bytes - bytes_found;
+    bytes_found += arena->FreeSystemMemory(bytes_needed, bytes_needed,
+         slow_but_sure ? &arena_locks[i] : nullptr).second;
+    if (bytes_found == target_bytes) break;
+  }
+  DCHECK_LE(bytes_found, target_bytes);
+
+  // Decrement 'system_bytes_remaining_' while still holding the arena locks to avoid
+  // the window for a race with another thread that removes a buffer from a list and
+  // then increments 'system_bytes_remaining_'. The race is prevented because the other
+  // thread holds the lock while decrementing 'system_bytes_remaining_' in the cases
+  // where it may not have reservation corresponding to that memory.
+  if (slow_but_sure && bytes_found < target_bytes) {
+    bytes_found += DecreaseSystemBytesRemaining(target_bytes - bytes_found, true);
+    DCHECK_EQ(bytes_found, target_bytes) << DebugString();
+  }
+  return bytes_found;
+}
+
+void BufferPool::BufferAllocator::Free(BufferHandle&& handle) {
+  DCHECK(handle.is_open());
+  handle.client_ = nullptr; // Buffer is no longer associated with a client.
+  FreeBufferArena* arena = per_core_arenas_[handle.home_core_].get();
+  arena->AddFreeBuffer(move(handle));
+}
+
+void BufferPool::BufferAllocator::AddCleanPage(
+    const unique_lock<mutex>& client_lock, Page* page) {
+  page->client->DCheckHoldsLock(client_lock);
+  FreeBufferArena* arena = per_core_arenas_[page->buffer.home_core_].get();
+  arena->AddCleanPage(page);
+}
+
+bool BufferPool::BufferAllocator::RemoveCleanPage(
+    const unique_lock<mutex>& client_lock, bool claim_buffer, Page* page) {
+  page->client->DCheckHoldsLock(client_lock);
+  FreeBufferArena* arena;
+  {
+    lock_guard<SpinLock> pl(page->buffer_lock);
+    // Page may be evicted - in which case it has no home core and is not in an arena.
+    if (!page->buffer.is_open()) return false;
+    arena = per_core_arenas_[page->buffer.home_core_].get();
+  }
+  return arena->RemoveCleanPage(claim_buffer, page);
+}
+
+void BufferPool::BufferAllocator::Maintenance() {
+  for (unique_ptr<FreeBufferArena>& arena : per_core_arenas_) arena->Maintenance();
+}
+
+void BufferPool::BufferAllocator::ReleaseMemory(int64_t bytes_to_free) {
+  int64_t bytes_freed = 0;
+  int current_core = CpuInfo::GetCurrentCore();
+  for (int i = 0; i < per_core_arenas_.size(); ++i) {
+    int core_to_check = (current_core + i) % per_core_arenas_.size();
+    FreeBufferArena* arena = per_core_arenas_[core_to_check].get();
+    // Free but don't claim any memory.
+    bytes_freed += arena->FreeSystemMemory(bytes_to_free - bytes_freed, 0, nullptr).first;
+    if (bytes_freed >= bytes_to_free) return;
+  }
+}
+
+int BufferPool::BufferAllocator::GetFreeListSize(int core, int64_t len) {
+  return per_core_arenas_[core]->GetFreeListSize(len);
+}
+
+int64_t BufferPool::BufferAllocator::FreeToSystem(vector<BufferHandle>&& buffers) {
+  int64_t bytes_freed = 0;
+  for (BufferHandle& buffer : buffers) {
+    bytes_freed += buffer.len();
+    system_allocator_->Free(move(buffer));
+  }
+  return bytes_freed;
+}
+
+string BufferPool::BufferAllocator::DebugString() {
+  stringstream ss;
+  ss << "<BufferAllocator> " << this << " min_buffer_len: " << min_buffer_len_
+     << " system_bytes_limit: " << system_bytes_limit_
+     << " system_bytes_remaining: " << system_bytes_remaining_.Load() << "\n";
+  for (int i = 0; i < per_core_arenas_.size(); ++i) {
+    ss << "  Arena " << i << " " << per_core_arenas_[i]->DebugString() << "\n";
+  }
+  return ss.str();
+}
+
+BufferPool::FreeBufferArena::FreeBufferArena(BufferAllocator* parent) : parent_(parent) {}
+
+BufferPool::FreeBufferArena::~FreeBufferArena() {
+  for (int i = 0; i < NumBufferSizes(); ++i) {
+    // Clear out the free lists.
+    FreeList* list = &buffer_sizes_[i].free_buffers;
+    vector<BufferHandle> buffers = list->GetBuffersToFree(list->Size());
+    parent_->system_bytes_remaining_.Add(parent_->FreeToSystem(move(buffers)));
+
+    // All pages should have been destroyed.
+    DCHECK_EQ(0, buffer_sizes_[i].clean_pages.size());
+  }
+}
+
+void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle buffer) {
+  lock_guard<SpinLock> al(lock_);
+  PerSizeLists* lists = GetListsForSize(buffer.len());
+  FreeList* list = &lists->free_buffers;
+  DCHECK_EQ(lists->num_free_buffers.Load(), list->Size());
+  lists->num_free_buffers.Add(1);
+  list->AddFreeBuffer(move(buffer));
+}
+
+bool BufferPool::FreeBufferArena::RemoveCleanPage(bool claim_buffer, Page* page) {
+  lock_guard<SpinLock> al(lock_);
+  PerSizeLists* lists = GetListsForSize(page->len);
+  DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());
+  if (!lists->clean_pages.Remove(page)) return false;
+  lists->num_clean_pages.Add(-1);
+  if (!claim_buffer) {
+    BufferHandle buffer;
+    {
+      lock_guard<SpinLock> pl(page->buffer_lock);
+      buffer = move(page->buffer);
+    }
+    lists->free_buffers.AddFreeBuffer(move(buffer));
+    lists->num_free_buffers.Add(1);
+  }
+  return true;
+}
+
+bool BufferPool::FreeBufferArena::PopFreeBuffer(
+    int64_t buffer_len, BufferHandle* buffer) {
+  PerSizeLists* lists = GetListsForSize(buffer_len);
+  // Check before acquiring lock.
+  if (lists->num_free_buffers.Load() == 0) return false;
+
+  lock_guard<SpinLock> al(lock_);
+  FreeList* list = &lists->free_buffers;
+  DCHECK_EQ(lists->num_free_buffers.Load(), list->Size());
+  if (!list->PopFreeBuffer(buffer)) return false;
+  lists->num_free_buffers.Add(-1);
+  lists->low_water_mark = min<int>(lists->low_water_mark, list->Size());
+  return true;
+}
+
+bool BufferPool::FreeBufferArena::EvictCleanPage(
+    int64_t buffer_len, BufferHandle* buffer) {
+  PerSizeLists* lists = GetListsForSize(buffer_len);
+  // Check before acquiring lock.
+  if (lists->num_clean_pages.Load() == 0) return false;
+
+  lock_guard<SpinLock> al(lock_);
+  DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());
+  Page* page = lists->clean_pages.Dequeue();
+  if (page == nullptr) return false;
+  lists->num_clean_pages.Add(-1);
+  lock_guard<SpinLock> pl(page->buffer_lock);
+  *buffer = move(page->buffer);
+  return true;
+}
+
+pair<int64_t, int64_t> BufferPool::FreeBufferArena::FreeSystemMemory(
+    int64_t target_bytes_to_free, int64_t target_bytes_to_claim,
+    std::unique_lock<SpinLock>* arena_lock) {
+  DCHECK_GT(target_bytes_to_free, 0);
+  DCHECK_GE(target_bytes_to_free, target_bytes_to_claim);
+  int64_t bytes_freed = 0;
+  // If the caller is acquiring the lock, just lock for the whole method.
+  // Otherwise lazily acquire the lock the first time we find some memory
+  // to free.
+  std::unique_lock<SpinLock> al(lock_, std::defer_lock_t());
+  if (arena_lock != nullptr) al.lock();
+
+  vector<BufferHandle> buffers;
+  // Search from largest to smallest to avoid freeing many small buffers unless
+  // necessary.
+  for (int i = NumBufferSizes() - 1; i >= 0; --i) {
+    PerSizeLists* lists = &buffer_sizes_[i];
+    // Check before acquiring lock to avoid expensive lock acquisition and make scanning
+    // empty lists much cheaper.
+    if (lists->num_free_buffers.Load() == 0 && lists->num_clean_pages.Load() == 0) {
+      continue;
+    }
+    if (!al.owns_lock()) al.lock();
+    FreeList* free_buffers = &lists->free_buffers;
+    InternalList<Page>* clean_pages = &lists->clean_pages;
+    DCHECK_EQ(lists->num_free_buffers.Load(), free_buffers->Size());
+    DCHECK_EQ(lists->num_clean_pages.Load(), clean_pages->size());
+
+    // Figure out how many of the buffers in the free list we should free.
+    DCHECK_GT(target_bytes_to_free, bytes_freed);
+    const int64_t buffer_len = 1L << (i + parent_->log_min_buffer_len_);
+    int64_t buffers_to_free = min(free_buffers->Size(),
+        BitUtil::Ceil(target_bytes_to_free - bytes_freed, buffer_len));
+    int64_t buffer_bytes_to_free = buffers_to_free * buffer_len;
+
+    // Evict clean pages by moving their buffers to the free page list before freeing
+    // them. This ensures that they are freed based on memory address in the expected
+    // order.
+    while (bytes_freed + buffer_bytes_to_free < target_bytes_to_free) {
+      Page* page = clean_pages->Dequeue();
+      if (page == nullptr) break;
+      lists->num_clean_pages.Add(-1);
+      BufferHandle page_buffer;
+      {
+        lock_guard<SpinLock> pl(page->buffer_lock);
+        page_buffer = move(page->buffer);
+      }
+      ++buffers_to_free;
+      buffer_bytes_to_free += page_buffer.len();
+      free_buffers->AddFreeBuffer(move(page_buffer));
+      lists->num_free_buffers.Add(1);
+    }
+    if (buffers_to_free > 0) {
+      int64_t buffer_bytes_freed =
+          parent_->FreeToSystem(free_buffers->GetBuffersToFree(buffers_to_free));
+      DCHECK_EQ(buffer_bytes_to_free, buffer_bytes_freed);
+      bytes_freed += buffer_bytes_to_free;
+      lists->num_free_buffers.Add(-buffers_to_free);
+      lists->low_water_mark = min<int>(lists->low_water_mark, free_buffers->Size());
+      if (bytes_freed >= target_bytes_to_free) break;
+    }
+    // Should have cleared out all lists if we don't have enough memory at this point.
+    DCHECK_EQ(0, free_buffers->Size());
+    DCHECK_EQ(0, clean_pages->size());
+  }
+  int64_t bytes_claimed = min(bytes_freed, target_bytes_to_claim);
+  if (bytes_freed > bytes_claimed) {
+    // Add back the extra for other threads before releasing the lock to avoid race
+    // where the other thread may not be able to find enough buffers.
+    parent_->system_bytes_remaining_.Add(bytes_freed - bytes_claimed);
+  }
+  if (arena_lock != nullptr) *arena_lock = move(al);
+  return make_pair(bytes_freed, bytes_claimed);
+}
+
+void BufferPool::FreeBufferArena::AddCleanPage(Page* page) {
+  lock_guard<SpinLock> al(lock_);
+  PerSizeLists* lists = GetListsForSize(page->len);
+  DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());
+  lists->clean_pages.Enqueue(page);
+  lists->num_clean_pages.Add(1);
+}
+
+void BufferPool::FreeBufferArena::Maintenance() {
+  lock_guard<SpinLock> al(lock_);
+  for (int i = 0; i < NumBufferSizes(); ++i) {
+    PerSizeLists* lists = &buffer_sizes_[i];
+    DCHECK_LE(lists->low_water_mark, lists->free_buffers.Size());
+    if (lists->low_water_mark != 0) {
+      // We haven't needed the buffers below the low water mark since the previous
+      // Maintenance() call. Discard half of them to free up memory. By always discarding
+      // at least one, we guarantee that an idle list will shrink to zero entries.
+      int num_to_free = max(1, lists->low_water_mark / 2);
+      parent_->system_bytes_remaining_.Add(
+          parent_->FreeToSystem(lists->free_buffers.GetBuffersToFree(num_to_free)));
+      lists->num_free_buffers.Add(-num_to_free);
+    }
+    lists->low_water_mark = lists->free_buffers.Size();
+  }
+}
+
+int BufferPool::FreeBufferArena::GetFreeListSize(int64_t len) {
+  lock_guard<SpinLock> al(lock_);
+  PerSizeLists* lists = GetListsForSize(len);
+  DCHECK_EQ(lists->num_free_buffers.Load(), lists->free_buffers.Size());
+  return lists->free_buffers.Size();
+}
+
+string BufferPool::FreeBufferArena::DebugString() {
+  lock_guard<SpinLock> al(lock_);
+  stringstream ss;
+  ss << "<FreeBufferArena> " << this << "\n";
+  for (int i = 0; i < NumBufferSizes(); ++i) {
+    int64_t buffer_len = 1L << (parent_->log_min_buffer_len_ + i);
+    PerSizeLists& lists = buffer_sizes_[i];
+    ss << "  " << PrettyPrinter::PrintBytes(buffer_len) << ":"
+       << " free buffers: " << lists.num_free_buffers.Load()
+       << " low water mark: " << lists.low_water_mark
+       << " clean pages: " << lists.num_clean_pages.Load() << " ";
+    lists.clean_pages.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
+
+    ss << "\n";
+  }
+  return ss.str();
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.h b/be/src/runtime/bufferpool/buffer-allocator.h
index 4358f75..fc79970 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.h
+++ b/be/src/runtime/bufferpool/buffer-allocator.h
@@ -18,32 +18,196 @@
 #ifndef IMPALA_RUNTIME_BUFFER_ALLOCATOR_H
 #define IMPALA_RUNTIME_BUFFER_ALLOCATOR_H
 
-#include "common/status.h"
+#include <boost/scoped_ptr.hpp>
 
-#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/buffer-pool-internal.h"
+#include "runtime/bufferpool/free-list.h"
+#include "util/aligned-new.h"
 
 namespace impala {
 
-/// The underlying memory allocator for the buffer pool. All buffers are allocated through
-/// the BufferPool's BufferAllocator. The allocator only handles allocating buffers that
-/// are power-of-two multiples of the minimum buffer length.
+/// The internal buffer allocator used by BufferPool to allocator power-of-two sized
+/// buffers. BufferAllocator builds on top of SystemAllocator by adding caching of
+/// free buffers and clean pages where the memory is not currently in use by a client
+/// but has not yet been released to SystemAllocator.
 ///
-/// TODO:
-/// * Allocate memory with mmap() instead of malloc().
-/// * Implement free lists in the allocator or external to the allocator.
-class BufferAllocator {
+/// The allocator is optimised for the common case where an allocation can be served
+/// by reclaiming a buffer of the request size from the current core's arena. In this
+/// case there is no contention for locks between concurrently-running threads. If this
+/// fails, progressively more expensive approaches to allocate memory are tried until
+/// the allocation eventually success (see AllocateInternal() for details).
+///
+/// Buffer Reservations
+/// ===================
+/// The implementation of the BufferAllocator relies on the BufferPool's reservation
+/// tracking system. The allocator is given a hard limit ('system_bytes_limit'), above
+/// which all allocations will fail. Allocations up to 'system_bytes_limit' are
+/// guaranteed to succeed unless an unexpected system error occurs (e.g. we can't allocate
+/// all of the required memory from the OS). Reservations must be set up so that the total
+/// of all reservations does not exceed 'system_bytes_limit', thus ensuring that
+/// BufferAllocator can alway find memory to fulfill reservations.
+///
+/// +========================+
+/// | IMPLEMENTATION NOTES   |
+/// +========================+
+///
+/// Memory
+/// ======
+/// Memory managed by BufferAllocator comes in four forms:
+/// 1. Buffers returned to the client (corresponding to a used reservation)
+/// 2. Free buffers cached in the BufferAllocator's free lists.
+/// 3. Buffers attached to clean unpinned pages in the BufferAllocator's clean page lists.
+/// 4. Bytes that are not allocated from the system: 'system_bytes_remaining_'.
+/// Together these always add up to 'system_bytes_limit', which allows BufferAllocator
+/// to always fulfill reservations via some combination of memory in forms 2, 3 or 4.
+///
+/// The BufferAllocator code is careful not to make memory inaccessible to concurrently
+/// executing threads that are entitled to it. E.g. if one thread is entitled to allocate
+/// a 1MB buffer from the BufferAllocator's free or clean page lists but needs to release
+/// a 2MB buffer to the system to free up enough memory, it must add 1MB to
+/// 'system_bytes_remaining_' in the same critical section in which it freed the 2MB
+/// buffer. Otherwise a concurrent thread that had a reservation for 1MB of memory might
+/// not be able to find it.
+///
+/// Arenas
+/// ======
+/// The buffer allocator's data structures are broken up into arenas, with an arena per
+/// core. Within each arena, each buffer or page is stored in a list with buffers and
+/// pages of the same size: there is a separate list for every power-of-two size. Each
+/// arena is protected by a separate lock, so in the common case where threads are able
+/// to fulfill allocations from their own arena, there will be no lock contention.
+///
+class BufferPool::BufferAllocator {
  public:
-  BufferAllocator(int64_t min_buffer_len);
+  BufferAllocator(BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit);
+  ~BufferAllocator();
 
-  /// Allocate memory for a buffer of 'len' bytes. 'len' must be a power-of-two multiple
-  /// of the minimum buffer length.
-  Status Allocate(int64_t len, BufferPool::BufferHandle* buffer) WARN_UNUSED_RESULT;
+  /// Allocate a buffer with a power-of-two length 'len'. This function may acquire
+  /// 'FreeBufferArena::lock_' and Page::lock so no locks lower in the lock acquisition
+  /// order (see buffer-pool-internal.h) should be held by the caller.
+  ///
+  /// Always succeeds on allocating memory up to 'system_bytes_limit', unless the system
+  /// is unable to give us 'system_bytes_limit' of memory or an internal bug: if all
+  /// clients write out enough dirty pages to stay within their reservation, then there
+  /// should always be enough free buffers and clean pages to reclaim.
+  Status Allocate(ClientHandle* client, int64_t len,
+      BufferPool::BufferHandle* buffer) WARN_UNUSED_RESULT;
 
-  /// Free the memory for a previously-allocated buffer.
+  /// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates
+  /// internal state but does not release to any reservation.
   void Free(BufferPool::BufferHandle&& buffer);
 
+  /// Adds a clean page 'page' to a clean page list. Caller must hold the page's
+  /// client's lock via 'client_lock' so that moving the page between the client list and
+  /// the free page list is atomic. Caller must not hold 'FreeBufferArena::lock_' or any
+  /// Page::lock.
+  void AddCleanPage(const boost::unique_lock<boost::mutex>& client_lock, Page* page);
+
+  /// Removes a clean page 'page' from a clean page list and returns true, if present in
+  /// one of the lists. Returns true if it was present. If 'claim_buffer' is true, the
+  /// caller must have reservation for the buffer, which is returned along with the page.
+  /// Otherwise the buffer is moved directly to the free buffer list. Caller must hold
+  /// the page's client's lock via 'client_lock' so that moving the page between the
+  /// client list and the free page list is atomic. Caller must not hold
+  /// 'FreeBufferArena::lock_' or any Page::lock.
+  bool RemoveCleanPage(
+      const boost::unique_lock<boost::mutex>& client_lock, bool claim_buffer, Page* page);
+
+  /// Periodically called to release free buffers back to the SystemAllocator. Releases
+  /// buffers based on recent allocation patterns, trying to minimise the number of
+  /// excess buffers retained in each list above the minimum required to avoid going
+  /// to the system allocator.
+  void Maintenance();
+
+  /// Try to release at least 'bytes_to_free' bytes of memory to the system allocator.
+  void ReleaseMemory(int64_t bytes_to_free);
+
+  std::string DebugString();
+
+ protected:
+  friend class BufferAllocatorTest;
+  friend class BufferPoolTest;
+  friend class FreeBufferArena;
+
+  /// Test helper: gets the current size of the free list for buffers of 'len' bytes
+  /// on core 'core'.
+  int GetFreeListSize(int core, int64_t len);
+
+  /// Test helper: reduce the number of scavenge attempts so backend tests can force
+  /// use of the "locked" scavenging code path.
+  void set_max_scavenge_attempts(int val) {
+    DCHECK_GE(val, 1);
+    max_scavenge_attempts_ = val;
+  }
+
  private:
+  /// Compute the maximum power-of-two buffer length that could be allocated based on the
+  /// amount of memory available 'system_bytes_limit'. The value is always at least
+  /// 'min_buffer_len' so that there is at least one valid buffer size.
+  static int64_t CalcMaxBufferLen(int64_t min_buffer_len, int64_t system_bytes_limit);
+
+  /// Same as Allocate() but leaves 'buffer->client_' NULL and does not update counters.
+  Status AllocateInternal(
+      int64_t len, BufferPool::BufferHandle* buffer) WARN_UNUSED_RESULT;
+
+  /// Decrease 'system_bytes_remaining_' by up to 'max_decrease', down to a minimum of 0.
+  /// If 'require_full_decrease' is true, only decrease if we can decrease it
+  /// 'max_decrease'. Returns the amount it was decreased by.
+  int64_t DecreaseSystemBytesRemaining(int64_t max_decrease, bool require_full_decrease);
+
+  /// Tries to reclaim enough memory from various sources so that the caller can allocate
+  /// a buffer of 'target_bytes' from the system allocator. Scavenges buffers from the
+  /// free buffer and clean page lists of all cores and frees them with
+  /// 'system_allocator_'. Also tries to decrement 'system_bytes_remaining_'.
+  /// 'current_core' is the index of the current CPU core. Any bytes freed in excess of
+  /// 'target_bytes' are added to 'system_bytes_remaining_.' If 'slow_but_sure' is true,
+  /// this function uses a slower strategy that guarantees enough memory will be found
+  /// but can block progress of other threads for longer. If 'slow_but_sure' is false,
+  /// then this function optimistically tries to reclaim the memory but may not reclaim
+  /// 'target_bytes' of memory. Returns the number of bytes reclaimed.
+  int64_t ScavengeBuffers(bool slow_but_sure, int current_core, int64_t target_bytes);
+
+  /// Helper to free a list of buffers to the system. Returns the number of bytes freed.
+  int64_t FreeToSystem(std::vector<BufferHandle>&& buffers);
+
+  /// The pool that this allocator is associated with.
+  BufferPool* const pool_;
+
+  /// System allocator that is ultimately used to allocate and free buffers.
+  const boost::scoped_ptr<SystemAllocator> system_allocator_;
+
+  /// The minimum power-of-two buffer length that can be allocated.
   const int64_t min_buffer_len_;
+
+  /// The maximum power-of-two buffer length that can be allocated. Always >=
+  /// 'min_buffer_len' so that there is at least one valid buffer size.
+  const int64_t max_buffer_len_;
+
+  /// The log2 of 'min_buffer_len_'.
+  const int log_min_buffer_len_;
+
+  /// The log2 of 'max_buffer_len_'.
+  const int log_max_buffer_len_;
+
+  /// The maximum physical memory in bytes that will be allocated from the system.
+  const int64_t system_bytes_limit_;
+
+  /// The remaining number of bytes of 'system_bytes_limit_' that can be used for
+  /// allocating new buffers. Must be updated atomically before a new buffer is
+  /// allocated or after an existing buffer is freed with the system allocator.
+  AtomicInt64 system_bytes_remaining_;
+
+  /// Free and clean pages. One arena per core.
+  std::vector<std::unique_ptr<FreeBufferArena>> per_core_arenas_;
+
+  /// Default number of times to attempt scavenging.
+  static const int MAX_SCAVENGE_ATTEMPTS = 3;
+
+  /// Number of times to attempt scavenging. Usually MAX_SCAVENGE_ATTEMPTS but can be
+  /// overridden by tests. The first max_scavenge_attempts_ - 1 attempts do not lock
+  /// all arenas so may fail. The final attempt locks all arenas, which is expensive
+  /// but is guaranteed to succeed.
+  int max_scavenge_attempts_;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool-counters.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-counters.h b/be/src/runtime/bufferpool/buffer-pool-counters.h
index 183742f..58257de 100644
--- a/be/src/runtime/bufferpool/buffer-pool-counters.h
+++ b/be/src/runtime/bufferpool/buffer-pool-counters.h
@@ -25,8 +25,14 @@ namespace impala {
 /// A set of counters for each buffer pool client.
 struct BufferPoolClientCounters {
  public:
-  /// Amount of time spent trying to get a buffer.
-  RuntimeProfile::Counter* get_buffer_time;
+  /// Total amount of time spent inside BufferAllocator::AllocateBuffer().
+  RuntimeProfile::Counter* alloc_time;
+
+  /// Number of buffers allocated via BufferAllocator::AllocateBuffer().
+  RuntimeProfile::Counter* num_allocations;
+
+  /// Bytes of buffers allocated via BufferAllocator::AllocateBuffer().
+  RuntimeProfile::Counter* bytes_alloced;
 
   /// Amount of time spent waiting for reads from disk to complete.
   RuntimeProfile::Counter* read_wait_time;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index f0dfa09..3428087 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -25,7 +25,8 @@
 /// =============
 /// The lock acquisition order is:
 /// 1. Client::lock_
-/// 2. BufferPool::clean_pages_lock_
+/// 2. FreeBufferArena::lock_. If multiple arena locks are acquired, must be acquired in
+///    ascending order.
 /// 3. Page::lock
 ///
 /// If a reference to a Page is acquired through a page list, the Page* reference only
@@ -44,7 +45,7 @@
 ///     a dirty unpinned page. The page is in Client::write_in_flight_pages_. For
 ///     accounting purposes this is considered a dirty page.
 /// * Unpinned - Clean: When the write has completed but the page was not evicted. The
-///     page is in BufferPool::clean_pages_.
+///     page is in a clean pages list in a BufferAllocator arena.
 /// * Unpinned - Evicted: After a clean page's buffer has been reclaimed. The page is
 ///     not in any list.
 ///

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index c71ab60..42344f6 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -27,13 +27,16 @@
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
 #include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-allocator.h"
 #include "runtime/bufferpool/buffer-pool-internal.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/test-env.h"
 #include "service/fe-support.h"
+#include "testutil/cpu-util.h"
 #include "testutil/death-test-util.h"
 #include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
 #include "util/metrics.h"
 
 #include "common/names.h"
@@ -47,6 +50,7 @@ class BufferPoolTest : public ::testing::Test {
   virtual void SetUp() {
     test_env_ = obj_pool_.Add(new TestEnv);
     ASSERT_OK(test_env_->Init());
+    RandTestUtil::SeedRng("BUFFER_POOL_TEST_SEED", &rng_);
   }
 
   virtual void TearDown() {
@@ -59,6 +63,7 @@ class BufferPoolTest : public ::testing::Test {
     }
     global_reservations_.Close();
     obj_pool_.Clear();
+    CpuTestUtil::ResetAffinity(); // Some tests modify affinity.
   }
 
   /// The minimum buffer size used in most tests.
@@ -105,11 +110,69 @@ class BufferPoolTest : public ::testing::Test {
     return !page->page_->buffer.is_open();
   }
 
+  /// Allocate buffers of varying sizes at most 'max_buffer_size' that add up to
+  /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size.
+  /// If 'randomize_core' is true, will switch thread between cores randomly before
+  /// each allocation.
+  void AllocateBuffers(BufferPool* pool, BufferPool::ClientHandle* client,
+      int64_t max_buffer_size, int64_t total_bytes,
+      vector<BufferPool::BufferHandle>* buffers, bool randomize_core = false) {
+    int64_t curr_buffer_size = max_buffer_size;
+    int64_t bytes_remaining = total_bytes;
+    while (bytes_remaining > 0) {
+      while (curr_buffer_size > client->GetUnusedReservation()) curr_buffer_size /= 2;
+      if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_);
+      buffers->emplace_back();
+      ASSERT_OK(pool->AllocateBuffer(client, curr_buffer_size, &buffers->back()));
+      bytes_remaining -= curr_buffer_size;
+    }
+  }
+
+  /// Create pages of varying sizes at most 'max_page_size' that add up to
+  /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size.
+  /// If 'randomize_core' is true, will switch thread between cores randomly before
+  /// each allocation.
+  void CreatePages(BufferPool* pool, BufferPool::ClientHandle* client,
+      int64_t max_page_size, int64_t total_bytes, vector<BufferPool::PageHandle>* pages,
+      bool randomize_core = false) {
+    int64_t curr_page_size = max_page_size;
+    int64_t bytes_remaining = total_bytes;
+    while (bytes_remaining > 0) {
+      while (curr_page_size > client->GetUnusedReservation()) curr_page_size /= 2;
+      pages->emplace_back();
+      if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_);
+      ASSERT_OK(pool->CreatePage(client, curr_page_size, &pages->back()));
+      bytes_remaining -= curr_page_size;
+    }
+  }
+
+  /// Free all the 'buffers' and clear the vector.
+  /// If 'randomize_core' is true, will switch thread between cores randomly before
+  /// each free.
+  void FreeBuffers(BufferPool* pool, BufferPool::ClientHandle* client,
+      vector<BufferPool::BufferHandle>* buffers, bool randomize_core = false) {
+    for (auto& buffer : *buffers) {
+      if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_);
+      pool->FreeBuffer(client, &buffer);
+    }
+    buffers->clear();
+  }
+
+  /// Set the maximum number of scavenge attempts that the pool's allocator wil do.
+  void SetMaxScavengeAttempts(BufferPool* pool, int max_attempts) {
+    pool->allocator()->set_max_scavenge_attempts(max_attempts);
+  }
+
+  void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core);
+
   ObjectPool obj_pool_;
   ReservationTracker global_reservations_;
 
   TestEnv* test_env_; // Owned by 'obj_pool_'.
 
+  /// Per-test random number generator. Seeded before every test.
+  std::mt19937 rng_;
+
   // The file groups created - closed at end of each test.
   vector<TmpFileMgr::FileGroup*> file_groups_;
 
@@ -668,6 +731,103 @@ TEST_F(BufferPoolTest, MultiplyPinnedPageAccounting) {
   pool.FreeBuffer(&client, &buffer);
   pool.DeregisterClient(&client);
 }
+
+// Constants for TestMemoryReclamation().
+const int MEM_RECLAMATION_NUM_CLIENTS = 2;
+// Choose a non-power-of two so that AllocateBuffers() will allocate a mix of sizes:
+// 32 + 32 + 32 + 8 + 4 + 2 + 1
+const int64_t MEM_RECLAMATION_BUFFERS_PER_CLIENT = 127;
+const int64_t MEM_RECLAMATION_CLIENT_RESERVATION =
+    BufferPoolTest::TEST_BUFFER_LEN * MEM_RECLAMATION_BUFFERS_PER_CLIENT;
+const int64_t MEM_RECLAMATION_TOTAL_BYTES =
+    MEM_RECLAMATION_NUM_CLIENTS * MEM_RECLAMATION_CLIENT_RESERVATION;
+
+// Test that we can reclaim buffers and pages from the same arena and from other arenas.
+TEST_F(BufferPoolTest, MemoryReclamation) {
+  global_reservations_.InitRootTracker(NULL, MEM_RECLAMATION_TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, MEM_RECLAMATION_TOTAL_BYTES);
+  // Assume that all cores are online. Test various combinations of cores to validate
+  // that it can reclaim from any other other core.
+  for (int src = 0; src < CpuInfo::num_cores(); ++src) {
+    // Limit the max scavenge attempts to force use of the "locked" scavenging sometimes,
+    // which would otherwise only be triggered by racing threads.
+    SetMaxScavengeAttempts(&pool, 1 + src % 3);
+    for (int j = 0; j < 4; ++j) {
+      int dst = (src + j) % CpuInfo::num_cores();
+      TestMemoryReclamation(&pool, src, dst);
+    }
+    // Test with one fixed and the other randomly changing
+    TestMemoryReclamation(&pool, src, -1);
+    TestMemoryReclamation(&pool, -1, src);
+  }
+  // Test with both src and dst randomly changing.
+  TestMemoryReclamation(&pool, -1, -1);
+  global_reservations_.Close();
+}
+
+// Test that we can reclaim buffers and pages from the same arena or a different arena.
+// Allocates then frees memory on 'src_core' then allocates on 'dst_core' to force
+// reclamation of memory from src_core's free buffer lists and clean page lists.
+// If 'src_core' or 'dst_core' is -1, randomly switch between cores instead of sticking
+// to a fixed core.
+void BufferPoolTest::TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core) {
+  LOG(INFO) << "TestMemoryReclamation " << src_core << " -> " << dst_core;
+  const bool rand_src_core = src_core == -1;
+  const bool rand_dst_core = dst_core == -1;
+
+  BufferPool::ClientHandle clients[MEM_RECLAMATION_NUM_CLIENTS];
+  for (int i = 0; i < MEM_RECLAMATION_NUM_CLIENTS; ++i) {
+    ASSERT_OK(pool->RegisterClient(Substitute("test client $0", i), NewFileGroup(),
+        &global_reservations_, NULL, MEM_RECLAMATION_CLIENT_RESERVATION, NewProfile(),
+        &clients[i]));
+    ASSERT_TRUE(clients[i].IncreaseReservation(MEM_RECLAMATION_CLIENT_RESERVATION));
+  }
+
+  // Allocate and free the whole pool's buffers on src_core to populate its free lists.
+  if (!rand_src_core) CpuTestUtil::PinToCore(src_core);
+  vector<BufferPool::BufferHandle> client_buffers[MEM_RECLAMATION_NUM_CLIENTS];
+  AllocateBuffers(pool, &clients[0], 32 * TEST_BUFFER_LEN,
+      MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_src_core);
+  AllocateBuffers(pool, &clients[1], 32 * TEST_BUFFER_LEN,
+      MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[1], rand_src_core);
+  FreeBuffers(pool, &clients[0], &client_buffers[0], rand_src_core);
+  FreeBuffers(pool, &clients[1], &client_buffers[1], rand_src_core);
+
+  // Allocate buffers again on dst_core. Make sure the size is bigger, smaller, and the
+  // same size as buffers we allocated earlier to we exercise different code paths.
+  if (!rand_dst_core) CpuTestUtil::PinToCore(dst_core);
+  AllocateBuffers(pool, &clients[0], 4 * TEST_BUFFER_LEN,
+      MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_dst_core);
+  FreeBuffers(pool, &clients[0], &client_buffers[0], rand_dst_core);
+
+  // Allocate and unpin the whole pool's buffers as clean pages on src_core to populate
+  // its clean page lists.
+  if (!rand_src_core) CpuTestUtil::PinToCore(src_core);
+  vector<BufferPool::PageHandle> client_pages[MEM_RECLAMATION_NUM_CLIENTS];
+  CreatePages(pool, &clients[0], 32 * TEST_BUFFER_LEN, MEM_RECLAMATION_CLIENT_RESERVATION,
+      &client_pages[0], rand_src_core);
+  CreatePages(pool, &clients[1], 32 * TEST_BUFFER_LEN, MEM_RECLAMATION_CLIENT_RESERVATION,
+      &client_pages[1], rand_src_core);
+  for (auto& page : client_pages[0]) pool->Unpin(&clients[0], &page);
+  for (auto& page : client_pages[1]) pool->Unpin(&clients[1], &page);
+
+  // Allocate the buffers again to force reclamation of the buffers from the clean pages.
+  if (!rand_dst_core) CpuTestUtil::PinToCore(dst_core);
+  AllocateBuffers(pool, &clients[0], 4 * TEST_BUFFER_LEN,
+      MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_dst_core);
+  FreeBuffers(pool, &clients[0], &client_buffers[0]);
+
+  // Just for good measure, pin the pages again then destroy them.
+  for (auto& page : client_pages[0]) {
+    ASSERT_OK(pool->Pin(&clients[0], &page));
+    pool->DestroyPage(&clients[0], &page);
+  }
+  for (auto& page : client_pages[1]) {
+    ASSERT_OK(pool->Pin(&clients[1], &page));
+    pool->DestroyPage(&clients[1], &page);
+  }
+  for (BufferPool::ClientHandle& client : clients) pool->DeregisterClient(&client);
+}
 }
 
 int main(int argc, char** argv) {
@@ -677,11 +837,16 @@ int main(int argc, char** argv) {
   impala::LlvmCodeGen::InitializeLlvm();
   int result = 0;
   for (bool encryption : {false, true}) {
-    FLAGS_disk_spill_encryption = encryption;
-    std::cerr << "+==================================================" << std::endl
-              << "| Running tests with encryption=" << encryption << std::endl
-              << "+==================================================" << std::endl;
-    if (RUN_ALL_TESTS() != 0) result = 1;
+    for (bool numa : {false, true}) {
+      if (!numa && encryption) continue; // Not an interesting combination.
+      impala::CpuTestUtil::SetupFakeNuma(numa);
+      FLAGS_disk_spill_encryption = encryption;
+      std::cerr << "+==================================================" << std::endl
+                << "| Running tests with encryption=" << encryption << " numa=" << numa
+                << std::endl
+                << "+==================================================" << std::endl;
+      if (RUN_ALL_TESTS() != 0) result = 1;
+    }
   }
   return result;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c162df3/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 482db10..b4b2420 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -22,9 +22,11 @@
 #include <boost/bind.hpp>
 
 #include "common/names.h"
+#include "gutil/bits.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/bufferpool/buffer-allocator.h"
 #include "util/bit-util.h"
+#include "util/cpu-info.h"
 #include "util/runtime-profile-counters.h"
 #include "util/uid-util.h"
 
@@ -35,10 +37,16 @@ DEFINE_int32(concurrent_scratch_ios_per_device, 2,
 
 namespace impala {
 
-void BufferPool::BufferHandle::Open(uint8_t* data, int64_t len) {
+constexpr int BufferPool::LOG_MAX_BUFFER_BYTES;
+constexpr int64_t BufferPool::MAX_BUFFER_BYTES;
+
+void BufferPool::BufferHandle::Open(uint8_t* data, int64_t len, int home_core) {
+  DCHECK_LE(0, home_core);
+  DCHECK_LT(home_core, CpuInfo::GetMaxNumCores());
   client_ = nullptr;
   data_ = data;
   len_ = len;
+  home_core_ = home_core;
 }
 
 BufferPool::PageHandle::PageHandle() {
@@ -90,17 +98,13 @@ const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const {
 }
 
 BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
-  : allocator_(new BufferAllocator(min_buffer_len)),
-    min_buffer_len_(min_buffer_len),
-    buffer_bytes_limit_(buffer_bytes_limit),
-    buffer_bytes_remaining_(buffer_bytes_limit) {
+  : allocator_(new BufferAllocator(this, min_buffer_len, buffer_bytes_limit)),
+    min_buffer_len_(min_buffer_len) {
   DCHECK_GT(min_buffer_len, 0);
   DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
 }
 
-BufferPool::~BufferPool() {
-  DCHECK_EQ(0, clean_pages_.size());
-}
+BufferPool::~BufferPool() {}
 
 Status BufferPool::RegisterClient(const string& name, TmpFileMgr::FileGroup* file_group,
     ReservationTracker* parent_reservation, MemTracker* mem_tracker,
@@ -196,7 +200,7 @@ void BufferPool::ExtractBuffer(
 Status BufferPool::AllocateBuffer(
     ClientHandle* client, int64_t len, BufferHandle* handle) {
   RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len));
-  Status status = AllocateBufferInternal(client, len, handle);
+  Status status = allocator_->Allocate(client, len, handle);
   if (!status.ok()) {
     // Allocation failed - update client's accounting to reflect the failure.
     client->impl_->FreedBuffer(len);
@@ -204,44 +208,12 @@ Status BufferPool::AllocateBuffer(
   return status;
 }
 
-Status BufferPool::AllocateBufferInternal(
-    ClientHandle* client, int64_t len, BufferHandle* buffer) {
-  DCHECK(!buffer->is_open());
-  DCHECK_GE(len, min_buffer_len_);
-  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
-  SCOPED_TIMER(client->impl_->counters().get_buffer_time);
-
-  // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a new buffer.
-  int64_t delta = DecreaseBufferBytesRemaining(len);
-  if (delta < len) {
-    // We must evict some pages to free memory before allocating.
-    int64_t to_evict = len - delta;
-    RETURN_IF_ERROR(EvictCleanPages(to_evict));
-  }
-  Status status = allocator_->Allocate(len, buffer);
-  if (!status.ok()) {
-    buffer_bytes_remaining_.Add(len);
-    return status;
-  }
-  DCHECK(buffer->is_open());
-  buffer->client_ = client;
-  return Status::OK();
-}
-
 void BufferPool::FreeBuffer(ClientHandle* client, BufferHandle* handle) {
   if (!handle->is_open()) return; // Should be idempotent.
   DCHECK_EQ(client, handle->client_);
   int64_t len = handle->len_;
-  FreeBufferInternal(handle);
-  client->impl_->FreedBuffer(len);
-}
-
-void BufferPool::FreeBufferInternal(BufferHandle* handle) {
-  DCHECK(handle->is_open());
-  int64_t buffer_len = handle->len();
   allocator_->Free(move(*handle));
-  buffer_bytes_remaining_.Add(buffer_len);
-  handle->Reset();
+  client->impl_->FreedBuffer(len);
 }
 
 Status BufferPool::TransferBuffer(ClientHandle* src_client, BufferHandle* src,
@@ -259,63 +231,12 @@ Status BufferPool::TransferBuffer(ClientHandle* src_client, BufferHandle* src,
   return Status::OK();
 }
 
-int64_t BufferPool::DecreaseBufferBytesRemaining(int64_t max_decrease) {
-  // TODO: we may want to change this policy so that we don't always use up to the limit
-  // for buffers, since this may starve other operators using non-buffer-pool memory.
-  while (true) {
-    int64_t old_value = buffer_bytes_remaining_.Load();
-    int64_t decrease = min(old_value, max_decrease);
-    int64_t new_value = old_value - decrease;
-    if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
-      return decrease;
-    }
-  }
-}
-
-void BufferPool::AddCleanPage(const unique_lock<mutex>& client_lock, Page* page) {
-  page->client->DCheckHoldsLock(client_lock);
-  lock_guard<SpinLock> cpl(clean_pages_lock_);
-  clean_pages_.Enqueue(page);
+void BufferPool::Maintenance() {
+  allocator_->Maintenance();
 }
 
-bool BufferPool::RemoveCleanPage(const unique_lock<mutex>& client_lock, Page* page) {
-  page->client->DCheckHoldsLock(client_lock);
-  lock_guard<SpinLock> cpl(clean_pages_lock_);
-  return clean_pages_.Remove(page);
-}
-
-Status BufferPool::EvictCleanPages(int64_t bytes_to_evict) {
-  DCHECK_GE(bytes_to_evict, 0);
-  vector<BufferHandle> buffers;
-  int64_t bytes_found = 0;
-  {
-    lock_guard<SpinLock> cpl(clean_pages_lock_);
-    while (bytes_found < bytes_to_evict) {
-      Page* page = clean_pages_.Dequeue();
-      if (page == NULL) break;
-      lock_guard<SpinLock> pl(page->buffer_lock);
-      bytes_found += page->len;
-      buffers.emplace_back(move(page->buffer));
-    }
-  }
-
-  // Free buffers after releasing all the locks. Do this regardless of success to avoid
-  // leaking buffers.
-  for (BufferHandle& buffer : buffers) allocator_->Free(move(buffer));
-  if (bytes_found < bytes_to_evict) {
-    // The buffer pool should not be overcommitted so this should only happen if there
-    // is an accounting error. Add any freed buffers back to 'buffer_bytes_remaining_'
-    // to restore consistency.
-    buffer_bytes_remaining_.Add(bytes_found);
-    return Status(TErrorCode::INTERNAL_ERROR,
-        Substitute("Tried to evict $0 bytes but only $1 bytes of clean pages:\n$2",
-                      bytes_to_evict, bytes_found, DebugString()));
-  }
-  // Update 'buffer_bytes_remaining_' with any excess.
-  if (bytes_found > bytes_to_evict) {
-    buffer_bytes_remaining_.Add(bytes_found - bytes_to_evict);
-  }
-  return Status::OK();
+void BufferPool::ReleaseMemory(int64_t bytes_to_free) {
+  allocator_->ReleaseMemory(bytes_to_free);
 }
 
 bool BufferPool::ClientHandle::IncreaseReservation(int64_t bytes) {
@@ -348,7 +269,10 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
     buffers_allocated_bytes_(0) {
   reservation_.InitChildTracker(
       profile, parent_reservation, mem_tracker, reservation_limit);
-  counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime");
+  counters_.alloc_time = ADD_TIMER(profile, "BufferPoolAllocTime");
+  counters_.num_allocations = ADD_COUNTER(profile, "BufferPoolAllocations", TUnit::UNIT);
+  counters_.bytes_alloced =
+      ADD_COUNTER(profile, "BufferPoolAllocationBytes", TUnit::BYTES);
   counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadIoWaitTime");
   counters_.read_io_ops = ADD_COUNTER(profile, "BufferPoolReadIoOps", TUnit::UNIT);
   counters_.bytes_read = ADD_COUNTER(profile, "BufferPoolReadIoBytes", TUnit::BYTES);
@@ -389,7 +313,7 @@ void BufferPool::Client::DestroyPageInternal(
       // Let the write complete, if in flight.
       WaitForWrite(&cl, page);
       // If clean, remove it from the clean pages list. If evicted, this is a no-op.
-      pool_->RemoveCleanPage(cl, page);
+      pool_->allocator_->RemoveCleanPage(cl, out_buffer != nullptr, page);
     }
     DCHECK(!page->in_queue());
     --num_pages_;
@@ -404,7 +328,7 @@ void BufferPool::Client::DestroyPageInternal(
     *out_buffer = std::move(page->buffer);
     buffers_allocated_bytes_ += out_buffer->len();
   } else if (page->buffer.is_open()) {
-    pool_->FreeBufferInternal(&page->buffer);
+    pool_->allocator_->Free(move(page->buffer));
   }
   delete page;
   handle->Reset();
@@ -431,20 +355,6 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle
   // Propagate any write errors that occurred for this client.
   RETURN_IF_ERROR(write_status_);
 
-  // Check if the page is evicted first. This is not necessary for correctness, since
-  // we re-check this later, but by doing it upfront we avoid grabbing the global
-  // 'clean_pages_lock_' in the common case.
-  bool evicted;
-  {
-    lock_guard<SpinLock> pl(page->buffer_lock);
-    evicted = !page->buffer.is_open();
-  }
-  if (evicted) {
-    // We may need to clean some pages to allocate a buffer for the evicted page.
-    RETURN_IF_ERROR(CleanPages(&cl, page->len));
-    return MoveEvictedToPinned(&cl, client, handle);
-  }
-
   if (dirty_unpinned_pages_.Remove(page)) {
     // No writes were initiated for the page - just move it back to the pinned state.
     pinned_pages_.Enqueue(page);
@@ -460,19 +370,17 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle
   // At this point we need to either reclaim a clean page or allocate a new buffer.
   // We may need to clean some pages to do so.
   RETURN_IF_ERROR(CleanPages(&cl, page->len));
-  if (pool_->RemoveCleanPage(cl, page)) {
-    // The clean page still has an associated buffer. Just clean up the write, restore
-    // the data, and move the page back to the pinned state.
+  if (pool_->allocator_->RemoveCleanPage(cl, true, page)) {
+    // The clean page still has an associated buffer. Restore the data, and move the page
+    // back to the pinned state.
     pinned_pages_.Enqueue(page);
     DCHECK(page->buffer.is_open());
     DCHECK(page->write_handle != NULL);
     // Don't need on-disk data.
     cl.unlock(); // Don't block progress for other threads operating on other pages.
-    return file_group_->CancelWriteAndRestoreData(
-        move(page->write_handle), page->buffer.mem_range());
+    return file_group_->RestoreData(move(page->write_handle), page->buffer.mem_range());
   }
-  // If the page wasn't in the global clean pages list, it must have been evicted after
-  // the earlier 'evicted' check.
+  // If the page wasn't in the clean pages list, it must have been evicted.
   return MoveEvictedToPinned(&cl, client, handle);
 }
 
@@ -486,7 +394,7 @@ Status BufferPool::Client::MoveEvictedToPinned(
   // can modify evicted pages.
   client_lock->unlock();
   BufferHandle buffer;
-  RETURN_IF_ERROR(pool_->AllocateBufferInternal(client, page->len, &page->buffer));
+  RETURN_IF_ERROR(pool_->allocator_->Allocate(client, page->len, &page->buffer));
   COUNTER_ADD(counters().bytes_read, page->len);
   COUNTER_ADD(counters().read_io_ops, 1);
   {
@@ -604,7 +512,7 @@ void BufferPool::Client::WriteCompleteCallback(Page* page, const Status& write_s
     // Move to clean pages list even if an error was encountered - the buffer can be
     // repurposed by other clients and 'write_status_' must be checked by this client
     // before reading back the bad data.
-    pool_->AddCleanPage(cl, page);
+    pool_->allocator_->AddCleanPage(cl, page);
     WriteDirtyPagesAsync(); // Start another asynchronous write if needed.
 
     // Notify before releasing lock to avoid race with Page and Client destruction.
@@ -680,14 +588,8 @@ string BufferPool::BufferHandle::DebugString() const {
 
 string BufferPool::DebugString() {
   stringstream ss;
-  ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
-     << " buffer_bytes_limit: " << buffer_bytes_limit_
-     << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n"
-     << "  Clean pages: ";
-  {
-    lock_guard<SpinLock> cpl(clean_pages_lock_);
-    clean_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
-  }
+  ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_ << "\n"
+     << allocator_->DebugString();
   return ss.str();
 }
 }


[3/4] incubator-impala git commit: IMPALA-4114: Port BufferedBlockMgr tests to buffer pool

Posted by ta...@apache.org.
IMPALA-4114: Port BufferedBlockMgr tests to buffer pool

BufferedBlockMgr had a number of interesting backend tests
that are still relevant to BufferPool. This commit copies
them across and adapts them to BufferPool. This should bring
the backend test coverage for BufferPool up to par with
BufferedBlockMgr.

Many tests weren't ported because they are not relevant or would
duplicate other tests:
* GetNewBlock* -> covered by PageCreation/BufferAllocation
* Pin -> covered by Pin
* Deletion/DeleteSingleBlocks -> all BufferPool tests cover deletion
* Close -> BufferPool doesn't have "cancellation"
* TransferBufferDuringWrite -> the API being tested is not present. Some
   of the deletion tests are the closest analogue.
* WriteCompleteWithCancelledRuntimeState -> not relevant, BufferPool
  doesn't reference RuntimeState.
* MultipleClients* -> we have many tests for the (very different)
  reservation mechanism
* ClientOversubscription -> oversubscription is not supported
* CreateDestroyMulti -> we don't support creation/destruction of
  buffer pools like this
* AllocationErrorHandling -> redundant with WriteErrorBlacklist

Change-Id: Ifb0221e8bea6f3b23b62d5094634d97562295ea3
Reviewed-on: http://gerrit.cloudera.org:8080/6498
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cb900df4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cb900df4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cb900df4

Branch: refs/heads/master
Commit: cb900df435c643bc1046b35e15f68d0c4b983b06
Parents: 6c162df
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Mar 22 07:49:30 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Apr 18 06:33:44 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/bufferpool/buffer-allocator.h    |   5 +
 .../runtime/bufferpool/buffer-pool-internal.h   |   8 +
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 827 ++++++++++++++++++-
 be/src/runtime/bufferpool/buffer-pool.cc        |  12 +
 be/src/runtime/bufferpool/buffer-pool.h         |   1 +
 be/src/runtime/tmp-file-mgr.cc                  |   6 +-
 6 files changed, 847 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.h b/be/src/runtime/bufferpool/buffer-allocator.h
index fc79970..68efbc1 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.h
+++ b/be/src/runtime/bufferpool/buffer-allocator.h
@@ -122,6 +122,11 @@ class BufferPool::BufferAllocator {
   /// Try to release at least 'bytes_to_free' bytes of memory to the system allocator.
   void ReleaseMemory(int64_t bytes_to_free);
 
+  /// Return the amount of memory currently allocated from the system.
+  int64_t GetSystemBytesAllocated() const {
+    return system_bytes_limit_ - system_bytes_remaining_.Load();
+  }
+
   std::string DebugString();
 
  protected:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 3428087..619288b 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -237,6 +237,10 @@ class BufferPool::Client {
   /// not be held.
   void WaitForWrite(boost::unique_lock<boost::mutex>* client_lock, Page* page);
 
+  /// Test helper: wait for all in-flight writes to complete.
+  /// 'lock_' must not be held by the caller.
+  void WaitForAllWrites();
+
   /// Asserts that 'client_lock' is holding 'lock_'.
   void DCheckHoldsLock(const boost::unique_lock<boost::mutex>& client_lock) {
     DCHECK(client_lock.mutex() == &lock_ && client_lock.owns_lock());
@@ -245,6 +249,7 @@ class BufferPool::Client {
   ReservationTracker* reservation() { return &reservation_; }
   const BufferPoolClientCounters& counters() const { return counters_; }
   bool spilling_enabled() const { return file_group_ != NULL; }
+  void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; }
 
   std::string DebugString();
 
@@ -305,6 +310,9 @@ class BufferPool::Client {
   /// All non-NULL.
   BufferPoolClientCounters counters_;
 
+  /// Debug option to delay write completion.
+  int debug_write_delay_ms_;
+
   /// Lock to protect the below member variables;
   boost::mutex lock_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 42344f6..a40feac 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -20,11 +20,13 @@
 #include <string>
 #include <vector>
 #include <boost/bind.hpp>
+#include <boost/filesystem.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/thread.hpp>
 #include <boost/unordered_map.hpp>
 
 #include "codegen/llvm-codegen.h"
+#include "common/atomic.h"
 #include "common/init.h"
 #include "common/object-pool.h"
 #include "runtime/bufferpool/buffer-allocator.h"
@@ -37,18 +39,36 @@
 #include "testutil/death-test-util.h"
 #include "testutil/gtest-util.h"
 #include "testutil/rand-util.h"
+#include "util/filesystem-util.h"
 #include "util/metrics.h"
 
 #include "common/names.h"
 
+using boost::filesystem::directory_iterator;
+using std::mt19937;
+using std::uniform_int_distribution;
+using std::uniform_real_distribution;
+
 DECLARE_bool(disk_spill_encryption);
 
+// Note: This is the default scratch dir created by impala.
+// FLAGS_scratch_dirs + TmpFileMgr::TMP_SUB_DIR_NAME.
+const string SCRATCH_DIR = "/tmp/impala-scratch";
+
+// This suffix is appended to a tmp dir
+const string SCRATCH_SUFFIX = "/impala-scratch";
+
 namespace impala {
 
+using BufferHandle = BufferPool::BufferHandle;
+using ClientHandle = BufferPool::ClientHandle;
+using FileGroup = TmpFileMgr::FileGroup;
+using PageHandle = BufferPool::PageHandle;
+
 class BufferPoolTest : public ::testing::Test {
  public:
   virtual void SetUp() {
-    test_env_ = obj_pool_.Add(new TestEnv);
+    test_env_.reset(new TestEnv);
     ASSERT_OK(test_env_->Init());
     RandTestUtil::SeedRng("BUFFER_POOL_TEST_SEED", &rng_);
   }
@@ -63,6 +83,13 @@ class BufferPoolTest : public ::testing::Test {
     }
     global_reservations_.Close();
     obj_pool_.Clear();
+
+    // Tests modify permissions, so make sure we can delete if they didn't clean up.
+    for (string created_tmp_dir : created_tmp_dirs_) {
+      chmod((created_tmp_dir + SCRATCH_SUFFIX).c_str(), S_IRWXU);
+    }
+    FileSystemUtil::RemovePaths(created_tmp_dirs_);
+    created_tmp_dirs_.clear();
     CpuTestUtil::ResetAffinity(); // Some tests modify affinity.
   }
 
@@ -79,6 +106,24 @@ class BufferPoolTest : public ::testing::Test {
       ReservationTracker* parent_tracker, int num_ops);
 
  protected:
+  /// Reinitialize test_env_ to have multiple temporary directories.
+  vector<string> InitMultipleTmpDirs(int num_dirs) {
+    vector<string> tmp_dirs;
+    for (int i = 0; i < num_dirs; ++i) {
+      const string& dir = Substitute("/tmp/buffer-pool-test.$0", i);
+      // Fix permissions in case old directories were left from previous runs of test.
+      chmod((dir + SCRATCH_SUFFIX).c_str(), S_IRWXU);
+      EXPECT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir));
+      tmp_dirs.push_back(dir);
+      created_tmp_dirs_.push_back(dir);
+    }
+    test_env_.reset(new TestEnv);
+    test_env_->SetTmpFileMgrArgs(tmp_dirs, false);
+    EXPECT_OK(test_env_->Init());
+    EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
+    return tmp_dirs;
+  }
+
   static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; }
 
   /// Helper function to create one reservation tracker per query.
@@ -128,6 +173,14 @@ class BufferPoolTest : public ::testing::Test {
     }
   }
 
+  /// Do a temporary test allocation. Return the status of AllocateBuffer().
+  Status AllocateAndFree(BufferPool* pool, ClientHandle* client, int64_t len) {
+    BufferHandle tmp;
+    RETURN_IF_ERROR(pool->AllocateBuffer(client, len, &tmp));
+    pool->FreeBuffer(client, &tmp);
+    return Status::OK();
+  }
+
   /// Create pages of varying sizes at most 'max_page_size' that add up to
   /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size.
   /// If 'randomize_core' is true, will switch thread between cores randomly before
@@ -135,6 +188,7 @@ class BufferPoolTest : public ::testing::Test {
   void CreatePages(BufferPool* pool, BufferPool::ClientHandle* client,
       int64_t max_page_size, int64_t total_bytes, vector<BufferPool::PageHandle>* pages,
       bool randomize_core = false) {
+    ASSERT_GE(client->GetUnusedReservation(), total_bytes);
     int64_t curr_page_size = max_page_size;
     int64_t bytes_remaining = total_bytes;
     while (bytes_remaining > 0) {
@@ -158,26 +212,148 @@ class BufferPoolTest : public ::testing::Test {
     buffers->clear();
   }
 
+  Status PinAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages) {
+    for (auto& page : *pages) RETURN_IF_ERROR(pool->Pin(client, &page));
+    return Status::OK();
+  }
+
+  /// Unpin all of 'pages'. If 'delay_between_unpins_ms' > 0, sleep between unpins.
+  void UnpinAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages,
+      int delay_between_unpins_ms = 0) {
+    for (auto& page : *pages) {
+      pool->Unpin(client, &page);
+      if (delay_between_unpins_ms > 0) SleepForMs(delay_between_unpins_ms);
+    }
+  }
+
+  void DestroyAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages) {
+    for (auto& page : *pages) pool->DestroyPage(client, &page);
+  }
+
+  /// Write some deterministically-generated sentinel values to pages or buffers. The same
+  /// data is written each time for objects[i], based on start_num + i.
+  template <typename T>
+  void WriteData(const vector<T>& objects, int start_num) {
+    WriteOrVerifyData(objects, start_num, true);
+  }
+
+  template <typename T>
+  void WriteData(const T& object, int val) {
+    return WriteOrVerifyData(object, val, true);
+  }
+
+  /// Verify data written by WriteData().
+  template <typename T>
+  void VerifyData(const vector<T>& objects, int start_num) {
+    WriteOrVerifyData(objects, start_num, false);
+  }
+
+  template <typename T>
+  void VerifyData(const T& object, int val) {
+    return WriteOrVerifyData(object, val, false);
+  }
+
+  /// Implemention of WriteData() and VerifyData().
+  template <typename T>
+  void WriteOrVerifyData(const vector<T>& objects, int start_num, bool write) {
+    for (int i = 0; i < objects.size(); ++i) {
+      WriteOrVerifyData(objects[i], i + start_num, write);
+    }
+  }
+
+  template <typename T>
+  void WriteOrVerifyData(const T& object, int val, bool write) {
+    // Only write sentinel values to start and end of buffer to make writing and
+    // verification cheap.
+    MemRange mem = object.mem_range();
+    uint64_t* start_word = reinterpret_cast<uint64_t*>(mem.data());
+    uint64_t* end_word =
+        reinterpret_cast<uint64_t*>(&mem.data()[mem.len() - sizeof(uint64_t)]);
+    if (write) {
+      *start_word = val;
+      *end_word = ~val;
+    } else {
+      EXPECT_EQ(*start_word, val);
+      EXPECT_EQ(*end_word, ~val);
+    }
+  }
+
+  /// Return the total number of bytes allocated from the system currently.
+  int64_t SystemBytesAllocated(BufferPool* pool) {
+    return pool->allocator()->GetSystemBytesAllocated();
+  }
+
   /// Set the maximum number of scavenge attempts that the pool's allocator wil do.
   void SetMaxScavengeAttempts(BufferPool* pool, int max_attempts) {
     pool->allocator()->set_max_scavenge_attempts(max_attempts);
   }
 
+  void WaitForAllWrites(ClientHandle* client) { client->impl_->WaitForAllWrites(); }
+
+  // Remove write permissions on scratch files. Return # of scratch files.
+  static int RemoveScratchPerms() {
+    int num_files = 0;
+    directory_iterator dir_it(SCRATCH_DIR);
+    for (; dir_it != directory_iterator(); ++dir_it) {
+      ++num_files;
+      chmod(dir_it->path().c_str(), 0);
+    }
+    return num_files;
+  }
+
+  // Remove permissions for the temporary file at 'path' - all subsequent writes
+  // to the file should fail. Expects backing file has already been allocated.
+  static void DisableBackingFile(const string& path) {
+    EXPECT_GT(path.size(), 0);
+    EXPECT_EQ(0, chmod(path.c_str(), 0));
+    LOG(INFO) << "Injected fault by removing file permissions " << path;
+  }
+
+  // Return the path of the temporary file backing the page.
+  static string TmpFilePath(PageHandle* page) {
+    return page->page_->write_handle->TmpFilePath();
+  }
+  // Check that the file backing the page has dir as a prefix of its path.
+  static bool PageInDir(PageHandle* page, const string& dir) {
+    return TmpFilePath(page).find(dir) == 0;
+  }
+
+  // Find a page in the list that is backed by a file with the given directory as prefix
+  // of its path.
+  static PageHandle* FindPageInDir(vector<PageHandle>& pages, const string& dir) {
+    for (PageHandle& page : pages) {
+      if (PageInDir(&page, dir)) return &page;
+    }
+    return NULL;
+  }
+
+  /// Parameterised test implementations.
   void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core);
+  void TestEvictionPolicy(int64_t page_size);
+  void TestQueryTeardown(bool write_error);
+  void TestWriteError(int write_delay_ms);
+  void TestRandomInternalSingle(int64_t buffer_len, bool multiple_pins);
+  void TestRandomInternalMulti(int num_threads, int64_t buffer_len, bool multiple_pins);
+  static const int SINGLE_THREADED_TID = -1;
+  void TestRandomInternalImpl(BufferPool* pool, FileGroup* file_group,
+      MemTracker* parent_mem_tracker, mt19937* rng, int tid, bool multiple_pins);
 
   ObjectPool obj_pool_;
   ReservationTracker global_reservations_;
 
-  TestEnv* test_env_; // Owned by 'obj_pool_'.
+  boost::scoped_ptr<TestEnv> test_env_;
 
   /// Per-test random number generator. Seeded before every test.
-  std::mt19937 rng_;
+  mt19937 rng_;
 
-  // The file groups created - closed at end of each test.
+  /// The file groups created - closed at end of each test.
   vector<TmpFileMgr::FileGroup*> file_groups_;
 
-  // Map from query_id to the reservation tracker for that query. Reads and modifications
-  // of the map are protected by query_reservations_lock_.
+  /// Paths of temporary directories created during tests - deleted at end of test.
+  vector<string> created_tmp_dirs_;
+
+  /// Map from query_id to the reservation tracker for that query. Reads and modifications
+  /// of the map are protected by query_reservations_lock_.
   unordered_map<int64_t, ReservationTracker*> query_reservations_;
   SpinLock query_reservations_lock_;
 };
@@ -212,7 +388,7 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi
           initial_query_reservation / clients_per_query + j
           < initial_query_reservation % clients_per_query;
       // Reservation limit can be anything greater or equal to the initial reservation.
-      int64_t client_reservation_limit = initial_client_reservation + rand() % 100000;
+      int64_t client_reservation_limit = initial_client_reservation + rng_() % 100000;
       string name = Substitute("Client $0 for query $1", j, query_id);
       EXPECT_OK(pool->RegisterClient(name, NULL, query_reservation, NULL,
           client_reservation_limit, NewProfile(), &clients[i][j]));
@@ -828,6 +1004,643 @@ void BufferPoolTest::TestMemoryReclamation(BufferPool* pool, int src_core, int d
   }
   for (BufferPool::ClientHandle& client : clients) pool->DeregisterClient(&client);
 }
+
+// Test the eviction policy of the buffer pool. Writes are issued eagerly as pages
+// are unpinned, but pages are only evicted from memory when another buffer is
+// allocated.
+TEST_F(BufferPoolTest, EvictionPolicy) {
+  TestEvictionPolicy(TEST_BUFFER_LEN);
+  TestEvictionPolicy(2 * 1024 * 1024);
+}
+
+void BufferPoolTest::TestEvictionPolicy(int64_t page_size) {
+  // The eviction policy changes if there are multiple NUMA nodes, because buffers from
+  // clean pages on the local node are claimed in preference to free buffers on the
+  // non-local node. The rest of the test assumes that it executes on a single NUMA node.
+  if (CpuInfo::GetMaxNumNumaNodes() > 1) CpuTestUtil::PinToCore(0);
+  const int MAX_NUM_BUFFERS = 5;
+  int64_t total_mem = MAX_NUM_BUFFERS * page_size;
+  global_reservations_.InitRootTracker(NewProfile(), total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+
+  ClientHandle client;
+  RuntimeProfile* profile = NewProfile();
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, total_mem, profile, &client));
+  ASSERT_TRUE(client.IncreaseReservation(total_mem));
+
+  RuntimeProfile::Counter* bytes_alloced =
+      profile->GetCounter("BufferPoolAllocationBytes");
+  RuntimeProfile::Counter* write_ios = profile->GetCounter("BufferPoolWriteIoOps");
+  RuntimeProfile::Counter* read_ios = profile->GetCounter("BufferPoolReadIoOps");
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, page_size, total_mem, &pages);
+  WriteData(pages, 0);
+
+  // Unpin pages. Writes should be started and memory should not be deallocated.
+  EXPECT_EQ(total_mem, bytes_alloced->value());
+  EXPECT_EQ(total_mem, SystemBytesAllocated(&pool));
+  UnpinAll(&pool, &client, &pages);
+  ASSERT_GT(write_ios->value(), 0);
+
+  // Re-pin all the pages and validate their data. This should not require reading the
+  // pages back from disk.
+  ASSERT_OK(PinAll(&pool, &client, &pages));
+  ASSERT_EQ(0, read_ios->value());
+  VerifyData(pages, 0);
+
+  // Unpin all pages. Writes should be started again.
+  int64_t prev_write_ios = write_ios->value();
+  UnpinAll(&pool, &client, &pages);
+  ASSERT_GT(write_ios->value(), prev_write_ios);
+
+  // Allocate two more buffers. Two unpinned pages must be evicted to make room.
+  const int NUM_EXTRA_BUFFERS = 2;
+  vector<BufferHandle> extra_buffers;
+  AllocateBuffers(
+      &pool, &client, page_size, page_size * NUM_EXTRA_BUFFERS, &extra_buffers);
+  // At least two unpinned pages should have been written out.
+  ASSERT_GE(write_ios->value(), prev_write_ios + NUM_EXTRA_BUFFERS);
+  // No additional memory should have been allocated - it should have been recycled.
+  EXPECT_EQ(total_mem, SystemBytesAllocated(&pool));
+  // Check that two pages were evicted.
+  int num_evicted = 0;
+  for (PageHandle& page : pages) {
+    if (IsEvicted(&page)) ++num_evicted;
+  }
+  EXPECT_EQ(NUM_EXTRA_BUFFERS, num_evicted);
+
+  // Free up memory required to pin the original pages again.
+  FreeBuffers(&pool, &client, &extra_buffers);
+  ASSERT_OK(PinAll(&pool, &client, &pages));
+  // We only needed read to back the two evicted pages. Make sure we didn't do extra I/O.
+  ASSERT_EQ(NUM_EXTRA_BUFFERS, read_ios->value());
+  VerifyData(pages, 0);
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+  global_reservations_.Close();
+}
+
+/// Test that we can destroy pages while a disk write is in flight for those pages.
+TEST_F(BufferPoolTest, DestroyDuringWrite) {
+  const int TRIALS = 20;
+  const int MAX_NUM_BUFFERS = 5;
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  ClientHandle client;
+  for (int trial = 0; trial < TRIALS; ++trial) {
+    ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+        nullptr, TOTAL_MEM, NewProfile(), &client));
+    ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+    vector<PageHandle> pages;
+    CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+
+    // Unpin will initiate writes.
+    UnpinAll(&pool, &client, &pages);
+
+    // Writes should still be in flight when pages are deleted.
+    DestroyAll(&pool, &client, &pages);
+    pool.DeregisterClient(&client);
+  }
+}
+
+/// Test teardown of a query while writes are in flight. This was based on a
+/// BufferedBlockMgr regression test for IMPALA-2252 where tear-down of the
+/// query's RuntimeStates raced with scratch writes. If write_error is true,
+/// force writes to hit errors.
+void BufferPoolTest::TestQueryTeardown(bool write_error) {
+  const int64_t TOTAL_BUFFERS = 20;
+  const int CLIENT_BUFFERS = 10;
+  const int64_t TOTAL_MEM = TOTAL_BUFFERS * TEST_BUFFER_LEN;
+  const int64_t CLIENT_MEM = CLIENT_BUFFERS * TEST_BUFFER_LEN;
+
+  // Set up a BufferPool in the TestEnv.
+  test_env_.reset(new TestEnv());
+  test_env_->SetBufferPoolArgs(TEST_BUFFER_LEN, TOTAL_MEM);
+  ASSERT_OK(test_env_->Init());
+
+  BufferPool* pool = test_env_->exec_env()->buffer_pool();
+  RuntimeState* state;
+  ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &state));
+
+  ClientHandle client;
+  ASSERT_OK(pool->RegisterClient("test client", state->query_state()->file_group(),
+      state->instance_buffer_reservation(),
+      obj_pool_.Add(new MemTracker(-1, "", state->instance_mem_tracker())), CLIENT_MEM,
+      NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(CLIENT_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(pool, &client, TEST_BUFFER_LEN, CLIENT_BUFFERS, &pages);
+
+  if (write_error) {
+    UnpinAll(pool, &client, &pages);
+    // Allocate more buffers to create memory pressure and force eviction of all the
+    // unpinned pages.
+    vector<BufferHandle> tmp_buffers;
+    AllocateBuffers(pool, &client, TEST_BUFFER_LEN, CLIENT_BUFFERS, &tmp_buffers);
+    string tmp_file_path = TmpFilePath(&pages[0]);
+    FreeBuffers(pool, &client, &tmp_buffers);
+
+    PinAll(pool, &client, &pages);
+    // Remove temporary file to force future writes to that file to fail.
+    DisableBackingFile(tmp_file_path);
+  }
+
+  // Unpin will initiate writes. If we triggered a write error earlier, some writes may
+  // go down the error path.
+  UnpinAll(pool, &client, &pages);
+
+  // Tear down the pages, client, and query in the correct order while writes are in
+  // flight.
+  DestroyAll(pool, &client, &pages);
+  pool->DeregisterClient(&client);
+  test_env_->TearDownQueries();
+
+  // All memory should be released from the query.
+  EXPECT_EQ(0, test_env_->TotalQueryMemoryConsumption());
+  EXPECT_EQ(0, test_env_->exec_env()->buffer_reservation()->GetChildReservations());
+}
+
+TEST_F(BufferPoolTest, QueryTeardown) {
+  TestQueryTeardown(false);
+}
+
+TEST_F(BufferPoolTest, QueryTeardownWriteError) {
+  TestQueryTeardown(true);
+}
+
+// Test that the buffer pool handles a write error correctly.  Delete the scratch
+// directory before an operation that would cause a write and test that subsequent API
+// calls return errors as expected.
+void BufferPoolTest::TestWriteError(int write_delay_ms) {
+  int MAX_NUM_BUFFERS = 2;
+  int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+  client.impl_->set_debug_write_delay_ms(write_delay_ms);
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, MAX_NUM_BUFFERS, &pages);
+  // Unpin two pages here, to ensure that backing storage is allocated in tmp file.
+  UnpinAll(&pool, &client, &pages);
+  WaitForAllWrites(&client);
+  // Repin the pages
+  PinAll(&pool, &client, &pages);
+  // Remove the backing storage so that future writes will fail
+  ASSERT_GT(RemoveScratchPerms(), 0);
+  // Give the first write a chance to fail before the second write starts.
+  const int INTERVAL_MS = 10;
+  UnpinAll(&pool, &client, &pages, INTERVAL_MS);
+  WaitForAllWrites(&client);
+
+  // Subsequent calls to APIs that require allocating memory should fail: the write error
+  // is picked up asynchronously.
+  BufferHandle tmp_buffer;
+  PageHandle tmp_page;
+  Status error = pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &tmp_buffer);
+  EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
+  EXPECT_FALSE(tmp_buffer.is_open());
+  error = pool.CreatePage(&client, TEST_BUFFER_LEN, &tmp_page);
+  EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
+  EXPECT_FALSE(tmp_page.is_open());
+  error = pool.Pin(&client, &pages[0]);
+  EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
+  EXPECT_FALSE(pages[0].is_pinned());
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+  global_reservations_.Close();
+}
+
+TEST_F(BufferPoolTest, WriteError) {
+  TestWriteError(0);
+}
+
+// Regression test for IMPALA-4842 - inject a delay in the write to
+// reproduce the issue.
+TEST_F(BufferPoolTest, WriteErrorWriteDelay) {
+  TestWriteError(100);
+}
+
+// Test error handling when temporary file space cannot be allocated to back an unpinned
+// page.
+TEST_F(BufferPoolTest, TmpFileAllocateError) {
+  const int MAX_NUM_BUFFERS = 2;
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+  // Unpin a page, which will trigger a write.
+  pool.Unpin(&client, &pages[0]);
+  WaitForAllWrites(&client);
+  // Remove temporary files - subsequent operations will fail.
+  ASSERT_GT(RemoveScratchPerms(), 0);
+  // The write error will happen asynchronously.
+  pool.Unpin(&client, &pages[1]);
+
+  // Write failure causes future operations like Pin() to fail.
+  WaitForAllWrites(&client);
+  Status error = pool.Pin(&client, &pages[0]);
+  EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
+  EXPECT_FALSE(pages[0].is_pinned());
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+}
+
+// Test that scratch devices are blacklisted after a write error. The query that
+// encountered the write error should not allocate more pages on that device, but
+// existing pages on the device will remain in use and future queries will use the device.
+TEST_F(BufferPoolTest, WriteErrorBlacklist) {
+  // Set up two file groups with two temporary dirs.
+  vector<string> tmp_dirs = InitMultipleTmpDirs(2);
+  // Simulate two concurrent queries.
+  const int TOTAL_QUERIES = 3;
+  const int INITIAL_QUERIES = 2;
+  const int MAX_NUM_PAGES = 6;
+  const int PAGES_PER_QUERY = MAX_NUM_PAGES / TOTAL_QUERIES;
+  const int64_t TOTAL_MEM = MAX_NUM_PAGES * TEST_BUFFER_LEN;
+  const int64_t MEM_PER_QUERY = PAGES_PER_QUERY * TEST_BUFFER_LEN;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  vector<FileGroup*> file_groups;
+  vector<ClientHandle> clients(TOTAL_QUERIES);
+  for (int i = 0; i < INITIAL_QUERIES; ++i) {
+    file_groups.push_back(NewFileGroup());
+    ASSERT_OK(pool.RegisterClient("test client", file_groups[i], &global_reservations_,
+        nullptr, MEM_PER_QUERY, NewProfile(), &clients[i]));
+    ASSERT_TRUE(clients[i].IncreaseReservation(MEM_PER_QUERY));
+  }
+
+  // Allocate files for all 2x2 combinations by unpinning pages.
+  vector<vector<PageHandle>> pages(TOTAL_QUERIES);
+  for (int i = 0; i < INITIAL_QUERIES; ++i) {
+    CreatePages(&pool, &clients[i], TEST_BUFFER_LEN, MEM_PER_QUERY, &pages[i]);
+    WriteData(pages[i], 0);
+    UnpinAll(&pool, &clients[i], &pages[i]);
+    for (int j = 0; j < PAGES_PER_QUERY; ++j) {
+      LOG(INFO) << "Manager " << i << " Block " << j << " backed by file "
+                << TmpFilePath(&pages[i][j]);
+    }
+  }
+  for (int i = 0; i < INITIAL_QUERIES; ++i) WaitForAllWrites(&clients[i]);
+  const int ERROR_QUERY = 0;
+  const int NO_ERROR_QUERY = 1;
+  const string& error_dir = tmp_dirs[0];
+  const string& good_dir = tmp_dirs[1];
+  // Delete one file from first scratch dir for first query to trigger an error.
+  PageHandle* error_page = FindPageInDir(pages[ERROR_QUERY], error_dir);
+  ASSERT_TRUE(error_page != NULL) << "Expected a tmp file in dir " << error_dir;
+  const string& error_file_path = TmpFilePath(error_page);
+  for (int i = 0; i < INITIAL_QUERIES; ++i) PinAll(&pool, &clients[i], &pages[i]);
+  DisableBackingFile(error_file_path);
+  for (int i = 0; i < INITIAL_QUERIES; ++i) UnpinAll(&pool, &clients[i], &pages[i]);
+
+  // At least one write should hit an error, but it should be recoverable.
+  for (int i = 0; i < INITIAL_QUERIES; ++i) WaitForAllWrites(&clients[i]);
+
+  // Both clients should still be usable - test the API.
+  for (int i = 0; i < INITIAL_QUERIES; ++i) {
+    PinAll(&pool, &clients[i], &pages[i]);
+    VerifyData(pages[i], 0);
+    UnpinAll(&pool, &clients[i], &pages[i]);
+    ASSERT_OK(AllocateAndFree(&pool, &clients[i], TEST_BUFFER_LEN));
+  }
+
+  // Temporary device with error should still be active.
+  vector<TmpFileMgr::DeviceId> active_tmp_devices =
+      test_env_->tmp_file_mgr()->ActiveTmpDevices();
+  ASSERT_EQ(tmp_dirs.size(), active_tmp_devices.size());
+  for (int i = 0; i < active_tmp_devices.size(); ++i) {
+    const string& device_path =
+        test_env_->tmp_file_mgr()->GetTmpDirPath(active_tmp_devices[i]);
+    ASSERT_EQ(string::npos, error_dir.find(device_path));
+  }
+
+  // The query that hit the error should only allocate from the device that had no error.
+  // The other one should continue using both devices, since it didn't encounter a write
+  // error itself.
+  vector<PageHandle> error_new_pages;
+  CreatePages(
+      &pool, &clients[ERROR_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY, &error_new_pages);
+  UnpinAll(&pool, &clients[ERROR_QUERY], &error_new_pages);
+  WaitForAllWrites(&clients[ERROR_QUERY]);
+  EXPECT_TRUE(FindPageInDir(error_new_pages, good_dir) != NULL);
+  EXPECT_TRUE(FindPageInDir(error_new_pages, error_dir) == NULL);
+  for (PageHandle& error_new_page : error_new_pages) {
+    LOG(INFO) << "Newly created page backed by file " << TmpFilePath(&error_new_page);
+    EXPECT_TRUE(PageInDir(&error_new_page, good_dir));
+  }
+  DestroyAll(&pool, &clients[ERROR_QUERY], &error_new_pages);
+
+  PinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]);
+  UnpinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]);
+  WaitForAllWrites(&clients[NO_ERROR_QUERY]);
+  EXPECT_TRUE(FindPageInDir(pages[NO_ERROR_QUERY], good_dir) != NULL);
+  EXPECT_TRUE(FindPageInDir(pages[NO_ERROR_QUERY], error_dir) != NULL);
+
+  // The second client should use the bad directory for new pages since
+  // blacklisting is per-query, not global.
+  vector<PageHandle> no_error_new_pages;
+  CreatePages(&pool, &clients[NO_ERROR_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY,
+      &no_error_new_pages);
+  UnpinAll(&pool, &clients[NO_ERROR_QUERY], &no_error_new_pages);
+  WaitForAllWrites(&clients[NO_ERROR_QUERY]);
+  EXPECT_TRUE(FindPageInDir(no_error_new_pages, good_dir) != NULL);
+  EXPECT_TRUE(FindPageInDir(no_error_new_pages, error_dir) != NULL);
+  DestroyAll(&pool, &clients[NO_ERROR_QUERY], &no_error_new_pages);
+
+  // A new query should use the both dirs for backing storage.
+  const int NEW_QUERY = 2;
+  ASSERT_OK(pool.RegisterClient("new test client", NewFileGroup(), &global_reservations_,
+      nullptr, MEM_PER_QUERY, NewProfile(), &clients[NEW_QUERY]));
+  ASSERT_TRUE(clients[NEW_QUERY].IncreaseReservation(MEM_PER_QUERY));
+  CreatePages(
+      &pool, &clients[NEW_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY, &pages[NEW_QUERY]);
+  UnpinAll(&pool, &clients[NEW_QUERY], &pages[NEW_QUERY]);
+  WaitForAllWrites(&clients[NEW_QUERY]);
+  EXPECT_TRUE(FindPageInDir(pages[NEW_QUERY], good_dir) != NULL);
+  EXPECT_TRUE(FindPageInDir(pages[NEW_QUERY], error_dir) != NULL);
+
+  for (int i = 0; i < TOTAL_QUERIES; ++i) {
+    DestroyAll(&pool, &clients[i], &pages[i]);
+    pool.DeregisterClient(&clients[i]);
+  }
+}
+
+/// Test that the buffer pool fails cleanly when all scratch directories are inaccessible
+/// at runtime.
+TEST_F(BufferPoolTest, NoDirsAllocationError) {
+  vector<string> tmp_dirs = InitMultipleTmpDirs(2);
+  int MAX_NUM_BUFFERS = 2;
+  int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+  for (int i = 0; i < tmp_dirs.size(); ++i) {
+    const string& tmp_scratch_subdir = tmp_dirs[i] + SCRATCH_SUFFIX;
+    chmod(tmp_scratch_subdir.c_str(), 0);
+  }
+
+  // The error will happen asynchronously.
+  UnpinAll(&pool, &client, &pages);
+  WaitForAllWrites(&client);
+
+  // Write failure should results in an error getting propagated back to Pin().
+  for (PageHandle& page : pages) {
+    Status status = pool.Pin(&client, &page);
+    EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, status.code());
+  }
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+}
+
+// Test that the buffer pool can still create pages when no scratch is present.
+TEST_F(BufferPoolTest, NoTmpDirs) {
+  InitMultipleTmpDirs(0);
+  const int MAX_NUM_BUFFERS = 3;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+
+  // Unpinning is allowed by the BufferPool interface but we won't start any writes to
+  // disk because the flushing heuristic does not eagerly start writes when there are no
+  // active scratch devices.
+  UnpinAll(&pool, &client, &pages);
+  WaitForAllWrites(&client);
+  ASSERT_OK(pool.Pin(&client, &pages[0]));
+
+  // Allocating another buffer will force a write, which will fail.
+  BufferHandle tmp_buffer;
+  Status status = pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &tmp_buffer);
+  ASSERT_FALSE(status.ok());
+  ASSERT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, status.code()) << status.msg().msg();
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+}
+
+// Test that the buffer pool can still create pages when spilling is disabled by
+// setting scratch_limit = 0.
+TEST_F(BufferPoolTest, ScratchLimitZero) {
+  const int QUERY_BUFFERS = 3;
+  const int64_t TOTAL_MEM = 100 * TEST_BUFFER_LEN;
+  const int64_t QUERY_MEM = QUERY_BUFFERS * TEST_BUFFER_LEN;
+
+  // Set up a query state with the scratch_limit option in the TestEnv.
+  test_env_.reset(new TestEnv());
+  test_env_->SetBufferPoolArgs(TEST_BUFFER_LEN, TOTAL_MEM);
+  ASSERT_OK(test_env_->Init());
+
+  BufferPool* pool = test_env_->exec_env()->buffer_pool();
+  RuntimeState* state;
+  TQueryOptions query_options;
+  query_options.scratch_limit = 0;
+  ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &state));
+
+  ClientHandle client;
+  ASSERT_OK(pool->RegisterClient("test client", state->query_state()->file_group(),
+      state->instance_buffer_reservation(),
+      obj_pool_.Add(new MemTracker(-1, "", state->instance_mem_tracker())), QUERY_MEM,
+      NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(QUERY_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(pool, &client, TEST_BUFFER_LEN, QUERY_MEM, &pages);
+
+  // Spilling is disabled by the QueryState when scratch_limit is 0, so trying to unpin
+  // will cause a DCHECK.
+  IMPALA_ASSERT_DEBUG_DEATH(pool->Unpin(&client, &pages[0]), "");
+
+  DestroyAll(pool, &client, &pages);
+  pool->DeregisterClient(&client);
+}
+
+TEST_F(BufferPoolTest, SingleRandom) {
+  TestRandomInternalSingle(8 * 1024, true);
+  TestRandomInternalSingle(8 * 1024, false);
+}
+
+TEST_F(BufferPoolTest, Multi2Random) {
+  TestRandomInternalMulti(2, 8 * 1024, true);
+  TestRandomInternalMulti(2, 8 * 1024, false);
+}
+
+TEST_F(BufferPoolTest, Multi4Random) {
+  TestRandomInternalMulti(4, 8 * 1024, true);
+  TestRandomInternalMulti(4, 8 * 1024, false);
+}
+
+TEST_F(BufferPoolTest, Multi8Random) {
+  TestRandomInternalMulti(8, 8 * 1024, true);
+  TestRandomInternalMulti(8, 8 * 1024, false);
+}
+
+// Single-threaded execution of the TestRandomInternalImpl.
+void BufferPoolTest::TestRandomInternalSingle(
+    int64_t min_buffer_len, bool multiple_pins) {
+  const int MAX_NUM_BUFFERS = 200;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * min_buffer_len;
+  BufferPool pool(min_buffer_len, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  MemTracker global_tracker(TOTAL_MEM);
+  TestRandomInternalImpl(
+      &pool, NewFileGroup(), &global_tracker, &rng_, SINGLE_THREADED_TID, multiple_pins);
+  global_reservations_.Close();
+}
+
+// Multi-threaded execution of the TestRandomInternalImpl.
+void BufferPoolTest::TestRandomInternalMulti(
+    int num_threads, int64_t min_buffer_len, bool multiple_pins) {
+  const int MAX_NUM_BUFFERS_PER_THREAD = 200;
+  const int64_t TOTAL_MEM = num_threads * MAX_NUM_BUFFERS_PER_THREAD * min_buffer_len;
+  BufferPool pool(min_buffer_len, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  MemTracker global_tracker(TOTAL_MEM);
+  FileGroup* shared_file_group = NewFileGroup();
+  thread_group workers;
+  vector<mt19937> rngs(num_threads);
+  for (int i = 0; i < num_threads; ++i) {
+    rngs[i].seed(rng_()); // Seed the thread-local rngs.
+    workers.add_thread(new thread(
+        [this, &pool, shared_file_group, &global_tracker, &rngs, i, multiple_pins]() {
+          TestRandomInternalImpl(
+              &pool, shared_file_group, &global_tracker, &rngs[i], i, multiple_pins);
+        }));
+  }
+
+  AtomicInt32 stop_maintenance(0);
+  thread* maintenance_thread = new thread([this, &pool, &stop_maintenance]() {
+    while (stop_maintenance.Load() == 0) {
+      pool.Maintenance();
+      SleepForMs(50);
+    }
+  });
+  workers.join_all();
+  stop_maintenance.Add(1);
+  maintenance_thread->join();
+  global_reservations_.Close();
+}
+
+/// Randomly issue AllocateBuffer(), FreeBuffer(), CreatePage(), Pin(), Unpin(), and
+/// DestroyPage() calls. All calls made are legal - error conditions are not expected.
+/// When executed in single-threaded mode 'tid' should be SINGLE_THREADED_TID. If
+/// 'multiple_pins' is true, pages can be pinned multiple times (useful to test this
+/// functionality). Otherwise they are only pinned once (useful to test the case when
+/// memory is more committed).
+void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_group,
+    MemTracker* parent_mem_tracker, mt19937* rng, int tid, bool multiple_pins) {
+  // Encrypting and decrypting is expensive - reduce iterations when encryption is on.
+  int num_iterations = FLAGS_disk_spill_encryption ? 5000 : 50000;
+  // All the existing pages and buffers along with the sentinel values written to them.
+  vector<pair<PageHandle, int>> pages;
+  vector<pair<BufferHandle, int>> buffers;
+
+  /// Pick a power-of-two buffer sizes that are up to 2^4 times the minimum buffer length.
+  uniform_int_distribution<int> buffer_exponent_dist(0, 4);
+
+  ClientHandle client;
+  ASSERT_OK(pool->RegisterClient(Substitute("$0", tid), file_group, &global_reservations_,
+      obj_pool_.Add(new MemTracker(-1, "", parent_mem_tracker)), 1L << 48, NewProfile(),
+      &client));
+
+  for (int i = 0; i < num_iterations; ++i) {
+    if ((i % 10000) == 0) LOG(ERROR) << " Iteration " << i << endl;
+    // Pick an operation.
+    // New page: 15%
+    // Pin a page: 30%
+    // Unpin a pinned page: 25% (< Pin prob. so that memory consumption increases).
+    // Destroy page: 10% (< New page prob. so that number of pages grows over time).
+    // Allocate buffer: 10%
+    // Free buffer: 9.9%
+    // Switch core that the thread is executing on: 0.1%
+    double p = uniform_real_distribution<double>(0.0, 1.0)(*rng);
+    if (p < 0.15) {
+      // Create a new page.
+      int64_t page_len = pool->min_buffer_len() << (buffer_exponent_dist)(*rng);
+      if (!client.IncreaseReservationToFit(page_len)) continue;
+      PageHandle new_page;
+      ASSERT_OK(pool->CreatePage(&client, page_len, &new_page));
+      int data = (*rng)();
+      WriteData(new_page, data);
+      pages.emplace_back(move(new_page), data);
+    } else if (p < 0.45) {
+      // Pin a page.
+      if (pages.empty()) continue;
+      int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
+      PageHandle* page = &pages[rand_pick].first;
+      if (!client.IncreaseReservationToFit(page->len())) continue;
+      if (!page->is_pinned() || multiple_pins) ASSERT_OK(pool->Pin(&client, page));
+      VerifyData(*page, pages[rand_pick].second);
+    } else if (p < 0.70) {
+      // Unpin a pinned page.
+      if (pages.empty()) continue;
+      int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
+      PageHandle* page = &pages[rand_pick].first;
+      if (page->is_pinned()) pool->Unpin(&client, page);
+    } else if (p < 0.80) {
+      // Destroy a page.
+      if (pages.empty()) continue;
+      int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
+      auto page_data = move(pages[rand_pick]);
+      pages[rand_pick] = move(pages.back());
+      pages.pop_back();
+      pool->DestroyPage(&client, &page_data.first);
+    } else if (p < 0.90) {
+      // Allocate a buffer. Pick a random power-of-two size that is up to 2^4
+      // times the minimum buffer length.
+      int64_t buffer_len = pool->min_buffer_len() << (buffer_exponent_dist)(*rng);
+      if (!client.IncreaseReservationToFit(buffer_len)) continue;
+      BufferHandle new_buffer;
+      ASSERT_OK(pool->AllocateBuffer(&client, buffer_len, &new_buffer));
+      int data = (*rng)();
+      WriteData(new_buffer, data);
+      buffers.emplace_back(move(new_buffer), data);
+    } else if (p < 0.999) {
+      // Free a buffer.
+      if (buffers.empty()) continue;
+      int rand_pick = uniform_int_distribution<int>(0, buffers.size() - 1)(*rng);
+      auto buffer_data = move(buffers[rand_pick]);
+      buffers[rand_pick] = move(buffers.back());
+      buffers.pop_back();
+      pool->FreeBuffer(&client, &buffer_data.first);
+    } else {
+      CpuTestUtil::PinToRandomCore(rng);
+    }
+  }
+
+  // The client needs to delete all its pages.
+  for (auto& page : pages) pool->DestroyPage(&client, &page.first);
+  for (auto& buffer : buffers) pool->FreeBuffer(&client, &buffer.first);
+  pool->DeregisterClient(&client);
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index b4b2420..05176a5 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -28,6 +28,7 @@
 #include "util/bit-util.h"
 #include "util/cpu-info.h"
 #include "util/runtime-profile-counters.h"
+#include "util/time.h"
 #include "util/uid-util.h"
 
 DEFINE_int32(concurrent_scratch_ios_per_device, 2,
@@ -265,6 +266,7 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
   : pool_(pool),
     file_group_(file_group),
     name_(name),
+    debug_write_delay_ms_(0),
     num_pages_(0),
     buffers_allocated_bytes_(0) {
   reservation_.InitChildTracker(
@@ -502,6 +504,9 @@ void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write) {
 }
 
 void BufferPool::Client::WriteCompleteCallback(Page* page, const Status& write_status) {
+#ifndef NDEBUG
+  if (debug_write_delay_ms_ > 0) SleepForMs(debug_write_delay_ms_);
+#endif
   {
     unique_lock<mutex> cl(lock_);
     DCHECK(in_flight_write_pages_.Contains(page));
@@ -529,6 +534,13 @@ void BufferPool::Client::WaitForWrite(unique_lock<mutex>* client_lock, Page* pag
   }
 }
 
+void BufferPool::Client::WaitForAllWrites() {
+  unique_lock<mutex> cl(lock_);
+  while (in_flight_write_pages_.size() > 0) {
+    write_complete_cv_.Wait(cl);
+  }
+}
+
 string BufferPool::Client::DebugString() {
   lock_guard<mutex> lock(lock_);
   stringstream ss;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 7770fff..ad9ab00 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -313,6 +313,7 @@ class BufferPool::ClientHandle {
 
  private:
   friend class BufferPool;
+  friend class BufferPoolTest;
   DISALLOW_COPY_AND_ASSIGN(ClientHandle);
 
   /// Internal state for the client. NULL means the client isn't registered.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb900df4/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index abba192..4650e69 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -271,11 +271,7 @@ Status TmpFileMgr::FileGroup::CreateFiles() {
   }
   DCHECK_EQ(tmp_files_.size(), files_allocated);
   if (tmp_files_.size() == 0) {
-    // TODO: IMPALA-4697: the merged errors do not show up in the query error log,
-    // so we must point users to the impalad error log.
-    Status err_status(
-        "Could not create files in any configured scratch directories (--scratch_dirs). "
-        "See logs for previous errors that may have caused this.");
+    Status err_status(TErrorCode::SCRATCH_ALLOCATION_FAILED);
     for (Status& err : scratch_errors_) err_status.MergeStatus(err);
     return err_status;
   }