You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/23 22:51:32 UTC

[05/15] impala git commit: IMPALA-4835: Part 3: switch I/O buffers to buffer pool

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index ba1ad92..3fd33de 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -19,6 +19,8 @@
 
 #include "runtime/io/disk-io-mgr-stress.h"
 
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/exec-env.h"
 #include "runtime/io/request-context.h"
 #include "util/time.h"
 
@@ -27,21 +29,20 @@
 using namespace impala;
 using namespace impala::io;
 
-static const float ABORT_CHANCE = .10f;
-static const int MIN_READ_LEN = 1;
-static const int MAX_READ_LEN = 20;
+constexpr float DiskIoMgrStress::ABORT_CHANCE;
+const int DiskIoMgrStress::MIN_READ_LEN;
+const int DiskIoMgrStress::MAX_READ_LEN;
 
-static const int MIN_FILE_LEN = 10;
-static const int MAX_FILE_LEN = 1024;
+const int DiskIoMgrStress::MIN_FILE_LEN;
+const int DiskIoMgrStress::MAX_FILE_LEN;
 
 // Make sure this is between MIN/MAX FILE_LEN to test more cases
-static const int MIN_READ_BUFFER_SIZE = 64;
-static const int MAX_READ_BUFFER_SIZE = 128;
+const int DiskIoMgrStress::MIN_READ_BUFFER_SIZE;
+const int DiskIoMgrStress::MAX_READ_BUFFER_SIZE;
 
-// Maximum bytes to allocate per scan range.
-static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3;
+const int DiskIoMgrStress::MAX_BUFFER_BYTES_PER_SCAN_RANGE;
 
