You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jb...@apache.org on 2017/01/27 00:54:12 UTC

[4/6] incubator-impala git commit: IMPALA-3202: implement spill-to-disk in new buffer pool

IMPALA-3202: implement spill-to-disk in new buffer pool

See https://goo.gl/0zuy97 for a high-level summary of the design.

Unpinned pages can now be written to disk to free up memory. After
unpinning, pages enter a "dirty" state. Each client initiates
asynchronous writes for dirty page to free up memory to allocate
more buffers. After the write completes, pages are "clean" and can
be evicted from memory by any client that needs the buffer. This
is implemented by moving pages between lists in ClientImpl (a new
internal class that stores the client's state) and BufferPool.

I/O:
----
The mechanics of I/O to scratch files is handled by the TmpFileMgr
mechanisms introduced in earlier IMPALA-3202 patches.

The decision to start a write I/O is based on two factors. First,
each client maintains the invariant that bytes of the dirty
pages should not exceed the unused reservation. This is to ensure
that any client's reservation can always be fulfilled by evicting
a clean page. Second, additional writes are initiated to proactively
write data to disk, so that clean pages are available when needed.
The buffer pool tries to keep enough writes in flight to keep all
disks busy.

The buffer pool avoids read I/O whenever possible: if an unpinned
page is pinned again and its buffer is still in memory, no I/O
is required to read back the data.

Locking:
--------
Concurrency is managed using client, page, and clean page list locks.
The design uses intrusive doubly-linked lists to track pages. This
patch adds a LockType parameter to InternalQueue and a FakeLock type
to support a non-concurrent InternalList type that is more convenient
for the buffer pool's locking scheme.

Testing:
--------
Added some basic unit tests to BufferPoolTest that exercise the new
code paths. More test coverage will be added later with system tests
and porting of tests from BufferedBlockMgrTest - see the draft
test plan for IMPALA-3200.

Change-Id: I8c6119a4557da853fefa02e17c49d8be0e9fbe01
Reviewed-on: http://gerrit.cloudera.org:8080/5584
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/253ea712
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/253ea712
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/253ea712

Branch: refs/heads/master
Commit: 253ea71281d259a62f75527c429096a74a5e5075
Parents: cca1eaa
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Sat Aug 20 16:35:55 2016 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 26 21:25:35 2017 +0000

----------------------------------------------------------------------
 be/src/common/compiler-util.h                   |   6 +
 .../runtime/bufferpool/buffer-pool-counters.h   |  14 +-
 .../runtime/bufferpool/buffer-pool-internal.h   | 268 ++++++++
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 264 ++++++--
 be/src/runtime/bufferpool/buffer-pool.cc        | 632 +++++++++++++------
 be/src/runtime/bufferpool/buffer-pool.h         | 173 ++---
 be/src/runtime/bufferpool/suballocator-test.cc  |  25 +-
 be/src/runtime/bufferpool/suballocator.cc       |   2 +-
 be/src/runtime/bufferpool/suballocator.h        |   5 +-
 be/src/runtime/disk-io-mgr.cc                   |   1 -
 be/src/runtime/tmp-file-mgr.cc                  |  13 +-
 be/src/runtime/tmp-file-mgr.h                   |   2 +
 be/src/testutil/death-test-util.h               |   2 +-
 be/src/util/aligned-new.h                       |   3 +-
 be/src/util/fake-lock.h                         |  36 ++
 be/src/util/internal-queue.h                    |  72 ++-
 16 files changed, 1132 insertions(+), 386 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/common/compiler-util.h
----------------------------------------------------------------------
diff --git a/be/src/common/compiler-util.h b/be/src/common/compiler-util.h
index 80d1112..7c0379b 100644
--- a/be/src/common/compiler-util.h
+++ b/be/src/common/compiler-util.h
@@ -43,4 +43,10 @@
 /// decision, e.g. not inlining a small function on a hot path.
 #define ALWAYS_INLINE __attribute__((always_inline))
 
+namespace impala {
+
+/// The size of an L1 cache line in bytes on x86-64.
+constexpr int CACHE_LINE_SIZE = 64;
+
+}
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/buffer-pool-counters.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-counters.h b/be/src/runtime/bufferpool/buffer-pool-counters.h
index e4b1c36..183742f 100644
--- a/be/src/runtime/bufferpool/buffer-pool-counters.h
+++ b/be/src/runtime/bufferpool/buffer-pool-counters.h
@@ -31,10 +31,22 @@ struct BufferPoolClientCounters {
   /// Amount of time spent waiting for reads from disk to complete.
   RuntimeProfile::Counter* read_wait_time;
 
+  /// Total number of read I/O operations issued.
+  RuntimeProfile::Counter* read_io_ops;
+
+  /// Total bytes read from disk.
+  RuntimeProfile::Counter* bytes_read;
+
   /// Amount of time spent waiting for writes to disk to complete.
   RuntimeProfile::Counter* write_wait_time;
 
-  /// The peak total size of unpinned buffers.
+  /// Total number of write I/O operations issued.
+  RuntimeProfile::Counter* write_io_ops;
+
+  /// Total bytes written to disk.
+  RuntimeProfile::Counter* bytes_written;
+
+  /// The peak total size of unpinned pages.
   RuntimeProfile::HighWaterMarkCounter* peak_unpinned_bytes;
 
   /// The total bytes of data unpinned. Every time a page's pin count goes from 1 to 0,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
new file mode 100644
index 0000000..abc8930
--- /dev/null
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This file includes definitions of classes used internally in the buffer pool.
+//
+/// +========================+
+/// | IMPLEMENTATION NOTES   |
+/// +========================+
+///
+/// Lock Ordering
+/// =============
+/// The lock acquisition order is:
+/// 1. Client::lock_
+/// 2. BufferPool::clean_pages_lock_
+/// 3. Page::lock
+///
+/// If a reference to a Page is acquired through a page list, the Page* reference only
+/// remains valid so long as list's lock is held.
+///
+/// Page States
+/// ===========
+/// Each Page object is owned by at most one InternalList<Page> at any given point.
+/// Each page is either pinned or unpinned. Unpinned has a number of sub-states, which
+/// is determined by which list in Client/BufferPool contains the page.
+/// * Pinned: Always in this state when 'pin_count' > 0. The page is in
+///     Client::pinned_pages_.
+/// * Unpinned - Dirty: When no write has been started for an unpinned page. The page is
+///     in Client::dirty_unpinned_pages_.
+/// * Unpinned - Write in flight: When the write has been started but not completed for
+///     a dirty unpinned page. The page is in Client::write_in_flight_pages_. For
+///     accounting purposes this is considered a dirty page.
+/// * Unpinned - Clean: When the write has completed but the page was not evicted. The
+///     page is in BufferPool::clean_pages_.
+/// * Unpinned - Evicted: After a clean page's buffer has been reclaimed. The page is
+///     not in any list.
+///
+/// Page Eviction Policy
+/// ====================
+/// The page eviction policy is designed so that clients that run only in-memory (i.e.
+/// don't unpin pages) never block on I/O. To achieve this, we must be able to
+/// fulfil reservations by either allocating buffers or evicting clean pages. Assuming
+/// reservations are not overcommitted (they shouldn't be), this global invariant can be
+/// maintained by enforcing a local invariant for every client:
+///
+///   unused reservation >= dirty unpinned pages
+///
+/// The local invariant is maintained by writing pages to disk as the first step of any
+/// operation that uses reservation. I.e. the R.H.S. of the invariant must be decreased
+/// before the L.H.S. can be decreased. These operations block waiting for enough writes
+/// to complete to satisfy the invariant.
+/// TODO: this invariant can be broken if a client calls DecreaseReservation() on the
+/// ReservationTracker. We should refactor so that DecreaseReservation() goes through
+/// the client before closing IMPALA-3202.
+
+#ifndef IMPALA_RUNTIME_BUFFER_POOL_INTERNAL_H
+#define IMPALA_RUNTIME_BUFFER_POOL_INTERNAL_H
+
+#include <memory>
+#include <sstream>
+
+#include <boost/thread/mutex.hpp>
+
+#include "runtime/bufferpool/buffer-pool-counters.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "util/condition-variable.h"
+
+namespace impala {
+
+/// The internal state for the client.
+class BufferPool::Client {
+ public:
+  Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, const string& name,
+      RuntimeProfile* profile);
+
+  ~Client() {
+    DCHECK_EQ(0, num_pages_);
+    DCHECK_EQ(0, pinned_pages_.size());
+    DCHECK_EQ(0, dirty_unpinned_pages_.size());
+    DCHECK_EQ(0, in_flight_write_pages_.size());
+  }
+
+  /// Add a new pinned page 'page' to the pinned pages list. 'page' must not be in any
+  /// other lists. Neither the client's lock nor page->buffer_lock should be held by the
+  /// caller.
+  void AddNewPinnedPage(Page* page);
+
+  /// Reset 'handle', clean up references to handle->page and release any resources
+  /// associated with handle->page. If the page is pinned, 'out_buffer' can be passed in
+  /// and the page's buffer will be returned.
+  /// Neither the client's lock nor handle->page_->buffer_lock should be held by the
+  /// caller.
+  void DestroyPageInternal(PageHandle* handle, BufferHandle* out_buffer = NULL);
+
+  /// Updates client state to reflect that 'page' is now a dirty unpinned page. May
+  /// initiate writes for this or other dirty unpinned pages.
+  /// Neither the client's lock nor page->buffer_lock should be held by the caller.
+  void MoveToDirtyUnpinned(int64_t unused_reservation, Page* page);
+
+  /// Move an unpinned page to the pinned state, moving between data structures and
+  /// reading from disk if necessary. Returns once the page's buffer is allocated
+  /// and contains the page's data. Neither the client's lock nor
+  /// handle->page_->buffer_lock should be held by the caller.
+  Status MoveToPinned(ClientHandle* client, PageHandle* handle);
+
+  /// Must be called before allocating a buffer to ensure that the client can allocate
+  /// 'allocation_len' bytes without pinned bytes plus dirty unpinned bytes exceeding the
+  /// client's reservation. No page or client locks should be held by the caller.
+  Status CleanPagesBeforeAllocation(
+      ReservationTracker* reservation, int64_t allocation_len);
+
+  /// Same as CleanPagesBeforeAllocation(), except 'lock_' must be held by 'client_lock'.
+  /// 'client_lock' may be released temporarily while waiting for writes to complete.
+  Status CleanPagesBeforeAllocationLocked(boost::unique_lock<boost::mutex>* client_lock,
+      ReservationTracker* reservation, int64_t allocation_len);
+
+  /// Initiates asynchronous writes of dirty unpinned pages to disk. Ensures that at
+  /// least 'min_bytes_to_write' bytes of writes will be written asynchronously. May
+  /// start writes more aggressively so that I/O and compute can be overlapped. If
+  /// any errors are encountered, 'write_status_' is set. 'write_status_' must therefore
+  /// be checked before reading back any pages. 'lock_' must be held by the caller.
+  void WriteDirtyPagesAsync(int64_t min_bytes_to_write = 0);
+
+  /// Wait for the in-flight write for 'page' to complete.
+  /// 'lock_' must be held by the caller via 'client_lock'. page->bufffer_lock should
+  /// not be held.
+  void WaitForWrite(boost::unique_lock<boost::mutex>* client_lock, Page* page);
+
+  /// Asserts that 'client_lock' is holding 'lock_'.
+  void DCheckHoldsLock(const boost::unique_lock<boost::mutex>& client_lock) {
+    DCHECK(client_lock.mutex() == &lock_ && client_lock.owns_lock());
+  }
+
+  const BufferPoolClientCounters& counters() const { return counters_; }
+  bool spilling_enabled() const { return file_group_ != NULL; }
+
+  std::string DebugString();
+
+ private:
+  // Check consistency of client, DCHECK if inconsistent. 'lock_' must be held.
+  void DCheckConsistency() {
+    DCHECK_GE(in_flight_write_bytes_, 0);
+    DCHECK_LE(in_flight_write_bytes_, dirty_unpinned_bytes_);
+    DCHECK_LE(pinned_pages_.size() + dirty_unpinned_pages_.size()
+            + in_flight_write_pages_.size(),
+        num_pages_);
+    if (in_flight_write_pages_.empty()) DCHECK_EQ(0, in_flight_write_bytes_);
+    if (in_flight_write_pages_.empty() && dirty_unpinned_pages_.empty()) {
+      DCHECK_EQ(0, dirty_unpinned_bytes_);
+    }
+  }
+
+  /// Called when a write for 'page' completes.
+  void WriteCompleteCallback(Page* page, const Status& write_status);
+
+  /// Move an evicted page to the pinned state by allocating a new buffer, reading data
+  /// from disk and moving the page to 'pinned_pages_'. client->impl must be locked by
+  /// the caller via 'client_lock' and handle->page must be unlocked. 'client_lock' is
+  /// released then reacquired.
+  Status MoveEvictedToPinned(boost::unique_lock<boost::mutex>* client_lock,
+      ClientHandle* client, PageHandle* handle);
+
+  /// The buffer pool that owns the client.
+  BufferPool* const pool_;
+
+  /// The file group that should be used for allocating scratch space. If NULL, spilling
+  /// is disabled.
+  TmpFileMgr::FileGroup* const file_group_;
+
+  /// A name identifying the client.
+  const std::string name_;
+
+  /// The RuntimeProfile counters for this client, owned by the client's RuntimeProfile.
+  /// All non-NULL.
+  BufferPoolClientCounters counters_;
+
+  /// Lock to protect the below member variables;
+  boost::mutex lock_;
+
+  /// Condition variable signalled when a write for this client completes.
+  ConditionVariable write_complete_cv_;
+
+  /// All non-OK statuses returned by write operations are merged into this status.
+  /// All operations that depend on pages being written to disk successfully (e.g.
+  /// reading pages back from disk) must check 'write_status_' before proceeding, so
+  /// that write errors that occurred asynchronously are correctly propagated. The
+  /// write error is global to the client so can be propagated to any Status-returning
+  /// operation for the client (even for operations on different Pages or Buffers).
+  /// Write errors are not recoverable so it is best to propagate them as quickly
+  /// as possible, instead of waiting to propagate them in a specific way.
+  Status write_status_;
+
+  /// Total number of pages for this client. Used for debugging and enforcing that all
+  /// pages are destroyed before the client.
+  int64_t num_pages_;
+
+  /// All pinned pages for this client. Only used for debugging.
+  InternalList<Page> pinned_pages_;
+
+  /// Dirty unpinned pages for this client for which writes are not in flight. Page
+  /// writes are started in LIFO order, because operators typically have sequential access
+  /// patterns where the most recently evicted page will be last to be read.
+  InternalList<Page> dirty_unpinned_pages_;
+
+  /// Dirty unpinned pages for this client for which writes are in flight.
+  InternalList<Page> in_flight_write_pages_;
+
+  /// Total bytes of dirty unpinned pages for this client.
+  int64_t dirty_unpinned_bytes_;
+
+  /// Total bytes of in-flight writes for dirty unpinned pages. Bytes accounted here
+  /// are also accounted in 'dirty_unpinned_bytes_'.
+  int64_t in_flight_write_bytes_;
+};
+
+/// The internal representation of a page, which can be pinned or unpinned. See the
+/// class comment for explanation of the different page states.
+///
+/// Code manipulating the page is responsible for acquiring 'lock' when reading or
+/// modifying the page.
+struct BufferPool::Page : public InternalList<Page>::Node {
+  Page(Client* client, int64_t len) : client(client), len(len), pin_count(0) {}
+
+  std::string DebugString();
+
+  // Helper for BufferPool::DebugString().
+  static bool DebugStringCallback(std::stringstream* ss, BufferPool::Page* page);
+
+  /// The client that the page belongs to.
+  Client* const client;
+
+  /// The length of the page in bytes.
+  const int64_t len;
+
+  /// The pin count of the page. Only accessed in contexts that are passed the associated
+  /// PageHandle, so it cannot be accessed by multiple threads concurrently.
+  int pin_count;
+
+  /// Non-null if there is a write in flight, the page is clean, or the page is evicted.
+  std::unique_ptr<TmpFileMgr::WriteHandle> write_handle;
+
+  /// Condition variable signalled when a write for this page completes. Protected by
+  /// client->lock_.
+  ConditionVariable write_complete_cv_;
+
+  /// This lock must be held when accessing 'buffer' if the page is unpinned and not
+  /// evicted (i.e. it is safe to access 'buffer' if the page is pinned or evicted).
+  SpinLock buffer_lock;
+
+  /// Buffer with the page's contents. Closed only iff page is evicted. Open otherwise.
+  BufferHandle buffer;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index af84bbe..b3dbfa9 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -15,35 +15,44 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cstdlib>
+#include <string>
+#include <vector>
 #include <boost/bind.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/thread.hpp>
 #include <boost/unordered_map.hpp>
-#include <cstdlib>
-#include <string>
-#include <vector>
 
-#include "runtime/bufferpool/buffer-pool.h"
-#include "runtime/bufferpool/reservation-tracker.h"
+#include "codegen/llvm-codegen.h"
 #include "common/init.h"
 #include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-pool-internal.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
 #include "testutil/death-test-util.h"
 #include "testutil/gtest-util.h"
+#include "util/metrics.h"
 
 #include "common/names.h"
 
+DECLARE_bool(disk_spill_encryption);
+
 namespace impala {
 
 class BufferPoolTest : public ::testing::Test {
  public:
-  virtual void SetUp() {}
+  virtual void SetUp() { test_env_ = obj_pool_.Add(new TestEnv); }
 
   virtual void TearDown() {
     for (auto entry : query_reservations_) {
       ReservationTracker* tracker = entry.second;
       tracker->Close();
     }
-
+    for (TmpFileMgr::FileGroup* file_group : file_groups_) {
+      file_group->Close();
+    }
     global_reservations_.Close();
     obj_pool_.Clear();
   }
@@ -57,7 +66,8 @@ class BufferPoolTest : public ::testing::Test {
       int64_t initial_query_reservation, int64_t query_reservation_limit);
 
   /// Create and destroy a page multiple times.
-  void CreatePageLoop(BufferPool* pool, ReservationTracker* parent_tracker, int num_ops);
+  void CreatePageLoop(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
+      ReservationTracker* parent_tracker, int num_ops);
 
  protected:
   static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; }
@@ -76,10 +86,29 @@ class BufferPoolTest : public ::testing::Test {
     return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
   }
 
-  ObjectPool obj_pool_;
+  /// Create a new file group with the default configs.
+  TmpFileMgr::FileGroup* NewFileGroup() {
+    TmpFileMgr::FileGroup* file_group =
+        obj_pool_.Add(new TmpFileMgr::FileGroup(test_env_->tmp_file_mgr(),
+            test_env_->exec_env()->disk_io_mgr(), NewProfile(), TUniqueId()));
+    file_groups_.push_back(file_group);
+    return file_group;
+  }
 
+  // Helper to check if the page is evicted.
+  bool IsEvicted(BufferPool::PageHandle* page) {
+    lock_guard<SpinLock> pl(page->page_->buffer_lock);
+    return !page->page_->buffer.is_open();
+  }
+
+  ObjectPool obj_pool_;
   ReservationTracker global_reservations_;
 
+  TestEnv* test_env_; // Owned by 'obj_pool_'.
+
+  // The file groups created - closed at end of each test.
+  vector<TmpFileMgr::FileGroup*> file_groups_;
+
   // Map from query_id to the reservation tracker for that query. Reads and modifications
   // of the map are protected by query_reservations_lock_.
   unordered_map<int64_t, ReservationTracker*> query_reservations_;
@@ -93,7 +122,7 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi
   Status status;
 
   int clients_per_query = 32;
-  BufferPool::Client* clients[num_queries];
+  BufferPool::ClientHandle* clients[num_queries];
   ReservationTracker* client_reservations[num_queries];
 
   for (int i = 0; i < num_queries; ++i) {
@@ -110,7 +139,7 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi
         NULL, &global_reservations_, NULL, query_reservation_limit);
     EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation));
 
-    clients[i] = new BufferPool::Client[clients_per_query];
+    clients[i] = new BufferPool::ClientHandle[clients_per_query];
     client_reservations[i] = new ReservationTracker[clients_per_query];
 
     for (int j = 0; j < clients_per_query; ++j) {
@@ -125,7 +154,7 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi
           client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation));
       string name = Substitute("Client $0 for query $1", j, query_id);
       EXPECT_OK(pool->RegisterClient(
-          name, &client_reservations[i][j], NewProfile(), &clients[i][j]));
+          name, &client_reservations[i][j], NULL, NewProfile(), &clients[i][j]));
     }
 
     for (int j = 0; j < clients_per_query; ++j) {
@@ -163,9 +192,9 @@ TEST_F(BufferPoolTest, BasicRegistration) {
   RegisterQueriesAndClients(
       &pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit);
 
-  DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
-  DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
-  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  ASSERT_EQ(global_reservations_.GetUsedReservation(), 0);
+  ASSERT_EQ(global_reservations_.GetChildReservations(), 0);
+  ASSERT_EQ(global_reservations_.GetReservation(), 0);
   global_reservations_.Close();
 }
 
@@ -192,8 +221,8 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) {
   workers.join_all();
 
   // All the reservations should be released at this point.
-  DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
-  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  ASSERT_EQ(global_reservations_.GetUsedReservation(), 0);
+  ASSERT_EQ(global_reservations_.GetReservation(), 0);
   global_reservations_.Close();
 }
 
