You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/08/06 00:36:00 UTC

incubator-impala git commit: IMPALA-3611: track unused Disk IO buffer memory

Repository: incubator-impala
Updated Branches:
  refs/heads/master 45d059855 -> 17bf14417


IMPALA-3611: track unused Disk IO buffer memory

Track I/O buffers against separate MemTrackers. This gives us better
visibility into memory consumption from the debug webpage and from
MemTracker consumption dumps. The immediate motivation was in trying to
determine whether idle memory consumption of an impalad was caused by a
memory leak.

We add two trackers: for buffers cached in DiskIoMgr's free list,
and another for clients that don't provide a MemTracker (the only
one is BufferedBlockMgr, which will be removed at some point).

The previous code "tracked" the buffers against the process-wide
tracker, but it was a no-op outside of ASAN builds since the
process-wide tracker took its value from TCMalloc.

The test code required fixing because it assumed that buffers were
always credited against the DiskIoMgr's tracker. This only made sense
when the DiskIoMgr's tracker is the root process-wide tracker.

Fix backend test logging for disk-io-mgr-test.

Testing:
Ran exhaustive tests.

Change-Id: I8777cf76f04d34a46f53d53005412e0f1d63b5b7
Reviewed-on: http://gerrit.cloudera.org:8080/3799
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/17bf1441
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/17bf1441
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/17bf1441

Branch: refs/heads/master
Commit: 17bf14417e3438d772b19111431453bdd537742a
Parents: 45d0598
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon May 23 14:09:39 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Aug 6 00:31:23 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-scan-range.cc |   4 +-
 be/src/runtime/disk-io-mgr-test.cc       |  88 ++++++----
 be/src/runtime/disk-io-mgr.cc            | 241 +++++++++++++++-----------
 be/src/runtime/disk-io-mgr.h             |  77 ++++----
 be/src/runtime/row-batch.cc              |   2 +-
 5 files changed, 243 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/17bf1441/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 9a7e39a..0de9622 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -442,8 +442,10 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
   DCHECK_EQ(bytes_read, len());
 
   // Create a single buffer desc for the entire scan range and enqueue that.
+  // 'mem_tracker' is NULL because the memory is owned by the HDFS java client,
+  // not the Impala backend.
   BufferDescriptor* desc = io_mgr_->GetBufferDesc(
-      reader_, this, reinterpret_cast<char*>(buffer), 0);
+      reader_, NULL, this, reinterpret_cast<char*>(buffer), 0);
   desc->len_ = bytes_read;
   desc->scan_range_offset_ = 0;
   desc->eosr_ = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/17bf1441/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 46149b5..243b2e1 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -19,6 +19,7 @@
 
 #include "testutil/gtest-util.h"
 #include "codegen/llvm-codegen.h"
+#include "common/init.h"
 #include "runtime/disk-io-mgr.h"
 #include "runtime/disk-io-mgr-stress.h"
 #include "runtime/mem-tracker.h"