-static const int CANCEL_READER_PERIOD_MS = 20;  // in ms
+const int DiskIoMgrStress::CANCEL_READER_PERIOD_MS;
 
 static void CreateTempFile(const char* filename, const char* data) {
   FILE* file = fopen(filename, "w");
@@ -50,7 +51,7 @@ static void CreateTempFile(const char* filename, const char* data) {
   fclose(file);
 }
 
-string GenerateRandomData() {
+string DiskIoMgrStress::GenerateRandomData() {
   int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN;
   stringstream ss;
   for (int i = 0; i < rand_len; ++i) {
@@ -62,6 +63,8 @@ string GenerateRandomData() {
 
 struct DiskIoMgrStress::Client {
   boost::mutex lock;
+  /// Pool for objects that is cleared when the client is (re-)initialized in NewClient().
+  ObjectPool obj_pool;
   unique_ptr<RequestContext> reader;
   int file_idx;
   vector<ScanRange*> scan_ranges;
@@ -95,6 +98,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
 
   clients_ = new Client[num_clients_];
   client_mem_trackers_.resize(num_clients_);
+  buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]);
   for (int i = 0; i < num_clients_; ++i) {
     NewClient(i);
   }
@@ -119,8 +123,8 @@ void DiskIoMgrStress::ClientThread(int client_id) {
       CHECK(status.ok() || status.IsCancelled());
       if (range == NULL) break;
       if (needs_buffers) {
-        status = io_mgr_->AllocateBuffersForRange(
-            client->reader.get(), range, MAX_BUFFER_BYTES_PER_SCAN_RANGE);
+        status = io_mgr_->AllocateBuffersForRange(client->reader.get(),
+            &buffer_pool_clients_[client_id], range, MAX_BUFFER_BYTES_PER_SCAN_RANGE);
         CHECK(status.ok()) << status.GetDetail();
       }
 
@@ -212,7 +216,13 @@ void DiskIoMgrStress::Run(int sec) {
   }
   readers_.join_all();
 
-  for (unique_ptr<MemTracker>& mem_tracker : client_mem_trackers_) mem_tracker->Close();
+  for (int i = 0; i < num_clients_; ++i) {
+    if (clients_[i].reader != nullptr) {
+      io_mgr_->UnregisterContext(clients_[i].reader.get());
+    }
+    ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
+    client_mem_trackers_[i]->Close();
+  }
   mem_tracker_.Close();
 }
 
@@ -234,26 +244,41 @@ void DiskIoMgrStress::NewClient(int i) {
     }
   }
 
-  for (int i = 0; i < client.scan_ranges.size(); ++i) {
-    delete client.scan_ranges[i];
-  }
+  // Clean up leftover state from the previous client (if any).
   client.scan_ranges.clear();
+  ExecEnv* exec_env = ExecEnv::GetInstance();
+  exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
+  if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
+  client.obj_pool.Clear();
 
   int assigned_len = 0;
   while (assigned_len < file_len) {
     int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
     range_len = min(range_len, file_len - assigned_len);
 
-    ScanRange* range = new ScanRange();
+    ScanRange* range = client.obj_pool.Add(new ScanRange);
     range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
         0, false, BufferOpts::Uncached());
     client.scan_ranges.push_back(range);
     assigned_len += range_len;
   }
 
-  if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
-  client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
-  client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
-  Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
+  string client_name = Substitute("Client $0", i);
+  client_mem_trackers_[i].reset(new MemTracker(-1, client_name, &mem_tracker_));
+  Status status = exec_env->buffer_pool()->RegisterClient(client_name, nullptr,
+      exec_env->buffer_reservation(), client_mem_trackers_[i].get(),
+      numeric_limits<int64_t>::max(), RuntimeProfile::Create(&client.obj_pool, client_name),
+      &buffer_pool_clients_[i]);
+  CHECK(status.ok());
+  // Reserve enough memory for 3 buffers per range, which should be enough to guarantee
+  // progress.
+  CHECK(buffer_pool_clients_[i].IncreaseReservationToFit(
+      MAX_BUFFER_BYTES_PER_SCAN_RANGE * client.scan_ranges.size()))
+      << buffer_pool_clients_[i].DebugString() << "\n"
+      << exec_env->buffer_pool()->DebugString() << "\n"
+      << exec_env->buffer_reservation()->DebugString();
+
+  client.reader = io_mgr_->RegisterContext();
+  status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
   CHECK(status.ok());
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h
index b872694..574b58c 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.h
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -22,8 +22,11 @@
 #include <memory>
 #include <vector>
 #include <boost/scoped_ptr.hpp>
+#include <boost/thread/condition_variable.hpp>
 #include <boost/thread/thread.hpp>
 
+#include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/thread-resource-mgr.h"
@@ -43,15 +46,29 @@ class DiskIoMgrStress {
   /// Run the test for 'sec'.  If 0, run forever
   void Run(int sec);
 
+  static constexpr float ABORT_CHANCE = .10f;
+  static const int MIN_READ_LEN = 1;
+  static const int MAX_READ_LEN = 20;
+
+  static const int MIN_FILE_LEN = 10;
+  static const int MAX_FILE_LEN = 1024;
+
+  // Make sure this is between MIN/MAX FILE_LEN to test more cases
+  static const int MIN_READ_BUFFER_SIZE = 64;
+  static const int MAX_READ_BUFFER_SIZE = 128;
+
+  // Maximum bytes to allocate per scan range.
+  static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3;
+
+  static const int CANCEL_READER_PERIOD_MS = 20;
  private:
   struct Client;
 
   struct File {
     std::string filename;
-    std::string data;  // the data in the file, used to validate
+    std::string data; // the data in the file, used to validate
   };
 
-
   /// Files used for testing.  These are created at startup and recycled
   /// during the test
   std::vector<File> files_;
@@ -72,6 +89,9 @@ class DiskIoMgrStress {
   /// Client MemTrackers, one per client.
   std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
 
+  /// Buffer pool clients, one per client.
+  std::unique_ptr<BufferPool::ClientHandle[]> buffer_pool_clients_;
+
   /// If true, tests cancelling readers
   bool includes_cancellation_;
 
@@ -88,6 +108,8 @@ class DiskIoMgrStress {
 
   /// Possibly cancels a random reader.
   void CancelRandomReader();
+
+  static std::string GenerateRandomData();
 };
 }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 95ea184..3a0f727 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -22,14 +22,16 @@
 
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/io/disk-io-mgr-stress.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/request-context.h"
-#include "runtime/mem-tracker.h"
 #include "runtime/test-env.h"
 #include "runtime/thread-resource-mgr.h"
 #include "service/fe-support.h"
 #include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
 #include "util/condition-variable.h"
 #include "util/cpu-info.h"
 #include "util/disk-info.h"
@@ -37,13 +39,20 @@
 
 #include "common/names.h"
 
+using std::mt19937;
+using std::uniform_int_distribution;
+using std::uniform_real_distribution;
+
+DECLARE_int64(min_buffer_size);
 DECLARE_int32(num_remote_hdfs_io_threads);
 DECLARE_int32(num_s3_io_threads);
 DECLARE_int32(num_adls_io_threads);
 
 const int MIN_BUFFER_SIZE = 128;
 const int MAX_BUFFER_SIZE = 1024;
-const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024;
+const int64_t LARGE_RESERVATION_LIMIT = 4L * 1024L * 1024L * 1024L;
+const int64_t LARGE_INITIAL_RESERVATION = 128L * 1024L * 1024L;
+const int64_t BUFFER_POOL_CAPACITY = LARGE_RESERVATION_LIMIT;
 
 namespace impala {
 namespace io {
@@ -53,15 +62,39 @@ class DiskIoMgrTest : public testing::Test {
 
   virtual void SetUp() {
     test_env_.reset(new TestEnv);
+    // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it.
+    test_env_->SetBufferPoolArgs(1, BUFFER_POOL_CAPACITY);
     ASSERT_OK(test_env_->Init());
+    RandTestUtil::SeedRng("DISK_IO_MGR_TEST_SEED", &rng_);
   }
 
   virtual void TearDown() {
+    root_reservation_.Close();
     pool_.Clear();
+    test_env_.reset();
+  }
+
+  /// Initialises 'root_reservation_'. The reservation is automatically closed in
+  /// TearDown().
+  void InitRootReservation(int64_t reservation_limit) {
+    root_reservation_.InitRootTracker(
+        RuntimeProfile::Create(&pool_, "root"), reservation_limit);
+  }
+
+  /// Initialise 'client' with the given reservation limit. The client reservation is a
+  /// child of 'root_reservation_'.
+  void RegisterBufferPoolClient(int64_t reservation_limit, int64_t initial_reservation,
+      BufferPool::ClientHandle* client) {
+    ASSERT_OK(buffer_pool()->RegisterClient("", nullptr, &root_reservation_, nullptr,
+        reservation_limit, RuntimeProfile::Create(&pool_, ""), client));
+    if (initial_reservation > 0) {
+      ASSERT_TRUE(client->IncreaseReservation(initial_reservation));
+    }
   }
+
   void WriteValidateCallback(int num_writes, WriteRange** written_range,
-      DiskIoMgr* io_mgr, RequestContext* reader, int32_t* data,
-      Status expected_status, const Status& status) {
+      DiskIoMgr* io_mgr, RequestContext* reader, BufferPool::ClientHandle* client,
+      int32_t* data, Status expected_status, const Status& status) {
     if (expected_status.code() == TErrorCode::CANCELLED) {
       EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail();
     } else {
@@ -71,8 +104,8 @@ class DiskIoMgrTest : public testing::Test {
       ScanRange* scan_range = pool_.Add(new ScanRange());
       scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(),
           (*written_range)->offset(), 0, false, BufferOpts::Uncached());
-      ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
-          sizeof(int32_t));
+      ValidateSyncRead(io_mgr, reader, client, scan_range,
+          reinterpret_cast<const char*>(data), sizeof(int32_t));
     }
 
     {
@@ -93,9 +126,13 @@ class DiskIoMgrTest : public testing::Test {
 
  protected:
   void CreateTempFile(const char* filename, const char* data) {
+    CreateTempFile(filename, data, strlen(data));
+  }
+
+  void CreateTempFile(const char* filename, const char* data, int64_t data_bytes) {
     FILE* file = fopen(filename, "w");
     EXPECT_TRUE(file != nullptr);
-    fwrite(data, 1, strlen(data), file);
+    fwrite(data, 1, data_bytes, file);
     fclose(file);
   }
 
@@ -120,13 +157,14 @@ class DiskIoMgrTest : public testing::Test {
   }
 
   static void ValidateSyncRead(DiskIoMgr* io_mgr, RequestContext* reader,
-      ScanRange* range, const char* expected, int expected_len = -1) {
+      BufferPool::ClientHandle* client, ScanRange* range, const char* expected,
+      int expected_len = -1) {
     unique_ptr<BufferDescriptor> buffer;
     bool needs_buffers;
     ASSERT_OK(io_mgr->StartScanRange(reader, range, &needs_buffers));
     if (needs_buffers) {
       ASSERT_OK(io_mgr->AllocateBuffersForRange(
-          reader, range, io_mgr->max_buffer_size()));
+          reader, client, range, io_mgr->max_buffer_size()));
     }
     ASSERT_OK(range->GetNext(&buffer));
     ASSERT_TRUE(buffer != nullptr);
@@ -161,8 +199,8 @@ class DiskIoMgrTest : public testing::Test {
   // Continues pulling scan ranges from the io mgr until they are all done.
   // Updates num_ranges_processed with the number of ranges seen by this thread.
   static void ScanRangeThread(DiskIoMgr* io_mgr, RequestContext* reader,
-      const char* expected_result, int expected_len, const Status& expected_status,
-      int max_ranges, AtomicInt32* num_ranges_processed) {
+      BufferPool::ClientHandle* client, const char* expected_result, int expected_len,
+      const Status& expected_status, int max_ranges, AtomicInt32* num_ranges_processed) {
     int num_ranges = 0;
     while (max_ranges == 0 || num_ranges < max_ranges) {
       ScanRange* range;
@@ -172,7 +210,7 @@ class DiskIoMgrTest : public testing::Test {
       if (range == nullptr) break;
       if (needs_buffers) {
         ASSERT_OK(io_mgr->AllocateBuffersForRange(
-            reader, range, io_mgr->max_buffer_size() * 3));
+            reader, client, range, io_mgr->max_buffer_size() * 3));
       }
       ValidateScanRange(io_mgr, range, expected_result, expected_len, expected_status);
       num_ranges_processed->Add(1);
@@ -180,23 +218,27 @@ class DiskIoMgrTest : public testing::Test {
     }
   }
 
-  ScanRange* AllocateRange() {
-    return pool_.Add(new ScanRange);
-  }
-
-  ScanRange* InitRange(const char* file_path, int offset, int len,
+  ScanRange* InitRange(ObjectPool* pool, const char* file_path, int offset, int len,
       int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) {
-    ScanRange* range = AllocateRange();
+    ScanRange* range = pool->Add(new ScanRange);
     range->Reset(nullptr, file_path, len, offset, disk_id, true,
         BufferOpts(is_cached, mtime), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
   }
 
+  /// Convenience function to get a reference to the buffer pool.
+  BufferPool* buffer_pool() const { return ExecEnv::GetInstance()->buffer_pool(); }
+
   boost::scoped_ptr<TestEnv> test_env_;
 
+  /// Per-test random number generator. Seeded before every test.
+  mt19937 rng_;
+
   ObjectPool pool_;
 
+  ReservationTracker root_reservation_;
+
   mutex written_mutex_;
   ConditionVariable writes_done_;
   int num_ranges_written_;
@@ -207,7 +249,7 @@ class DiskIoMgrTest : public testing::Test {
 // by reading the data back via a separate IoMgr instance. All writes are expected to
 // complete successfully.
 TEST_F(DiskIoMgrTest, SingleWriter) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   num_ranges_written_ = 0;
   string tmp_file = "/tmp/disk_io_mgr_test.txt";
   int num_ranges = 100;
@@ -221,24 +263,27 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   }
 
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
-  MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
+  BufferPool::ClientHandle read_client;
+  RegisterBufferPoolClient(
+      LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
   ASSERT_OK(read_io_mgr->Init());
-  unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext(&reader_mem_tracker);
+  unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext();
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      pool_.Clear(); // Destroy scan ranges from previous iterations.
+      // Pool for temporary objects from this iteration only.
+      ObjectPool tmp_pool;
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init());
-      unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
+      unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
       for (int i = 0; i < num_ranges; ++i) {
-        int32_t* data = pool_.Add(new int32_t);
+        int32_t* data = tmp_pool.Add(new int32_t);
         *data = rand();
-        WriteRange** new_range = pool_.Add(new WriteRange*);
-        WriteRange::WriteDoneCallback callback =
-            bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges,
-                new_range, read_io_mgr.get(), reader.get(), data, Status::OK(), _1);
-        *new_range = pool_.Add(new WriteRange(
-            tmp_file, cur_offset, num_ranges % num_disks, callback));
+        WriteRange** new_range = tmp_pool.Add(new WriteRange*);
+        WriteRange::WriteDoneCallback callback = bind(
+            mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges, new_range,
+            read_io_mgr.get(), reader.get(), &read_client, data, Status::OK(), _1);
+        *new_range = tmp_pool.Add(
+            new WriteRange(tmp_file, cur_offset, num_ranges % num_disks, callback));
         (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
         EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range));
         cur_offset += sizeof(int32_t);
@@ -254,27 +299,26 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   }
 
   read_io_mgr->UnregisterContext(reader.get());
-  read_io_mgr.reset();
+  buffer_pool()->DeregisterClient(&read_client);
 }
 
 // Perform invalid writes (e.g. file in non-existent directory, negative offset) and
 // validate that an error status is returned via the write callback.
 TEST_F(DiskIoMgrTest, InvalidWrite) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   num_ranges_written_ = 0;
   string tmp_file = "/non-existent/file.txt";
   DiskIoMgr io_mgr(1, 1, 1, 1, 10);
   ASSERT_OK(io_mgr.Init());
-  unique_ptr<RequestContext> writer = io_mgr.RegisterContext(nullptr);
+  unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
   int32_t* data = pool_.Add(new int32_t);
   *data = rand();
 
   // Write to file in non-existent directory.
   WriteRange** new_range = pool_.Add(new WriteRange*);
   WriteRange::WriteDoneCallback callback =
-      bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
-          (DiskIoMgr*)nullptr, (RequestContext*)nullptr, data,
-          Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
+      bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range, nullptr,
+          nullptr, nullptr, data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
   *new_range = pool_.Add(new WriteRange(tmp_file, rand(), 0, callback));
 
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
@@ -289,9 +333,9 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   }
 
   new_range = pool_.Add(new WriteRange*);
-  callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
-      new_range, (DiskIoMgr*)nullptr, (RequestContext*)nullptr,
-      data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
+  callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
+      nullptr, nullptr, nullptr, data,
+      Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
 
   *new_range = pool_.Add(new WriteRange(tmp_file, -1, 0, callback));
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
@@ -309,7 +353,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
 // AddWriteRange() is expected to succeed before the cancel and fail after it.
 // The writes themselves may finish with status cancelled or ok.
 TEST_F(DiskIoMgrTest, SingleWriterCancel) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   num_ranges_written_ = 0;
   string tmp_file = "/tmp/disk_io_mgr_test.txt";
   int num_ranges = 100;
@@ -324,29 +368,33 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
   }
 
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
-  MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
+  BufferPool::ClientHandle read_client;
+  RegisterBufferPoolClient(
+      LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
   ASSERT_OK(read_io_mgr->Init());
-  unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext(&reader_mem_tracker);
+  unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext();
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      pool_.Clear(); // Destroy scan ranges from previous iterations.
+      // Pool for temporary objects from this iteration only.
+      ObjectPool tmp_pool;
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init());
-      unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
+      unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
       Status validate_status = Status::OK();
       for (int i = 0; i < num_ranges; ++i) {
         if (i == num_ranges_before_cancel) {
           writer->Cancel();
           validate_status = Status::CANCELLED;
         }
-        int32_t* data = pool_.Add(new int32_t);
+        int32_t* data = tmp_pool.Add(new int32_t);
         *data = rand();
-        WriteRange** new_range = pool_.Add(new WriteRange*);
-        WriteRange::WriteDoneCallback callback = bind(
-            mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges_before_cancel,
-            new_range, read_io_mgr.get(), reader.get(), data, Status::CANCELLED, _1);
-        *new_range = pool_.Add(new WriteRange(
-            tmp_file, cur_offset, num_ranges % num_disks, callback));
+        WriteRange** new_range = tmp_pool.Add(new WriteRange*);
+        WriteRange::WriteDoneCallback callback =
+            bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this,
+                num_ranges_before_cancel, new_range, read_io_mgr.get(), reader.get(),
+                &read_client, data, Status::CANCELLED, _1);
+        *new_range = tmp_pool.Add(
+            new WriteRange(tmp_file, cur_offset, num_ranges % num_disks, callback));
         (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
         cur_offset += sizeof(int32_t);
         Status add_status = io_mgr.AddWriteRange(writer.get(), *new_range);
@@ -363,13 +411,13 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
   }
 
   read_io_mgr->UnregisterContext(reader.get());
-  read_io_mgr.reset();
+  buffer_pool()->DeregisterClient(&read_client);
 }
 
 // Basic test with a single reader, testing multiple threads, disks and a different
 // number of buffers.
 TEST_F(DiskIoMgrTest, SingleReader) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
   const char* data = "abcdefghijklm";
   int len = strlen(data);
@@ -383,7 +431,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
-        ObjectPool pool;
+        ObjectPool tmp_pool;
         LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
                   << " num_disk=" << num_disks
                   << " num_read_threads=" << num_read_threads;
@@ -392,36 +440,39 @@ TEST_F(DiskIoMgrTest, SingleReader) {
         DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
         ASSERT_OK(io_mgr.Init());
-        MemTracker reader_mem_tracker;
-        unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+        BufferPool::ClientHandle read_client;
+        RegisterBufferPoolClient(
+            LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+        unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
         vector<ScanRange*> ranges;
         for (int i = 0; i < len; ++i) {
           int disk_id = i % num_disks;
-          ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+          ranges.push_back(InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
         }
         ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
         AtomicInt32 num_ranges_processed;
         thread_group threads;
         for (int i = 0; i < num_read_threads; ++i) {
-          threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, len,
-              Status::OK(), 0, &num_ranges_processed));
+          threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
+              &read_client, data, len, Status::OK(), 0, &num_ranges_processed));
         }
         threads.join_all();
 
         EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
         io_mgr.UnregisterContext(reader.get());
-        EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+        EXPECT_EQ(read_client.GetUsedReservation(), 0);
+        buffer_pool()->DeregisterClient(&read_client);
       }
     }
   }
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
 // This test issues adding additional scan ranges while there are some still in flight.
 TEST_F(DiskIoMgrTest, AddScanRangeTest) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
   const char* data = "abcdefghijklm";
   int len = strlen(data);