@@ -208,8 +237,9 @@ TEST_F(BufferPoolTest, PageCreation) {
   ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
   client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
   ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem));
-  BufferPool::Client client;
-  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client));
+  BufferPool::ClientHandle client;
+  ASSERT_OK(
+      pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), &client));
 
   vector<BufferPool::PageHandle> handles(num_pages);
 
@@ -226,7 +256,7 @@ TEST_F(BufferPoolTest, PageCreation) {
     ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
     ASSERT_EQ(handles[i].len(), page_len);
     ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
-    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + page_len);
+    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before + page_len);
   }
 
   // Close the handles and check memory consumption.
@@ -234,14 +264,14 @@ TEST_F(BufferPoolTest, PageCreation) {
     int64_t used_before = client_tracker->GetUsedReservation();
     int page_len = handles[i].len();
     pool.DestroyPage(&client, &handles[i]);
-    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - page_len);
+    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before - page_len);
   }
 
   pool.DeregisterClient(&client);
   client_tracker->Close();
 
   // All the reservations should be released at this point.
-  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  ASSERT_EQ(global_reservations_.GetReservation(), 0);
   global_reservations_.Close();
 }
 
@@ -255,8 +285,9 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
   client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
   ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem));
-  BufferPool::Client client;
-  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client));
+  BufferPool::ClientHandle client;
+  ASSERT_OK(
+      pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), &client));
 
   vector<BufferPool::BufferHandle> handles(num_buffers);
 