@@ -39,6 +40,14 @@ namespace impala {
 
 class DiskIoMgrTest : public testing::Test {
  public:
+
+  virtual void SetUp() {
+    pool_.reset(new ObjectPool);
+  }
+
+  virtual void TearDown() {
+    pool_.reset();
+  }
   void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange** written_range,
       DiskIoMgr* io_mgr, DiskIoRequestContext* reader, int32_t* data,
       Status expected_status, const Status& status) {
@@ -220,6 +229,7 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   read_io_mgr->UnregisterContext(reader);
   read_io_mgr.reset();
 }
+
 // Perform invalid writes (e.g. non-existent file, negative offset) and validate
 // that an error status is returned via the write callback.
 TEST_F(DiskIoMgrTest, InvalidWrite) {
@@ -230,7 +240,6 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   ASSERT_OK(io_mgr.Init(&mem_tracker));
   DiskIoRequestContext* writer;
   ASSERT_OK(io_mgr.RegisterContext(&writer));
-  pool_.reset(new ObjectPool);
   int32_t* data = pool_->Add(new int32_t);
   *data = rand();
 
@@ -619,11 +628,11 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     pool_.reset(new ObjectPool);
     if (++iters % 1000 == 0) LOG(ERROR) << "Starting iteration " << iters;
 
-    MemTracker mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
+    MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
     DiskIoMgr io_mgr(1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
-    ASSERT_OK(io_mgr.Init(&mem_tracker));
-    MemTracker reader_mem_tracker;
+    ASSERT_OK(io_mgr.Init(&root_mem_tracker));
+    MemTracker reader_mem_tracker(-1, -1, "Reader", &root_mem_tracker);
     DiskIoRequestContext* reader;
     ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
@@ -932,54 +941,66 @@ TEST_F(DiskIoMgrTest, Buffers) {
   // Test default min/max buffer size
   int min_buffer_size = 1024;
   int max_buffer_size = 8 * 1024 * 1024; // 8 MB
-  MemTracker mem_tracker(max_buffer_size * 2);
+  MemTracker root_mem_tracker(max_buffer_size * 2);
 
   DiskIoMgr io_mgr(1, 1, min_buffer_size, max_buffer_size);
-  ASSERT_OK(io_mgr.Init(&mem_tracker));
-  ASSERT_EQ(mem_tracker.consumption(), 0);
+  ASSERT_OK(io_mgr.Init(&root_mem_tracker));
+  ASSERT_EQ(root_mem_tracker.consumption(), 0);
+
+  MemTracker reader_mem_tracker(-1, -1, "Reader", &root_mem_tracker);
+  DiskIoRequestContext* reader;
+  ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
+
+  DiskIoMgr::ScanRange* dummy_range = InitRange(1, "dummy", 0, 0, 0, 0);
 
   // buffer length should be rounded up to min buffer size
   int64_t buffer_len = 1;
-  char* buf = io_mgr.GetFreeBuffer(&buffer_len);
-  EXPECT_EQ(buffer_len, min_buffer_size);
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 1);
-  io_mgr.ReturnFreeBuffer(buf, buffer_len);
-  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size);
+  DiskIoMgr::BufferDescriptor* buffer_desc;
+  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
+  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
+  io_mgr.FreeBufferMemory(buffer_desc);
+  EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
 
   // reuse buffer
   buffer_len = min_buffer_size;
-  buf = io_mgr.GetFreeBuffer(&buffer_len);
-  EXPECT_EQ(buffer_len, min_buffer_size);
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 1);
-  io_mgr.ReturnFreeBuffer(buf, buffer_len);
-  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size);
+  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
+  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
+  io_mgr.FreeBufferMemory(buffer_desc);
+  EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
 
   // bump up to next buffer size
   buffer_len = min_buffer_size + 1;
-  buf = io_mgr.GetFreeBuffer(&buffer_len);
-  EXPECT_EQ(buffer_len, min_buffer_size * 2);
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 2);
-  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 3);
+  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  EXPECT_EQ(min_buffer_size * 2, buffer_desc->buffer_len());
+  EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
+  EXPECT_EQ(min_buffer_size * 3, root_mem_tracker.consumption());
 
   // gc unused buffer
   io_mgr.GcIoBuffers();
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 1);
-  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 2);
+  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
+  EXPECT_EQ(min_buffer_size * 2, root_mem_tracker.consumption());
 
-  io_mgr.ReturnFreeBuffer(buf, buffer_len);
+  io_mgr.FreeBufferMemory(buffer_desc);
 
   // max buffer size
   buffer_len = max_buffer_size;
-  buf = io_mgr.GetFreeBuffer(&buffer_len);
-  EXPECT_EQ(buffer_len, max_buffer_size);
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 2);
-  io_mgr.ReturnFreeBuffer(buf, buffer_len);
-  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 2 + max_buffer_size);
+  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  EXPECT_EQ(max_buffer_size, buffer_desc->buffer_len());
+  EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
+  io_mgr.FreeBufferMemory(buffer_desc);
+  EXPECT_EQ(min_buffer_size * 2 + max_buffer_size, root_mem_tracker.consumption());
 
   // gc buffers
   io_mgr.GcIoBuffers();
   EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 0);
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  EXPECT_EQ(root_mem_tracker.consumption(), 0);
+  io_mgr.UnregisterContext(reader);
 }
 
 // IMPALA-2366: handle partial read where range goes past end of file.