@@ -434,7 +485,8 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
   int64_t iters = 0;
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      pool_.Clear(); // Destroy scan ranges from previous iterations.
+      // Pool for temporary objects from this iteration only.
+      ObjectPool tmp_pool;
       LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
                 << " num_disk=" << num_disks;
 
@@ -442,8 +494,10 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
       ASSERT_OK(io_mgr.Init());
-      MemTracker reader_mem_tracker;
-      unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+      BufferPool::ClientHandle read_client;
+      RegisterBufferPoolClient(
+          LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+      unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
       vector<ScanRange*> ranges_first_half;
       vector<ScanRange*> ranges_second_half;
@@ -451,10 +505,10 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
         int disk_id = i % num_disks;
         if (i > len / 2) {
           ranges_second_half.push_back(
-              InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
+              InitRange(&tmp_pool, tmp_file, i, 1, disk_id, stat_val.st_mtime));
         } else {
           ranges_first_half.push_back(
-              InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
+              InitRange(&tmp_pool, tmp_file, i, 1, disk_id, stat_val.st_mtime));
         }
       }
       AtomicInt32 num_ranges_processed;
@@ -463,8 +517,8 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
       ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_first_half));
 
       // Read a couple of them
-      ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 2,
-          &num_ranges_processed);
+      ScanRangeThread(&io_mgr, reader.get(), &read_client, data, strlen(data),
+          Status::OK(), 2, &num_ranges_processed);
 
       // Issue second half
       ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_second_half));