@@ -269,7 +300,7 @@ TEST_F(BufferPoolTest, BufferAllocation) {
     ASSERT_TRUE(handles[i].is_open());
     ASSERT_TRUE(handles[i].data() != NULL);
     ASSERT_EQ(handles[i].len(), buffer_len);
-    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len);
+    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len);
   }
 
   // Close the handles and check memory consumption.
@@ -277,14 +308,14 @@ TEST_F(BufferPoolTest, BufferAllocation) {
     int64_t used_before = client_tracker->GetUsedReservation();
     int buffer_len = handles[i].len();
     pool.FreeBuffer(&client, &handles[i]);
-    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len);
+    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len);
   }
 
   pool.DeregisterClient(&client);
   client_tracker->Close();
 
   // All the reservations should be released at this point.
-  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  ASSERT_EQ(global_reservations_.GetReservation(), 0);
   global_reservations_.Close();
 }
 
@@ -296,14 +327,14 @@ TEST_F(BufferPoolTest, BufferTransfer) {
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
   ReservationTracker client_trackers[num_clients];
-  BufferPool::Client clients[num_clients];
+  BufferPool::ClientHandle clients[num_clients];
   BufferPool::BufferHandle handles[num_clients];
   for (int i = 0; i < num_clients; ++i) {
     client_trackers[i].InitChildTracker(
         NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
     ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
     ASSERT_OK(pool.RegisterClient(
-        "test client", &client_trackers[i], NewProfile(), &clients[i]));
+        "test client", &client_trackers[i], NULL, NewProfile(), &clients[i]));
   }
 
   // Transfer the page around between the clients repeatedly in a circle.
@@ -329,7 +360,7 @@ TEST_F(BufferPoolTest, BufferTransfer) {
     pool.DeregisterClient(&clients[i]);
     client_trackers[i].Close();
   }
-  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  ASSERT_EQ(global_reservations_.GetReservation(), 0);
   global_reservations_.Close();
 }
 
@@ -344,8 +375,9 @@ TEST_F(BufferPoolTest, Pin) {
   client_tracker->InitChildTracker(
       NewProfile(), &global_reservations_, NULL, child_reservation);
   ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
-  BufferPool::Client client;
-  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client));
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool.RegisterClient(
+      "test client", client_tracker, NewFileGroup(), NewProfile(), &client));
 
   BufferPool::PageHandle handle1, handle2;
 
@@ -395,8 +427,9 @@ TEST_F(BufferPoolTest, PinWithoutReservation) {
   ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
   client_tracker->InitChildTracker(
       NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
-  BufferPool::Client client;
-  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client));
+  BufferPool::ClientHandle client;
+  ASSERT_OK(
+      pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), &client));
 
   BufferPool::PageHandle handle;
   IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), "");