@@ -995,7 +1016,6 @@ TEST_F(DiskIoMgrTest, PartialRead) {
   struct stat stat_val;
   stat(tmp_file, &stat_val);
 
-  pool_.reset(new ObjectPool);
   scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, read_len, read_len));
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
@@ -1022,11 +1042,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
 }
 
 int main(int argc, char **argv) {
-  google::InitGoogleLogging(argv[0]);
   ::testing::InitGoogleTest(&argc, argv);
-  impala::CpuInfo::Init();
-  impala::DiskInfo::Init();
-  impala::OsInfo::Init();
-  impala::InitThreading();
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/17bf1441/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 448424f..769c505 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -203,25 +203,50 @@ string DiskIoMgr::DebugString() {
   return ss.str();
 }
 
-DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) :
-  io_mgr_(io_mgr), reader_(NULL), buffer_(NULL) {
+DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) {
+  Reset();
+}
+
+void DiskIoMgr::BufferDescriptor::Reset() {
+  DCHECK(io_mgr_ != NULL);
+  reader_ = NULL;
+  scan_range_ = NULL;
+  mem_tracker_ = NULL;
+  buffer_ = NULL;
+  buffer_len_ = 0;
+  len_ = 0;
+  eosr_ = false;
+  status_ = Status::OK();
+  scan_range_offset_ = 0;
 }
 
 void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader,
-      ScanRange* range, char* buffer, int64_t buffer_len) {
+    ScanRange* range, char* buffer, int64_t buffer_len, MemTracker* mem_tracker) {
   DCHECK(io_mgr_ != NULL);
   DCHECK(buffer_ == NULL);
   DCHECK(range != NULL);
   DCHECK(buffer != NULL);
   DCHECK_GE(buffer_len, 0);
+  DCHECK_NE(range->cached_buffer_ == NULL, mem_tracker == NULL);
   reader_ = reader;
   scan_range_ = range;
+  mem_tracker_ = mem_tracker;
   buffer_ = buffer;
   buffer_len_ = buffer_len;
   len_ = 0;
   eosr_ = false;
   status_ = Status::OK();
-  mem_tracker_ = NULL;
+  scan_range_offset_ = 0;
+}
+
+void DiskIoMgr::BufferDescriptor::TransferOwnership(MemTracker* dst) {
+  DCHECK(dst != NULL);
+  // Memory of cached buffers is not tracked against a tracker.
+  if (is_cached()) return;
+  DCHECK(mem_tracker_ != NULL);
+  dst->Consume(buffer_len_);
+  mem_tracker_->Release(buffer_len_);
+  mem_tracker_ = dst;
 }
 
 void DiskIoMgr::BufferDescriptor::Return() {
@@ -229,15 +254,6 @@ void DiskIoMgr::BufferDescriptor::Return() {
   io_mgr_->ReturnBuffer(this);
 }
 
-void DiskIoMgr::BufferDescriptor::SetMemTracker(MemTracker* tracker) {
-  // Cached buffers don't count towards mem usage.
-  if (scan_range_->cached_buffer_ != NULL) return;
-  if (mem_tracker_ == tracker) return;
-  if (mem_tracker_ != NULL) mem_tracker_->Release(buffer_len_);
-  mem_tracker_ = tracker;
-  if (mem_tracker_ != NULL) mem_tracker_->Consume(buffer_len_);
-}
-
 DiskIoMgr::WriteRange::WriteRange(const string& file, int64_t file_offset, int disk_id,
     WriteDoneCallback callback) {
   file_ = file;
@@ -340,12 +356,20 @@ DiskIoMgr::~DiskIoMgr() {
     delete disk_queues_[i];
   }
 
+  if (free_buffer_mem_tracker_ != NULL) free_buffer_mem_tracker_->UnregisterFromParent();
+  if (unowned_buffer_mem_tracker_ != NULL) {
+    unowned_buffer_mem_tracker_->UnregisterFromParent();
+  }
+
   if (cached_read_options_ != NULL) hadoopRzOptionsFree(cached_read_options_);
 }
 
 Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
   DCHECK(process_mem_tracker != NULL);
-  process_mem_tracker_ = process_mem_tracker;
+  free_buffer_mem_tracker_.reset(
+      new MemTracker(-1, -1, "Free Disk IO Buffers", process_mem_tracker, false));
+  unowned_buffer_mem_tracker_.reset(
+      new MemTracker(-1, -1, "Untracked Disk IO Buffers", process_mem_tracker, false));
   // If we hit the process limit, see if we can reclaim some memory by removing
   // previously allocated (but unused) io buffers.
   process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::GcIoBuffers, this));