@@ -472,24 +526,26 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
       // Start up some threads and then cancel
       thread_group threads;
       for (int i = 0; i < 3; ++i) {
-        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
-            strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
+        threads.add_thread(
+            new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client, data,
+                strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
       }
 
       threads.join_all();
       EXPECT_EQ(num_ranges_processed.Load(), len);
       io_mgr.UnregisterContext(reader.get());
-      EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+      EXPECT_EQ(read_client.GetUsedReservation(), 0);
+      buffer_pool()->DeregisterClient(&read_client);
     }
   }
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
 // Test to make sure that sync reads and async reads work together
 // Note: this test is constructed so the number of buffers is greater than the
 // number of scan ranges.
 TEST_F(DiskIoMgrTest, SyncReadTest) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
   const char* data = "abcdefghijklm";
   int len = strlen(data);
@@ -502,7 +558,8 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
   int64_t iters = 0;
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      pool_.Clear(); // Destroy scan ranges from previous iterations.
+      // Pool for temporary objects from this iteration only.
+      ObjectPool tmp_pool;
       LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
                 << " num_disk=" << num_disks;
 
@@ -511,50 +568,55 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
           MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
       ASSERT_OK(io_mgr.Init());
-      MemTracker reader_mem_tracker;
-      unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+      BufferPool::ClientHandle read_client;
+      RegisterBufferPoolClient(
+          LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+      unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
+
       ScanRange* complete_range =
-          InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
+          InitRange(&tmp_pool, tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
 
       // Issue some reads before the async ones are issued
-      ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
-      ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
       vector<ScanRange*> ranges;
       for (int i = 0; i < len; ++i) {
         int disk_id = i % num_disks;
-        ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+        ranges.push_back(
+            InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
       }
       ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
       AtomicInt32 num_ranges_processed;
       thread_group threads;
       for (int i = 0; i < 5; ++i) {
-        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
-            strlen(data), Status::OK(), 0, &num_ranges_processed));
+        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
+            &read_client, data, strlen(data), Status::OK(), 0, &num_ranges_processed));
       }
 
       // Issue some more sync ranges
       for (int i = 0; i < 5; ++i) {
         sched_yield();
-        ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+        ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
       }
 
       threads.join_all();
 
-      ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
-      ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
 
       EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
       io_mgr.UnregisterContext(reader.get());
-      EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+      EXPECT_EQ(read_client.GetUsedReservation(), 0);
+      buffer_pool()->DeregisterClient(&read_client);
     }
   }
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
 // Tests a single reader cancelling half way through scan ranges.
 TEST_F(DiskIoMgrTest, SingleReaderCancel) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
   const char* data = "abcdefghijklm";
   int len = strlen(data);
@@ -567,7 +629,8 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
   int64_t iters = 0;
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      pool_.Clear(); // Destroy scan ranges from previous iterations.
+      // Pool for temporary objects from this iteration only.
+      ObjectPool tmp_pool;
       LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
                 << " num_disk=" << num_disks;
 
@@ -575,13 +638,16 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
       ASSERT_OK(io_mgr.Init());
-      MemTracker reader_mem_tracker;
-      unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+      BufferPool::ClientHandle read_client;
+      RegisterBufferPoolClient(
+          LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+      unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
       vector<ScanRange*> ranges;
       for (int i = 0; i < len; ++i) {
         int disk_id = i % num_disks;
-        ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+        ranges.push_back(
+            InitRange(&tmp_pool, tmp_file, 0, len, disk_id, stat_val.st_mtime));
       }
       ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
@@ -589,16 +655,17 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
       int num_succesful_ranges = ranges.size() / 2;
       // Read half the ranges
       for (int i = 0; i < num_succesful_ranges; ++i) {
-        ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 1,
-            &num_ranges_processed);
+        ScanRangeThread(&io_mgr, reader.get(), &read_client, data, strlen(data),
+            Status::OK(), 1, &num_ranges_processed);
       }
       EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges);
 
       // Start up some threads and then cancel
       thread_group threads;
       for (int i = 0; i < 3; ++i) {
-        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
-            strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
+        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(),
+            &read_client, data, strlen(data), Status::CANCELLED, 0,
+            &num_ranges_processed));
       }
 
       reader->Cancel();
@@ -607,87 +674,93 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
       threads.join_all();
       EXPECT_TRUE(reader->IsCancelled());
       io_mgr.UnregisterContext(reader.get());
-      EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+      EXPECT_EQ(read_client.GetUsedReservation(), 0);
+      buffer_pool()->DeregisterClient(&read_client);
     }
   }
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
-// Test when the reader goes over the mem limit
-TEST_F(DiskIoMgrTest, MemLimits) {
+// Test readers running with different amounts of memory and getting blocked on scan
+// ranges that have run out of buffers.
+TEST_F(DiskIoMgrTest, MemScarcity) {
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
-  const char* data = "abcdefghijklm";
-  int len = strlen(data);
-  CreateTempFile(tmp_file, data);
+  // File is 2.5 max buffers so that we can scan file without returning buffers
+  // when we get the max reservation below.
+  const int64_t DATA_BYTES = MAX_BUFFER_SIZE * 5 / 2;
+  char data[DATA_BYTES];
+  for (int i = 0; i < DATA_BYTES; ++i) {
+    data[i] = uniform_int_distribution<uint8_t>(0, 255)(rng_);
+  }
+  CreateTempFile(tmp_file, data, DATA_BYTES);
 
   // Get mtime for file
   struct stat stat_val;
   stat(tmp_file, &stat_val);
 
-  const int mem_limit_num_buffers = 2;
-  // Allocate enough ranges so that the total buffers exceeds the mem limit.
+  const int RESERVATION_LIMIT_NUM_BUFFERS = 20;
+  const int64_t RESERVATION_LIMIT = RESERVATION_LIMIT_NUM_BUFFERS * MAX_BUFFER_SIZE;
+  InitRootReservation(RESERVATION_LIMIT);
+
+  thread_group threads;
+  // Allocate enough ranges so that the total buffers exceeds the limit.
   const int num_ranges = 25;
   {
-    MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
     DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
     ASSERT_OK(io_mgr.Init());
-    MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
-    unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+    BufferPool::ClientHandle read_client;
+    RegisterBufferPoolClient(RESERVATION_LIMIT, RESERVATION_LIMIT, &read_client);
+    unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
     vector<ScanRange*> ranges;
     for (int i = 0; i < num_ranges; ++i) {
-      ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime));
+      ranges.push_back(InitRange(&pool_, tmp_file, 0, DATA_BYTES, 0, stat_val.st_mtime));
     }
     ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
-
-    // Don't return buffers to force memory pressure
-    vector<pair<ScanRange*, unique_ptr<BufferDescriptor>>> buffers;
-
-    AtomicInt32 num_ranges_processed;
-    ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::MemLimitExceeded(),
-        1, &num_ranges_processed);
-
-    bool hit_mem_limit_exceeded = false;
-    char result[strlen(data) + 1];
-    // Keep starting new ranges without returning buffers. This forces us to go over
-    // the limit eventually.
-    while (true) {
-      memset(result, 0, strlen(data) + 1);
+    // Keep starting new ranges without returning buffers until we run out of
+    // reservation.
+    while (read_client.GetUnusedReservation() >= MIN_BUFFER_SIZE) {
       ScanRange* range = nullptr;
       bool needs_buffers;
-      Status status = io_mgr.GetNextUnstartedRange(reader.get(), &range, &needs_buffers);
-      ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
-      hit_mem_limit_exceeded |= status.IsMemLimitExceeded();
+      ASSERT_OK(io_mgr.GetNextUnstartedRange(reader.get(), &range, &needs_buffers));
       if (range == nullptr) break;
-      DCHECK(needs_buffers);
-      status = io_mgr.AllocateBuffersForRange(reader.get(), range, MAX_BUFFER_SIZE * 3);
-      ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
-      if (status.IsMemLimitExceeded()) {
-        hit_mem_limit_exceeded = true;
-        continue;
-      }
-
-      while (true) {
-        unique_ptr<BufferDescriptor> buffer;
-        Status status = range->GetNext(&buffer);
-        ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
-        hit_mem_limit_exceeded |= status.IsMemLimitExceeded();
-        if (buffer == nullptr) break;
-        memcpy(result + range->offset() + buffer->scan_range_offset(),
-            buffer->buffer(), buffer->len());
-        buffers.emplace_back(range, move(buffer));
-      }
-      ValidateEmptyOrCorrect(data, result, strlen(data));
-    }
-
-    for (int i = 0; i < buffers.size(); ++i) {
-      buffers[i].first->ReturnBuffer(move(buffers[i].second));
+      ASSERT_TRUE(needs_buffers);
+      // Pick a random amount of memory to reserve.
+      int64_t max_bytes_to_alloc = uniform_int_distribution<int64_t>(MIN_BUFFER_SIZE,
+          min<int64_t>(read_client.GetUnusedReservation(), MAX_BUFFER_SIZE * 3))(rng_);
+      ASSERT_OK(io_mgr.AllocateBuffersForRange(
+          reader.get(), &read_client, range, max_bytes_to_alloc));
+      // Start a thread fetching from the range. The thread will either finish the
+      // range or be cancelled.
+      threads.add_thread(new thread([&data, DATA_BYTES, range] {
+        // Don't return buffers to force memory pressure.
+        vector<unique_ptr<BufferDescriptor>> buffers;
+        int64_t data_offset = 0;
+        Status status;
+        while (true) {
+          unique_ptr<BufferDescriptor> buffer;
+          status = range->GetNext(&buffer);
+          ASSERT_TRUE(status.ok() || status.IsCancelled()) << status.GetDetail();
+          if (status.IsCancelled() || buffer == nullptr) break;
+          EXPECT_EQ(0, memcmp(data + data_offset, buffer->buffer(), buffer->len()));
+          data_offset += buffer->len();
+          buffers.emplace_back(move(buffer));
+        }
+        if (status.ok()) ASSERT_EQ(DATA_BYTES, data_offset);
+        for (auto& buffer : buffers) range->ReturnBuffer(move(buffer));
+      }));
+      // Let the thread start running before starting the next.
+      SleepForMs(10);
     }
-
-    EXPECT_TRUE(hit_mem_limit_exceeded) << "Should have run out of memory";
+    // Let the threads run for a bit then cancel everything.
+    SleepForMs(500);
+    reader->Cancel();
+    // Wait until the threads have returned their buffers before unregistering.
+    threads.join_all();
     io_mgr.UnregisterContext(reader.get());
-    EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+    EXPECT_EQ(read_client.GetUsedReservation(), 0);
+    buffer_pool()->DeregisterClient(&read_client);
   }
 }
 