@@ -423,8 +456,9 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
   client_tracker->InitChildTracker(
       NewProfile(), &global_reservations_, NULL, child_reservation);
   ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
-  BufferPool::Client client;
-  ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client));
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool.RegisterClient(
+      "test client", client_tracker, NewFileGroup(), NewProfile(), &client));
 
   BufferPool::PageHandle page;
   BufferPool::BufferHandle buffer;
@@ -470,18 +504,21 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
 // Test concurrent creation and destruction of pages.
 TEST_F(BufferPoolTest, ConcurrentPageCreation) {
   int ops_per_thread = 1024;
-  int num_threads = 64;
+  // int num_threads = 64;
+  int num_threads = 1;
   // Need enough buffers for all initial reservations.
   int total_mem = num_threads * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, total_mem);
 
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  // Share a file group between the threads.
+  TmpFileMgr::FileGroup* file_group = NewFileGroup();
 
   // Launch threads, each with a different set of query IDs.
   thread_group workers;
   for (int i = 0; i < num_threads; ++i) {
     workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, &pool,
-        &global_reservations_, ops_per_thread)));
+        file_group, &global_reservations_, ops_per_thread)));
   }
 
   // Build debug string to test concurrent iteration over pages_ list.
@@ -491,16 +528,17 @@ TEST_F(BufferPoolTest, ConcurrentPageCreation) {
   workers.join_all();
 
   // All the reservations should be released at this point.
-  DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
+  ASSERT_EQ(global_reservations_.GetChildReservations(), 0);
   global_reservations_.Close();
 }
 
-void BufferPoolTest::CreatePageLoop(
-    BufferPool* pool, ReservationTracker* parent_tracker, int num_ops) {
+void BufferPoolTest::CreatePageLoop(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
+    ReservationTracker* parent_tracker, int num_ops) {
   ReservationTracker client_tracker;
   client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, TEST_BUFFER_LEN);
-  BufferPool::Client client;
-  ASSERT_OK(pool->RegisterClient("test client", &client_tracker, NewProfile(), &client));
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool->RegisterClient(
+      "test client", &client_tracker, file_group, NewProfile(), &client));
   for (int i = 0; i < num_ops; ++i) {
     BufferPool::PageHandle handle;
     ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN));
@@ -514,41 +552,141 @@ void BufferPoolTest::CreatePageLoop(
   client_tracker.Close();
 }
 
-/// Test error path where pool is unable to fulfill a reservation because it cannot evict
-/// unpinned pages.
-TEST_F(BufferPoolTest, CapacityExhausted) {
-  global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
-  // TODO: once we enable spilling, set up buffer pool so that spilling is disabled.
-  // Set up pool with one more buffer than reservations (so that we hit the reservation
-  // limit instead of the buffer limit).
-  BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN * 2);
+/// Test that DCHECK fires when trying to unpin a page with spilling disabled.
+TEST_F(BufferPoolTest, SpillingDisabledDcheck) {
+  global_reservations_.InitRootTracker(NULL, 2 * TEST_BUFFER_LEN);
+  BufferPool pool(TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN);
+  BufferPool::PageHandle handle;
 
-  BufferPool::PageHandle handle1, handle2, handle3;
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool.RegisterClient(
+      "test client", &global_reservations_, NULL, NewProfile(), &client));
+  ASSERT_TRUE(global_reservations_.IncreaseReservation(2 * TEST_BUFFER_LEN));
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
 
-  BufferPool::Client client;
-  ASSERT_OK(
-      pool.RegisterClient("test client", &global_reservations_, NewProfile(), &client));
+  ASSERT_OK(pool.Pin(&client, &handle));
+  // It's ok to Unpin() if the pin count remains positive.
+  pool.Unpin(&client, &handle);
+  // We didn't pass in a FileGroup, so spilling is disabled and we can't bring the
+  // pin count to 0.
+  IMPALA_ASSERT_DEBUG_DEATH(pool.Unpin(&client, &handle), "");
+
+  pool.DestroyPage(&client, &handle);
+  pool.DeregisterClient(&client);
+}
+
+/// Test simple case where pool must evict a page from the same client to fit another.
+TEST_F(BufferPoolTest, EvictPageSameClient) {
+  global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
+  BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN);
+  BufferPool::PageHandle handle1, handle2;
+
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool.RegisterClient(
+      "test client", &global_reservations_, NewFileGroup(), NewProfile(), &client));
   ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN));
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
 
   // Do not have enough reservations because we pinned the page.
   IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2), "");
 
-  // Even with reservations, we can only create one more unpinned page because we don't
-  // support eviction yet.
+  // We should be able to create a new page after unpinned and evicting the first one.
   pool.Unpin(&client, &handle1);
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
-  pool.Unpin(&client, &handle2);
-  ASSERT_FALSE(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3).ok());
 
-  // After destroying a page, we should have a free buffer that we can use.
   pool.DestroyPage(&client, &handle1);
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3));
+  pool.DestroyPage(&client, &handle2);
+  pool.DeregisterClient(&client);
+}
+
+/// Test simple case where pool must evict pages of different sizes.
+TEST_F(BufferPoolTest, EvictPageDifferentSizes) {
+  const int64_t TOTAL_BYTES = 2 * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
+  BufferPool::PageHandle handle1, handle2;
+
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool.RegisterClient(
+      "test client", &global_reservations_, NewFileGroup(), NewProfile(), &client));
+  ASSERT_TRUE(global_reservations_.IncreaseReservation(2 * TEST_BUFFER_LEN));
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+  pool.Unpin(&client, &handle1);
 
+  // We must evict the small page to fit the large one.
+  ASSERT_OK(pool.CreatePage(&client, 2 * TEST_BUFFER_LEN, &handle2));
+  ASSERT_TRUE(IsEvicted(&handle1));
+
+  // We must evict the large page to fit the small one.
+  pool.Unpin(&client, &handle2);
+  ASSERT_OK(pool.Pin(&client, &handle1));
+  ASSERT_TRUE(IsEvicted(&handle2));
+
+  pool.DestroyPage(&client, &handle1);
   pool.DestroyPage(&client, &handle2);
-  pool.DestroyPage(&client, &handle3);
   pool.DeregisterClient(&client);
 }
+
+/// Test simple case where pool must evict a page from a one client to fit another one in
+/// memory.
+TEST_F(BufferPoolTest, EvictPageDifferentClient) {
+  const int NUM_CLIENTS = 2;
+  const int64_t TOTAL_BYTES = NUM_CLIENTS * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
+
+  ReservationTracker client_reservations[NUM_CLIENTS];
+  BufferPool::ClientHandle clients[NUM_CLIENTS];
+  for (int i = 0; i < NUM_CLIENTS; ++i) {
+    client_reservations[i].InitChildTracker(
+        NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
+    ASSERT_TRUE(client_reservations[i].IncreaseReservation(TEST_BUFFER_LEN));
+    ASSERT_OK(pool.RegisterClient(Substitute("test client $0", i),
+        &client_reservations[i], NewFileGroup(), NewProfile(), &clients[i]));
+  }
+
+  // Create a pinned and unpinned page for the first client.
+  BufferPool::PageHandle handle1, handle2;
+  ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle1));
+  const uint8_t TEST_VAL = 123;
+  memset(handle1.data(), TEST_VAL, handle1.len()); // Fill page with an arbitrary value.
+  pool.Unpin(&clients[0], &handle1);
+  ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle2));
+
+  // Allocating a buffer for the second client requires evicting the unpinned page.
+  BufferPool::BufferHandle buffer;
+  ASSERT_OK(pool.AllocateBuffer(&clients[1], TEST_BUFFER_LEN, &buffer));
+  ASSERT_TRUE(IsEvicted(&handle1));
+
+  // Test reading back the first page, which requires swapping buffers again.
+  pool.Unpin(&clients[0], &handle2);
+  ASSERT_OK(pool.Pin(&clients[0], &handle1));
+  ASSERT_TRUE(IsEvicted(&handle2));
+  for (int i = 0; i < handle1.len(); ++i) EXPECT_EQ(TEST_VAL, handle1.data()[i]) << i;
+
+  // Clean up everything.
+  pool.DestroyPage(&clients[0], &handle1);
+  pool.DestroyPage(&clients[0], &handle2);
+  pool.FreeBuffer(&clients[1], &buffer);
+  for (int i = 0; i < NUM_CLIENTS; ++i) {
+    pool.DeregisterClient(&clients[i]);
+    client_reservations[i].Close();
+  }
+}
 }
 