@@ -626,11 +650,10 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
 
   DiskIoRequestContext* reader = buffer_desc->reader_;
   if (buffer_desc->buffer_ != NULL) {
-    if (buffer_desc->scan_range_->cached_buffer_ == NULL) {
-      // Not a cached buffer. Return the io buffer and update mem tracking.
-      ReturnFreeBuffer(buffer_desc);
+    if (!buffer_desc->is_cached()) {
+      // Cached buffers are not allocated by DiskIoMgr so don't need to be freed.
+      FreeBufferMemory(buffer_desc);
     }
-    buffer_desc->buffer_ = NULL;
     num_buffers_in_readers_.Add(-1);
     reader->num_buffers_in_reader_.Add(-1);
   } else {
@@ -649,14 +672,15 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
 
 void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) {
   DCHECK(desc != NULL);
+  desc->Reset();
   unique_lock<mutex> lock(free_buffers_lock_);
   DCHECK(find(free_buffer_descs_.begin(), free_buffer_descs_.end(), desc)
          == free_buffer_descs_.end());
   free_buffer_descs_.push_back(desc);
 }
 
-DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(
-    DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size) {
+DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(DiskIoRequestContext* reader,
+    MemTracker* mem_tracker, ScanRange* range, char* buffer, int64_t buffer_size) {
   BufferDescriptor* buffer_desc;
   {
     unique_lock<mutex> lock(free_buffers_lock_);
@@ -667,43 +691,57 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(
       free_buffer_descs_.pop_front();
     }
   }
-  buffer_desc->Reset(reader, range, buffer, buffer_size);
-  buffer_desc->SetMemTracker(reader->mem_tracker_);
+  buffer_desc->Reset(reader, range, buffer, buffer_size, mem_tracker);
   return buffer_desc;
 }
 
-char* DiskIoMgr::GetFreeBuffer(int64_t* buffer_size) {
-  DCHECK_LE(*buffer_size, max_buffer_size_);
-  DCHECK_GT(*buffer_size, 0);
-  *buffer_size = min(static_cast<int64_t>(max_buffer_size_), *buffer_size);
-  int idx = free_buffers_idx(*buffer_size);
+DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* reader,
+    ScanRange* range, int64_t buffer_size) {
+  DCHECK_LE(buffer_size, max_buffer_size_);
+  DCHECK_GT(buffer_size, 0);
+  buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size);
+  int idx = free_buffers_idx(buffer_size);
   // Quantize buffer size to nearest power of 2 greater than the specified buffer size and
   // convert to bytes
-  *buffer_size = (1 << idx) * min_buffer_size_;
+  buffer_size = (1LL << idx) * min_buffer_size_;
+
+  // Track memory against the reader. This is checked the next time we start
+  // a read for the next reader in DiskIoMgr::GetNextScanRange().
+  // TODO: IMPALA-3200: BufferedBlockMgr does not expect read buffers to be tracked
+  // against its MemTracker. Once BufferedBlockMgr is removed, we can expect that
+  // all readers provide a MemTracker and remove this NULL check.
+  MemTracker* buffer_mem_tracker = reader->mem_tracker_ != NULL ? reader->mem_tracker_ :
+      unowned_buffer_mem_tracker_.get();
+  buffer_mem_tracker->Consume(buffer_size);
 
-  unique_lock<mutex> lock(free_buffers_lock_);
   char* buffer = NULL;
-  if (free_buffers_[idx].empty()) {
-    num_allocated_buffers_.Add(1);
-    if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
-      ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
-    }
-    if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
-      ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(*buffer_size);
-    }
-    // Update the process mem usage.  This is checked the next time we start
-    // a read for the next reader (DiskIoMgr::GetNextScanRange)
-    process_mem_tracker_->Consume(*buffer_size);
-    buffer = new char[*buffer_size];
-  } else {
-    if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
-      ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
+  {
+    unique_lock<mutex> lock(free_buffers_lock_);
+    if (free_buffers_[idx].empty()) {
+      num_allocated_buffers_.Add(1);
+      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
+        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
+      }
+      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
+        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size);
+      }
+      // We already tracked this memory against the reader's MemTracker.
+      buffer = new char[buffer_size];
+    } else {
+      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
+        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
+      }
+      buffer = free_buffers_[idx].front();
+      free_buffers_[idx].pop_front();
+      free_buffer_mem_tracker_->Release(buffer_size);
     }