@@ -696,7 +769,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
 // only tests the fallback mechanism.
 // TODO: we can fake the cached read path without HDFS
 TEST_F(DiskIoMgrTest, CachedReads) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
   const char* data = "abcdefghijklm";
   int len = strlen(data);
@@ -711,51 +784,54 @@ TEST_F(DiskIoMgrTest, CachedReads) {
     DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
     ASSERT_OK(io_mgr.Init());
-    MemTracker reader_mem_tracker;
-    unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
+    BufferPool::ClientHandle read_client;
+    RegisterBufferPoolClient(
+        LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+    unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
     ScanRange* complete_range =
-        InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
+        InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true);
 
     // Issue some reads before the async ones are issued
-    ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
-    ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
 
     vector<ScanRange*> ranges;
     for (int i = 0; i < len; ++i) {
       int disk_id = i % num_disks;
       ranges.push_back(
-          InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
+          InitRange(&pool_, tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true));
     }
     ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
     AtomicInt32 num_ranges_processed;
     thread_group threads;
     for (int i = 0; i < 5; ++i) {
-      threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data,
-          strlen(data), Status::OK(), 0, &num_ranges_processed));
+      threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), &read_client,
+          data, strlen(data), Status::OK(), 0, &num_ranges_processed));
     }
 
     // Issue some more sync ranges
     for (int i = 0; i < 5; ++i) {
       sched_yield();
-      ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+      ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
     }
 
     threads.join_all();
 
-    ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
-    ValidateSyncRead(&io_mgr, reader.get(), complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
+    ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, data);
 
     EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
     io_mgr.UnregisterContext(reader.get());
-    EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+    EXPECT_EQ(read_client.GetUsedReservation(), 0);
+    buffer_pool()->DeregisterClient(&read_client);
   }
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
 TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const int ITERATIONS = 1;
   const char* data = "abcdefghijklmnopqrstuvwxyz";
   const int num_contexts = 5;
@@ -777,17 +853,22 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
 
   int64_t iters = 0;
   vector<unique_ptr<RequestContext>> contexts(num_contexts);
+  unique_ptr<BufferPool::ClientHandle[]> clients(
+      new BufferPool::ClientHandle[num_contexts]);
   Status status;
   for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
     for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
       for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
+        // Pool for temporary objects from this iteration only.
+        ObjectPool tmp_pool;
         DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
             MAX_BUFFER_SIZE);
         ASSERT_OK(io_mgr.Init());
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
-          contexts[file_index] = io_mgr.RegisterContext(&mem_tracker);
+          RegisterBufferPoolClient(
+              LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &clients[file_index]);
+          contexts[file_index] = io_mgr.RegisterContext();
         }
-        pool_.Clear();
         int read_offset = 0;
         int write_offset = 0;
         while (read_offset < file_size) {
@@ -799,11 +880,11 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
             int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
             for (int i = 0; i < num_scan_ranges; ++i) {
               ranges.push_back(InitRange(
-                  file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime));
-              threads.add_thread(
-                  new thread(ScanRangeThread, &io_mgr, contexts[context_index].get(),
-                      reinterpret_cast<const char*>(data + (read_offset % strlen(data))),
-                      1, Status::OK(), num_scan_ranges, &num_ranges_processed));
+                  &tmp_pool, file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime));
+              threads.add_thread(new thread(ScanRangeThread, &io_mgr,
+                  contexts[context_index].get(), &clients[context_index],
+                  reinterpret_cast<const char*>(data + (read_offset % strlen(data))),
+                  1, Status::OK(), num_scan_ranges, &num_ranges_processed));
               ++read_offset;
             }
 
@@ -813,7 +894,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
               WriteRange::WriteDoneCallback callback =
                   bind(mem_fn(&DiskIoMgrTest::WriteCompleteCallback),
                       this, num_write_ranges, _1);
-              WriteRange* new_range = pool_.Add(new WriteRange(
+              WriteRange* new_range = tmp_pool.Add(new WriteRange(
                   file_name, write_offset, i % num_disks, callback));
               new_range->SetData(
                   reinterpret_cast<const uint8_t*>(data + (write_offset % strlen(data))),
@@ -832,6 +913,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
         } // while (read_offset < file_size)
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
           io_mgr.UnregisterContext(contexts[file_index].get());
+          buffer_pool()->DeregisterClient(&clients[file_index]);
         }
       } // for (int num_disks
     } // for (int threads_per_disk
@@ -840,23 +922,19 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
 
 // This test will test multiple concurrent reads each reading a different file.
 TEST_F(DiskIoMgrTest, MultipleReader) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const int NUM_READERS = 5;
   const int DATA_LEN = 50;
   const int ITERATIONS = 25;
   const int NUM_THREADS_PER_READER = 3;
 
-  vector<string> file_names;
-  vector<int64_t> mtimes;
-  vector<string> data;
-  vector<unique_ptr<RequestContext>> readers;
-  vector<char*> results;
-
-  file_names.resize(NUM_READERS);
-  readers.resize(NUM_READERS);
-  mtimes.resize(NUM_READERS);
-  data.resize(NUM_READERS);
-  results.resize(NUM_READERS);
+  vector<string> file_names(NUM_READERS);
+  vector<int64_t> mtimes(NUM_READERS);
+  vector<string> data(NUM_READERS);
+  unique_ptr<BufferPool::ClientHandle[]> clients(
+      new BufferPool::ClientHandle[NUM_READERS]);
+  vector<unique_ptr<RequestContext>> readers(NUM_READERS);
+  vector<char*> results(NUM_READERS);
 
   // Initialize data for each reader.  The data will be
   // 'abcd...' for reader one, 'bcde...' for reader two (wrapping around at 'z')
@@ -887,7 +965,8 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
   for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
     for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
       for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-        pool_.Clear(); // Destroy scan ranges from previous iterations.