-IMPALA_TEST_MAIN();
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  impala::LlvmCodeGen::InitializeLlvm();
+  int result = 0;
+  for (bool encryption : {false, true}) {
+    FLAGS_disk_spill_encryption = encryption;
+    std::cerr << "+==================================================" << std::endl
+              << "| Running tests with encryption=" << encryption << std::endl
+              << "+==================================================" << std::endl;
+    if (RUN_ALL_TESTS() != 0) result = 1;
+  }
+  return result;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/253ea712/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 1960bb0..863effc 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -15,80 +15,33 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/buffer-pool-internal.h"
 
-#include <boost/bind.hpp>
 #include <limits>
 #include <sstream>
+#include <boost/bind.hpp>
 
-#include "runtime/bufferpool/reservation-tracker.h"
 #include "common/names.h"
 #include "gutil/strings/substitute.h"
+#include "runtime/bufferpool/buffer-allocator.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "util/bit-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/uid-util.h"
 
-namespace impala {
-
-/// The internal representation of a page, which can be pinned or unpinned. If the
-/// page is pinned, a buffer is associated with the page.
-///
-/// Code manipulating the page is responsible for acquiring 'lock' when reading or
-/// modifying the page.
-struct BufferPool::Page : public BufferPool::PageList::Node {
-  Page(int64_t len) : len(len), pin_count(0), dirty(false) {}
-
-  /// Increment the pin count. Caller must hold 'lock'.
-  void IncrementPinCount(PageHandle* handle) {
-    lock.DCheckLocked();
-    ++pin_count;
-    // Pinned page buffers may be modified by anyone with a pointer to the buffer, so we
-    // have to assume they are dirty.
-    dirty = true;
-  }
-
-  /// Decrement the pin count. Caller must hold 'lock'.
-  void DecrementPinCount(PageHandle* handle) {
-    lock.DCheckLocked();
-    DCHECK(pin_count > 0);
-    --pin_count;
-  }
-
-  string DebugString() {
-    return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3 dirty: $4", this,
-        len, pin_count, buffer.DebugString(), dirty);
-  }
-
-  // Helper for BufferPool::DebugString().
-  static bool DebugStringCallback(stringstream* ss, BufferPool::Page* page) {
-    lock_guard<SpinLock> pl(page->lock);
-    (*ss) << page->DebugString() << "\n";
-    return true;
-  }
-
-  /// The length of the page in bytes.
-  const int64_t len;
-
-  /// Lock to protect the below members of Page. The lock must be held when modifying any
-  /// of the below members and when reading any of the below members of an unpinned page.
-  SpinLock lock;
-
-  /// The pin count of the page.
-  int pin_count;
-
-  /// Buffer with the page's contents, Always open if pinned. Closed if page is unpinned
-  /// and was evicted from memory.
-  BufferHandle buffer;
+DEFINE_int32(concurrent_scratch_ios_per_device, 2,
+    "Set this to influence the number of concurrent write I/Os issues to write data to "
+    "scratch files. This is multiplied by the number of active scratch directories to "
+    "obtain the target number of scratch write I/Os per query.");
 
-  /// True if the buffer's contents need to be saved before evicting it from memory.
-  bool dirty;
-};
+namespace impala {
 
 BufferPool::BufferHandle::BufferHandle() {
   Reset();
 }
 
 BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) {
+  Reset();
   *this = std::move(src);
 }
 
@@ -102,7 +55,8 @@ BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(BufferHandle&& src
   return *this;
 }
 