-    buffer = free_buffers_[idx].front();
-    free_buffers_[idx].pop_front();
   }
+
+  // Validate more invariants.
+  DCHECK(range != NULL);
+  DCHECK(reader != NULL);
   DCHECK(buffer != NULL);
-  return buffer;
+  return GetBufferDesc(reader, buffer_mem_tracker, range, buffer, buffer_size);
 }
 
 void DiskIoMgr::GcIoBuffers() {
@@ -711,12 +749,11 @@ void DiskIoMgr::GcIoBuffers() {
   int buffers_freed = 0;
   int bytes_freed = 0;
   for (int idx = 0; idx < free_buffers_.size(); ++idx) {
-    for (list<char*>::iterator iter = free_buffers_[idx].begin();
-         iter != free_buffers_[idx].end(); ++iter) {
-      int64_t buffer_size = (1 << idx) * min_buffer_size_;
-      process_mem_tracker_->Release(buffer_size);
+    for (char* buffer: free_buffers_[idx]) {
+      int64_t buffer_size = (1LL << idx) * min_buffer_size_;
+      delete[] buffer;
+      free_buffer_mem_tracker_->Release(buffer_size);
       num_allocated_buffers_.Add(-1);
-      delete[] *iter;
 
       ++buffers_freed;
       bytes_freed += buffer_size;
@@ -735,35 +772,44 @@ void DiskIoMgr::GcIoBuffers() {
   }
 }
 
-void DiskIoMgr::ReturnFreeBuffer(BufferDescriptor* desc) {
-  ReturnFreeBuffer(desc->buffer_, desc->buffer_len_);
-  desc->SetMemTracker(NULL);
-  desc->buffer_ = NULL;
-}
-
-void DiskIoMgr::ReturnFreeBuffer(char* buffer, int64_t buffer_size) {
-  DCHECK(buffer != NULL);
+void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
+  DCHECK(!desc->is_cached());
+  char* buffer = desc->buffer_;
+  int64_t buffer_size = desc->buffer_len_;
   int idx = free_buffers_idx(buffer_size);
-  DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1 << idx), 0)
+  DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0)
       << "buffer_size_ / min_buffer_size_ should be power of 2, got buffer_size = "
       << buffer_size << ", min_buffer_size_ = " << min_buffer_size_;
-  unique_lock<mutex> lock(free_buffers_lock_);
-  if (!FLAGS_disable_mem_pools && free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
-    free_buffers_[idx].push_back(buffer);
-    if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
-      ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);
-    }
-  } else {
-    process_mem_tracker_->Release(buffer_size);
-    num_allocated_buffers_.Add(-1);
-    delete[] buffer;
-    if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
-      ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
-    }
-    if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
-      ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
+
+  {
+    unique_lock<mutex> lock(free_buffers_lock_);
+    if (!FLAGS_disable_mem_pools &&
+        free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
+      free_buffers_[idx].push_back(buffer);
+      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
+        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);
+      }
+      // This consume call needs to be protected by 'free_buffers_lock_' to avoid a race
+      // with a Release() call for the same buffer that could make consumption negative.
+      // Note: we can't use TryConsume(), which can indirectly call GcIoBuffers().
+      // TODO: after IMPALA-3200 is completed, we should be able to leverage the buffer
+      // pool's free lists, and remove these free lists.
+      free_buffer_mem_tracker_->Consume(buffer_size);
+    } else {
+      num_allocated_buffers_.Add(-1);
+      delete[] buffer;
+      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
+        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
+      }
+      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
+        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
+      }
     }
   }