+        // Pool for temporary objects from this iteration only.
+        ObjectPool tmp_pool;
         LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk
                   << " num_disk=" << num_disks;
         if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
@@ -897,12 +976,14 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
         ASSERT_OK(io_mgr.Init());
 
         for (int i = 0; i < NUM_READERS; ++i) {
-          readers[i] = io_mgr.RegisterContext(&mem_tracker);
+          RegisterBufferPoolClient(
+              LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &clients[i]);
+          readers[i] = io_mgr.RegisterContext();
 
           vector<ScanRange*> ranges;
           for (int j = 0; j < DATA_LEN; ++j) {
             int disk_id = j % num_disks;
-            ranges.push_back(InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
+            ranges.push_back(InitRange(&tmp_pool, file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
           }
           ASSERT_OK(io_mgr.AddScanRanges(readers[i].get(), ranges));
         }
@@ -912,18 +993,20 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
         for (int i = 0; i < NUM_READERS; ++i) {
           for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
             threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i].get(),
-                data[i].c_str(), data[i].size(), Status::OK(), 0, &num_ranges_processed));
+                &clients[i], data[i].c_str(), data[i].size(), Status::OK(), 0,
+                &num_ranges_processed));
           }
         }
         threads.join_all();
         EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS);
         for (int i = 0; i < NUM_READERS; ++i) {
           io_mgr.UnregisterContext(readers[i].get());
+          buffer_pool()->DeregisterClient(&clients[i]);
         }
       }
     }
   }
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
 // Stress test for multiple clients with cancellation