-void BufferPool::BufferHandle::Open(const Client* client, uint8_t* data, int64_t len) {
+void BufferPool::BufferHandle::Open(
+    const ClientHandle* client, uint8_t* data, int64_t len) {
   client_ = client;
   data_ = data;
   len_ = len;
@@ -119,6 +73,7 @@ BufferPool::PageHandle::PageHandle() {
 }
 
 BufferPool::PageHandle::PageHandle(PageHandle&& src) {
+  Reset();
   *this = std::move(src);
 }
 
@@ -131,9 +86,8 @@ BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) {
   return *this;
 }
 
-void BufferPool::PageHandle::Open(Page* page, Client* client) {
+void BufferPool::PageHandle::Open(Page* page, ClientHandle* client) {
   DCHECK(!is_open());
-  page->lock.DCheckLocked();
   page_ = page;
   client_ = client;
 }
@@ -152,8 +106,7 @@ int BufferPool::PageHandle::pin_count() const {
 
 int64_t BufferPool::PageHandle::len() const {
   DCHECK(is_open());
-  // The length of the page cannot change, so it is safe to access without locking.
-  return page_->len;
+  return page_->len; // Does not require locking.
 }
 
 const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const {
@@ -173,191 +126,136 @@ BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
 }
 
 BufferPool::~BufferPool() {
-  DCHECK(pages_.empty());
+  DCHECK_EQ(0, clean_pages_.size());
 }
 
-Status BufferPool::RegisterClient(
-    const string& name, ReservationTracker* reservation, RuntimeProfile* profile,
-    Client* client) {
+Status BufferPool::RegisterClient(const string& name, ReservationTracker* reservation,
+    TmpFileMgr::FileGroup* file_group, RuntimeProfile* profile, ClientHandle* client) {
   DCHECK(!client->is_registered());
   DCHECK(reservation != NULL);
-  client->InitCounters(profile);
   client->reservation_ = reservation;
-  client->name_ = name;
+  client->impl_ = new Client(this, file_group, name, profile);
   return Status::OK();
 }
 
-void BufferPool::Client::InitCounters(RuntimeProfile* profile) {
-  counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime");
-  counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadWaitTime");
-  counters_.write_wait_time = ADD_TIMER(profile, "BufferPoolWriteWaitTime");
-  counters_.peak_unpinned_bytes =
-      profile->AddHighWaterMarkCounter("BufferPoolPeakUnpinnedBytes", TUnit::BYTES);
-  counters_.total_unpinned_bytes =
-      ADD_COUNTER(profile, "BufferPoolTotalUnpinnedBytes", TUnit::BYTES);
-}
-
-void BufferPool::DeregisterClient(Client* client) {
+void BufferPool::DeregisterClient(ClientHandle* client) {
   if (!client->is_registered()) return;
-  client->reservation_->Close();
-  client->name_.clear();
+  client->reservation_->Close(); // Will DCHECK if any remaining buffers or pinned pages.
   client->reservation_ = NULL;
+  delete client->impl_; // Will DCHECK if there are any remaining pages.
+  client->impl_ = NULL;
 }
 
-Status BufferPool::CreatePage(Client* client, int64_t len, PageHandle* handle) {
+Status BufferPool::CreatePage(ClientHandle* client, int64_t len, PageHandle* handle) {
   DCHECK(!handle->is_open());
   DCHECK_GE(len, min_buffer_len_);
   DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
 
   BufferHandle buffer;
   // No changes have been made to state yet, so we can cleanly return on error.
-  RETURN_IF_ERROR(AllocateBufferInternal(client, len, &buffer));
-
-  Page* page = new Page(len);
-  {
-    lock_guard<SpinLock> pl(page->lock);
-    page->buffer = std::move(buffer);
-    handle->Open(page, client);
-    page->IncrementPinCount(handle);
-  }
+  RETURN_IF_ERROR(AllocateBuffer(client, len, &buffer));
 
-  // Only add to globally-visible list after page is initialized. The page lock also
-  // needs to be released before enqueueing to respect the lock ordering.
-  pages_.Enqueue(page);
-
-  client->reservation_->AllocateFrom(len);
+  Page* page = new Page(client->impl_, len);
+  page->buffer = std::move(buffer);
+  handle->Open(page, client);
+  page->pin_count++;
+  client->impl_->AddNewPinnedPage(page);
   return Status::OK();
 }
 
-void BufferPool::DestroyPage(Client* client, PageHandle* handle) {
+void BufferPool::DestroyPage(ClientHandle* client, PageHandle* handle) {
   if (!handle->is_open()) return; // DestroyPage() should be idempotent.
 
-  Page* page = handle->page_;
   if (handle->is_pinned()) {
     // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work
-    // of cleaning up the page and freeing the buffer.
+    // of cleaning up the page, freeing the buffer and updating reservations correctly.
     BufferHandle buffer;
     ExtractBuffer(client, handle, &buffer);
     FreeBuffer(client, &buffer);
-    return;
-  }
-
-  {
-    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
-    // In the unpinned case, no reservation is consumed, so just free the buffer.
-    // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
-    if (page->buffer.is_open()) FreeBufferInternal(&page->buffer);
+  } else {
+    // In the unpinned case, no reservations are used so we just clean up the page.
+    client->impl_->DestroyPageInternal(handle);
   }
-  CleanUpPage(handle);
 }
 
-void BufferPool::CleanUpPage(PageHandle* handle) {
-  // Remove the destroyed page from data structures in a way that ensures no other
-  // threads have a remaining reference. Threads that access pages via the 'pages_'
-  // list hold 'pages_.lock_', so Remove() will not return until those threads are done
-  // and it is safe to delete page.
-  pages_.Remove(handle->page_);
-  delete handle->page_;
-  handle->Reset();
-}
-
-Status BufferPool::Pin(Client* client, PageHandle* handle) {
+Status BufferPool::Pin(ClientHandle* client, PageHandle* handle) {
   DCHECK(client->is_registered());
   DCHECK(handle->is_open());
   DCHECK_EQ(handle->client_, client);
 
   Page* page = handle->page_;
-  {
-    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
-    if (page->pin_count == 0)  {
-      if (!page->buffer.is_open()) {
-        // No changes have been made to state yet, so we can cleanly return on error.
-        RETURN_IF_ERROR(AllocateBufferInternal(client, page->len, &page->buffer));
-
-        // TODO: will need to initiate/wait for read if the page is not in-memory.
-      }
-      COUNTER_ADD(client->counters_.peak_unpinned_bytes, -handle->len());
-    }
-    page->IncrementPinCount(handle);
+  if (page->pin_count == 0) {
+    RETURN_IF_ERROR(client->impl_->MoveToPinned(client, handle));
+    COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, -page->len);
   }
-
+  // Update accounting last to avoid complicating the error return path above.
+  ++page->pin_count;
   client->reservation_->AllocateFrom(page->len);
   return Status::OK();
 }
 
-void BufferPool::Unpin(Client* client, PageHandle* handle) {
+void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) {
   DCHECK(handle->is_open());
-  lock_guard<SpinLock> pl(handle->page_->lock);
-  UnpinLocked(client, handle);
-}
-
-void BufferPool::UnpinLocked(Client* client, PageHandle* handle) {
   DCHECK(client->is_registered());
   DCHECK_EQ(handle->client_, client);
   // If handle is pinned, we can assume that the page itself is pinned.
   DCHECK(handle->is_pinned());
   Page* page = handle->page_;
-  page->lock.DCheckLocked();
-
-  page->DecrementPinCount(handle);
   client->reservation_->ReleaseTo(page->len);
 
-  COUNTER_ADD(client->counters_.total_unpinned_bytes, handle->len());
-  COUNTER_ADD(client->counters_.peak_unpinned_bytes, handle->len());
-
-  // TODO: can evict now. Only need to preserve contents if 'page->dirty' is true.
+  if (--page->pin_count > 0) return;
+  client->impl_->MoveToDirtyUnpinned(client->reservation_->GetUnusedReservation(), page);
+  COUNTER_ADD(client->impl_->counters().total_unpinned_bytes, handle->len());
+  COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, handle->len());
 }
 
 void BufferPool::ExtractBuffer(
-    Client* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
+    ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
   DCHECK(page_handle->is_pinned());
-
+  DCHECK(!buffer_handle->is_open());
   DCHECK_EQ(page_handle->client_, client);
 
-  Page* page = page_handle->page_;
-  {
-    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
-    // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
+  // Bring the pin count to 1 so that we're not using surplus reservations.
+  while (page_handle->pin_count() > 1) Unpin(client, page_handle);
 
-    // Bring the pin count to 1 so that we're not using surplus reservations.
-    while (page->pin_count > 1) UnpinLocked(client, page_handle);
-    *buffer_handle = std::move(page->buffer);
-  }
-  CleanUpPage(page_handle);
+  // Destroy the page and extract the buffer.
+  client->impl_->DestroyPageInternal(page_handle, buffer_handle);
+  DCHECK(buffer_handle->is_open());
 }
 
-Status BufferPool::AllocateBuffer(Client* client, int64_t len, BufferHandle* handle) {
+Status BufferPool::AllocateBuffer(
+    ClientHandle* client, int64_t len, BufferHandle* handle) {
+  RETURN_IF_ERROR(client->impl_->CleanPagesBeforeAllocation(client->reservation_, len));
   client->reservation_->AllocateFrom(len);
   return AllocateBufferInternal(client, len, handle);
 }
 
 Status BufferPool::AllocateBufferInternal(
-    Client* client, int64_t len, BufferHandle* buffer) {
+    ClientHandle* client, int64_t len, BufferHandle* buffer) {
   DCHECK(!buffer->is_open());
   DCHECK_GE(len, min_buffer_len_);
   DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
-  SCOPED_TIMER(client->counters_.get_buffer_time);
+  SCOPED_TIMER(client->impl_->counters().get_buffer_time);
 
   // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a new buffer.
-  if (TryDecreaseBufferBytesRemaining(len)) {
-    uint8_t* data;
-    Status status = allocator_->Allocate(len, &data);
-    if (!status.ok()) {
-      buffer_bytes_remaining_.Add(len);
-      return status;
-    }
-    DCHECK(data != NULL);
-    buffer->Open(client, data, len);
-    return Status::OK();
+  int64_t delta = DecreaseBufferBytesRemaining(len);
+  if (delta < len) {
+    // We must evict some pages to free memory before allocating.
+    int64_t to_evict = len - delta;
+    RETURN_IF_ERROR(EvictCleanPages(to_evict));
   }
-
-  // If there is no remaining capacity, we must evict another page.
-  return Status(TErrorCode::NOT_IMPLEMENTED_ERROR,
-      Substitute("Buffer bytes limit $0 of buffer pool is exhausted and page eviction is "
-                 "not implemented yet!", buffer_bytes_limit_));
+  uint8_t* data;
+  Status status = allocator_->Allocate(len, &data);
+  if (!status.ok()) {
+    buffer_bytes_remaining_.Add(len);
+    return status;
+  }
+  DCHECK(data != NULL);
+  buffer->Open(client, data, len);
+  return Status::OK();
 }
 
-void BufferPool::FreeBuffer(Client* client, BufferHandle* handle) {
+void BufferPool::FreeBuffer(ClientHandle* client, BufferHandle* handle) {
   if (!handle->is_open()) return; // Should be idempotent.
   DCHECK_EQ(client, handle->client_);
   client->reservation_->ReleaseTo(handle->len_);
@@ -371,8 +269,8 @@ void BufferPool::FreeBufferInternal(BufferHandle* handle) {
   handle->Reset();
 }
 
-Status BufferPool::TransferBuffer(
-    Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* dst) {
+Status BufferPool::TransferBuffer(ClientHandle* src_client, BufferHandle* src,
+    ClientHandle* dst_client, BufferHandle* dst) {
   DCHECK(src->is_open());
   DCHECK(!dst->is_open());
   DCHECK_EQ(src_client, src->client_);
@@ -386,43 +284,387 @@ Status BufferPool::TransferBuffer(
   return Status::OK();
 }
 
-bool BufferPool::TryDecreaseBufferBytesRemaining(int64_t len) {
+int64_t BufferPool::DecreaseBufferBytesRemaining(int64_t max_decrease) {
   // TODO: we may want to change this policy so that we don't always use up to the limit
   // for buffers, since this may starve other operators using non-buffer-pool memory.
   while (true) {
     int64_t old_value = buffer_bytes_remaining_.Load();
-    if (old_value < len) return false;
-    int64_t new_value = old_value - len;
+    int64_t decrease = min(old_value, max_decrease);
+    int64_t new_value = old_value - decrease;
     if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
-      return true;
+      return decrease;
+    }
+  }
+}
+
+void BufferPool::AddCleanPage(const unique_lock<mutex>& client_lock, Page* page) {
+  page->client->DCheckHoldsLock(client_lock);
+  lock_guard<SpinLock> cpl(clean_pages_lock_);
+  clean_pages_.Enqueue(page);
+}
+
+bool BufferPool::RemoveCleanPage(const unique_lock<mutex>& client_lock, Page* page) {
+  page->client->DCheckHoldsLock(client_lock);
+  lock_guard<SpinLock> cpl(clean_pages_lock_);
+  bool found = clean_pages_.Contains(page);
+  if (found) clean_pages_.Remove(page);
+  return found;
+}
+
+Status BufferPool::EvictCleanPages(int64_t bytes_to_evict) {
+  DCHECK_GE(bytes_to_evict, 0);
+  vector<BufferHandle> buffers;
+  int64_t bytes_found = 0;
+  {
+    lock_guard<SpinLock> cpl(clean_pages_lock_);
+    while (bytes_found < bytes_to_evict) {
+      Page* page = clean_pages_.Dequeue();
+      if (page == NULL) break;
+      lock_guard<SpinLock> pl(page->buffer_lock);
+      bytes_found += page->len;
+      buffers.emplace_back(move(page->buffer));
+    }
+  }
+
+  // Free buffers after releasing all the locks. Do this regardless of success to avoid
+  // leaking buffers.
+  for (BufferHandle& buffer : buffers) {
+    allocator_->Free(buffer.data(), buffer.len());
+    buffer.Reset();
+  }
+  if (bytes_found < bytes_to_evict) {
+    // The buffer pool should not be overcommitted so this should only happen if there
+    // is an accounting error. Add any freed buffers back to 'buffer_bytes_remaining_'
+    // to restore consistency.
+    buffer_bytes_remaining_.Add(bytes_found);
+    return Status(TErrorCode::INTERNAL_ERROR,
+        Substitute("Tried to evict $0 bytes but only $1 bytes of clean pages:\n$2",
+                      bytes_to_evict, bytes_found, DebugString()));
+  }
+  // Update 'buffer_bytes_remaining_' with any excess.
+  if (bytes_found > bytes_to_evict) {
+    buffer_bytes_remaining_.Add(bytes_found - bytes_to_evict);
+  }
+  return Status::OK();
+}
+
+BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
+    const string& name, RuntimeProfile* profile)
+  : pool_(pool),
+    file_group_(file_group),
+    name_(name),
+    num_pages_(0),
+    dirty_unpinned_bytes_(0),
+    in_flight_write_bytes_(0) {
+  counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime");
+  counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadIoWaitTime");
+  counters_.read_io_ops = ADD_COUNTER(profile, "BufferPoolReadIoOps", TUnit::UNIT);
+  counters_.bytes_read = ADD_COUNTER(profile, "BufferPoolReadIoBytes", TUnit::BYTES);
+  counters_.write_wait_time = ADD_TIMER(profile, "BufferPoolWriteIoWaitTime");
+  counters_.write_io_ops = ADD_COUNTER(profile, "BufferPoolWriteIoOps", TUnit::UNIT);
+  counters_.bytes_written = ADD_COUNTER(profile, "BufferPoolWriteIoBytes", TUnit::BYTES);
+  counters_.peak_unpinned_bytes =
+      profile->AddHighWaterMarkCounter("BufferPoolPeakUnpinnedBytes", TUnit::BYTES);
+  counters_.total_unpinned_bytes =
+      ADD_COUNTER(profile, "BufferPoolTotalUnpinnedBytes", TUnit::BYTES);
+}
+
+void BufferPool::Client::AddNewPinnedPage(Page* page) {
+  DCHECK_GT(page->pin_count, 0);
+  boost::lock_guard<boost::mutex> lock(lock_);
+  pinned_pages_.Enqueue(page);
+  ++num_pages_;
+}
+
+void BufferPool::Client::DestroyPageInternal(
+    PageHandle* handle, BufferHandle* out_buffer) {
+  DCHECK(handle->is_pinned() || out_buffer == NULL);
+  Page* page = handle->page_;
+  // Remove the page from the list that it is currently present in (if any).
+  {
+    unique_lock<mutex> cl(lock_);
+    if (pinned_pages_.Contains(page)) {
+      pinned_pages_.Remove(page);
+    } else if (dirty_unpinned_pages_.Contains(page)) {
+      dirty_unpinned_pages_.Remove(page);
+      dirty_unpinned_bytes_ -= page->len;
+    } else {
+      // The page either has a write in flight, is clean, or is evicted.
+      // Let the write complete, if in flight.
+      WaitForWrite(&cl, page);
+      // If clean, remove it from the clean pages list. If evicted, this is a no-op.
+      pool_->RemoveCleanPage(cl, page);
+    }
+    DCHECK(!page->in_queue());
+    --num_pages_;
+  }
+
+  if (page->write_handle != NULL) {
+    // Discard any on-disk data.
+    file_group_->DestroyWriteHandle(move(page->write_handle));
+  }
+  if (out_buffer != NULL) {
+    DCHECK(page->buffer.is_open());
+    *out_buffer = std::move(page->buffer);
+  } else if (page->buffer.is_open()) {
+    pool_->FreeBufferInternal(&page->buffer);
+  }
+  delete page;
+  handle->Reset();
+}
+
+void BufferPool::Client::MoveToDirtyUnpinned(int64_t unused_reservation, Page* page) {
+  // Only valid to unpin pages if spilling is enabled.
+  DCHECK(spilling_enabled());
+  DCHECK_EQ(0, page->pin_count);
+  unique_lock<mutex> lock(lock_);
+  DCHECK(pinned_pages_.Contains(page));
+  pinned_pages_.Remove(page);
+  dirty_unpinned_pages_.Enqueue(page);
+  dirty_unpinned_bytes_ += page->len;
+
+  // Check if we should initiate writes for this (or another) dirty page.
+  WriteDirtyPagesAsync();
+}
+
+Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle) {
+  Page* page = handle->page_;
+  unique_lock<mutex> cl(lock_);
+  // Propagate any write errors that occurred for this client.
+  RETURN_IF_ERROR(write_status_);
+
+  // Check if the page is evicted first. This is not necessary for correctness, since
+  // we re-check this later, but by doing it upfront we avoid grabbing the global
+  // 'clean_pages_lock_' in the common case.
+  bool evicted;
+  {
+    lock_guard<SpinLock> pl(page->buffer_lock);
+    evicted = !page->buffer.is_open();
+  }
+  if (evicted) return MoveEvictedToPinned(&cl, client, handle);
+
+  if (dirty_unpinned_pages_.Contains(page)) {
+    // No writes were initiated for the page - just move it back to the pinned state.
+    dirty_unpinned_pages_.Remove(page);
+    pinned_pages_.Enqueue(page);
+    dirty_unpinned_bytes_ -= page->len;
+    return Status::OK();
+  }
+  if (in_flight_write_pages_.Contains(page)) {
+    // A write is in flight. If so, wait for it to complete - then we only have to
+    // handle the pinned and evicted cases.
+    WaitForWrite(&cl, page);
+    RETURN_IF_ERROR(write_status_); // The write may have set 'write_status_'.
+  }
+  if (pool_->RemoveCleanPage(cl, page)) {
+    // The clean page still has an associated buffer. Just clean up the write, restore
+    // the data, and move the page back to the pinned state.
+    pinned_pages_.Enqueue(page);
+    DCHECK(page->buffer.is_open());
+    DCHECK(page->write_handle != NULL);
+    // Don't need on-disk data.
+    cl.unlock(); // Don't block progress for other threads operating on other pages.
+    return file_group_->CancelWriteAndRestoreData(
+        move(page->write_handle), page->buffer.mem_range());
+  }
+  // If the page wasn't in the global clean pages list, it must have been evicted after
+  // the earlier 'evicted' check.
+  return MoveEvictedToPinned(&cl, client, handle);
+}
+
+Status BufferPool::Client::MoveEvictedToPinned(
+    unique_lock<mutex>* client_lock, ClientHandle* client, PageHandle* handle) {
+  Page* page = handle->page_;
+  DCHECK(!page->buffer.is_open());
+  RETURN_IF_ERROR(
+      CleanPagesBeforeAllocationLocked(client_lock, client->reservation_, page->len));
+
+  // Don't hold any locks while allocating or reading back the data. It is safe to modify
+  // the page's buffer handle without holding any locks because no concurrent operations
+  // can modify evicted pages.
+  client_lock->unlock();
+  BufferHandle buffer;
+  RETURN_IF_ERROR(pool_->AllocateBufferInternal(client, page->len, &page->buffer));
+  COUNTER_ADD(counters().bytes_read, page->len);
+  COUNTER_ADD(counters().read_io_ops, 1);
+  {
+    SCOPED_TIMER(counters().read_wait_time);
+    RETURN_IF_ERROR(
+        file_group_->Read(page->write_handle.get(), page->buffer.mem_range()));
+  }
+  file_group_->DestroyWriteHandle(move(page->write_handle));
+  client_lock->lock();
+  pinned_pages_.Enqueue(page);
+  return Status::OK();
+}
+
+Status BufferPool::Client::CleanPagesBeforeAllocation(
+    ReservationTracker* reservation, int64_t allocation_len) {
+  unique_lock<mutex> lock(lock_);
+  return CleanPagesBeforeAllocationLocked(&lock, reservation, allocation_len);
+}
+
+Status BufferPool::Client::CleanPagesBeforeAllocationLocked(
+    unique_lock<mutex>* client_lock, ReservationTracker* reservation,
+    int64_t allocation_len) {
+  DCheckHoldsLock(*client_lock);
+  int64_t unused_reservation = reservation->GetUnusedReservation();
+  DCHECK_LE(allocation_len, unused_reservation);
+  int64_t unused_reservation_after_alloc = unused_reservation - allocation_len;
+  // Start enough writes to ensure that the loop condition below will eventually become
+  // false (or a write error will be encountered).
+  int64_t min_in_flight_bytes = dirty_unpinned_bytes_ - unused_reservation_after_alloc;
+  WriteDirtyPagesAsync(max<int64_t>(0, min_in_flight_bytes - in_flight_write_bytes_));
+
+  // One of the writes we initiated, or an earlier in-flight write may have hit an error.
+  RETURN_IF_ERROR(write_status_);
+
+  // Wait until enough writes have finished that the allocation plus dirty pages won't
+  // exceed our reservation. I.e. so that other clients can immediately get the allocated
+  // memory they're entitled to without waiting for this client's write to complete.
+  DCHECK_GE(in_flight_write_bytes_, min_in_flight_bytes);
+  while (dirty_unpinned_bytes_ > unused_reservation_after_alloc) {
+    SCOPED_TIMER(counters().write_wait_time);
+    write_complete_cv_.Wait(*client_lock);
+    RETURN_IF_ERROR(write_status_); // Check if error occurred while waiting.
+  }
+  return Status::OK();
+}
+
+void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write) {
+  DCHECK_GE(min_bytes_to_write, 0);
+  DCheckConsistency();
+  if (file_group_ == NULL) {
+    // Spilling disabled - there should be no unpinned pages to write.
+    DCHECK_EQ(0, min_bytes_to_write);
+    DCHECK_EQ(0, dirty_unpinned_bytes_);
+    return;
+  }
+  // No point in starting writes if an error occurred because future operations for the
+  // client will fail regardless.
+  if (!write_status_.ok()) return;
+
+  const int64_t writeable_bytes = dirty_unpinned_bytes_ - in_flight_write_bytes_;
+  DCHECK_LE(min_bytes_to_write, writeable_bytes);
+  // Compute the ideal amount of writes to start. We use a simple heuristic based on the
+  // total number of writes. The FileGroup's allocation should spread the writes across
+  // disks somewhat, but doesn't guarantee we're fully using all available disks. In
+  // future we could track the # of writes per-disk.
+  const int64_t target_writes = FLAGS_concurrent_scratch_ios_per_device
+      * file_group_->tmp_file_mgr()->NumActiveTmpDevices();
+
+  int64_t bytes_written = 0;
+  while (bytes_written < writeable_bytes
+      && (bytes_written < min_bytes_to_write
+             || in_flight_write_pages_.size() < target_writes)) {
+    Page* page = dirty_unpinned_pages_.tail(); // LIFO.
+    DCHECK(page != NULL) << "Should have been enough dirty unpinned pages";
+    {
+      lock_guard<SpinLock> pl(page->buffer_lock);
+      DCHECK(file_group_ != NULL);
+      DCHECK(page->buffer.is_open());
+      COUNTER_ADD(counters().bytes_written, page->len);
+      COUNTER_ADD(counters().write_io_ops, 1);
+      Status status = file_group_->Write(page->buffer.mem_range(),
+          [this, page](const Status& write_status) {
+            WriteCompleteCallback(page, write_status);
+          },
+          &page->write_handle);
+      // Exit early on error: there is no point in starting more writes because future
+      /// operations for this client will fail regardless.
+      if (!status.ok()) {
+        write_status_.MergeStatus(status);
+        return;
+      }
     }
+    // Now that the write is in flight, update all the state
+    Page* tmp = dirty_unpinned_pages_.PopBack();
+    DCHECK_EQ(tmp, page);
+    in_flight_write_pages_.Enqueue(page);
+    bytes_written += page->len;
+    in_flight_write_bytes_ += page->len;
   }
 }
 
-string BufferPool::Client::DebugString() const {
+void BufferPool::Client::WriteCompleteCallback(Page* page, const Status& write_status) {
+  {
+    unique_lock<mutex> cl(lock_);
+    DCHECK(in_flight_write_pages_.Contains(page));
+    // The status should always be propagated.
+    // TODO: if we add cancellation support to TmpFileMgr, consider cancellation path.
+    if (!write_status.ok()) write_status_.MergeStatus(write_status);
+    in_flight_write_pages_.Remove(page);
+    // Move to clean pages list even if an error was encountered - the buffer can be
+    // repurposed by other clients and 'write_status_' must be checked by this client
+    // before reading back the bad data.
+    pool_->AddCleanPage(cl, page);
+    dirty_unpinned_bytes_ -= page->len;
+    in_flight_write_bytes_ -= page->len;
+    WriteDirtyPagesAsync(); // Start another asynchronous write if needed.
+  }
+  write_complete_cv_.NotifyAll();
+  page->write_complete_cv_.NotifyAll();
+}
+
+void BufferPool::Client::WaitForWrite(unique_lock<mutex>* client_lock, Page* page) {
+  DCheckHoldsLock(*client_lock);
+  while (in_flight_write_pages_.Contains(page)) {
+    SCOPED_TIMER(counters().write_wait_time);
+    page->write_complete_cv_.Wait(*client_lock);
+  }
+}
+
+string BufferPool::Client::DebugString() {
+  lock_guard<mutex> lock(lock_);
+  stringstream ss;
+  ss << Substitute("<BufferPool::Client> $0 name: $1 write_status: $2 num_pages: $3 "
+                   "dirty_unpinned_bytes: $4 in_flight_write_bytes: $5",
+      this, name_, write_status_.GetDetail(), num_pages_, dirty_unpinned_bytes_,
+      in_flight_write_bytes_);
+  ss << "\n  " << pinned_pages_.size() << " pinned pages: ";
+  pinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
+  ss << "\n  " << dirty_unpinned_pages_.size() << " dirty unpinned pages: ";
+  dirty_unpinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
+  ss << "\n  " << in_flight_write_pages_.size() << " in flight write pages: ";
+  in_flight_write_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
+  return ss.str();
+}
+
+string BufferPool::ClientHandle::DebugString() const {
   if (is_registered()) {
-    return Substitute("<BufferPool::Client> $0 name: $1 reservation: {$2}", this, name_,
-        reservation_->DebugString());
+    return Substitute("<BufferPool::Client> $0 reservation: {$1} "
+                      "internal state: {$2}",
+        this, reservation_->DebugString(), impl_->DebugString());
   } else {
-    return Substitute("<BufferPool::Client> $0 UNREGISTERED", this);
+    return Substitute("<BufferPool::ClientHandle> $0 UNREGISTERED", this);
   }
 }
 
 string BufferPool::PageHandle::DebugString() const {
   if (is_open()) {
-    lock_guard<SpinLock> pl(page_->lock);
-    return Substitute(
-        "<BufferPool::PageHandle> $0 client: {$1} page: {$2}",
-        this, client_->DebugString(), page_->DebugString());
+    lock_guard<SpinLock> pl(page_->buffer_lock);
+    return Substitute("<BufferPool::PageHandle> $0 client: $1/$2 page: {$3}", this,
+        client_, client_->impl_, page_->DebugString());
   } else {
     return Substitute("<BufferPool::PageHandle> $0 CLOSED", this);
   }
 }
 
+string BufferPool::Page::DebugString() {
+  return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3", this, len,
+      pin_count, buffer.DebugString());
+}
+
+bool BufferPool::Page::DebugStringCallback(stringstream* ss, BufferPool::Page* page) {
+  lock_guard<SpinLock> pl(page->buffer_lock);
+  (*ss) << page->DebugString() << "\n";
+  return true;
+}
+
 string BufferPool::BufferHandle::DebugString() const {
   if (is_open()) {
-    return Substitute("<BufferPool::BufferHandle> $0 client: {$1} data: $2 len: $3", this,
-        client_->DebugString(), data_, len_);
+    return Substitute("<BufferPool::BufferHandle> $0 client: $1/$2 data: $3 len: $4",
+        this, client_, client_->impl_, data_, len_);
   } else {
     return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this);
   }
@@ -432,8 +674,12 @@ string BufferPool::DebugString() {
   stringstream ss;
   ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
      << " buffer_bytes_limit: " << buffer_bytes_limit_
-     << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n";
-  pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
+     << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n"
+     << "  Clean pages: ";
+  {
+    lock_guard<SpinLock> cpl(clean_pages_lock_);
+    clean_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
+  }
   return ss.str();
 }
 }