You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/18 15:12:54 UTC

[8/9] incubator-impala git commit: IMPALA-5677: limit clean page memory consumption

IMPALA-5677: limit clean page memory consumption

Adds the following flag:

  -buffer_pool_clean_pages_limit ((Advanced) Limit on bytes of clean spilled
    pages that will be accumulated in the buffer pool. Specified as number of
    bytes ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes
    ('<float>[gG]'), or percentage of the buffer pool limit ('<int>%').
    Defaults to bytes if no unit is given..) type: string default: "10%"

This helps prevent Impala accumulating excessive amounts of clean pages,
which can cause some problems in practice:
* The OS buffer cache is squeezed, reducing I/O performance from HDFS
  and potentially reducing the ability of the OS to absorb writes from
  Impala without blocking.
* Impala process memory consumption can expand more than users or admins
  might expect. E.g. if one query is running with a mem_limit of 1GB,
  many people will be surprised if the process inflates to the full
  process limit of 100GB. Impala doesn't provide any guarantees except
  from staying within the process mem_limit, but this could be a
  surprising divergence from past behaviour.

Observability:
A new metric buffer-pool.clean-pages-limit is added.

Testing:
Added a backend test to directly test that clean pages are evicted.
Ran in a loop to flush out any flakiness.

Ran exhaustive tests.

Change-Id: Ib6b687ab4bdddf07d9d6c997ff814aa3976042f9
Reviewed-on: http://gerrit.cloudera.org:8080/7653
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/039255a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/039255a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/039255a6

Branch: refs/heads/master
Commit: 039255a6ad01be785fb7b5ee02758dd125218ba9
Parents: 79fba27
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Aug 9 17:40:46 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Aug 18 04:22:32 2017 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |   6 +
 .../runtime/bufferpool/buffer-allocator-test.cc |   7 +-
 be/src/runtime/bufferpool/buffer-allocator.cc   | 110 ++++++++++------
 be/src/runtime/bufferpool/buffer-allocator.h    |  21 +++-
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 124 ++++++++++++++-----
 be/src/runtime/bufferpool/buffer-pool.cc        |  10 +-
 be/src/runtime/bufferpool/buffer-pool.h         |   8 +-
 be/src/runtime/bufferpool/suballocator-test.cc  |   2 +-
 be/src/runtime/exec-env.cc                      |  15 ++-
 be/src/runtime/exec-env.h                       |   2 +-
 be/src/runtime/test-env.cc                      |   3 +-
 be/src/util/memory-metrics.cc                   |   7 ++
 be/src/util/memory-metrics.h                    |   2 +
 common/thrift/metrics.json                      |  10 ++
 14 files changed, 243 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 98417f2..5d61edc 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -57,6 +57,12 @@ static const string buffer_pool_limit_help_msg = "(Advanced) Limit on buffer poo
     "The default value and behaviour of this flag may change between releases.";
 DEFINE_string(buffer_pool_limit, "80%", buffer_pool_limit_help_msg.c_str());
 
+static const string buffer_pool_clean_pages_limit_help_msg = "(Advanced) Limit on bytes "
+    "of clean pages that will be accumulated in the buffer pool. "
+     + Substitute(MEM_UNITS_HELP_MSG, "the buffer pool limit") + ".";
+DEFINE_string(buffer_pool_clean_pages_limit, "10%",
+    buffer_pool_clean_pages_limit_help_msg.c_str());
+
 DEFINE_int64(min_buffer_size, 64 * 1024,
     "(Advanced) The minimum buffer size to use in the buffer pool");
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-allocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc b/be/src/runtime/bufferpool/buffer-allocator-test.cc
index 7ec64ad..6427648 100644
--- a/be/src/runtime/bufferpool/buffer-allocator-test.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -39,7 +39,7 @@ using BufferHandle = BufferPool::BufferHandle;
 class BufferAllocatorTest : public ::testing::Test {
  public:
   virtual void SetUp() {
-    dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0));
+    dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0, 0));
     dummy_reservation_.InitRootTracker(nullptr, 0);
     ASSERT_OK(dummy_pool_->RegisterClient("", nullptr, &dummy_reservation_, nullptr, 0,
         obj_pool_.Add(new RuntimeProfile(&obj_pool_, "")), &dummy_client_));