+
+  // We transferred the buffer ownership from the BufferDescriptor to the DiskIoMgr.
+  desc->mem_tracker_->Release(buffer_size);
+  desc->buffer_ = NULL;
 }
 
 // This function gets the next RequestRange to work on for this disk. It checks for
@@ -812,14 +858,19 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
     // same reader here (the reader is removed from the queue).  There can be
     // other disk threads operating on this reader in other functions though.
 
-    // We just picked a reader, check the mem limits.
-    // TODO: we can do a lot better here.  The reader can likely make progress
-    // with fewer io buffers.
-    bool process_limit_exceeded = process_mem_tracker_->LimitExceeded();
+    // We just picked a reader, check the mem limits. We need to fail the request if
+    // the reader exceeded its memory limit, or if we're over a global memory limit.
+    // TODO: once IMPALA-3200 is fixed, we should be able to remove the free lists and
+    // move these memory limit checks to GetFreeBuffer().
+    // Note that calling AnyLimitExceeded() can result in a call to GcIoBuffers().
+    bool any_io_mgr_limit_exceeded = free_buffer_mem_tracker_->AnyLimitExceeded();
+    // TODO: IMPALA-3209: we should not force a reader over its memory limit by
+    // pushing more buffers to it. Most readers can make progress and operate within
+    // a fixed memory limit.
     bool reader_limit_exceeded = (*request_context)->mem_tracker_ != NULL
         ? (*request_context)->mem_tracker_->AnyLimitExceeded() : false;
 
-    if (process_limit_exceeded || reader_limit_exceeded) {
+    if (any_io_mgr_limit_exceeded || reader_limit_exceeded) {
       (*request_context)->Cancel(Status::MemLimitExceeded());
     }
 
@@ -923,8 +974,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext*
   if (reader->state_ == DiskIoRequestContext::Cancelled) {
     state.DecrementRequestThreadAndCheckDone(reader);
     DCHECK(reader->Validate()) << endl << reader->DebugString();
-    ReturnFreeBuffer(buffer);
-    buffer->buffer_ = NULL;
+    FreeBufferMemory(buffer);
     buffer->scan_range_->Cancel(reader->status_);
     // Enqueue the buffer to use the scan range's buffer cleanup path.
     buffer->scan_range_->EnqueueBuffer(buffer);
@@ -940,7 +990,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext*
   //  3. Middle of scan range
   if (!buffer->status_.ok()) {
     // Error case
-    ReturnFreeBuffer(buffer);
+    FreeBufferMemory(buffer);
     buffer->eosr_ = true;
     --state.num_remaining_ranges();
     buffer->scan_range_->Cancel(buffer->status_);
@@ -952,11 +1002,12 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext*
   // Store the state we need before calling EnqueueBuffer().
   bool eosr = buffer->eosr_;
   ScanRange* scan_range = buffer->scan_range_;
-  bool queue_full = buffer->scan_range_->EnqueueBuffer(buffer);
+  bool is_cached = buffer->is_cached();
+  bool queue_full = scan_range->EnqueueBuffer(buffer);
   if (eosr) {
     // For cached buffers, we can't close the range until the cached buffer is returned.
     // Close() is called from DiskIoMgr::ReturnBuffer().
-    if (scan_range->cached_buffer_ == NULL) scan_range->Close();
+    if (!is_cached) scan_range->Close();
   } else {
     if (queue_full) {
       reader->blocked_ranges_.Enqueue(scan_range);
@@ -1003,7 +1054,6 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
 // specified reader context and disk queue.
 void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
     ScanRange* range) {
-  char* buffer = NULL;
   int64_t bytes_remaining = range->len_ - range->bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
   int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_));
@@ -1044,17 +1094,9 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
     }
   }
 
-  buffer = GetFreeBuffer(&buffer_size);
-  reader->num_used_buffers_.Add(1);
-
-  // Validate more invariants.
-  DCHECK_GT(reader->num_used_buffers_.Load(), 0);
-  DCHECK(range != NULL);
-  DCHECK(reader != NULL);
-  DCHECK(buffer != NULL);
-
-  BufferDescriptor* buffer_desc = GetBufferDesc(reader, range, buffer, buffer_size);
+  BufferDescriptor* buffer_desc = GetFreeBuffer(reader, range, buffer_size);
   DCHECK(buffer_desc != NULL);
+  reader->num_used_buffers_.Add(1);
 
   // No locks in this section.  Only working on local vars.  We don't want to hold a
   // lock across the read call.
@@ -1065,13 +1107,14 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
       reader->active_read_thread_counter_->Add(1L);
     }
     if (reader->disks_accessed_bitmap_) {
-      int64_t disk_bit = 1 << disk_queue->disk_id;
+      int64_t disk_bit = 1LL << disk_queue->disk_id;
       reader->disks_accessed_bitmap_->BitOr(disk_bit);
     }
     SCOPED_TIMER(&read_timer_);
     SCOPED_TIMER(reader->read_timer_);
 