@@ -937,7 +1020,7 @@ TEST_F(DiskIoMgrTest, StressTest) {
 
 // IMPALA-2366: handle partial read where range goes past end of file.
 TEST_F(DiskIoMgrTest, PartialRead) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
   const char* data = "the quick brown fox jumped over the lazy dog";
   int len = strlen(data);
@@ -956,19 +1039,22 @@ TEST_F(DiskIoMgrTest, PartialRead) {
 
   for (int64_t max_buffer_size : max_buffer_sizes) {
     DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, max_buffer_size);
-
     ASSERT_OK(io_mgr.Init());
-    MemTracker reader_mem_tracker;
     unique_ptr<RequestContext> reader;
-    reader = io_mgr.RegisterContext(&reader_mem_tracker);
+    reader = io_mgr.RegisterContext();
+
+    BufferPool::ClientHandle read_client;
+    RegisterBufferPoolClient(
+        LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
 
     // We should not read past the end of file.
-    ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
+    ScanRange* range = InitRange(&pool_, tmp_file, 0, read_len, 0, stat_val.st_mtime);
     unique_ptr<BufferDescriptor> buffer;
     bool needs_buffers;
     ASSERT_OK(io_mgr.StartScanRange(reader.get(), range, &needs_buffers));
     if (needs_buffers) {
-      ASSERT_OK(io_mgr.AllocateBuffersForRange(reader.get(), range, 3 * max_buffer_size));
+      ASSERT_OK(io_mgr.AllocateBuffersForRange(
+          reader.get(), &read_client, range, 3 * max_buffer_size));
     }
 
     int64_t bytes_read = 0;
@@ -994,15 +1080,13 @@ TEST_F(DiskIoMgrTest, PartialRead) {
     } while (!eosr);
 
     io_mgr.UnregisterContext(reader.get());
-    EXPECT_EQ(reader_mem_tracker.consumption(), 0);
-    EXPECT_EQ(mem_tracker.consumption(), 0);
-    pool_.Clear();
+    EXPECT_EQ(read_client.GetUsedReservation(), 0);
+    buffer_pool()->DeregisterClient(&read_client);
   }
 }
 
 // Test zero-length scan range.
 TEST_F(DiskIoMgrTest, ZeroLengthScanRange) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
   const char* data = "the quick brown fox jumped over the lazy dog";
   const int64_t MIN_BUFFER_SIZE = 2;
@@ -1016,12 +1100,9 @@ TEST_F(DiskIoMgrTest, ZeroLengthScanRange) {
   DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
   ASSERT_OK(io_mgr.Init());
-  MemTracker reader_mem_tracker;
-  unique_ptr<RequestContext> reader;
-  reader = io_mgr.RegisterContext(&reader_mem_tracker);
+  unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
-  // We should not read past the end of file.
-  ScanRange* range = InitRange(tmp_file, 0, 0, 0, stat_val.st_mtime);
+  ScanRange* range = InitRange(&pool_, tmp_file, 0, 0, 0, stat_val.st_mtime);
   bool needs_buffers;
   Status status = io_mgr.StartScanRange(reader.get(), range, &needs_buffers);
   ASSERT_EQ(TErrorCode::DISK_IO_ERROR, status.code());
@@ -1035,7 +1116,6 @@ TEST_F(DiskIoMgrTest, ZeroLengthScanRange) {
 // Test what happens if don't call AllocateBuffersForRange() after trying to start a
 // range.
 TEST_F(DiskIoMgrTest, SkipAllocateBuffers) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
   const char* data = "the quick brown fox jumped over the lazy dog";
   int len = strlen(data);
@@ -1051,13 +1131,12 @@ TEST_F(DiskIoMgrTest, SkipAllocateBuffers) {
 
   ASSERT_OK(io_mgr.Init());
   MemTracker reader_mem_tracker;
-  unique_ptr<RequestContext> reader;
-  reader = io_mgr.RegisterContext(&reader_mem_tracker);
+  unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
 
   // We should not read past the end of file.
   vector<ScanRange*> ranges;
   for (int i = 0; i < 4; ++i) {
-    ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime));
+    ranges.push_back(InitRange(&pool_, tmp_file, 0, len, 0, stat_val.st_mtime));
   }
   bool needs_buffers;
   // Test StartScanRange().
@@ -1080,7 +1159,7 @@ TEST_F(DiskIoMgrTest, SkipAllocateBuffers) {
 
 // Test reading into a client-allocated buffer.
 TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
   const char* data = "the quick brown fox jumped over the lazy dog";
   int len = strlen(data);
@@ -1090,15 +1169,13 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
   scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
 
   ASSERT_OK(io_mgr->Init());
-  // Reader doesn't need to provide mem tracker if it's providing buffers.
-  MemTracker* reader_mem_tracker = nullptr;
-  unique_ptr<RequestContext> reader;
-  reader = io_mgr->RegisterContext(reader_mem_tracker);
+  // Reader doesn't need to provide client if it's providing buffers.
+  unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
 
   for (int buffer_len : vector<int>({len - 1, len, len + 1})) {
     vector<uint8_t> client_buffer(buffer_len);
     int scan_len = min(len, buffer_len);
-    ScanRange* range = AllocateRange();
+    ScanRange* range = pool_.Add(new ScanRange);
     range->Reset(nullptr, tmp_file, scan_len, 0, 0, true,
         BufferOpts::ReadInto(client_buffer.data(), buffer_len));
     bool needs_buffers;
@@ -1113,32 +1190,31 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
     ASSERT_EQ(memcmp(io_buffer->buffer(), data, scan_len), 0);
 
     // DiskIoMgr should not have allocated memory.
-    EXPECT_EQ(mem_tracker.consumption(), 0);
+    EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
     range->ReturnBuffer(move(io_buffer));
   }
 
   io_mgr->UnregisterContext(reader.get());
-  pool_.Clear();
-  io_mgr.reset();
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
 // Test reading into a client-allocated buffer where the read fails.
 TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const char* tmp_file = "/file/that/does/not/exist";
   const int SCAN_LEN = 128;
 
   scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, SCAN_LEN, SCAN_LEN));
 
   ASSERT_OK(io_mgr->Init());
-  // Reader doesn't need to provide mem tracker if it's providing buffers.
-  MemTracker* reader_mem_tracker = nullptr;
-  unique_ptr<RequestContext> reader;
   vector<uint8_t> client_buffer(SCAN_LEN);
   for (int i = 0; i < 1000; ++i) {
-    reader = io_mgr->RegisterContext(reader_mem_tracker);
-    ScanRange* range = AllocateRange();
+    // Reader doesn't need to provide mem tracker if it's providing buffers.
+    BufferPool::ClientHandle read_client;
+    RegisterBufferPoolClient(
+        LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+    unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
+    ScanRange* range = pool_.Add(new ScanRange);
     range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true,
         BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
     bool needs_buffers;
@@ -1153,25 +1229,25 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
     ASSERT_FALSE(range->GetNext(&io_buffer).ok());
 
     // DiskIoMgr should not have allocated memory.
-    EXPECT_EQ(mem_tracker.consumption(), 0);
+    EXPECT_EQ(read_client.GetUsedReservation(), 0);
 
     io_mgr->UnregisterContext(reader.get());
+    EXPECT_EQ(read_client.GetUsedReservation(), 0);
+    buffer_pool()->DeregisterClient(&read_client);
   }
 
-  pool_.Clear();
-  io_mgr.reset();
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  EXPECT_EQ(root_reservation_.GetChildReservations(), 0);
 }
 
 // Test to verify configuration parameters for number of I/O threads per disk.
 TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
   const int num_io_threads_for_remote_disks = FLAGS_num_remote_hdfs_io_threads
       + FLAGS_num_s3_io_threads + FLAGS_num_adls_io_threads;
 
   // Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
   // Since we do not have control over which disk is used, we check for either type
   // (rotational/solid state)
-  MemTracker mem_tracker(LARGE_MEM_LIMIT);
   const int num_io_threads_per_rotational_or_ssd = 2;
   DiskIoMgr io_mgr(1, num_io_threads_per_rotational_or_ssd,
       num_io_threads_per_rotational_or_ssd, 1, 10);

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 6dda447..6c7b9e6 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -15,8 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "common/global-flags.h"
 #include "runtime/io/disk-io-mgr.h"
+
+#include "common/global-flags.h"
+#include "runtime/exec-env.h"
 #include "runtime/io/disk-io-mgr-internal.h"
 #include "runtime/io/handle-cache.inline.h"
 
@@ -52,6 +54,8 @@ DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk");
 static const int THREADS_PER_ROTATIONAL_DISK = 1;
 static const int THREADS_PER_SOLID_STATE_DISK = 8;
 
+const int64_t DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE;
+
 // The maximum number of the threads per rotational disk is also the max queue depth per
 // rotational disk.
 static const string num_io_threads_per_rotational_disk_help_msg = Substitute("Number of "
@@ -309,9 +313,8 @@ Status DiskIoMgr::Init() {
   return Status::OK();
 }
 
-unique_ptr<RequestContext> DiskIoMgr::RegisterContext(MemTracker* mem_tracker) {
-  return unique_ptr<RequestContext>(
-      new RequestContext(this, num_total_disks(), mem_tracker));
+unique_ptr<RequestContext> DiskIoMgr::RegisterContext() {
+  return unique_ptr<RequestContext>(new RequestContext(this, num_total_disks()));
 }
 
 void DiskIoMgr::UnregisterContext(RequestContext* reader) {
@@ -455,28 +458,21 @@ Status DiskIoMgr::GetNextUnstartedRange(RequestContext* reader, ScanRange** rang
   }
 }
 
-Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader, ScanRange* range,
-    int64_t max_bytes) {
+Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader,
+    BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes) {
   DCHECK_GE(max_bytes, min_buffer_size_);
   DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
      << static_cast<int>(range->external_buffer_tag_) << " invalid to allocate buffers "
      << "when already reading into an external buffer";
-
+  BufferPool* bp = ExecEnv::GetInstance()->buffer_pool();
   Status status;
   vector<unique_ptr<BufferDescriptor>> buffers;
   for (int64_t buffer_size : ChooseBufferSizes(range->len(), max_bytes)) {
-    if (!reader->mem_tracker_->TryConsume(buffer_size)) {
-      status = reader->mem_tracker_->MemLimitExceeded(nullptr,
-          "Failed to allocate I/O buffer", buffer_size);
-      goto error;
-    }
-    uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(buffer_size));
-    if (buffer == nullptr) {
-      reader->mem_tracker_->Release(buffer_size);
-      status = Status(Substitute("Failed to malloc $0-byte I/O buffer", buffer_size));
-      goto error;
-    }
-    buffers.emplace_back(new BufferDescriptor(this, reader, range, buffer, buffer_size));
+    BufferPool::BufferHandle handle;
+    status = bp->AllocateBuffer(bp_client, buffer_size, &handle);
+    if (!status.ok()) goto error;
+    buffers.emplace_back(new BufferDescriptor(
+        this, reader, range, bp_client, move(handle)));
   }
   range->AddUnusedBuffers(move(buffers), false);
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index d429d1d..d246e95 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -30,6 +30,7 @@
 #include "common/hdfs.h"
 #include "common/object-pool.h"
 #include "common/status.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/io/handle-cache.h"
 #include "runtime/io/request-ranges.h"
 #include "runtime/thread-resource-mgr.h"
@@ -42,8 +43,6 @@
 
 namespace impala {
 
-class MemTracker;
-
 namespace io {
 /// Manager object that schedules IO for all queries on all disks and remote filesystems
 /// (such as S3). Each query maps to one or more RequestContext objects, each of which
@@ -163,6 +162,7 @@ namespace io {
 ///    buffer and one buffer is in the disk queue. The additional buffer can absorb
 ///    bursts where the producer runs faster than the consumer or the consumer runs
 ///    faster than the producer without blocking either the producer or consumer.
+/// See IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE.
 ///
 /// Caching support:
 /// Scan ranges contain metadata on whether or not it is cached on the DN. In that
@@ -243,11 +243,7 @@ class DiskIoMgr : public CacheLineAligned {
   /// Allocates tracking structure for a request context.
   /// Register a new request context and return it to the caller. The caller must call
   /// UnregisterContext() for each context.
-  /// reader_mem_tracker: Is non-null only for readers. IO buffers
-  ///    used for this reader will be tracked by this. If the limit is exceeded
-  ///    the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via
-  ///    GetNext().
-  std::unique_ptr<RequestContext> RegisterContext(MemTracker* reader_mem_tracker);
+  std::unique_ptr<RequestContext> RegisterContext();
 
   /// Unregisters context from the disk IoMgr by first cancelling it then blocking until
   /// all references to the context are removed from I/O manager internal data structures.
@@ -302,16 +298,15 @@ class DiskIoMgr : public CacheLineAligned {
   /// *needs_buffers=true.
   ///
   /// The buffer sizes are chosen based on range->len(). 'max_bytes' must be >=
-  /// min_read_buffer_size() so that at least one buffer can be allocated. Returns ok
-  /// if the buffers were successfully allocated and the range was scheduled. Fails with
-  /// MEM_LIMIT_EXCEEDED if the buffers could not be allocated. On failure, any allocated
-  /// buffers are freed and the state of 'range' is unmodified so that allocation can be
-  /// retried.  Setting 'max_bytes' to 3 * max_buffer_size() will typically maximize I/O
-  /// throughput. See Buffer management" section of the class comment for explanation.
-  /// TODO: error handling contract will change with reservations. The caller needs to
-  /// to guarantee that there is sufficient reservation.
-  Status AllocateBuffersForRange(RequestContext* reader, ScanRange* range,
-      int64_t max_bytes);
+  /// min_read_buffer_size() so that at least one buffer can be allocated. The caller
+  /// must ensure that 'bp_client' has at least 'max_bytes' unused reservation. Returns ok
+  /// if the buffers were successfully allocated and the range was scheduled.
+  ///
+  /// Setting 'max_bytes' to IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE * max_buffer_size()
+  /// will typically maximize I/O throughput. See the "Buffer Management" section of
+  /// the class comment for explanation.
+  Status AllocateBuffersForRange(RequestContext* reader,
+      BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes);
 
   /// Determine which disk queue this file should be assigned to.  Returns an index into
   /// disk_queues_.  The disk_id is the volume ID for the local disk that holds the
@@ -379,6 +374,10 @@ class DiskIoMgr : public CacheLineAligned {
     REMOTE_NUM_DISKS
   };
 
+  /// The ideal number of max-sized buffers per scan range to maximise throughput.
+  /// See "Buffer Management" in the class comment for explanation.
+  static const int64_t IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE = 3;
+
  private:
   friend class BufferDescriptor;
   friend class RequestContext;
@@ -401,7 +400,7 @@ class DiskIoMgr : public CacheLineAligned {
   /// Maximum read size. This is also the maximum size of each allocated buffer.
   const int64_t max_buffer_size_;
 
-  /// The minimum size of each read buffer.
+  /// The minimum size of each read buffer. Must be >= BufferPool::min_buffer_len().
   const int64_t min_buffer_size_;
 
   /// Thread group containing all the worker threads.

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index b124702..dec6aa6 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -17,6 +17,8 @@
 
 #include "runtime/io/disk-io-mgr-internal.h"
 
+#include "runtime/exec-env.h"
+
 #include "common/names.h"
 
 using namespace impala;
@@ -36,12 +38,28 @@ BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
   DCHECK_GE(buffer_len, 0);
 }
 
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
+    ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
+    BufferPool::BufferHandle handle) :
+  io_mgr_(io_mgr),
+  reader_(reader),
+  scan_range_(scan_range),
+  buffer_(handle.data()),
+  buffer_len_(handle.len()),
+  bp_client_(bp_client),
+  handle_(move(handle)) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
+  DCHECK(bp_client_->is_registered());
+  DCHECK(handle_.is_open());
+}
+
 void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
   DCHECK(buffer->buffer_ != nullptr);
   if (!buffer->is_cached() && !buffer->is_client_buffer()) {
-    // Only buffers that were not allocated by DiskIoMgr need to have memory freed.
-    free(buffer->buffer_);
-    mem_tracker_->Release(buffer->buffer_len_);
+    // Only buffers that were allocated by DiskIoMgr need to be freed.
+    ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
+        buffer->bp_client_, &buffer->handle_);
   }
   buffer->buffer_ = nullptr;
 }
@@ -200,9 +218,8 @@ void RequestContext::RemoveActiveScanRangeLocked(
   active_scan_ranges_.erase(range);
 }
 
-RequestContext::RequestContext(
-    DiskIoMgr* parent, int num_disks, MemTracker* tracker)
-  : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {}
+RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
+  : parent_(parent), disk_states_(num_disks) {}
 
 // Dumps out request context information. Lock should be taken by caller
 string RequestContext::DebugString() const {

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 3aea2bc..24fd0fc 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -158,12 +158,12 @@ class RequestContext {
     Inactive,
   };
 
-  RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
+  RequestContext(DiskIoMgr* parent, int num_disks);
 
-  /// Cleans up a buffer. If the buffer was allocated with AllocBuffer(), frees the buffer
-  /// memory and release the consumption to the client MemTracker. Otherwise (e.g. a
-  /// client or HDFS cache buffer), just prepares the descriptor to be destroyed.
-  /// After this is called, buffer->buffer() is NULL. Does not acquire 'lock_'.
+  /// Cleans up a buffer. If the buffer was allocated with AllocateBuffersForRange(),
+  /// frees the buffer. Otherwise (e.g. a client or HDFS cache buffer), just prepares the
+  /// descriptor to be destroyed. After this is called, buffer->buffer() is NULL.
+  /// Does not acquire 'lock_'.
   void FreeBuffer(BufferDescriptor* buffer);
 
   /// Decrements the number of active disks for this reader.  If the disk count
@@ -239,10 +239,6 @@ class RequestContext {
   /// Parent object
   DiskIoMgr* const parent_;
 
-  /// Memory used for this reader.  This is unowned by this object.
-  /// TODO: replace with bp client
-  MemTracker* const mem_tracker_;
-
   /// Total bytes read for this reader
   RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 041cb9d..0b234ac 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -32,8 +32,6 @@
 #include "util/mem-range.h"
 
 namespace impala {
-class MemTracker;
-
 namespace io {
 class DiskIoMgr;
 class RequestContext;
@@ -63,11 +61,15 @@ class BufferDescriptor {
   friend class ScanRange;
   friend class RequestContext;
 
-  /// Create a buffer descriptor for a new reader, range and data buffer. The buffer
-  /// memory should already be accounted against 'mem_tracker'.
+  /// Create a buffer descriptor for a new reader, range and data buffer.
   BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
       ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len);
 
+  /// Create a buffer descriptor allocated from the buffer pool.
+  BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
+      ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
+      BufferPool::BufferHandle handle);
+
   /// Return true if this is a cached buffer owned by HDFS.
   bool is_cached() const;
 
@@ -97,6 +99,11 @@ class BufferDescriptor {
   bool eosr_ = false;
 
   int64_t scan_range_offset_ = 0;
+
+  // Handle to an allocated buffer and the client used to allocate it buffer. Only used
+  // for non-external buffers.
+  BufferPool::ClientHandle* bp_client_ = nullptr;
+  BufferPool::BufferHandle handle_;
 };
 
 /// The request type, read or write associated with a request range.

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 0663a2b..9c2110c 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -340,9 +340,6 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
 void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
   DCHECK(exclusive_hdfs_fh_ == nullptr);
   DCHECK(local_file_ == nullptr);
-  // Reader must provide MemTracker or a buffer.
-  DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER
-      || reader->mem_tracker_ != nullptr);
   io_mgr_ = io_mgr;
   reader_ = reader;
   local_file_ = nullptr;
@@ -650,8 +647,7 @@ Status ScanRange::ReadFromCache(
   }
 
   // Create a single buffer desc for the entire scan range and enqueue that.
-  // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client,
-  // not the Impala backend.
+  // The memory is owned by the HDFS java client, not the Impala backend.
   unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
       io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0));
   desc->len_ = bytes_read;

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 3a69c33..e0c58d4 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -243,7 +243,7 @@ TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
     next_allocation_index_(0),
     free_ranges_(64) {
   DCHECK(tmp_file_mgr != nullptr);
-  io_ctx_ = io_mgr_->RegisterContext(nullptr);
+  io_ctx_ = io_mgr_->RegisterContext();
 }
 
 TmpFileMgr::FileGroup::~FileGroup() {

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c5df1cd..1ab05a0 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -239,6 +239,9 @@ struct THdfsScanNode {
   // The byte offset of the slot for Parquet metadata if Parquet count star optimization
   // is enabled.
   10: optional i32 parquet_count_star_slot_offset
+
+  // The ideal memory reservation in bytes to process an input split.
+  11: optional i64 ideal_scan_range_reservation
 }
 
 struct TDataSourceScanNode {

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index 4f0a0e1..aae3863 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -163,6 +163,25 @@ public class SlotDescriptor {
   }
 
   /**
+   * Checks if this descriptor describes  an array "pos" pseudo-column.
+   *
+   * Note: checking whether the column is null distinguishes between top-level columns
+   * and nested types. This check more specifically looks just for a reference to the
+   * "pos" field of an array type.
+   */
+  public boolean isArrayPosRef() {
+    if (parent_ == null) return false;
+    Type parentType = parent_.getType();
+    if (parentType instanceof CollectionStructType) {
+      if (((CollectionStructType)parentType).isArrayStruct() &&
+          label_.equals(Path.ARRAY_POS_FIELD_NAME)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Assembles the absolute materialized path to this slot starting from the schema
    * root. The materialized path points to the first non-struct schema element along the
    * path starting from the parent's tuple path to this slot's path.

http://git-wip-us.apache.org/repos/asf/impala/blob/8c922a6e/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index 23f2d88..0a945bd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -153,26 +153,6 @@ public class SlotRef extends Expr {
     return "<slot " + Integer.toString(desc_.getId().asInt()) + ">";
   }
 
-  /**
-   * Checks if this slotRef refers to an array "pos" pseudo-column.
-   *
-   * Note: checking whether the column is null distinguishes between top-level columns
-   * and nested types. This check more specifically looks just for a reference to the
-   * "pos" field of an array type.
-   */
-  public boolean isArrayPosRef() {
-    TupleDescriptor parent = getDesc().getParent();
-    if (parent == null) return false;
-    Type parentType = parent.getType();
-    if (parentType instanceof CollectionStructType) {
-      if (((CollectionStructType)parentType).isArrayStruct() &&
-          getDesc().getLabel().equals(Path.ARRAY_POS_FIELD_NAME)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   @Override
   protected void toThrift(TExprNode msg) {
     msg.node_type = TExprNodeType.SLOT_REF;