You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/26 02:41:53 UTC

[3/4] incubator-impala git commit: Refactor RuntimeState and ExecEnv dependencies

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc
index f1e71b2..c01f34b 100644
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ b/be/src/runtime/disk-io-mgr-reader-context.cc
@@ -18,7 +18,7 @@
 
 using namespace impala;
 
-void DiskIoMgr::RequestContext::Cancel(const Status& status) {
+void DiskIoRequestContext::Cancel(const Status& status) {
   DCHECK(!status.ok());
 
   // Callbacks are collected in this vector and invoked while no lock is held.
@@ -28,18 +28,18 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) {
     DCHECK(Validate()) << endl << DebugString();
 
     // Already being cancelled
-    if (state_ == RequestContext::Cancelled) return;
+    if (state_ == DiskIoRequestContext::Cancelled) return;
 
     DCHECK(status_.ok());
     status_ = status;
 
     // The reader will be put into a cancelled state until call cleanup is complete.
-    state_ = RequestContext::Cancelled;
+    state_ = DiskIoRequestContext::Cancelled;
 
     // Cancel all scan ranges for this reader. Each range could be one one of
     // four queues.
     for (int i = 0; i < disk_states_.size(); ++i) {
-      RequestContext::PerDiskState& state = disk_states_[i];
+      DiskIoRequestContext::PerDiskState& state = disk_states_[i];
       RequestRange* range = NULL;
       while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
         if (range->request_type() == RequestType::READ) {
@@ -74,7 +74,7 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) {
     // Schedule reader on all disks. The disks will notice it is cancelled and do any
     // required cleanup
     for (int i = 0; i < disk_states_.size(); ++i) {
-      RequestContext::PerDiskState& state = disk_states_[i];
+      DiskIoRequestContext::PerDiskState& state = disk_states_[i];
       state.ScheduleContext(this, i);
     }
   }
@@ -88,10 +88,10 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) {
   ready_to_start_ranges_cv_.notify_all();
 }
 
-void DiskIoMgr::RequestContext::AddRequestRange(
+void DiskIoRequestContext::AddRequestRange(
     DiskIoMgr::RequestRange* range, bool schedule_immediately) {
   // DCHECK(lock_.is_locked()); // TODO: boost should have this API
-  RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+  DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
   if (state.done()) {
     DCHECK_EQ(state.num_remaining_ranges(), 0);
     state.set_done(false);
@@ -107,7 +107,7 @@ void DiskIoMgr::RequestContext::AddRequestRange(
       state.unstarted_scan_ranges()->Enqueue(scan_range);
       num_unstarted_scan_ranges_.Add(1);
     }
-    // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will
+    // If next_scan_range_to_start is NULL, schedule this DiskIoRequestContext so that it will
     // be set. If it's not NULL, this context will be scheduled when GetNextRange() is
     // invoked.
     schedule_context = state.next_scan_range_to_start() == NULL;
@@ -126,7 +126,7 @@ void DiskIoMgr::RequestContext::AddRequestRange(
   ++state.num_remaining_ranges();
 }
 
-DiskIoMgr::RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
+DiskIoRequestContext::DiskIoRequestContext(DiskIoMgr* parent, int num_disks)
   : parent_(parent),
     bytes_read_counter_(NULL),
     read_timer_(NULL),
@@ -137,7 +137,7 @@ DiskIoMgr::RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
 }
 
 // Resets this object.
-void DiskIoMgr::RequestContext::Reset(MemTracker* tracker) {
+void DiskIoRequestContext::Reset(MemTracker* tracker) {
   DCHECK_EQ(state_, Inactive);
   status_ = Status::OK();
 
@@ -173,13 +173,13 @@ void DiskIoMgr::RequestContext::Reset(MemTracker* tracker) {
 }
 
 // Dumps out request context information. Lock should be taken by caller
-string DiskIoMgr::RequestContext::DebugString() const {
+string DiskIoRequestContext::DebugString() const {
   stringstream ss;
-  ss << endl << "  RequestContext: " << (void*)this << " (state=";
-  if (state_ == RequestContext::Inactive) ss << "Inactive";
-  if (state_ == RequestContext::Cancelled) ss << "Cancelled";
-  if (state_ == RequestContext::Active) ss << "Active";
-  if (state_ != RequestContext::Inactive) {
+  ss << endl << "  DiskIoRequestContext: " << (void*)this << " (state=";
+  if (state_ == DiskIoRequestContext::Inactive) ss << "Inactive";
+  if (state_ == DiskIoRequestContext::Cancelled) ss << "Cancelled";
+  if (state_ == DiskIoRequestContext::Active) ss << "Active";
+  if (state_ != DiskIoRequestContext::Inactive) {
     ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
        << " #ready_buffers=" << num_ready_buffers_.Load()
        << " #used_buffers=" << num_used_buffers_.Load()
@@ -203,9 +203,9 @@ string DiskIoMgr::RequestContext::DebugString() const {
   return ss.str();
 }
 
-bool DiskIoMgr::RequestContext::Validate() const {
-  if (state_ == RequestContext::Inactive) {
-    LOG(WARNING) << "state_ == RequestContext::Inactive";
+bool DiskIoRequestContext::Validate() const {
+  if (state_ == DiskIoRequestContext::Inactive) {
+    LOG(WARNING) << "state_ == DiskIoRequestContext::Inactive";
     return false;
   }
 
@@ -234,7 +234,7 @@ bool DiskIoMgr::RequestContext::Validate() const {
       return false;
     }
 
-    if (state_ != RequestContext::Cancelled) {
+    if (state_ != DiskIoRequestContext::Cancelled) {
       if (state.unstarted_scan_ranges()->size() + state.in_flight_ranges()->size() >
           state.num_remaining_ranges()) {
         LOG(WARNING) << "disk_id=" << i
@@ -285,7 +285,7 @@ bool DiskIoMgr::RequestContext::Validate() const {
     }
   }
 
-  if (state_ != RequestContext::Cancelled) {
+  if (state_ != DiskIoRequestContext::Cancelled) {
     if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) {
       LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges
                    << " sum_in_states=" << num_unstarted_scan_ranges_.Load();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 01ea4b5..25399bc 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -121,7 +121,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) {
   }
 
   DCHECK(reader_->Validate()) << endl << reader_->DebugString();
-  if (reader_->state_ == RequestContext::Cancelled) {
+  if (reader_->state_ == DiskIoRequestContext::Cancelled) {
     reader_->blocked_ranges_.Remove(this);
     Cancel(reader_->status_);
     (*buffer)->Return();
@@ -230,7 +230,7 @@ void DiskIoMgr::ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64
   mtime_ = mtime;
 }
 
-void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
+void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader) {
   DCHECK(hdfs_file_ == NULL);
   io_mgr_ = io_mgr;
   reader_ = reader;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc
index af6ad27..0a8a628 100644
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ b/be/src/runtime/disk-io-mgr-stress.cc
@@ -52,7 +52,7 @@ string GenerateRandomData() {
 
 struct DiskIoMgrStress::Client {
   boost::mutex lock;
-  DiskIoMgr::RequestContext* reader;
+  DiskIoRequestContext* reader;
   int file_idx;
   vector<DiskIoMgr::ScanRange*> scan_ranges;
   int abort_at_byte;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index ee89f56..46149b5 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -40,7 +40,7 @@ namespace impala {
 class DiskIoMgrTest : public testing::Test {
  public:
   void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange** written_range,
-      DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader, int32_t* data,
+      DiskIoMgr* io_mgr, DiskIoRequestContext* reader, int32_t* data,
       Status expected_status, const Status& status) {
     if (expected_status.code() == TErrorCode::CANCELLED) {
       EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail();
@@ -99,7 +99,7 @@ class DiskIoMgrTest : public testing::Test {
     }
   }
 
-  static void ValidateSyncRead(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
+  static void ValidateSyncRead(DiskIoMgr* io_mgr, DiskIoRequestContext* reader,
       DiskIoMgr::ScanRange* range, const char* expected, int expected_len = -1) {
     DiskIoMgr::BufferDescriptor* buffer;
     ASSERT_OK(io_mgr->Read(reader, range, &buffer));
@@ -134,7 +134,7 @@ class DiskIoMgrTest : public testing::Test {
 
   // Continues pulling scan ranges from the io mgr until they are all done.
   // Updates num_ranges_processed with the number of ranges seen by this thread.
-  static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
+  static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoRequestContext* reader,
       const char* expected_result, int expected_len, const Status& expected_status,
       int max_ranges, AtomicInt32* num_ranges_processed) {
     int num_ranges = 0;
@@ -185,14 +185,14 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
   ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
-  DiskIoMgr::RequestContext* reader;
+  DiskIoRequestContext* reader;
   ASSERT_OK(read_io_mgr->RegisterContext(&reader, &reader_mem_tracker));
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.reset(new ObjectPool);
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
-      DiskIoMgr::RequestContext* writer;
+      DiskIoRequestContext* writer;
       io_mgr.RegisterContext(&writer, &mem_tracker);
       for (int i = 0; i < num_ranges; ++i) {
         int32_t* data = pool_->Add(new int32_t);
@@ -228,7 +228,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   string tmp_file = "/tmp/non-existent.txt";
   DiskIoMgr io_mgr(1, 1, 1, 10);
   ASSERT_OK(io_mgr.Init(&mem_tracker));
-  DiskIoMgr::RequestContext* writer;
+  DiskIoRequestContext* writer;
   ASSERT_OK(io_mgr.RegisterContext(&writer));
   pool_.reset(new ObjectPool);
   int32_t* data = pool_->Add(new int32_t);
@@ -238,7 +238,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
   DiskIoMgr::WriteRange::WriteDoneCallback callback =
       bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
-          new_range, (DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL,
+          new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
           data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
   *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
 
@@ -255,7 +255,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
 
   new_range = pool_->Add(new DiskIoMgr::WriteRange*);
   callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
-      new_range, (DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL,
+      new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
       data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
 
   *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
@@ -291,14 +291,14 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
   ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
-  DiskIoMgr::RequestContext* reader;
+  DiskIoRequestContext* reader;
   ASSERT_OK(read_io_mgr->RegisterContext(&reader, &reader_mem_tracker));
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.reset(new ObjectPool);
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
-      DiskIoMgr::RequestContext* writer;
+      DiskIoRequestContext* writer;
       io_mgr.RegisterContext(&writer, &mem_tracker);
       Status validate_status = Status::OK();
       for (int i = 0; i < num_ranges; ++i) {
@@ -362,7 +362,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
 
           ASSERT_OK(io_mgr.Init(&mem_tracker));
           MemTracker reader_mem_tracker;
-          DiskIoMgr::RequestContext* reader;
+          DiskIoRequestContext* reader;
           ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
           vector<DiskIoMgr::ScanRange*> ranges;
@@ -416,7 +416,7 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
-        DiskIoMgr::RequestContext* reader;
+        DiskIoRequestContext* reader;
         ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
         vector<DiskIoMgr::ScanRange*> ranges_first_half;
@@ -489,7 +489,7 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
-        DiskIoMgr::RequestContext* reader;
+        DiskIoRequestContext* reader;
         ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
         DiskIoMgr::ScanRange* complete_range = InitRange(1, tmp_file, 0, strlen(data), 0,
@@ -559,7 +559,7 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
-        DiskIoMgr::RequestContext* reader;
+        DiskIoRequestContext* reader;
         ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
         vector<DiskIoMgr::ScanRange*> ranges;
@@ -624,7 +624,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
 
     ASSERT_OK(io_mgr.Init(&mem_tracker));
     MemTracker reader_mem_tracker;
-    DiskIoMgr::RequestContext* reader;
+    DiskIoRequestContext* reader;
     ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
     vector<DiskIoMgr::ScanRange*> ranges;
@@ -699,7 +699,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
 
     ASSERT_OK(io_mgr.Init(&mem_tracker));
     MemTracker reader_mem_tracker;
-    DiskIoMgr::RequestContext* reader;
+    DiskIoRequestContext* reader;
     ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
     DiskIoMgr::ScanRange* complete_range =
@@ -764,7 +764,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
   stat(file_name.c_str(), &stat_val);
 
   int64_t iters = 0;
-  vector<DiskIoMgr::RequestContext*> contexts(num_contexts);
+  vector<DiskIoRequestContext*> contexts(num_contexts);
   Status status;
   for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
     for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
@@ -838,7 +838,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
   vector<string> file_names;
   vector<int64_t> mtimes;
   vector<string> data;
-  vector<DiskIoMgr::RequestContext*> readers;
+  vector<DiskIoRequestContext*> readers;
   vector<char*> results;
 
   file_names.resize(NUM_READERS);
@@ -1000,7 +1000,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   MemTracker reader_mem_tracker;
-  DiskIoMgr::RequestContext* reader;
+  DiskIoRequestContext* reader;
   ASSERT_OK(io_mgr->RegisterContext(&reader, &reader_mem_tracker));
 
   // We should not read past the end of file.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 66f0498..448424f 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -104,33 +104,34 @@ DiskIoMgr::HdfsCachedFileHandle::~HdfsCachedFileHandle() {
   hdfs_file_ = NULL;
 }
 
-// This class provides a cache of RequestContext objects.  RequestContexts are recycled.
-// This is good for locality as well as lock contention.  The cache has the property that
-// regardless of how many clients get added/removed, the memory locations for
-// existing clients do not change (not the case with std::vector) minimizing the locks we
-// have to take across all readers.
+// This class provides a cache of DiskIoRequestContext objects.  DiskIoRequestContexts
+// are recycled. This is good for locality as well as lock contention.  The cache has
+// the property that regardless of how many clients get added/removed, the memory
+// locations for existing clients do not change (not the case with std::vector)
+// minimizing the locks we have to take across all readers.
 // All functions on this object are thread safe
 class DiskIoMgr::RequestContextCache {
  public:
   RequestContextCache(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) {}
 
   // Returns a context to the cache.  This object can now be reused.
-  void ReturnContext(RequestContext* reader) {
-    DCHECK(reader->state_ != RequestContext::Inactive);
-    reader->state_ = RequestContext::Inactive;
+  void ReturnContext(DiskIoRequestContext* reader) {
+    DCHECK(reader->state_ != DiskIoRequestContext::Inactive);
+    reader->state_ = DiskIoRequestContext::Inactive;
     lock_guard<mutex> l(lock_);
     inactive_contexts_.push_back(reader);
   }
 
-  // Returns a new RequestContext object.  Allocates a new object if necessary.
-  RequestContext* GetNewContext() {
+  // Returns a new DiskIoRequestContext object.  Allocates a new object if necessary.
+  DiskIoRequestContext* GetNewContext() {
     lock_guard<mutex> l(lock_);
     if (!inactive_contexts_.empty()) {
-      RequestContext* reader = inactive_contexts_.front();
+      DiskIoRequestContext* reader = inactive_contexts_.front();
       inactive_contexts_.pop_front();
       return reader;
     } else {
-      RequestContext* reader = new RequestContext(io_mgr_, io_mgr_->num_total_disks());
+      DiskIoRequestContext* reader =
+          new DiskIoRequestContext(io_mgr_, io_mgr_->num_total_disks());
       all_contexts_.push_back(reader);
       return reader;
     }
@@ -138,7 +139,7 @@ class DiskIoMgr::RequestContextCache {
 
   // This object has the same lifetime as the disk IoMgr.
   ~RequestContextCache() {
-    for (list<RequestContext*>::iterator it = all_contexts_.begin();
+    for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
         it != all_contexts_.end(); ++it) {
       delete *it;
     }
@@ -147,9 +148,9 @@ class DiskIoMgr::RequestContextCache {
   // Validates that all readers are cleaned up and in the inactive state.  No locks
   // are taken since this is only called from the disk IoMgr destructor.
   bool ValidateAllInactive() {
-    for (list<RequestContext*>::iterator it = all_contexts_.begin();
+    for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
         it != all_contexts_.end(); ++it) {
-      if ((*it)->state_ != RequestContext::Inactive) {
+      if ((*it)->state_ != DiskIoRequestContext::Inactive) {
         return false;
       }
     }
@@ -166,16 +167,16 @@ class DiskIoMgr::RequestContextCache {
   mutex lock_;
 
   // List of all request contexts created.  Used for debugging
-  list<RequestContext*> all_contexts_;
+  list<DiskIoRequestContext*> all_contexts_;
 
   // List of inactive readers.  These objects can be used for a new reader.
-  list<RequestContext*> inactive_contexts_;
+  list<DiskIoRequestContext*> inactive_contexts_;
 };
 
 string DiskIoMgr::RequestContextCache::DebugString() {
   lock_guard<mutex> l(lock_);
   stringstream ss;
-  for (list<RequestContext*>::iterator it = all_contexts_.begin();
+  for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
       it != all_contexts_.end(); ++it) {
     unique_lock<mutex> lock((*it)->lock_);
     ss << (*it)->DebugString() << endl;
@@ -193,7 +194,7 @@ string DiskIoMgr::DebugString() {
     ss << "  " << (void*) disk_queues_[i] << ":" ;
     if (!disk_queues_[i]->request_contexts.empty()) {
       ss << " Readers: ";
-      for (RequestContext* req_context: disk_queues_[i]->request_contexts) {
+      for (DiskIoRequestContext* req_context: disk_queues_[i]->request_contexts) {
         ss << (void*)req_context;
       }
     }
@@ -206,7 +207,7 @@ DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) :
   io_mgr_(io_mgr), reader_(NULL), buffer_(NULL) {
 }
 
-void DiskIoMgr::BufferDescriptor::Reset(RequestContext* reader,
+void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader,
       ScanRange* range, char* buffer, int64_t buffer_len) {
   DCHECK(io_mgr_ != NULL);
   DCHECK(buffer_ == NULL);
@@ -314,7 +315,7 @@ DiskIoMgr::~DiskIoMgr() {
   for (int i = 0; i < disk_queues_.size(); ++i) {
     if (disk_queues_[i] == NULL) continue;
     int disk_id = disk_queues_[i]->disk_id;
-    for (list<RequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
+    for (list<DiskIoRequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
         it != disk_queues_[i]->request_contexts.end(); ++it) {
       DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
       DCHECK((*it)->disk_states_[disk_id].done());
@@ -384,7 +385,7 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
   return Status::OK();
 }
 
-Status DiskIoMgr::RegisterContext(RequestContext** request_context,
+Status DiskIoMgr::RegisterContext(DiskIoRequestContext** request_context,
     MemTracker* mem_tracker) {
   DCHECK(request_context_cache_.get() != NULL) << "Must call Init() first.";
   *request_context = request_context_cache_->GetNewContext();
@@ -392,7 +393,7 @@ Status DiskIoMgr::RegisterContext(RequestContext** request_context,
   return Status::OK();
 }
 
-void DiskIoMgr::UnregisterContext(RequestContext* reader) {
+void DiskIoMgr::UnregisterContext(DiskIoRequestContext* reader) {
   // Blocking cancel (waiting for disks completion).
   CancelContext(reader, true);
 
@@ -425,7 +426,7 @@ void DiskIoMgr::UnregisterContext(RequestContext* reader) {
 // outstanding reference to the context decrements the number of disk queues the context
 // is on.
 // If wait_for_disks_completion is true, wait for the number of active disks to become 0.
-void DiskIoMgr::CancelContext(RequestContext* context, bool wait_for_disks_completion) {
+void DiskIoMgr::CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion) {
   context->Cancel(Status::CANCELLED);
 
   if (wait_for_disks_completion) {
@@ -437,50 +438,50 @@ void DiskIoMgr::CancelContext(RequestContext* context, bool wait_for_disks_compl
   }
 }
 
-void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) {
+void DiskIoMgr::set_read_timer(DiskIoRequestContext* r, RuntimeProfile::Counter* c) {
   r->read_timer_ = c;
 }
 
-void DiskIoMgr::set_bytes_read_counter(RequestContext* r, RuntimeProfile::Counter* c) {
+void DiskIoMgr::set_bytes_read_counter(DiskIoRequestContext* r, RuntimeProfile::Counter* c) {
   r->bytes_read_counter_ = c;
 }
 
-void DiskIoMgr::set_active_read_thread_counter(RequestContext* r,
+void DiskIoMgr::set_active_read_thread_counter(DiskIoRequestContext* r,
     RuntimeProfile::Counter* c) {
   r->active_read_thread_counter_ = c;
 }
 
-void DiskIoMgr::set_disks_access_bitmap(RequestContext* r,
+void DiskIoMgr::set_disks_access_bitmap(DiskIoRequestContext* r,
     RuntimeProfile::Counter* c) {
   r->disks_accessed_bitmap_ = c;
 }
 
-int64_t DiskIoMgr::queue_size(RequestContext* reader) const {
+int64_t DiskIoMgr::queue_size(DiskIoRequestContext* reader) const {
   return reader->num_ready_buffers_.Load();
 }
 
-Status DiskIoMgr::context_status(RequestContext* context) const {
+Status DiskIoMgr::context_status(DiskIoRequestContext* context) const {
   unique_lock<mutex> lock(context->lock_);
   return context->status_;
 }
 
-int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const {
+int64_t DiskIoMgr::bytes_read_local(DiskIoRequestContext* reader) const {
   return reader->bytes_read_local_.Load();
 }
 
-int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const {
+int64_t DiskIoMgr::bytes_read_short_circuit(DiskIoRequestContext* reader) const {
   return reader->bytes_read_short_circuit_.Load();
 }
 
-int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const {
+int64_t DiskIoMgr::bytes_read_dn_cache(DiskIoRequestContext* reader) const {
   return reader->bytes_read_dn_cache_.Load();
 }
 
-int DiskIoMgr::num_remote_ranges(RequestContext* reader) const {
+int DiskIoMgr::num_remote_ranges(DiskIoRequestContext* reader) const {
   return reader->num_remote_ranges_.Load();
 }
 
-int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const {
+int64_t DiskIoMgr::unexpected_remote_bytes(DiskIoRequestContext* reader) const {
   return reader->unexpected_remote_bytes_.Load();
 }
 
@@ -499,7 +500,7 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
   return Status::OK();
 }
 
-Status DiskIoMgr::AddScanRanges(RequestContext* reader,
+Status DiskIoMgr::AddScanRanges(DiskIoRequestContext* reader,
     const vector<ScanRange*>& ranges, bool schedule_immediately) {
   if (ranges.empty()) return Status::OK();
 
@@ -513,7 +514,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader,
   unique_lock<mutex> reader_lock(reader->lock_);
   DCHECK(reader->Validate()) << endl << reader->DebugString();
 
-  if (reader->state_ == RequestContext::Cancelled) {
+  if (reader->state_ == DiskIoRequestContext::Cancelled) {
     DCHECK(!reader->status_.ok());
     return reader->status_;
   }
@@ -545,7 +546,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader,
 // This function returns the next scan range the reader should work on, checking
 // for eos and error cases. If there isn't already a cached scan range or a scan
 // range prepared by the disk threads, the caller waits on the disk threads.
-Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
+Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range) {
   DCHECK(reader != NULL);
   DCHECK(range != NULL);
   *range = NULL;
@@ -555,7 +556,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
   DCHECK(reader->Validate()) << endl << reader->DebugString();
 
   while (true) {
-    if (reader->state_ == RequestContext::Cancelled) {
+    if (reader->state_ == DiskIoRequestContext::Cancelled) {
       DCHECK(!reader->status_.ok());
       status = reader->status_;
       break;
@@ -599,7 +600,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
   return status;
 }
 
-Status DiskIoMgr::Read(RequestContext* reader,
+Status DiskIoMgr::Read(DiskIoRequestContext* reader,
     ScanRange* range, BufferDescriptor** buffer) {
   DCHECK(range != NULL);
   DCHECK(buffer != NULL);
@@ -623,7 +624,7 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
   DCHECK(buffer_desc != NULL);
   if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == NULL);
 
-  RequestContext* reader = buffer_desc->reader_;
+  DiskIoRequestContext* reader = buffer_desc->reader_;
   if (buffer_desc->buffer_ != NULL) {
     if (buffer_desc->scan_range_->cached_buffer_ == NULL) {
       // Not a cached buffer. Return the io buffer and update mem tracking.
@@ -655,7 +656,7 @@ void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) {
 }
 
 DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(
-    RequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size) {
+    DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size) {
   BufferDescriptor* buffer_desc;
   {
     unique_lock<mutex> lock(free_buffers_lock_);
@@ -771,18 +772,18 @@ void DiskIoMgr::ReturnFreeBuffer(char* buffer, int64_t buffer_size) {
 // b) Adds an unstarted write range to in_flight_ranges_. The write range is processed
 //    immediately if there are no preceding scan ranges in in_flight_ranges_
 // It blocks until work is available or the thread is shut down.
-// Work is available if there is a RequestContext with
+// Work is available if there is a DiskIoRequestContext with
 //  - A ScanRange with a buffer available, or
 //  - A WriteRange in unstarted_write_ranges_.
 bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
-    RequestContext** request_context) {
+    DiskIoRequestContext** request_context) {
   int disk_id = disk_queue->disk_id;
   *range = NULL;
 
   // This loops returns either with work to do or when the disk IoMgr shuts down.
   while (true) {
     *request_context = NULL;
-    RequestContext::PerDiskState* request_disk_state = NULL;
+    DiskIoRequestContext::PerDiskState* request_disk_state = NULL;
     {
       unique_lock<mutex> disk_lock(disk_queue->lock);
 
@@ -827,12 +828,12 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
         << (*request_context)->DebugString();
 
     // Check if reader has been cancelled
-    if ((*request_context)->state_ == RequestContext::Cancelled) {
+    if ((*request_context)->state_ == DiskIoRequestContext::Cancelled) {
       request_disk_state->DecrementRequestThreadAndCheckDone(*request_context);
       continue;
     }
 
-    DCHECK_EQ((*request_context)->state_, RequestContext::Active)
+    DCHECK_EQ((*request_context)->state_, DiskIoRequestContext::Active)
         << (*request_context)->DebugString();
 
     if (request_disk_state->next_scan_range_to_start() == NULL &&
@@ -889,7 +890,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
   return false;
 }
 
-void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_range,
+void DiskIoMgr::HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* write_range,
     const Status& write_status) {
   // Execute the callback before decrementing the thread count. Otherwise CancelContext()
   // that waits for the disk ref count to be 0 will return, creating a race, e.g.
@@ -900,8 +901,8 @@ void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_ra
   {
     unique_lock<mutex> writer_lock(writer->lock_);
     DCHECK(writer->Validate()) << endl << writer->DebugString();
-    RequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_];
-    if (writer->state_ == RequestContext::Cancelled) {
+    DiskIoRequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_];
+    if (writer->state_ == DiskIoRequestContext::Cancelled) {
       state.DecrementRequestThreadAndCheckDone(writer);
     } else {
       state.DecrementRequestThread();
@@ -910,16 +911,16 @@ void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_ra
   }
 }
 
-void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
+void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* reader,
     BufferDescriptor* buffer) {
   unique_lock<mutex> reader_lock(reader->lock_);
 
-  RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
+  DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
   DCHECK(reader->Validate()) << endl << reader->DebugString();
   DCHECK_GT(state.num_threads_in_op(), 0);
   DCHECK(buffer->buffer_ != NULL);
 
-  if (reader->state_ == RequestContext::Cancelled) {
+  if (reader->state_ == DiskIoRequestContext::Cancelled) {
     state.DecrementRequestThreadAndCheckDone(reader);
     DCHECK(reader->Validate()) << endl << reader->DebugString();
     ReturnFreeBuffer(buffer);
@@ -930,7 +931,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader
     return;
   }
 
-  DCHECK_EQ(reader->state_, RequestContext::Active);
+  DCHECK_EQ(reader->state_, DiskIoRequestContext::Active);
   DCHECK(buffer->buffer_ != NULL);
 
   // Update the reader's scan ranges.  There are a three cases here:
@@ -979,7 +980,7 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
   //   3. Perform the read or write as specified.
   // Cancellation checking needs to happen in both steps 1 and 3.
   while (true) {
-    RequestContext* worker_context = NULL;;
+    DiskIoRequestContext* worker_context = NULL;;
     RequestRange* range = NULL;
 
     if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
@@ -1000,7 +1001,7 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
 
 // This function reads the specified scan range associated with the
 // specified reader context and disk queue.
-void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader,
+void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
     ScanRange* range) {
   char* buffer = NULL;
   int64_t bytes_remaining = range->len_ - range->bytes_read_;
@@ -1017,11 +1018,11 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader,
   }
 
   if (!enough_memory) {
-    RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
+    DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
     unique_lock<mutex> reader_lock(reader->lock_);
 
     // Just grabbed the reader lock, check for cancellation.
-    if (reader->state_ == RequestContext::Cancelled) {
+    if (reader->state_ == DiskIoRequestContext::Cancelled) {
       DCHECK(reader->Validate()) << endl << reader->DebugString();
       state.DecrementRequestThreadAndCheckDone(reader);
       range->Cancel(reader->status_);
@@ -1087,7 +1088,7 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader,
   HandleReadFinished(disk_queue, reader, buffer_desc);
 }
 
-void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) {
+void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_range) {
   FILE* file_handle = fopen(write_range->file(), "rb+");
   Status ret_status;
   if (file_handle == NULL) {
@@ -1137,11 +1138,11 @@ int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
   return idx;
 }
 
-Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* write_range) {
+Status DiskIoMgr::AddWriteRange(DiskIoRequestContext* writer, WriteRange* write_range) {
   DCHECK_LE(write_range->len(), max_buffer_size_);
   unique_lock<mutex> writer_lock(writer->lock_);
 
-  if (writer->state_ == RequestContext::Cancelled) {
+  if (writer->state_ == DiskIoRequestContext::Cancelled) {
     DCHECK(!writer->status_.ok());
     return writer->status_;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index b130d52..79902c5 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -23,7 +23,6 @@
 #include <boost/unordered_set.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
-#include <boost/thread/thread.hpp>
 
 #include "common/atomic.h"
 #include "common/hdfs.h"
@@ -42,7 +41,7 @@ namespace impala {
 class MemTracker;
 
 /// Manager object that schedules IO for all queries on all disks and remote filesystems
-/// (such as S3). Each query maps to one or more RequestContext objects, each of which
+/// (such as S3). Each query maps to one or more DiskIoRequestContext objects, each of which
 /// has its own queue of scan ranges and/or write ranges.
 //
 /// The API splits up requesting scan/write ranges (non-blocking) and reading the data
@@ -185,12 +184,14 @@ class MemTracker;
 ///  - Internal classes are defined in disk-io-mgr-internal.h
 ///  - ScanRange APIs are implemented in disk-io-mgr-scan-range.cc
 ///    This contains the ready buffer queue logic
-///  - RequestContext APIs are implemented in disk-io-mgr-reader-context.cc
+///  - DiskIoRequestContext APIs are implemented in disk-io-mgr-reader-context.cc
 ///    This contains the logic for picking scan ranges for a reader.
 ///  - Disk Thread and general APIs are implemented in disk-io-mgr.cc.
+
+class DiskIoRequestContext;
+
 class DiskIoMgr {
  public:
-  class RequestContext;
   class ScanRange;
 
   /// This class is a small wrapper around the hdfsFile handle and the file system
@@ -247,16 +248,17 @@ class DiskIoMgr {
 
    private:
     friend class DiskIoMgr;
+    friend class DiskIoRequestContext;
     BufferDescriptor(DiskIoMgr* io_mgr);
 
     /// Resets the buffer descriptor state for a new reader, range and data buffer.
-    void Reset(RequestContext* reader, ScanRange* range, char* buffer,
+    void Reset(DiskIoRequestContext* reader, ScanRange* range, char* buffer,
         int64_t buffer_len);
 
     DiskIoMgr* io_mgr_;
 
     /// Reader that this buffer is for
-    RequestContext* reader_;
+    DiskIoRequestContext* reader_;
 
     /// The current tracker this buffer is associated with.
     MemTracker* mem_tracker_;
@@ -367,9 +369,10 @@ class DiskIoMgr {
 
    private:
     friend class DiskIoMgr;
+    friend class DiskIoRequestContext;
 
     /// Initialize internal fields
-    void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
+    void InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader);
 
     /// Enqueues a buffer for this range. This does not block.
     /// Returns true if this scan range has hit the queue capacity, false otherwise.
@@ -423,7 +426,7 @@ class DiskIoMgr {
     DiskIoMgr* io_mgr_;
 
     /// Reader/owner of the scan range
-    RequestContext* reader_;
+    DiskIoRequestContext* reader_;
 
     /// File handle either to hdfs or local fs (FILE*)
     ///
@@ -446,7 +449,7 @@ class DiskIoMgr {
     int bytes_read_;
 
     /// Status for this range. This is non-ok if is_cancelled_ is true.
-    /// Note: an individual range can fail without the RequestContext being
+    /// Note: an individual range can fail without the DiskIoRequestContext being
     /// cancelled. This allows us to skip individual ranges.
     Status status_;
 
@@ -509,6 +512,7 @@ class DiskIoMgr {
 
    private:
     friend class DiskIoMgr;
+    friend class DiskIoRequestContext;
 
     /// Data to be written. RequestRange::len_ contains the length of data
     /// to be written.
@@ -540,13 +544,13 @@ class DiskIoMgr {
 
   /// Allocates tracking structure for a request context.
   /// Register a new request context which is returned in *request_context.
-  /// The IoMgr owns the allocated RequestContext object. The caller must call
+  /// The IoMgr owns the allocated DiskIoRequestContext object. The caller must call
   /// UnregisterContext() for each context.
   /// reader_mem_tracker: Is non-null only for readers. IO buffers
   ///    used for this reader will be tracked by this. If the limit is exceeded
   ///    the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via
   ///    GetNext().
-  Status RegisterContext(RequestContext** request_context,
+  Status RegisterContext(DiskIoRequestContext** request_context,
       MemTracker* reader_mem_tracker = NULL);
 
   /// Unregisters context from the disk IoMgr. This must be called for every
@@ -555,7 +559,7 @@ class DiskIoMgr {
   /// The 'context' cannot be used after this call.
   /// This call blocks until all the disk threads have finished cleaning up.
   /// UnregisterContext also cancels the reader/writer from the disk IoMgr.
-  void UnregisterContext(RequestContext* context);
+  void UnregisterContext(DiskIoRequestContext* context);
 
   /// This function cancels the context asychronously. All outstanding requests
   /// are aborted and tracking structures cleaned up. This does not need to be
@@ -565,7 +569,7 @@ class DiskIoMgr {
   /// context to reach 0. After calling with wait_for_disks_completion = true, the only
   /// valid API is returning IO buffers that have already been returned.
   /// Takes context->lock_ if wait_for_disks_completion is true.
-  void CancelContext(RequestContext* context, bool wait_for_disks_completion = false);
+  void CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion = false);
 
   /// Adds the scan ranges to the queues. This call is non-blocking. The caller must
   /// not deallocate the scan range pointers before UnregisterContext().
@@ -573,26 +577,26 @@ class DiskIoMgr {
   /// (i.e. the caller should not/cannot call GetNextRange for these ranges).
   /// This can be used to do synchronous reads as well as schedule dependent ranges,
   /// as in the case for columnar formats.
-  Status AddScanRanges(RequestContext* reader, const std::vector<ScanRange*>& ranges,
+  Status AddScanRanges(DiskIoRequestContext* reader, const std::vector<ScanRange*>& ranges,
       bool schedule_immediately = false);
 
   /// Add a WriteRange for the writer. This is non-blocking and schedules the context
   /// on the IoMgr disk queue. Does not create any files.
-  Status AddWriteRange(RequestContext* writer, WriteRange* write_range);
+  Status AddWriteRange(DiskIoRequestContext* writer, WriteRange* write_range);
 
   /// Returns the next unstarted scan range for this reader. When the range is returned,
   /// the disk threads in the IoMgr will already have started reading from it. The
   /// caller is expected to call ScanRange::GetNext on the returned range.
   /// If there are no more unstarted ranges, NULL is returned.
   /// This call is blocking.
-  Status GetNextRange(RequestContext* reader, ScanRange** range);
+  Status GetNextRange(DiskIoRequestContext* reader, ScanRange** range);
 
   /// Reads the range and returns the result in buffer.
   /// This behaves like the typical synchronous read() api, blocking until the data
   /// is read. This can be called while there are outstanding ScanRanges and is
   /// thread safe. Multiple threads can be calling Read() per reader at a time.
   /// range *cannot* have already been added via AddScanRanges.
-  Status Read(RequestContext* reader, ScanRange* range, BufferDescriptor** buffer);
+  Status Read(DiskIoRequestContext* reader, ScanRange* range, BufferDescriptor** buffer);
 
   /// Determine which disk queue this file should be assigned to.  Returns an index into
   /// disk_queues_.  The disk_id is the volume ID for the local disk that holds the
@@ -600,21 +604,21 @@ class DiskIoMgr {
   /// co-located with the datanode for this file.
   int AssignQueue(const char* file, int disk_id, bool expected_local);
 
-  /// TODO: The functions below can be moved to RequestContext.
+  /// TODO: The functions below can be moved to DiskIoRequestContext.
   /// Returns the current status of the context.
-  Status context_status(RequestContext* context) const;
+  Status context_status(DiskIoRequestContext* context) const;
 
-  void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*);
-  void set_read_timer(RequestContext*, RuntimeProfile::Counter*);
-  void set_active_read_thread_counter(RequestContext*, RuntimeProfile::Counter*);
-  void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*);
+  void set_bytes_read_counter(DiskIoRequestContext*, RuntimeProfile::Counter*);
+  void set_read_timer(DiskIoRequestContext*, RuntimeProfile::Counter*);
+  void set_active_read_thread_counter(DiskIoRequestContext*, RuntimeProfile::Counter*);
+  void set_disks_access_bitmap(DiskIoRequestContext*, RuntimeProfile::Counter*);
 
-  int64_t queue_size(RequestContext* reader) const;
-  int64_t bytes_read_local(RequestContext* reader) const;
-  int64_t bytes_read_short_circuit(RequestContext* reader) const;
-  int64_t bytes_read_dn_cache(RequestContext* reader) const;
-  int num_remote_ranges(RequestContext* reader) const;
-  int64_t unexpected_remote_bytes(RequestContext* reader) const;
+  int64_t queue_size(DiskIoRequestContext* reader) const;
+  int64_t bytes_read_local(DiskIoRequestContext* reader) const;
+  int64_t bytes_read_short_circuit(DiskIoRequestContext* reader) const;
+  int64_t bytes_read_dn_cache(DiskIoRequestContext* reader) const;
+  int num_remote_ranges(DiskIoRequestContext* reader) const;
+  int64_t unexpected_remote_bytes(DiskIoRequestContext* reader) const;
 
   /// Returns the read throughput across all readers.
   /// TODO: should this be a sliding window?  This should report metrics for the
@@ -671,6 +675,7 @@ class DiskIoMgr {
 
  private:
   friend class BufferDescriptor;
+  friend class DiskIoRequestContext;
   struct DiskQueue;
   class RequestContextCache;
 
@@ -757,7 +762,7 @@ class DiskIoMgr {
   /// should be <= max_buffer_size_. These constraints will be met if buffer was acquired
   /// via GetFreeBuffer() (which it should have been).
   BufferDescriptor* GetBufferDesc(
-      RequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size);
+      DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size);
 
   /// Returns a buffer desc object which can now be used for another reader.
   void ReturnBufferDesc(BufferDescriptor* desc);
@@ -798,11 +803,11 @@ class DiskIoMgr {
   /// Only returns false if the disk thread should be shut down.
   /// No locks should be taken before this function call and none are left taken after.
   bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
-      RequestContext** request_context);
+      DiskIoRequestContext** request_context);
 
   /// Updates disk queue and reader state after a read is complete. The read result
   /// is captured in the buffer descriptor.
-  void HandleReadFinished(DiskQueue*, RequestContext*, BufferDescriptor*);
+  void HandleReadFinished(DiskQueue*, DiskIoRequestContext*, BufferDescriptor*);
 
   /// Invokes write_range->callback_  after the range has been written and
   /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR
@@ -810,7 +815,7 @@ class DiskIoMgr {
   /// The write_status does not affect the writer->status_. That is, an write error does
   /// not cancel the writer context - that decision is left to the callback handler.
   /// TODO: On the read path, consider not canceling the reader context on error.
-  void HandleWriteFinished(RequestContext* writer, WriteRange* write_range,
+  void HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* write_range,
       const Status& write_status);
 
   /// Validates that range is correctly initialized
@@ -818,7 +823,7 @@ class DiskIoMgr {
 
   /// Write the specified range to disk and calls HandleWriteFinished when done.
   /// Responsible for opening and closing the file that is written.
-  void Write(RequestContext* writer_context, WriteRange* write_range);
+  void Write(DiskIoRequestContext* writer_context, WriteRange* write_range);
 
   /// Helper method to write a range using the specified FILE handle. Returns Status:OK
   /// if the write succeeded, or a RUNTIME_ERROR with an appropriate message otherwise.
@@ -826,7 +831,7 @@ class DiskIoMgr {
   Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range);
 
   /// Reads the specified scan range and calls HandleReadFinished when done.
-  void ReadRange(DiskQueue* disk_queue, RequestContext* reader,
+  void ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
       ScanRange* range);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 07378e4..7eaa3db 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -22,6 +22,7 @@
 
 #include "common/logging.h"
 #include "resourcebroker/resource-broker.h"
+#include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
@@ -38,6 +39,7 @@
 #include "statestore/statestore-subscriber.h"
 #include "util/debug-util.h"
 #include "util/default-path-handlers.h"
+#include "util/hdfs-bulk-ops.h"
 #include "util/mem-info.h"
 #include "util/metrics.h"
 #include "util/network-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index e405bf8..c5404dc 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -17,35 +17,35 @@
 #define IMPALA_RUNTIME_EXEC_ENV_H
 
 #include <boost/scoped_ptr.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/thread/thread.hpp>
 
+// NOTE: try not to add more headers here: exec-env.h is included in many many files.
 #include "common/status.h"
-#include "runtime/backend-client.h"
-#include "util/cgroups-mgr.h"
-#include "util/hdfs-bulk-ops.h" // For declaration of HdfsOpThreadPool
-#include "resourcebroker/resource-broker.h"
+#include "runtime/client-cache-types.h"
+#include "util/hdfs-bulk-ops-defs.h" // For declaration of HdfsOpThreadPool
 
 namespace impala {
 
+class CallableThreadPool;
+class CgroupsMgr;
 class DataStreamMgr;
 class DiskIoMgr;
+class FragmentMgr;
+class Frontend;
 class HBaseTableFactory;
 class HdfsFsCache;
+class ImpalaServer;
 class LibCache;
+class MemTracker;
+class MetricGroup;
+class QueryResourceMgr;
+class RequestPoolService;
+class ResourceBroker;
 class Scheduler;
 class StatestoreSubscriber;
 class TestExecEnv;
-class Webserver;
-class MetricGroup;
-class MemTracker;
 class ThreadResourceMgr;
-class CgroupsManager;
-class ImpalaServer;
-class RequestPoolService;
-class FragmentMgr;
-class Frontend;
 class TmpFileMgr;
+class Webserver;
 
 /// Execution environment for queries/plan fragments.
 /// Contains all required global structures, and handles to

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/lib-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index 5c1e423..b3a6f0a 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -29,14 +29,15 @@
 #include "common/names.h"
 
 namespace filesystem = boost::filesystem;
-using namespace impala;
 
 DEFINE_string(local_library_dir, "/tmp",
               "Local directory to copy UDF libraries from HDFS into");
 
+namespace impala {
+
 scoped_ptr<LibCache> LibCache::instance_;
 
-struct LibCache::LibCacheEntry {
+struct LibCacheEntry {
   // Lock protecting all fields in this entry
   boost::mutex lock;
 
@@ -53,7 +54,7 @@ struct LibCache::LibCacheEntry {
   bool check_needs_refresh;
 
   // The type of this file.
-  LibType type;
+  LibCache::LibType type;
 
   // The path on the local file system for this library.
   std::string local_path;
@@ -117,7 +118,7 @@ Status LibCache::InitInternal() {
   return Status::OK();
 }
 
-LibCache::LibCacheEntry::~LibCacheEntry() {
+LibCacheEntry::~LibCacheEntry() {
   if (shared_object_handle != NULL) {
     DCHECK_EQ(use_count, 0);
     DCHECK(should_remove);
@@ -418,3 +419,5 @@ string LibCache::MakeLocalPath(const string& hdfs_path, const string& local_dir)
       << (num_libs_copied_.Add(1) - 1) << src.extension().native();
   return dst.str();
 }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index b36a859..9201341 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -50,10 +50,10 @@ class RuntimeState;
 /// TODO:
 /// - refresh libraries
 /// - better cached module management.
+struct LibCacheEntry;
+
 class LibCache {
  public:
-  struct LibCacheEntry;
-
   enum LibType {
     TYPE_SO,      // Shared object
     TYPE_IR,      // IR intermediate

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index 9e22e9f..fdf38e9 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -23,7 +23,6 @@
 
 #include "common/logging.h"
 #include "util/bit-util.h"
-#include "util/runtime-profile.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 9534671..73c9300 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -29,7 +29,7 @@
 #include "util/debug-util.h"
 #include "util/internal-queue.h"
 #include "util/metrics.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
 
 #include "gen-cpp/Types_types.h" // for TUniqueId

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 99ed75d..75c5f58 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -29,10 +29,13 @@
 #include "exec/hdfs-scan-node.h"
 #include "exec/hbase-table-scanner.h"
 #include "exprs/expr.h"
+#include "resourcebroker/resource-broker.h"
 #include "runtime/descriptors.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/row-batch.h"
+#include "runtime/runtime-filter-bank.h"
 #include "runtime/mem-tracker.h"
+#include "scheduling/query-resource-mgr.h"
 #include "util/cgroups-mgr.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -589,7 +592,7 @@ void PlanFragmentExecutor::Close() {
           runtime_state_->fragment_instance_id(), runtime_state_->cgroup());
     }
     if (plan_ != NULL) plan_->Close(runtime_state_.get());
-    for (DiskIoMgr::RequestContext* context: *runtime_state_->reader_contexts()) {
+    for (DiskIoRequestContext* context: *runtime_state_->reader_contexts()) {
       runtime_state_->io_mgr()->UnregisterContext(context);
     }
     exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index 91cece6..29250a3 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -24,6 +24,7 @@
 #include "common/status.h"
 #include "common/object-pool.h"
 #include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
 #include "util/thread.h"
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/row-batch-serialize-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc
index cb49bda..1f28440 100644
--- a/be/src/runtime/row-batch-serialize-test.cc
+++ b/be/src/runtime/row-batch-serialize-test.cc
@@ -15,6 +15,8 @@
 #include "testutil/gtest-util.h"
 #include "runtime/collection-value.h"
 #include "runtime/collection-value-builder.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
 #include "runtime/tuple-row.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 3f2ba29..4197977 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -27,7 +27,6 @@
 #include "runtime/descriptors.h"
 #include "runtime/disk-io-mgr.h"
 #include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
new file mode 100644
index 0000000..91f12c9
--- /dev/null
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -0,0 +1,222 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "runtime/runtime-filter-bank.h"
+
+#include "common/names.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gutil/bits.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/client-cache.h"
+#include "runtime/exec-env.h"
+#include "runtime/backend-client.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/runtime-filter.inline.h"
+#include "service/impala-server.h"
+#include "util/bloom-filter.h"
+
+using namespace impala;
+using namespace boost;
+using namespace strings;
+
+DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false "
+    "positives in a runtime filter before it is disabled.");
+
+const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
+const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
+
+RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
+    : state_(state), closed_(false) {
+  memory_allocated_ =
+      state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
+
+  // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
+  max_filter_size_ = query_ctx.request.query_options.runtime_filter_max_size;
+  max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
+  max_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+  min_filter_size_ = query_ctx.request.query_options.runtime_filter_min_size;
+  min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
+  min_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+  // Make sure that min <= max
+  min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
+
+  DCHECK_GT(min_filter_size_, 0);
+  DCHECK_GT(max_filter_size_, 0);
+
+  default_filter_size_ = query_ctx.request.query_options.runtime_bloom_filter_size;
+  default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
+  default_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
+}
+
+RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
+    bool is_producer) {
+  RuntimeFilter* ret = obj_pool_.Add(
+      new RuntimeFilter(filter_desc, GetFilterSizeForNdv(filter_desc.ndv_estimate)));
+  lock_guard<mutex> l(runtime_filter_lock_);
+  if (is_producer) {
+    DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
+    produced_filters_[filter_desc.filter_id] = ret;
+  } else {
+    if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
+      consumed_filters_[filter_desc.filter_id] = ret;
+    } else {
+      // The filter has already been registered in this filter bank by another
+      // target node.
+      DCHECK_GT(filter_desc.targets.size(), 1);
+      ret = consumed_filters_[filter_desc.filter_id];
+    }
+  }
+  return ret;
+}
+
+namespace {
+
+/// Sends a filter to the coordinator. Executed asynchronously in the context of
+/// ExecEnv::rpc_pool().
+void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
+    ImpalaBackendClientCache* client_cache) {
+  Status status;
+  ImpalaBackendConnection coord(client_cache, address, &status);
+  if (!status.ok()) {
+    // Failing to send a filter is not a query-wide error - the remote fragment will
+    // continue regardless.
+    // TODO: Retry.
+    LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
+    return;
+  }
+  TUpdateFilterResult res;
+  status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
+}
+
+}
+
+void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
+    BloomFilter* bloom_filter) {
+  DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
+      << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
+  TUpdateFilterParams params;
+  // A runtime filter may have both local and remote targets.
+  bool has_local_target = false;
+  bool has_remote_target = false;
+  {
+    lock_guard<mutex> l(runtime_filter_lock_);
+    RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
+    DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
+                                          << filter_id;
+    it->second->SetBloomFilter(bloom_filter);
+    has_local_target = it->second->filter_desc().has_local_targets;
+    has_remote_target = it->second->filter_desc().has_remote_targets;
+  }
+
+  if (has_local_target) {
+    // Do a short circuit publication by pushing the same BloomFilter to the consumer
+    // side.
+    RuntimeFilter* filter;
+    {
+      lock_guard<mutex> l(runtime_filter_lock_);
+      RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
+      if (it == consumed_filters_.end()) return;
+      filter = it->second;
+    }
+    filter->SetBloomFilter(bloom_filter);
+    state_->runtime_profile()->AddInfoString(
+        Substitute("Filter $0 arrival", filter_id),
+        PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
+  }
+
+  if (has_remote_target
+      && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
+    BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
+    params.filter_id = filter_id;
+    params.query_id = state_->query_id();
+
+    ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
+        SendFilterToCoordinator, state_->query_ctx().coord_address, params,
+        ExecEnv::GetInstance()->impalad_client_cache()));
+  }
+}
+
+void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id,
+    const TBloomFilter& thrift_filter) {
+  lock_guard<mutex> l(runtime_filter_lock_);
+  if (closed_) return;
+  RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
+  DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
+                                        << filter_id;
+  if (thrift_filter.always_true) {
+    it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
+  } else {
+    int64_t required_space =
+        BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
+    // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
+    // there's not enough memory for it.
+    if (!state_->query_mem_tracker()->TryConsume(required_space)) {
+      VLOG_QUERY << "No memory for global filter: " << filter_id
+                 << " (fragment instance: " << state_->fragment_instance_id() << ")";
+      it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
+    } else {
+      BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter));
+      DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
+      memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+      it->second->SetBloomFilter(bloom_filter);
+    }
+  }
+  state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id),
+      PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS));
+}
+
+BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
+  lock_guard<mutex> l(runtime_filter_lock_);
+  if (closed_) return NULL;
+
+  RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
+  DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
+
+  // Track required space
+  int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size());
+  int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size);
+  if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL;
+  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
+  DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
+  memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+  return bloom_filter;
+}
+
+int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
+  if (ndv == -1) return default_filter_size_;
+  int64_t required_space =
+      1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
+  required_space = max<int64_t>(required_space, min_filter_size_);
+  required_space = min<int64_t>(required_space, max_filter_size_);
+  return required_space;
+}
+
+bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
+  double fpp =
+      BloomFilter::FalsePositiveProb(observed_ndv, Bits::Log2Ceiling64(filter_size));
+  return fpp > FLAGS_max_filter_error_rate;
+}
+
+void RuntimeFilterBank::Close() {
+  lock_guard<mutex> l(runtime_filter_lock_);
+  closed_ = true;
+  obj_pool_.Clear();
+  state_->query_mem_tracker()->Release(memory_allocated_->value());
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter-bank.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
new file mode 100644
index 0000000..4703a0f
--- /dev/null
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -0,0 +1,149 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_RUNTIME_RUNTIME_FILTER_BANK_H
+#define IMPALA_RUNTIME_RUNTIME_FILTER_BANK_H
+
+#include "common/object-pool.h"
+#include "runtime/types.h"
+#include "util/runtime-profile.h"
+
+#include <boost/thread/lock_guard.hpp>
+#include <boost/unordered_map.hpp>
+
+namespace impala {
+
+class BloomFilter;
+class RuntimeFilter;
+class RuntimeState;
+class TBloomFilter;
+class TRuntimeFilterDesc;
+class TQueryCtx;
+
+/// RuntimeFilters are produced and consumed by plan nodes at run time to propagate
+/// predicates across the plan tree dynamically. Each fragment instance manages its
+/// filters with a RuntimeFilterBank which provides low-synchronization access to filter
+/// objects and data structures.
+///
+/// A RuntimeFilterBank manages both production and consumption of filters. In the case
+/// where a given filter is both consumed and produced by the same fragment, the
+/// RuntimeFilterBank treats each filter independently.
+///
+/// All filters must be registered with the filter bank via RegisterFilter(). Local plan
+/// fragments update the bloom filters by calling UpdateFilterFromLocal()
+/// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The
+/// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by
+/// AllocateScratchBloomFilter(); this allows RuntimeFilterBank to manage all memory
+/// associated with filters.
+///
+/// Filters are aggregated at the coordinator, and then made available to consumers after
+/// PublishGlobalFilter() has been called.
+///
+/// After PublishGlobalFilter() has been called (and again, it may only be called once per
+/// filter_id), the RuntimeFilter object associated with filter_id will have a valid
+/// bloom_filter, and may be used for filter evaluation. This operation occurs without
+/// synchronisation, and neither the thread that calls PublishGlobalFilter() nor the
+/// thread that may call RuntimeFilter::Eval() need to coordinate in any way.
+class RuntimeFilterBank {
+ public:
+  RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state);
+
+  /// Registers a filter that will either be produced (is_producer == false) or consumed
+  /// (is_producer == true) by fragments that share this RuntimeState. The filter
+  /// bloom_filter itself is unallocated until the first call to PublishGlobalFilter().
+  RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer);
+
+  /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some
+  /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a
+  /// full filter that contains all elements.
+  void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter);
+
+  /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
+  /// consumption by operators that wish to use it for filtering.
+  void PublishGlobalFilter(int32_t filter_id, const TBloomFilter& thrift_filter);
+
+  /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
+  /// 'filter_size' would have an expected false-positive rate which would exceed
+  /// FLAGS_max_filter_error_rate.
+  bool FpRateTooHigh(int64_t filter_size, int64_t observed_ndv);
+
+  /// Returns a RuntimeFilter with the given filter id. This is safe to call after all
+  /// calls to RegisterFilter() have finished, and not before. Filters may be cached by
+  /// clients and subsequently accessed without synchronization. Concurrent calls to
+  /// PublishGlobalFilter() will update a filter's bloom filter atomically, without the
+  /// need for client synchronization.
+  inline const RuntimeFilter* GetRuntimeFilter(int32_t filter_id);
+
+  /// Returns a bloom_filter that can be used by an operator to produce a local filter,
+  /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by
+  /// the RuntimeFilterBank (which may transfer it to a RuntimeFilter subsequently), and
+  /// should not be deleted by the caller. The filter identified by 'filter_id' must have
+  /// been previously registered as a 'producer' by RegisterFilter().
+  ///
+  /// If there is not enough memory, or if Close() has been called first, returns NULL.
+  BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);
+
+  /// Default hash seed to use when computing hashed values to insert into filters.
+  static const int32_t DefaultHashSeed() { return 1234; }
+
+  /// Releases all memory allocated for BloomFilters.
+  void Close();
+
+  static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024;           // 4KB
+  static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024;   // 16MB
+
+ private:
+  /// Returns the the space (in bytes) required for a filter to achieve the configured
+  /// maximum false-positive rate based on the expected NDV. If 'ndv' is -1 (i.e. no
+  /// estimate is known), the default filter size is returned.
+  int64_t GetFilterSizeForNdv(int64_t ndv);
+
+  /// Lock protecting produced_filters_ and consumed_filters_.
+  boost::mutex runtime_filter_lock_;
+
+  /// Map from filter id to a RuntimeFilter.
+  typedef boost::unordered_map<int32_t, RuntimeFilter*> RuntimeFilterMap;
+
+  /// All filters expected to be produced by the local plan fragment instance.
+  RuntimeFilterMap produced_filters_;
+
+  /// All filters expected to be consumed by the local plan fragment instance.
+  RuntimeFilterMap consumed_filters_;
+
+  /// Fragment instance's runtime state.
+  RuntimeState* state_;
+
+  /// Object pool to track allocated Bloom filters.
+  ObjectPool obj_pool_;
+
+  /// True iff Close() has been called. Used to prevent races between
+  /// AllocateScratchBloomFilter() and Close().
+  bool closed_;
+
+  /// Total amount of memory allocated to Bloom Filters
+  RuntimeProfile::Counter* memory_allocated_;
+
+  /// Precomputed default BloomFilter size.
+  int64_t default_filter_size_;
+
+  /// Maximum filter size, in bytes, rounded up to a power of two.
+  int64_t max_filter_size_;
+
+  /// Minimum filter size, in bytes, rounded up to a power of two.
+  int64_t min_filter_size_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index b0126ec..9ae6cbb 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -14,211 +14,14 @@
 
 #include "runtime/runtime-filter.inline.h"
 
+#include "util/time.h"
+
 #include "common/names.h"
-#include "gutil/bits.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/client-cache.h"
-#include "runtime/exec-env.h"
-#include "runtime/backend-client.h"
-#include "service/impala-server.h"
-#include "util/bloom-filter.h"
 
 using namespace impala;
-using namespace boost;
-using namespace strings;
-
-DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false "
-    "positives in a runtime filter before it is disabled.");
 
 const int RuntimeFilter::SLEEP_PERIOD_MS = 20;
 
-const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
-const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
-
-RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
-    : query_ctx_(query_ctx), state_(state), closed_(false) {
-  memory_allocated_ =
-      state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
-
-  // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
-  max_filter_size_ = query_ctx_.request.query_options.runtime_filter_max_size;
-  max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
-  max_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
-
-  min_filter_size_ = query_ctx_.request.query_options.runtime_filter_min_size;
-  min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
-  min_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
-
-  // Make sure that min <= max
-  min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
-
-  DCHECK_GT(min_filter_size_, 0);
-  DCHECK_GT(max_filter_size_, 0);
-
-  default_filter_size_ = query_ctx_.request.query_options.runtime_bloom_filter_size;
-  default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
-  default_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
-}
-
-RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
-    bool is_producer) {
-  RuntimeFilter* ret = obj_pool_.Add(
-      new RuntimeFilter(filter_desc, GetFilterSizeForNdv(filter_desc.ndv_estimate)));
-  lock_guard<mutex> l(runtime_filter_lock_);
-  if (is_producer) {
-    DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
-    produced_filters_[filter_desc.filter_id] = ret;
-  } else {
-    if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
-      consumed_filters_[filter_desc.filter_id] = ret;
-    } else {
-      // The filter has already been registered in this filter bank by another
-      // target node.
-      DCHECK_GT(filter_desc.targets.size(), 1);
-      ret = consumed_filters_[filter_desc.filter_id];
-    }
-  }
-  return ret;
-}
-
-namespace {
-
-/// Sends a filter to the coordinator. Executed asynchronously in the context of
-/// ExecEnv::rpc_pool().
-void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
-    ImpalaBackendClientCache* client_cache) {
-  Status status;
-  ImpalaBackendConnection coord(client_cache, address, &status);
-  if (!status.ok()) {
-    // Failing to send a filter is not a query-wide error - the remote fragment will
-    // continue regardless.
-    // TODO: Retry.
-    LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
-    return;
-  }
-  TUpdateFilterResult res;
-  status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
-}
-
-}
-
-void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
-    BloomFilter* bloom_filter) {
-  DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
-      << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
-  TUpdateFilterParams params;
-  // A runtime filter may have both local and remote targets.
-  bool has_local_target = false;
-  bool has_remote_target = false;
-  {
-    lock_guard<mutex> l(runtime_filter_lock_);
-    RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
-    DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
-                                          << filter_id;
-    it->second->SetBloomFilter(bloom_filter);
-    has_local_target = it->second->filter_desc().has_local_targets;
-    has_remote_target = it->second->filter_desc().has_remote_targets;
-  }
-
-  if (has_local_target) {
-    // Do a short circuit publication by pushing the same BloomFilter to the consumer
-    // side.
-    RuntimeFilter* filter;
-    {
-      lock_guard<mutex> l(runtime_filter_lock_);
-      RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
-      if (it == consumed_filters_.end()) return;
-      filter = it->second;
-    }
-    filter->SetBloomFilter(bloom_filter);
-    state_->runtime_profile()->AddInfoString(
-        Substitute("Filter $0 arrival", filter_id),
-        PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
-  }
-
-  if (has_remote_target
-      && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
-    BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
-    params.filter_id = filter_id;
-    params.query_id = query_ctx_.query_id;
-
-    ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
-        SendFilterToCoordinator, query_ctx_.coord_address, params,
-        ExecEnv::GetInstance()->impalad_client_cache()));
-  }
-}
-
-void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id,
-    const TBloomFilter& thrift_filter) {
-  lock_guard<mutex> l(runtime_filter_lock_);
-  if (closed_) return;
-  RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
-  DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
-                                        << filter_id;
-  if (thrift_filter.always_true) {
-    it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
-  } else {
-    int64_t required_space =
-        BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
-    // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
-    // there's not enough memory for it.
-    if (!state_->query_mem_tracker()->TryConsume(required_space)) {
-      VLOG_QUERY << "No memory for global filter: " << filter_id
-                 << " (fragment instance: " << state_->fragment_instance_id() << ")";
-      it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
-    } else {
-      BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter));
-      DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-      memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
-      it->second->SetBloomFilter(bloom_filter);
-    }
-  }
-  state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id),
-      PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS));
-}
-
-BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
-  lock_guard<mutex> l(runtime_filter_lock_);
-  if (closed_) return NULL;
-
-  RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
-  DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
-
-  // Track required space
-  int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size());
-  int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size);
-  if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL;
-  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
-  DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-  memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
-  return bloom_filter;
-}
-
-int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
-  if (ndv == -1) return default_filter_size_;
-  int64_t required_space =
-      1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
-  required_space = max<int64_t>(required_space, min_filter_size_);
-  required_space = min<int64_t>(required_space, max_filter_size_);
-  return required_space;
-}
-
-bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
-  double fpp =
-      BloomFilter::FalsePositiveProb(observed_ndv, Bits::Log2Ceiling64(filter_size));
-  return fpp > FLAGS_max_filter_error_rate;
-}
-
-void RuntimeFilterBank::Close() {
-  lock_guard<mutex> l(runtime_filter_lock_);
-  closed_ = true;
-  obj_pool_.Clear();
-  state_->query_mem_tracker()->Release(memory_allocated_->value());
-}
-
 bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
   do {
     if (HasBloomFilter()) return true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 178c03f..4168704 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -16,135 +16,14 @@
 #ifndef IMPALA_RUNTIME_RUNTIME_FILTER_H
 #define IMPALA_RUNTIME_RUNTIME_FILTER_H
 
-#include <boost/unordered_map.hpp>
-
-#include "common/object-pool.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-#include "gen-cpp/PlanNodes_types.h"
-#include "runtime/types.h"
-#include "util/runtime-profile.h"
+#include "runtime/raw-value.h"
+#include "runtime/runtime-filter-bank.h"
+#include "util/bloom-filter.h"
 #include "util/spinlock.h"
 
 namespace impala {
 
 class BloomFilter;
-class RuntimeFilter;
-class RuntimeState;
-
-/// RuntimeFilters are produced and consumed by plan nodes at run time to propagate
-/// predicates across the plan tree dynamically. Each fragment instance manages its
-/// filters with a RuntimeFilterBank which provides low-synchronization access to filter
-/// objects and data structures.
-///
-/// A RuntimeFilterBank manages both production and consumption of filters. In the case
-/// where a given filter is both consumed and produced by the same fragment, the
-/// RuntimeFilterBank treats each filter independently.
-///
-/// All filters must be registered with the filter bank via RegisterFilter(). Local plan
-/// fragments update the bloom filters by calling UpdateFilterFromLocal()
-/// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The
-/// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by
-/// AllocateScratchBloomFilter() (or be NULL); this allows RuntimeFilterBank to manage all
-/// memory associated with filters.
-///
-/// Filters are aggregated at the coordinator, and then made available to consumers after
-/// PublishGlobalFilter() has been called.
-///
-/// After PublishGlobalFilter() has been called (and again, it may only be called once per
-/// filter_id), the RuntimeFilter object associated with filter_id will have a valid
-/// bloom_filter, and may be used for filter evaluation. This operation occurs without
-/// synchronisation, and neither the thread that calls PublishGlobalFilter() nor the
-/// thread that may call RuntimeFilter::Eval() need to coordinate in any way.
-class RuntimeFilterBank {
- public:
-  RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state);
-
-  /// Registers a filter that will either be produced (is_producer == false) or consumed
-  /// (is_producer == true) by fragments that share this RuntimeState. The filter
-  /// bloom_filter itself is unallocated until the first call to PublishGlobalFilter().
-  RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer);
-
-  /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some
-  /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a
-  /// full filter that contains all elements.
-  void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter);
-
-  /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
-  /// consumption by operators that wish to use it for filtering.
-  void PublishGlobalFilter(int32_t filter_id, const TBloomFilter& thrift_filter);
-
-  /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
-  /// 'filter_size' would have an expected false-positive rate which would exceed
-  /// FLAGS_max_filter_error_rate.
-  bool FpRateTooHigh(int64_t filter_size, int64_t observed_ndv);
-
-  /// Returns a RuntimeFilter with the given filter id. This is safe to call after all
-  /// calls to RegisterFilter() have finished, and not before. Filters may be cached by
-  /// clients and subsequently accessed without synchronization. Concurrent calls to
-  /// PublishGlobalFilter() will update a filter's bloom filter atomically, without the
-  /// need for client synchronization.
-  inline const RuntimeFilter* GetRuntimeFilter(int32_t filter_id);
-
-  /// Returns a bloom_filter that can be used by an operator to produce a local filter,
-  /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by
-  /// the RuntimeFilterBank (which may transfer it to a RuntimeFilter subsequently), and
-  /// should not be deleted by the caller. The filter identified by 'filter_id' must have
-  /// been previously registered as a 'producer' by RegisterFilter().
-  ///
-  /// If there is not enough memory, or if Close() has been called first, returns NULL.
-  BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);
-
-  /// Default hash seed to use when computing hashed values to insert into filters.
-  static const int32_t DefaultHashSeed() { return 1234; }
-
-  /// Releases all memory allocated for BloomFilters.
-  void Close();
-
-  static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024;           // 4KB
-  static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024;   // 16MB
-
- private:
-  /// Returns the the space (in bytes) required for a filter to achieve the configured
-  /// maximum false-positive rate based on the expected NDV. If 'ndv' is -1 (i.e. no
-  /// estimate is known), the default filter size is returned.
-  int64_t GetFilterSizeForNdv(int64_t ndv);
-
-  const TQueryCtx query_ctx_;
-
-  /// Lock protecting produced_filters_ and consumed_filters_.
-  boost::mutex runtime_filter_lock_;
-
-  /// Map from filter id to a RuntimeFilter.
-  typedef boost::unordered_map<int32_t, RuntimeFilter*> RuntimeFilterMap;
-
-  /// All filters expected to be produced by the local plan fragment instance.
-  RuntimeFilterMap produced_filters_;
-
-  /// All filters expected to be consumed by the local plan fragment instance.
-  RuntimeFilterMap consumed_filters_;
-
-  /// Fragment instance's runtime state.
-  RuntimeState* state_;
-
-  /// Object pool to track allocated Bloom filters.
-  ObjectPool obj_pool_;
-
-  /// True iff Close() has been called. Used to prevent races between
-  /// AllocateScratchBloomFilter() and Close().
-  bool closed_;
-
-  /// Total amount of memory allocated to Bloom Filters
-  RuntimeProfile::Counter* memory_allocated_;
-
-  /// Precomputed default BloomFilter size.
-  int64_t default_filter_size_;
-
-  /// Maximum filter size, in bytes, rounded up to a power of two.
-  int64_t max_filter_size_;
-
-  /// Minimum filter size, in bytes, rounded up to a power of two.
-  int64_t min_filter_size_;
-};
 
 /// RuntimeFilters represent set-membership predicates (implemented with bloom filters)
 /// that are computed during query execution (rather than during planning). They can then