-    buffer_desc->status_ = range->Read(buffer, &buffer_desc->len_, &buffer_desc->eosr_);
+    buffer_desc->status_ = range->Read(buffer_desc->buffer_, &buffer_desc->len_,
+        &buffer_desc->eosr_);
     buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
 
     if (reader->bytes_read_counter_ != NULL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/17bf1441/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index 9eb663c..aadc85b 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -237,9 +237,12 @@ class DiskIoMgr {
     /// Returns the offset within the scan range that this buffer starts at
     int64_t scan_range_offset() const { return scan_range_offset_; }
 
-    /// Updates this buffer buffer to be owned by the new tracker. Consumption is
-    /// release from the current tracker and added to the new one.
-    void SetMemTracker(MemTracker* tracker);
+    /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and
+    /// set 'mem_tracker_' to 'dst'.  'mem_tracker_' and 'dst' must be non-NULL.
+    /// Does not check memory limits on 'dst': the caller should check the memory limit
+    /// if a different memory limit may apply to 'dst'.
+    /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp.
+    void TransferOwnership(MemTracker* dst);
 
     /// Returns the buffer to the IoMgr. This must be called for every buffer
     /// returned by GetNext()/Read() that did not return an error. This is non-blocking.
@@ -251,19 +254,26 @@ class DiskIoMgr {
     friend class DiskIoRequestContext;
     BufferDescriptor(DiskIoMgr* io_mgr);
 
+    bool is_cached() { return scan_range_->cached_buffer_ != NULL; }
+
+    /// Reset the buffer descriptor to an uninitialized state.
+    void Reset();
+
     /// Resets the buffer descriptor state for a new reader, range and data buffer.
+    /// The buffer memory should already be accounted against MemTracker
     void Reset(DiskIoRequestContext* reader, ScanRange* range, char* buffer,
-        int64_t buffer_len);
+        int64_t buffer_len, MemTracker* mem_tracker);
 
-    DiskIoMgr* io_mgr_;
+    DiskIoMgr* const io_mgr_;
 
-    /// Reader that this buffer is for
+    /// Reader that this buffer is for.
     DiskIoRequestContext* reader_;
 
-    /// The current tracker this buffer is associated with.
+    /// The current tracker this buffer is associated with. After initialisation,
+    /// NULL for cached buffers and non-NULL for all other buffers.
     MemTracker* mem_tracker_;
 
-    /// Scan range that this buffer is for.
+    /// Scan range that this buffer is for. Non-NULL when initialised.
     ScanRange* scan_range_;
 
     /// buffer with the read contents
@@ -684,8 +694,13 @@ class DiskIoMgr {
   /// Pool to allocate BufferDescriptors.
   ObjectPool pool_;
 
-  /// Process memory tracker; needed to account for io buffers.
-  MemTracker* process_mem_tracker_;
+  /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
+  boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
+
+  /// Memory tracker for I/O buffers where the DiskIoRequestContext has no MemTracker.
+  /// TODO: once IMPALA-3200 is fixed, there should be no more cases where readers don't
+  /// provide a MemTracker.
+  boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
 
   /// Number of worker(read) threads per disk. Also the max depth of queued
   /// work to the disk.
@@ -757,25 +772,27 @@ class DiskIoMgr {
   /// Returns the index into free_buffers_ for a given buffer size
   int free_buffers_idx(int64_t buffer_size);
 
-  /// Gets a buffer description object, initialized for this reader, allocating one as
-  /// necessary. buffer_size / min_buffer_size_ should be a power of 2, and buffer_size
-  /// should be <= max_buffer_size_. These constraints will be met if buffer was acquired
-  /// via GetFreeBuffer() (which it should have been).
-  BufferDescriptor* GetBufferDesc(
-      DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size);
-
-  /// Returns a buffer desc object which can now be used for another reader.
-  void ReturnBufferDesc(BufferDescriptor* desc);
+  /// Returns a buffer to read into with size between 'buffer_size' and
+  /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the
+  /// 'free_buffers_', that is returned, otherwise a new one is allocated.
+  /// The returned *buffer_size must be between 0 and 'max_buffer_size_'.
+  /// The buffer memory is tracked against reader's mem tracker, or
+  /// 'unowned_buffer_mem_tracker_' if the reader does not have one.
+  BufferDescriptor* GetFreeBuffer(DiskIoRequestContext* reader, ScanRange* range,
+      int64_t buffer_size);
+
+  /// Gets a BufferDescriptor initialized with the provided parameters. The object may be
+  /// recycled or newly allocated. Does not do anything aside from initialize the
+  /// descriptor's fields.
+  BufferDescriptor* GetBufferDesc(DiskIoRequestContext* reader,
+      MemTracker* mem_tracker, ScanRange* range, char* buffer, int64_t buffer_size);
 
   /// Returns the buffer desc and underlying buffer to the disk IoMgr. This also updates
   /// the reader and disk queue state.
   void ReturnBuffer(BufferDescriptor* buffer);
 
-  /// Returns a buffer to read into with size between *buffer_size and max_buffer_size_,
-  /// and *buffer_size is set to the size of the buffer. If there is an
-  /// appropriately-sized free buffer in the 'free_buffers_', that is returned, otherwise
-  /// a new one is allocated. *buffer_size must be between 0 and max_buffer_size_.
-  char* GetFreeBuffer(int64_t* buffer_size);
+  /// Returns a buffer desc object which can now be used for another reader.
+  void ReturnBufferDesc(BufferDescriptor* desc);
 
   /// Garbage collect all unused io buffers. This is currently only triggered when the
   /// process wide limit is hit. This is not good enough. While it is sufficient for
@@ -783,14 +800,10 @@ class DiskIoMgr {
   /// TODO: make this run periodically?
   void GcIoBuffers();
 
-  /// Returns a buffer to the free list. buffer_size / min_buffer_size_ should be a power
-  /// of 2, and buffer_size should be <= max_buffer_size_. These constraints will be met
-  /// if buffer was acquired via GetFreeBuffer() (which it should have been).
-  void ReturnFreeBuffer(char* buffer, int64_t buffer_size);
-
-  /// Returns the buffer in desc (cannot be NULL), sets buffer to NULL and clears the
-  /// mem tracker.
-  void ReturnFreeBuffer(BufferDescriptor* desc);
+  /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be NULL), either
+  /// freeing it or returning it to 'free_buffers_'. Memory tracking is updated to
+  /// reflect the transfer of ownership from desc->mem_tracker_ to the disk I/O mgr.
+  void FreeBufferMemory(BufferDescriptor* desc);
 
   /// Disk worker thread loop. This function retrieves the next range to process on
   /// the disk queue and invokes ReadRange() or Write() depending on the type of Range().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/17bf1441/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 7e75736..458e137 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -290,7 +290,7 @@ void RowBatch::AddIoBuffer(DiskIoMgr::BufferDescriptor* buffer) {
   DCHECK(buffer != NULL);
   io_buffers_.push_back(buffer);
   auxiliary_mem_usage_ += buffer->buffer_len();
-  buffer->SetMemTracker(mem_tracker_);
+  buffer->TransferOwnership(mem_tracker_);
 }
 
 void RowBatch::AddTupleStream(BufferedTupleStream* stream) {