@@ -73,7 +73,8 @@ class BufferAllocatorTest : public ::testing::Test {
 // Confirm that ASAN will catch use-after-free errors, even if the BufferAllocator caches
 // returned memory.
 TEST_F(BufferAllocatorTest, Poisoning) {
-  BufferAllocator allocator(dummy_pool_, TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN);
+  BufferAllocator allocator(
+      dummy_pool_, TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN);
   BufferHandle buffer;
   ASSERT_OK(allocator.Allocate(&dummy_client_, TEST_BUFFER_LEN, &buffer));
   uint8_t* data = buffer.data();
@@ -94,7 +95,7 @@ TEST_F(BufferAllocatorTest, FreeListSizes) {
   const int NUM_BUFFERS = 512;
   const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN;
 
-  BufferAllocator allocator(dummy_pool_, TEST_BUFFER_LEN, TOTAL_BYTES);
+  BufferAllocator allocator(dummy_pool_, TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
 
   // Allocate a bunch of buffers - all free list checks should miss.
   vector<BufferHandle> buffers(NUM_BUFFERS);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.cc b/be/src/runtime/bufferpool/buffer-allocator.cc
index 0f05cf3..09c2623 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator.cc
@@ -34,6 +34,12 @@ DECLARE_bool(disable_mem_pools);
 
 namespace impala {
 
+/// Decrease 'bytes_remaining' by up to 'max_decrease', down to a minimum of 0.
+/// If 'require_full_decrease' is true, only decrease if we can decrease it
+/// 'max_decrease'. Returns the amount it was decreased by.
+static int64_t DecreaseBytesRemaining(
+    int64_t max_decrease, bool require_full_decrease, AtomicInt64* bytes_remaining);
+
 /// An arena containing free buffers and clean pages that are associated with a
 /// particular core. All public methods are thread-safe.
 class BufferPool::FreeBufferArena : public CacheLineAligned {
@@ -98,10 +104,6 @@ class BufferPool::FreeBufferArena : public CacheLineAligned {
   /// it doesn't acquire the arena lock.
   int64_t GetNumCleanPages();
 
-  /// Return the total bytes of clean pages in the arena. May be approximate since
-  /// it doesn't acquire the arena lock.
-  int64_t GetCleanPageBytes();
-
   string DebugString();
 
  private:
@@ -109,6 +111,15 @@ class BufferPool::FreeBufferArena : public CacheLineAligned {
   /// All members are protected by FreeBufferArena::lock_ unless otherwise mentioned.
   struct PerSizeLists {
     PerSizeLists() : num_free_buffers(0), low_water_mark(0), num_clean_pages(0) {}
+
+    /// Helper to add a free buffer and increment the counter.
+    /// FreeBufferArena::lock_ must be held by the caller.
+    void AddFreeBuffer(BufferHandle&& buffer) {
+      DCHECK_EQ(num_free_buffers.Load(), free_buffers.Size());
+      num_free_buffers.Add(1);
+      free_buffers.AddFreeBuffer(move(buffer));
+    }
+
     /// The number of entries in 'free_buffers'. Can be read without holding a lock to
     /// allow threads to quickly skip over empty lists when trying to find a buffer.
     AtomicInt64 num_free_buffers;
@@ -170,7 +181,8 @@ int64_t BufferPool::BufferAllocator::CalcMaxBufferLen(
 }
 
 BufferPool::BufferAllocator::BufferAllocator(
-    BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit)
+    BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit,
+    int64_t clean_page_bytes_limit)
   : pool_(pool),
     system_allocator_(new SystemAllocator(min_buffer_len)),
     min_buffer_len_(min_buffer_len),
@@ -179,6 +191,8 @@ BufferPool::BufferAllocator::BufferAllocator(
     log_max_buffer_len_(BitUtil::Log2Ceiling64(max_buffer_len_)),
     system_bytes_limit_(system_bytes_limit),
     system_bytes_remaining_(system_bytes_limit),
+    clean_page_bytes_limit_(clean_page_bytes_limit),
+    clean_page_bytes_remaining_(clean_page_bytes_limit),
     per_core_arenas_(CpuInfo::GetMaxNumCores()),
     max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS) {
   DCHECK(BitUtil::IsPowerOf2(min_buffer_len_)) << min_buffer_len_;
@@ -197,6 +211,7 @@ BufferPool::BufferAllocator::~BufferAllocator() {
   per_core_arenas_.clear(); // Release all the memory.
   // Check for accounting leaks.
   DCHECK_EQ(system_bytes_limit_, system_bytes_remaining_.Load());
+  DCHECK_EQ(clean_page_bytes_limit_, clean_page_bytes_remaining_.Load());
 }
 
 Status BufferPool::BufferAllocator::Allocate(
@@ -231,7 +246,7 @@ Status BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle*
   if (current_core_arena->PopFreeBuffer(len, buffer)) return Status::OK();
 
   // Fast-ish path: allocate a new buffer if there is room in 'system_bytes_remaining_'.
-  int64_t delta = DecreaseSystemBytesRemaining(len, true);
+  int64_t delta = DecreaseBytesRemaining(len, true, &system_bytes_remaining_);
   if (delta != len) {
     DCHECK_EQ(0, delta);
     const vector<int>& numa_node_cores = CpuInfo::GetCoresOfSameNumaNode(current_core);
@@ -285,14 +300,14 @@ Status BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle*
   return Status::OK();
 }
 
-int64_t BufferPool::BufferAllocator::DecreaseSystemBytesRemaining(
-    int64_t max_decrease, bool require_full_decrease) {
+int64_t DecreaseBytesRemaining(
+    int64_t max_decrease, bool require_full_decrease, AtomicInt64* bytes_remaining) {
   while (true) {
-    int64_t old_value = system_bytes_remaining_.Load();
+    int64_t old_value = bytes_remaining->Load();
     if (require_full_decrease && old_value < max_decrease) return 0;
     int64_t decrease = min(old_value, max_decrease);
     int64_t new_value = old_value - decrease;
-    if (system_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
+    if (bytes_remaining->CompareAndSwap(old_value, new_value)) {
       return decrease;
     }
   }
@@ -311,11 +326,12 @@ int64_t BufferPool::BufferAllocator::ScavengeBuffers(
   //    examined (or from 'system_bytes_available_') because in order to do so, it would
   //    have had to return the equivalent amount of memory to an earlier arena or added
   //    it back into 'systems_bytes_reamining_'. The former can't happen since we're
-  //    still holding those locks, and the latter is solved by trying
-  //    DecreaseSystemBytesRemaining() at the end.
+  //    still holding those locks, and the latter is solved by trying to decrease
+  //    system_bytes_remaining_ with DecreaseBytesRemaining() at the end.
   DCHECK_GT(target_bytes, 0);
   // First make sure we've used up all the headroom in the buffer limit.
-  int64_t bytes_found = DecreaseSystemBytesRemaining(target_bytes, false);
+  int64_t bytes_found =
+      DecreaseBytesRemaining(target_bytes, false, &system_bytes_remaining_);
   if (bytes_found == target_bytes) return bytes_found;
 
   // In 'slow_but_sure' mode, we will hold locks for multiple arenas at the same time and
@@ -341,7 +357,8 @@ int64_t BufferPool::BufferAllocator::ScavengeBuffers(
   // thread holds the lock while decrementing 'system_bytes_remaining_' in the cases
   // where it may not have reservation corresponding to that memory.
   if (slow_but_sure && bytes_found < target_bytes) {
-    bytes_found += DecreaseSystemBytesRemaining(target_bytes - bytes_found, true);
+    bytes_found += DecreaseBytesRemaining(
+        target_bytes - bytes_found, true, &system_bytes_remaining_);
     DCHECK_EQ(bytes_found, target_bytes) << DebugString();
   }
   return bytes_found;
@@ -428,15 +445,21 @@ int64_t BufferPool::BufferAllocator::GetNumCleanPages() const {
   return SumOverArenas([](FreeBufferArena* arena) { return arena->GetNumCleanPages(); });
 }
 
+int64_t BufferPool::BufferAllocator::GetCleanPageBytesLimit() const {
+  return clean_page_bytes_limit_;
+}
+
 int64_t BufferPool::BufferAllocator::GetCleanPageBytes() const {
-  return SumOverArenas([](FreeBufferArena* arena) { return arena->GetCleanPageBytes(); });
+  return clean_page_bytes_limit_ - clean_page_bytes_remaining_.Load();
 }
 
 string BufferPool::BufferAllocator::DebugString() {
   stringstream ss;
   ss << "<BufferAllocator> " << this << " min_buffer_len: " << min_buffer_len_
      << " system_bytes_limit: " << system_bytes_limit_
-     << " system_bytes_remaining: " << system_bytes_remaining_.Load() << "\n";
+     << " system_bytes_remaining: " << system_bytes_remaining_.Load() << "\n"
+     << " clean_page_bytes_limit: " << clean_page_bytes_limit_
+     << " clean_page_bytes_remaining: " << clean_page_bytes_remaining_.Load() << "\n";
   for (int i = 0; i < per_core_arenas_.size(); ++i) {
     ss << "  Arena " << i << " " << per_core_arenas_[i]->DebugString() << "\n";
   }
@@ -466,10 +489,7 @@ void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) {
     return;
   }
   PerSizeLists* lists = GetListsForSize(buffer.len());
-  FreeList* list = &lists->free_buffers;
-  DCHECK_EQ(lists->num_free_buffers.Load(), list->Size());
-  lists->num_free_buffers.Add(1);
-  list->AddFreeBuffer(move(buffer));
+  lists->AddFreeBuffer(move(buffer));
 }
 
 bool BufferPool::FreeBufferArena::RemoveCleanPage(bool claim_buffer, Page* page) {
@@ -478,14 +498,14 @@ bool BufferPool::FreeBufferArena::RemoveCleanPage(bool claim_buffer, Page* page)
   DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());
   if (!lists->clean_pages.Remove(page)) return false;
   lists->num_clean_pages.Add(-1);
+  parent_->clean_page_bytes_remaining_.Add(page->len);
   if (!claim_buffer) {
     BufferHandle buffer;
     {
       lock_guard<SpinLock> pl(page->buffer_lock);
       buffer = move(page->buffer);
     }
-    lists->free_buffers.AddFreeBuffer(move(buffer));
-    lists->num_free_buffers.Add(1);
+    lists->AddFreeBuffer(move(buffer));
   }
   return true;
 }
@@ -517,6 +537,7 @@ bool BufferPool::FreeBufferArena::EvictCleanPage(
   Page* page = lists->clean_pages.Dequeue();
   if (page == nullptr) return false;
   lists->num_clean_pages.Add(-1);
+  parent_->clean_page_bytes_remaining_.Add(buffer_len);
   lock_guard<SpinLock> pl(page->buffer_lock);
   *buffer = move(page->buffer);
   return true;
@@ -560,10 +581,11 @@ pair<int64_t, int64_t> BufferPool::FreeBufferArena::FreeSystemMemory(
     // Evict clean pages by moving their buffers to the free page list before freeing
     // them. This ensures that they are freed based on memory address in the expected
     // order.
+    int num_pages_evicted = 0;
+    int64_t page_bytes_evicted = 0;
     while (bytes_freed + buffer_bytes_to_free < target_bytes_to_free) {
       Page* page = clean_pages->Dequeue();
       if (page == nullptr) break;
-      lists->num_clean_pages.Add(-1);
       BufferHandle page_buffer;
       {
         lock_guard<SpinLock> pl(page->buffer_lock);
@@ -571,9 +593,14 @@ pair<int64_t, int64_t> BufferPool::FreeBufferArena::FreeSystemMemory(
       }
       ++buffers_to_free;
       buffer_bytes_to_free += page_buffer.len();
+      ++num_pages_evicted;
+      page_bytes_evicted += page_buffer.len();
       free_buffers->AddFreeBuffer(move(page_buffer));
-      lists->num_free_buffers.Add(1);
     }
+    lists->num_free_buffers.Add(num_pages_evicted);
+    lists->num_clean_pages.Add(-num_pages_evicted);
+    parent_->clean_page_bytes_remaining_.Add(page_bytes_evicted);
+
     if (buffers_to_free > 0) {
       int64_t buffer_bytes_freed =
           parent_->FreeToSystem(free_buffers->GetBuffersToFree(buffers_to_free));
@@ -598,16 +625,31 @@ pair<int64_t, int64_t> BufferPool::FreeBufferArena::FreeSystemMemory(
 }
 
 void BufferPool::FreeBufferArena::AddCleanPage(Page* page) {
-  if (FLAGS_disable_mem_pools) {
-    // Immediately evict the page.
-    AddFreeBuffer(move(page->buffer));
-    return;
-  }
+  bool eviction_needed = FLAGS_disable_mem_pools
+    || DecreaseBytesRemaining(
+        page->len, true, &parent_->clean_page_bytes_remaining_) == 0;
   lock_guard<SpinLock> al(lock_);
   PerSizeLists* lists = GetListsForSize(page->len);
   DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());
-  lists->clean_pages.Enqueue(page);
-  lists->num_clean_pages.Add(1);
+  if (eviction_needed) {
+    if (lists->clean_pages.empty()) {
+      // No other pages to evict, must evict 'page' instead of adding it.
+      lists->AddFreeBuffer(move(page->buffer));
+    } else {
+      // Evict an older page (FIFO eviction) to make space for this one.
+      Page* page_to_evict = lists->clean_pages.Dequeue();
+      lists->clean_pages.Enqueue(page);
+      BufferHandle page_to_evict_buffer;
+      {
+        lock_guard<SpinLock> pl(page_to_evict->buffer_lock);
+        page_to_evict_buffer = move(page_to_evict->buffer);
+      }
+      lists->AddFreeBuffer(move(page_to_evict_buffer));
+    }
+  } else {
+    lists->clean_pages.Enqueue(page);
+    lists->num_clean_pages.Add(1);
+  }
 }
 
 void BufferPool::FreeBufferArena::Maintenance() {
@@ -663,12 +705,6 @@ int64_t BufferPool::FreeBufferArena::GetNumCleanPages() {
   });
 }
 
-int64_t BufferPool::FreeBufferArena::GetCleanPageBytes() {
-  return SumOverSizes([](PerSizeLists* lists, int64_t buffer_size) {
-    return lists->num_clean_pages.Load() * buffer_size;
-  });
-}
-
 string BufferPool::FreeBufferArena::DebugString() {
   lock_guard<SpinLock> al(lock_);
   stringstream ss;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.h b/be/src/runtime/bufferpool/buffer-allocator.h
index b360f38..ab4feb6 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.h
+++ b/be/src/runtime/bufferpool/buffer-allocator.h
@@ -79,7 +79,8 @@ namespace impala {
 ///
 class BufferPool::BufferAllocator {
  public:
-  BufferAllocator(BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit);
+  BufferAllocator(BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit,
+      int64_t clean_page_bytes_limit);
   ~BufferAllocator();
 
   /// Allocate a buffer with a power-of-two length 'len'. This function may acquire
@@ -135,6 +136,9 @@ class BufferPool::BufferAllocator {
   /// Return the total bytes of free buffers in the allocator.
   int64_t GetFreeBufferBytes() const;
 
+  /// Return the limit on bytes of clean pages in the allocator.
+  int64_t GetCleanPageBytesLimit() const;
+
   /// Return the total number of clean pages in the allocator.
   int64_t GetNumCleanPages() const;
 
@@ -169,11 +173,6 @@ class BufferPool::BufferAllocator {
   Status AllocateInternal(
       int64_t len, BufferPool::BufferHandle* buffer) WARN_UNUSED_RESULT;
 
-  /// Decrease 'system_bytes_remaining_' by up to 'max_decrease', down to a minimum of 0.
-  /// If 'require_full_decrease' is true, only decrease if we can decrease it
-  /// 'max_decrease'. Returns the amount it was decreased by.
-  int64_t DecreaseSystemBytesRemaining(int64_t max_decrease, bool require_full_decrease);
-
   /// Tries to reclaim enough memory from various sources so that the caller can allocate
   /// a buffer of 'target_bytes' from the system allocator. Scavenges buffers from the
   /// free buffer and clean page lists of all cores and frees them with
@@ -219,6 +218,16 @@ class BufferPool::BufferAllocator {
   /// allocated or after an existing buffer is freed with the system allocator.
   AtomicInt64 system_bytes_remaining_;
 
+  /// The maximum bytes of clean pages that can accumulate across all arenas before
+  /// they will be evicted.
+  const int64_t clean_page_bytes_limit_;
+
+  /// The number of bytes of 'clean_page_bytes_limit_' not used by clean pages. I.e.
+  /// (clean_page_bytes_limit - bytes of clean pages in the BufferAllocator).
+  /// 'clean_pages_bytes_limit_' is enforced by increasing this value before a
+  /// clean page is added and decreasing it after a clean page is reclaimed or evicted.
+  AtomicInt64 clean_page_bytes_remaining_;
+
   /// Free and clean pages. One arena per core.
   std::vector<std::unique_ptr<FreeBufferArena>> per_core_arenas_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index b2f8695..61d7a39 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -367,6 +367,7 @@ class BufferPoolTest : public ::testing::Test {
   /// Parameterised test implementations.
   void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core);
   void TestEvictionPolicy(int64_t page_size);
+  void TestCleanPageLimit(int max_clean_pages, bool randomize_core);
   void TestQueryTeardown(bool write_error);
   void TestWriteError(int write_delay_ms);
   void TestRandomInternalSingle(int64_t buffer_len, bool multiple_pins);
@@ -461,7 +462,7 @@ TEST_F(BufferPoolTest, BasicRegistration) {
   int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
   global_reservations_.InitRootTracker(NewProfile(), total_mem);
 
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
 
   RegisterQueriesAndClients(&pool, 0, num_concurrent_queries, sum_initial_reservations,
       reservation_limit, &rng_);
@@ -484,7 +485,7 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) {
   int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
   global_reservations_.InitRootTracker(NewProfile(), total_mem);
 
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   vector<mt19937> thread_rngs = RandTestUtil::CreateThreadLocalRngs(num_threads, &rng_);
   // Launch threads, each with a different set of query IDs.
   thread_group workers;
@@ -508,7 +509,7 @@ TEST_F(BufferPoolTest, PageCreation) {
   int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1);
   int64_t total_mem = 2 * 2 * max_page_len;
   global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
       total_mem, NewProfile(), &client));
@@ -553,7 +554,7 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
   int64_t total_mem = 2 * 2 * max_buffer_len;
   global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
       total_mem, NewProfile(), &client));
@@ -604,7 +605,7 @@ TEST_F(BufferPoolTest, CleanPageStats) {
   const int MAX_NUM_BUFFERS = 4;
   const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
 
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -647,13 +648,77 @@ TEST_F(BufferPoolTest, CleanPageStats) {
   global_reservations_.Close();
 }
 
+/// Test that the buffer pool respects the clean page limit with all pages in
+/// the same arena.
+TEST_F(BufferPoolTest, CleanPageLimitOneArena) {
+  TestCleanPageLimit(0, false);
+  TestCleanPageLimit(2, false);
+  TestCleanPageLimit(4, false);
+}
+
+/// Test that the buffer pool respects the clean page limit with pages spread across
+/// different arenas.
+TEST_F(BufferPoolTest, CleanPageLimitRandomArenas) {
+  TestCleanPageLimit(0, true);
+  TestCleanPageLimit(2, true);
+  TestCleanPageLimit(4, true);
+}
+
+void BufferPoolTest::TestCleanPageLimit(int max_clean_pages, bool randomize_core) {
+  const int MAX_NUM_BUFFERS = 4;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  const int max_clean_page_bytes = max_clean_pages * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, max_clean_page_bytes);
+
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+  if (!randomize_core) CpuTestUtil::PinToCore(0);
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages, randomize_core);
+  WriteData(pages, 0);
+
+  // Unpin pages and wait until they're written out and therefore clean.
+  UnpinAll(&pool, &client, &pages);
+  WaitForAllWrites(&client);
+  EXPECT_EQ(max_clean_pages, pool.GetNumCleanPages());
+  EXPECT_EQ(max_clean_page_bytes, pool.GetCleanPageBytes());
+
+  // Do an allocation to force a buffer to be reclaimed from somewhere.
+  ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
+  if (randomize_core) {
+    // We will either evict a clean page or reclaim a free buffer, depending on the
+    // arena that we pick.
+    EXPECT_LE(pool.GetNumCleanPages(), max_clean_pages);
+    EXPECT_LE(pool.GetCleanPageBytes(), max_clean_page_bytes);
+  } else {
+    // We will reclaim one of the free buffers in arena 0.
+    EXPECT_EQ(min(MAX_NUM_BUFFERS - 1, max_clean_pages), pool.GetNumCleanPages());
+    const int64_t expected_clean_page_bytes =
+        min<int64_t>((MAX_NUM_BUFFERS - 1) * TEST_BUFFER_LEN, max_clean_page_bytes);
+    EXPECT_EQ(expected_clean_page_bytes, pool.GetCleanPageBytes());
+  }
+
+  // Re-pin all the pages - none will be clean afterwards.
+  ASSERT_OK(PinAll(&pool, &client, &pages));
+  VerifyData(pages, 0);
+  EXPECT_EQ(0, pool.GetNumCleanPages());
+  EXPECT_EQ(0, pool.GetCleanPageBytes());
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+  global_reservations_.Close();
+}
+
 /// Test transfer of buffer handles between clients.
 TEST_F(BufferPoolTest, BufferTransfer) {
   // Each client needs to have enough reservation for a buffer.
   const int num_clients = 5;
   int64_t total_mem = num_clients * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   BufferPool::ClientHandle clients[num_clients];
   BufferPool::BufferHandle handles[num_clients];
   for (int i = 0; i < num_clients; ++i) {
@@ -691,7 +756,7 @@ TEST_F(BufferPoolTest, Pin) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
   // Set up client with enough reservation to pin twice.
   int64_t child_reservation = TEST_BUFFER_LEN * 2;
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -743,7 +808,7 @@ TEST_F(BufferPoolTest, AsyncPin) {
   const int DATA_SEED = 1234;
   // Set up pool with enough reservation to keep two buffers in memory.
   const int64_t TOTAL_MEM = 2 * TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -813,7 +878,7 @@ TEST_F(BufferPoolTest, AsyncPin) {
 /// Creating a page or pinning without sufficient reservation should DCHECK.
 TEST_F(BufferPoolTest, PinWithoutReservation) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
@@ -838,7 +903,7 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
   // Set up client with enough reservation for two buffers/pins.
   int64_t child_reservation = TEST_BUFFER_LEN * 2;
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -897,7 +962,7 @@ TEST_F(BufferPoolTest, ConcurrentPageCreation) {
   int total_mem = num_threads * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, total_mem);
 
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   // Share a file group between the threads.
   TmpFileMgr::FileGroup* file_group = NewFileGroup();
 
@@ -938,7 +1003,7 @@ void BufferPoolTest::CreatePageLoop(BufferPool* pool, TmpFileMgr::FileGroup* fil
 /// Test that DCHECK fires when trying to unpin a page with spilling disabled.
 TEST_F(BufferPoolTest, SpillingDisabledDcheck) {
   global_reservations_.InitRootTracker(NULL, 2 * TEST_BUFFER_LEN);
-  BufferPool pool(TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN);
+  BufferPool pool(TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN);
   BufferPool::PageHandle handle;
 
   BufferPool::ClientHandle client;
@@ -961,7 +1026,7 @@ TEST_F(BufferPoolTest, SpillingDisabledDcheck) {
 /// Test simple case where pool must evict a page from the same client to fit another.
 TEST_F(BufferPoolTest, EvictPageSameClient) {
   global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
-  BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN);
+  BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN, TEST_BUFFER_LEN);
   BufferPool::PageHandle handle1, handle2;
 
   BufferPool::ClientHandle client;
@@ -987,7 +1052,7 @@ TEST_F(BufferPoolTest, EvictPageSameClient) {
 TEST_F(BufferPoolTest, EvictPageDifferentSizes) {
   const int64_t TOTAL_BYTES = 2 * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
   BufferPool::PageHandle handle1, handle2;
 
   BufferPool::ClientHandle client;
@@ -1017,7 +1082,7 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
   const int NUM_CLIENTS = 2;
   const int64_t TOTAL_BYTES = NUM_CLIENTS * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
 
   BufferPool::ClientHandle clients[NUM_CLIENTS];
   for (int i = 0; i < NUM_CLIENTS; ++i) {
@@ -1063,7 +1128,7 @@ TEST_F(BufferPoolTest, MultiplyPinnedPageAccounting) {
   const int NUM_BUFFERS = 3;
   const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
 
   BufferPool::ClientHandle client;
   RuntimeProfile* profile = NewProfile();
@@ -1102,7 +1167,8 @@ const int64_t MEM_RECLAMATION_TOTAL_BYTES =
 // Test that we can reclaim buffers and pages from the same arena and from other arenas.
 TEST_F(BufferPoolTest, MemoryReclamation) {
   global_reservations_.InitRootTracker(NULL, MEM_RECLAMATION_TOTAL_BYTES);
-  BufferPool pool(TEST_BUFFER_LEN, MEM_RECLAMATION_TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, MEM_RECLAMATION_TOTAL_BYTES,
+      MEM_RECLAMATION_TOTAL_BYTES);
   // Assume that all cores are online. Test various combinations of cores to validate
   // that it can reclaim from any other other core.
   for (int src = 0; src < CpuInfo::num_cores(); ++src) {
@@ -1202,7 +1268,7 @@ void BufferPoolTest::TestEvictionPolicy(int64_t page_size) {
   const int MAX_NUM_BUFFERS = 5;
   int64_t total_mem = MAX_NUM_BUFFERS * page_size;
   global_reservations_.InitRootTracker(NewProfile(), total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
 
   ClientHandle client;
   RuntimeProfile* profile = NewProfile();
@@ -1274,7 +1340,7 @@ TEST_F(BufferPoolTest, DestroyDuringWrite) {
   const int TRIALS = 20;
   const int MAX_NUM_BUFFERS = 5;
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   ClientHandle client;
   for (int trial = 0; trial < TRIALS; ++trial) {
@@ -1366,7 +1432,7 @@ TEST_F(BufferPoolTest, QueryTeardownWriteError) {
 void BufferPoolTest::TestWriteError(int write_delay_ms) {
   int MAX_NUM_BUFFERS = 2;
   int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -1422,7 +1488,7 @@ TEST_F(BufferPoolTest, WriteErrorWriteDelay) {
 TEST_F(BufferPoolTest, TmpFileAllocateError) {
   const int MAX_NUM_BUFFERS = 2;
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -1462,7 +1528,7 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) {
   const int PAGES_PER_QUERY = MAX_NUM_PAGES / TOTAL_QUERIES;
   const int64_t TOTAL_MEM = MAX_NUM_PAGES * TEST_BUFFER_LEN;
   const int64_t MEM_PER_QUERY = PAGES_PER_QUERY * TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   vector<FileGroup*> file_groups;
   vector<ClientHandle> clients(TOTAL_QUERIES);
@@ -1575,7 +1641,7 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) {
 TEST_F(BufferPoolTest, ScratchReadError) {
   // Only allow one buffer in memory.
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
 
   // Simulate different types of error.
@@ -1647,7 +1713,7 @@ TEST_F(BufferPoolTest, NoDirsAllocationError) {
   vector<string> tmp_dirs = InitMultipleTmpDirs(2);
   int MAX_NUM_BUFFERS = 2;
   int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -1679,7 +1745,7 @@ TEST_F(BufferPoolTest, NoTmpDirs) {
   InitMultipleTmpDirs(0);
   const int MAX_NUM_BUFFERS = 3;
   const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -1767,7 +1833,7 @@ void BufferPoolTest::TestRandomInternalSingle(
     int64_t min_buffer_len, bool multiple_pins) {
   const int MAX_NUM_BUFFERS = 200;
   const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * min_buffer_len;
-  BufferPool pool(min_buffer_len, TOTAL_MEM);
+  BufferPool pool(min_buffer_len, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   MemTracker global_tracker(TOTAL_MEM);
   TestRandomInternalImpl(
@@ -1780,7 +1846,7 @@ void BufferPoolTest::TestRandomInternalMulti(
     int num_threads, int64_t min_buffer_len, bool multiple_pins) {
   const int MAX_NUM_BUFFERS_PER_THREAD = 200;
   const int64_t TOTAL_MEM = num_threads * MAX_NUM_BUFFERS_PER_THREAD * min_buffer_len;
-  BufferPool pool(min_buffer_len, TOTAL_MEM);
+  BufferPool pool(min_buffer_len, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   MemTracker global_tracker(TOTAL_MEM);
   FileGroup* shared_file_group = NewFileGroup();
@@ -1910,7 +1976,7 @@ void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_gr
 TEST_F(BufferPoolTest, SubReservation) {
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 10;
   global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
       TOTAL_MEM, NewProfile(), &client));
@@ -1944,7 +2010,7 @@ TEST_F(BufferPoolTest, DecreaseReservation) {
   const int MAX_NUM_BUFFERS = 4;
   const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
 
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 2e9ba3d..6a111ff 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -104,8 +104,10 @@ Status BufferPool::PageHandle::GetBuffer(const BufferHandle** buffer) const {
   return Status::OK();
 }
 
-BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
-  : allocator_(new BufferAllocator(this, min_buffer_len, buffer_bytes_limit)),
+BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit,
+      int64_t clean_page_bytes_limit)
+  : allocator_(new BufferAllocator(
+        this, min_buffer_len, buffer_bytes_limit, clean_page_bytes_limit)),
     min_buffer_len_(min_buffer_len) {
   DCHECK_GT(min_buffer_len, 0);
   DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
@@ -272,6 +274,10 @@ int64_t BufferPool::GetSystemBytesAllocated() const {
   return allocator_->GetSystemBytesAllocated();
 }
 
+int64_t BufferPool::GetCleanPageBytesLimit() const {
+  return allocator_->GetCleanPageBytesLimit();
+}
+
 int64_t BufferPool::GetNumCleanPages() const {
   return allocator_->GetNumCleanPages();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 7b11551..3d6145c 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -158,7 +158,10 @@ class BufferPool : public CacheLineAligned {
   /// 'buffer_bytes_limit': the maximum physical memory in bytes that can be used by the
   ///     buffer pool. If 'buffer_bytes_limit' is not a multiple of 'min_buffer_len', the
   ///     remainder will not be usable.
-  BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit);
+  /// 'clean_page_bytes_limit': the maximum bytes of clean pages that will be retained by the
+  ///     buffer pool.
+  BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit,
+      int64_t clean_page_bytes_limit);
   ~BufferPool();
 
   /// Register a client. Returns an error status and does not register the client if the
@@ -264,6 +267,9 @@ class BufferPool : public CacheLineAligned {
   int64_t GetSystemBytesLimit() const;
   int64_t GetSystemBytesAllocated() const;
 
+  /// Return the limit on bytes of clean pages in the pool.
+  int64_t GetCleanPageBytesLimit() const;
+
   /// Return the total number of clean pages in the pool.
   int64_t GetNumCleanPages() const;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc
index ea515fe..32acfaf 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -68,7 +68,7 @@ class SuballocatorTest : public ::testing::Test {
   /// bytes of buffers of minimum length 'min_buffer_len'.
   void InitPool(int64_t min_buffer_len, int total_mem) {
     global_reservation_.InitRootTracker(nullptr, total_mem);
-    buffer_pool_.reset(new BufferPool(min_buffer_len, total_mem));
+    buffer_pool_.reset(new BufferPool(min_buffer_len, total_mem, 0));
   }
 
   /// Register a client with 'buffer_pool_'. The client is automatically deregistered

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 7f62c96..0b96e2a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -86,6 +86,7 @@ DECLARE_int32(num_cores);
 DECLARE_int32(be_port);
 DECLARE_string(mem_limit);
 DECLARE_string(buffer_pool_limit);
+DECLARE_string(buffer_pool_clean_pages_limit);
 DECLARE_int64(min_buffer_size);
 DECLARE_bool(is_coordinator);
 DECLARE_int32(webserver_port);
@@ -263,7 +264,14 @@ Status ExecEnv::StartServices() {
           "positive bytes value: $0", FLAGS_buffer_pool_limit));
   }
   buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, FLAGS_min_buffer_size);
-  InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit);
+
+  int64_t clean_pages_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_clean_pages_limit,
+      &is_percent, buffer_pool_limit);
+  if (clean_pages_limit <= 0) {
+    return Status(Substitute("Invalid --buffer_pool_clean_pages_limit value, must be a percentage or "
+          "positive bytes value: $0", FLAGS_buffer_pool_clean_pages_limit));
+  }
+  InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit);
 
   RETURN_IF_ERROR(metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr));
   impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
@@ -344,8 +352,9 @@ Status ExecEnv::StartServices() {
   return Status::OK();
 }
 
-void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity) {
-  buffer_pool_.reset(new BufferPool(min_buffer_size, capacity));
+void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity,
+    int64_t clean_pages_limit) {
+  buffer_pool_.reset(new BufferPool(min_buffer_size, capacity, clean_pages_limit));
   buffer_reservation_.reset(new ReservationTracker());
   buffer_reservation_->InitRootTracker(nullptr, capacity);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 5d7a3d0..9e925a1 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -206,7 +206,7 @@ class ExecEnv {
   KuduClientMap kudu_client_map_;
 
   /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity.
-  void InitBufferPool(int64_t min_page_len, int64_t capacity);
+  void InitBufferPool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit);
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 23dfa4c..69bbc50 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -58,7 +58,8 @@ Status TestEnv::Init() {
   } else {
     RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics()));
   }
-  exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_);
+  exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_,
+      static_cast<int64_t>(0.1 * buffer_pool_capacity_));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index f2ddcb9..7d0d065 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -54,6 +54,7 @@ BufferPoolMetric* BufferPoolMetric::SYSTEM_ALLOCATED = nullptr;
 BufferPoolMetric* BufferPoolMetric::RESERVED = nullptr;
 BufferPoolMetric* BufferPoolMetric::NUM_FREE_BUFFERS = nullptr;
 BufferPoolMetric* BufferPoolMetric::FREE_BUFFER_BYTES = nullptr;
+BufferPoolMetric* BufferPoolMetric::CLEAN_PAGES_LIMIT = nullptr;
 BufferPoolMetric* BufferPoolMetric::NUM_CLEAN_PAGES = nullptr;
 BufferPoolMetric* BufferPoolMetric::CLEAN_PAGE_BYTES = nullptr;
 
@@ -238,6 +239,9 @@ Status BufferPoolMetric::InitMetrics(MetricGroup* metrics,
   FREE_BUFFER_BYTES = metrics->RegisterMetric(
       new BufferPoolMetric(MetricDefs::Get("buffer-pool.free-buffer-bytes"),
           BufferPoolMetricType::FREE_BUFFER_BYTES, global_reservations, buffer_pool));
+  CLEAN_PAGES_LIMIT = metrics->RegisterMetric(
+      new BufferPoolMetric(MetricDefs::Get("buffer-pool.clean-pages-limit"),
+          BufferPoolMetricType::CLEAN_PAGES_LIMIT, global_reservations, buffer_pool));
   NUM_CLEAN_PAGES = metrics->RegisterMetric(
       new BufferPoolMetric(MetricDefs::Get("buffer-pool.clean-pages"),
           BufferPoolMetricType::NUM_CLEAN_PAGES, global_reservations, buffer_pool));
@@ -271,6 +275,9 @@ void BufferPoolMetric::CalculateValue() {
     case BufferPoolMetricType::FREE_BUFFER_BYTES:
       value_ = buffer_pool_->GetFreeBufferBytes();
       break;
+    case BufferPoolMetricType::CLEAN_PAGES_LIMIT:
+      value_ = buffer_pool_->GetCleanPageBytesLimit();
+      break;
     case BufferPoolMetricType::NUM_CLEAN_PAGES:
       value_ = buffer_pool_->GetNumCleanPages();
       break;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 6f306f7..4f65933 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -201,6 +201,7 @@ class BufferPoolMetric : public UIntGauge {
   static BufferPoolMetric* RESERVED;
   static BufferPoolMetric* NUM_FREE_BUFFERS;
   static BufferPoolMetric* FREE_BUFFER_BYTES;
+  static BufferPoolMetric* CLEAN_PAGES_LIMIT;
   static BufferPoolMetric* NUM_CLEAN_PAGES;
   static BufferPoolMetric* CLEAN_PAGE_BYTES;
 
@@ -218,6 +219,7 @@ class BufferPoolMetric : public UIntGauge {
     RESERVED,
     NUM_FREE_BUFFERS, // Total number of free buffers in BufferPool.
     FREE_BUFFER_BYTES, // Total bytes of free buffers in BufferPool.
+    CLEAN_PAGES_LIMIT, // Limit on number of clean pages in BufferPool.
     NUM_CLEAN_PAGES, // Total number of clean pages in BufferPool.
     CLEAN_PAGE_BYTES, // Total bytes of clean pages in BufferPool.
   };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 2fbb6fc..6c4f07d 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1176,6 +1176,16 @@
     "key": "buffer-pool.free-buffer-bytes"
   },
   {
+    "description": "Limit on number of clean pages cached in the buffer pool.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Buffer Pool Clean Pages Limit.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "buffer-pool.clean-pages-limit"
+  },
+  {
     "description": "Total number of clean pages cached in the buffer pool.",
     "contexts": [
       "IMPALAD"