You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/14 01:09:12 UTC

[2/4] impala git commit: IMPALA-5519: Allocate fragment's runtime filter memory from Buffer pool

IMPALA-5519: Allocate fragment's runtime filter memory from Buffer pool

This patch adds changes to the planner to account for memory used by
bloom filters at the fragment instance level. Also adds changes to
allocate memory for those bloom filters from the buffer pool.

Testing:
- Modified Planner Tests and end to end tests to account for memory
  reservation for the runtime filters.
- Modified backend tests and benchmarks to use the bufferpool for
  bloom filter allocation.
- Add an end to end test.
- Ran rest of the core tests.

Change-Id: Iea2759665fb2e8bef9433014a8d42a7ebf99ce1f
Reviewed-on: http://gerrit.cloudera.org:8080/8971
Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8fc1eccc
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8fc1eccc
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8fc1eccc

Branch: refs/heads/master
Commit: 8fc1eccce494c36c5f8d94e5083366b40365a71f
Parents: 5f75996
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Dec 18 11:04:27 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 13 08:29:03 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/bloom-filter-benchmark.cc     |  90 ++++--
 be/src/runtime/fragment-instance-state.cc       |   3 +-
 be/src/runtime/runtime-filter-bank.cc           | 109 +++----
 be/src/runtime/runtime-filter-bank.h            |  42 ++-
 be/src/runtime/runtime-filter.h                 |   2 +-
 be/src/runtime/runtime-state.cc                 |   6 +-
 be/src/runtime/runtime-state.h                  |   5 +-
 be/src/service/fe-support.cc                    |  14 +
 be/src/service/query-options-test.cc            |  14 +-
 be/src/service/query-options.cc                 |  12 +
 be/src/util/backend-gflag-util.cc               |   5 +-
 be/src/util/bloom-filter-test.cc                | 289 ++++++++++++-------
 be/src/util/bloom-filter.cc                     |  73 +++--
 be/src/util/bloom-filter.h                      |  59 ++--
 common/thrift/BackendGflags.thrift              |   4 +
 common/thrift/ImpalaInternalService.thrift      |  10 +-
 common/thrift/PlanNodes.thrift                  |   4 +
 common/thrift/Planner.thrift                    |   4 +
 .../org/apache/impala/planner/PlanFragment.java |  34 ++-
 .../impala/planner/RuntimeFilterGenerator.java  |  91 +++++-
 .../apache/impala/service/BackendConfig.java    |   4 +
 .../org/apache/impala/service/FeSupport.java    |  16 +
 .../queries/PlannerTest/disable-codegen.test    |   8 +-
 .../PlannerTest/fk-pk-join-detection.test       |  20 +-
 .../queries/PlannerTest/max-row-size.test       |  54 ++--
 .../PlannerTest/min-max-runtime-filters.test    |   4 +-
 .../PlannerTest/resource-requirements.test      | 218 +++++++-------
 .../PlannerTest/spillable-buffer-sizing.test    |  80 ++---
 .../queries/PlannerTest/tablesample.test        |   4 +-
 .../admission-reject-min-reservation.test       |   4 +-
 .../queries/QueryTest/bloom_filters.test        |  73 +++--
 .../queries/QueryTest/bloom_filters_wait.test   |  17 +-
 .../queries/QueryTest/explain-level0.test       |   4 +-
 .../queries/QueryTest/explain-level1.test       |   4 +-
 .../queries/QueryTest/explain-level2.test       |   6 +-
 .../queries/QueryTest/explain-level3.test       |   6 +-
 .../queries/QueryTest/runtime_row_filters.test  |  34 +--
 .../queries/QueryTest/spilling.test             |   4 +-
 38 files changed, 891 insertions(+), 539 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/benchmarks/bloom-filter-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/bloom-filter-benchmark.cc b/be/src/benchmarks/bloom-filter-benchmark.cc
index 6728c42..9660bfa 100644
--- a/be/src/benchmarks/bloom-filter-benchmark.cc
+++ b/be/src/benchmarks/bloom-filter-benchmark.cc
@@ -21,9 +21,15 @@
 #include <iostream>
 #include <vector>
 
+#include "runtime/bufferpool/buffer-allocator.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
 #include "util/benchmark.h"
-#include "util/cpu-info.h"
 #include "util/bloom-filter.h"
+#include "common/init.h"
 
 #include "common/names.h"
 
@@ -172,25 +178,40 @@ uint32_t MakeRand() {
 namespace initialize {
 
 void Benchmark(int batch_size, void* data) {
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "");
+  ExecEnv* env = ExecEnv::GetInstance();
+  BufferPool::ClientHandle client;
+  CHECK(env->buffer_pool()
+            ->RegisterClient("", nullptr, env->buffer_reservation(), nullptr,
+                std::numeric_limits<int64>::max(), profile, &client).ok());
   int* d = reinterpret_cast<int*>(data);
+  CHECK(client.IncreaseReservation(BloomFilter::GetExpectedMemoryUsed(*d)));
   for (int i = 0; i < batch_size; ++i) {
-    BloomFilter bf(*d);
+    BloomFilter bf(&client);
+    CHECK(bf.Init(*d).ok());
+    bf.Close();
   }
+  env->buffer_pool()->DeregisterClient(&client);
+  pool.Clear();
 }
 
 }  // namespace initialize
 
-
 // Benchmark insert
 namespace insert {
 
 struct TestData {
-  explicit TestData(int log_heap_size) : bf(log_heap_size), data(1ull << 20) {
+  explicit TestData(int log_bufferpool_size, BufferPool::ClientHandle* client)
+    : bf(client), data(1ull << 20) {
+    CHECK(bf.Init(log_bufferpool_size).ok());
     for (size_t i = 0; i < data.size(); ++i) {
       data[i] = MakeRand();
     }
   }
 
+  ~TestData() { bf.Close(); }
+
   BloomFilter bf;
   vector<uint32_t> data;
 };
@@ -211,12 +232,13 @@ void Benchmark(int batch_size, void* data) {
 namespace find {
 
 struct TestData {
-  TestData(int log_heap_size, size_t size)
-      : bf(log_heap_size),
-        vec_mask((1ull << static_cast<int>(floor(log2(size))))-1),
-        present(size),
-        absent(size),
-        result(0) {
+  TestData(int log_bufferpool_size, BufferPool::ClientHandle* client, size_t size)
+    : bf(client),
+      vec_mask((1ull << static_cast<int>(floor(log2(size)))) - 1),
+      present(size),
+      absent(size),
+      result(0) {
+    CHECK(bf.Init(log_bufferpool_size).ok());
     for (size_t i = 0; i < size; ++i) {
       present[i] = MakeRand();
       absent[i] = MakeRand();
@@ -224,6 +246,8 @@ struct TestData {
     }
   }
 
+  ~TestData() { bf.Close(); }
+
   BloomFilter bf;
   // A mask value such that i & vec_mask < present.size() (and absent.size()). This is
   // used in the benchmark functions to loop through present and absent, because
@@ -254,10 +278,12 @@ void Absent(int batch_size, void* data) {
 namespace either {
 
 struct TestData {
-  explicit TestData(int log_heap_size) {
-    BloomFilter bf(log_heap_size);
+  explicit TestData(int log_bufferpool_size, BufferPool::ClientHandle* client) {
+    BloomFilter bf(client);
+    CHECK(bf.Init(log_bufferpool_size).ok());
     BloomFilter::ToThrift(&bf, &tbf1);
     BloomFilter::ToThrift(&bf, &tbf2);
+    bf.Close();
   }
 
   TBloomFilter tbf1, tbf2;
@@ -273,7 +299,13 @@ void Benchmark(int batch_size, void* data) {
 } // namespace either
 
 void RunBenchmarks() {
-
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "");
+  ExecEnv* env = ExecEnv::GetInstance();
+  BufferPool::ClientHandle client;
+  CHECK(env->buffer_pool()
+            ->RegisterClient("", nullptr, env->buffer_reservation(), nullptr,
+                std::numeric_limits<int64>::max(), profile, &client).ok());
   char name[120];
 
   {
@@ -282,13 +314,18 @@ void RunBenchmarks() {
     for (int ndv = 10000; ndv <= 100 * 1000 * 1000; ndv *= 100) {
       for (int log10fpp = -1; log10fpp >= -3; --log10fpp) {
         const double fpp = pow(10, log10fpp);
-        testdata.emplace_back(new insert::TestData(BloomFilter::MinLogSpace(ndv, fpp)));
+        int log_required_size = BloomFilter::MinLogSpace(ndv, fpp);
+        CHECK(client.IncreaseReservation(
+            BloomFilter::GetExpectedMemoryUsed(log_required_size)));
+        testdata.emplace_back(
+            new insert::TestData(log_required_size, &client));
         snprintf(name, sizeof(name), "ndv %7dk fpp %6.1f%%", ndv/1000, fpp*100);
         suite.AddBenchmark(name, insert::Benchmark, testdata.back().get());
       }
     }
     cout << suite.Measure() << endl;
   }
+  CHECK(client.DecreaseReservationTo(0).ok());
 
   {
     Benchmark suite("find");
@@ -296,8 +333,11 @@ void RunBenchmarks() {
     for (int ndv = 10000; ndv <= 100 * 1000 * 1000; ndv *= 100) {
       for (int log10fpp = -1; log10fpp >= -3; --log10fpp) {
         const double fpp = pow(10, log10fpp);
+        int log_required_size = BloomFilter::MinLogSpace(ndv, fpp);
+        CHECK(client.IncreaseReservation(
+            BloomFilter::GetExpectedMemoryUsed(log_required_size)));
         testdata.emplace_back(
-            new find::TestData(BloomFilter::MinLogSpace(ndv, fpp), ndv));
+            new find::TestData(BloomFilter::MinLogSpace(ndv, fpp), &client , ndv));
         snprintf(name, sizeof(name), "present ndv %7dk fpp %6.1f%%", ndv/1000, fpp*100);
         suite.AddBenchmark(name, find::Present, testdata.back().get());
 
@@ -307,6 +347,7 @@ void RunBenchmarks() {
     }
     cout << suite.Measure() << endl;
   }
+  CHECK(client.DecreaseReservationTo(0).ok());
 
   {
     Benchmark suite("union", false /* micro_heuristics */);
@@ -314,18 +355,31 @@ void RunBenchmarks() {
     for (int ndv = 10000; ndv <= 100 * 1000 * 1000; ndv *= 100) {
       for (int log10fpp = -1; log10fpp >= -3; --log10fpp) {
         const double fpp = pow(10, log10fpp);
-        testdata.emplace_back(
-            new either::TestData(BloomFilter::MinLogSpace(ndv, fpp)));
+        int log_required_size = BloomFilter::MinLogSpace(ndv, fpp);
+        CHECK(client.IncreaseReservation(
+            BloomFilter::GetExpectedMemoryUsed(log_required_size)));
+        testdata.emplace_back(new either::TestData(
+            BloomFilter::MinLogSpace(ndv, fpp), &client));
         snprintf(name, sizeof(name), "ndv %7dk fpp %6.1f%%", ndv/1000, fpp*100);
         suite.AddBenchmark(name, either::Benchmark, testdata.back().get());
       }
     }
     cout << suite.Measure() << endl;
   }
+
+  CHECK(client.DecreaseReservationTo(0).ok());
+  env->buffer_pool()->DeregisterClient(&client);
+  pool.Clear();
 }
 
 int main(int argc, char **argv) {
-  CpuInfo::Init();
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  TestEnv test_env;
+  int64_t min_page_size = 8;
+  int64_t buffer_bytes_limit = 4L * 1024 * 1024 * 1024;
+  test_env.SetBufferPoolArgs(min_page_size, buffer_bytes_limit);
+  CHECK(test_env.Init().ok());
 
   cout << endl << Benchmark::GetMachineInfo() << endl << endl;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index ad9e99e..f36c91d 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -135,7 +135,8 @@ Status FragmentInstanceState::Prepare() {
   event_sequence_->Start(query_state_->fragment_events_start_time());
   UpdateState(StateEvent::PREPARE_START);
 
-  runtime_state_->InitFilterBank();
+  RETURN_IF_ERROR(runtime_state_->InitFilterBank(
+      fragment_ctx_.fragment.runtime_filters_reservation_bytes));
 
   // Reserve one main thread from the pool
   runtime_state_->resource_pool()->AcquireThreadToken();

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 178aef1..239e066 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -22,7 +22,10 @@
 #include "runtime/client-cache.h"
 #include "runtime/exec-env.h"
 #include "runtime/backend-client.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/initial-reservations.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/runtime-filter.inline.h"
 #include "service/impala-server.h"
 #include "util/bit-util.h"
@@ -41,48 +44,44 @@ DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability o
 const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
 const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
 
-RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
+RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state,
+    long total_filter_mem_required)
   : state_(state),
     filter_mem_tracker_(
         new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false)),
     mem_pool_(filter_mem_tracker_.get()),
-    closed_(false) {
+    closed_(false),
+    total_bloom_filter_mem_required_(total_filter_mem_required) {
   bloom_memory_allocated_ =
       state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
+}
 
-  // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
-  max_filter_size_ = query_ctx.client_request.query_options.runtime_filter_max_size;
-  max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
-  max_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
-
-  min_filter_size_ = query_ctx.client_request.query_options.runtime_filter_min_size;
-  min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
-  min_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
-
-  // Make sure that min <= max
-  min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
-
-  DCHECK_GT(min_filter_size_, 0);
-  DCHECK_GT(max_filter_size_, 0);
-
-  default_filter_size_ = query_ctx.client_request.query_options.runtime_bloom_filter_size;
-  default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
-  default_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
+Status RuntimeFilterBank::ClaimBufferReservation() {
+  DCHECK(!buffer_pool_client_.is_registered());
+  string filter_bank_name = Substitute(
+      "RuntimeFilterBank (Fragment Id: $0)", PrintId(state_->fragment_instance_id()));
+  RETURN_IF_ERROR(state_->exec_env()->buffer_pool()->RegisterClient(filter_bank_name,
+      state_->query_state()->file_group(), state_->instance_buffer_reservation(),
+      filter_mem_tracker_.get(), total_bloom_filter_mem_required_,
+      state_->runtime_profile(), &buffer_pool_client_));
+  VLOG_FILE << filter_bank_name << " claiming reservation "
+            << total_bloom_filter_mem_required_;
+  state_->query_state()->initial_reservations()->Claim(
+      &buffer_pool_client_, total_bloom_filter_mem_required_);
+  return Status::OK();
 }
 
 RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
     bool is_producer) {
-  RuntimeFilter* ret = obj_pool_.Add(
-      new RuntimeFilter(filter_desc, GetFilterSizeForNdv(filter_desc.ndv_estimate)));
+  RuntimeFilter* ret = nullptr;
   lock_guard<mutex> l(runtime_filter_lock_);
   if (is_producer) {
     DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
+    ret = obj_pool_.Add(new RuntimeFilter(filter_desc, filter_desc.filter_size_bytes));
     produced_filters_[filter_desc.filter_id] = ret;
   } else {
     if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
+      ret = obj_pool_.Add(new RuntimeFilter(filter_desc, filter_desc.filter_size_bytes));
       consumed_filters_[filter_desc.filter_id] = ret;
       VLOG_QUERY << "registered consumer filter " << filter_desc.filter_id;
     } else {
@@ -186,17 +185,20 @@ void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params)
       bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
     } else {
       int64_t required_space =
-          BloomFilter::GetExpectedHeapSpaceUsed(params.bloom_filter.log_heap_space);
-      // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
-      // there's not enough memory for it.
-      if (!filter_mem_tracker_->TryConsume(required_space)) {
-        VLOG_QUERY << "No memory for global filter: " << params.filter_id
-                   << " (fragment instance: " << state_->fragment_instance_id() << ")";
+          BloomFilter::GetExpectedMemoryUsed(params.bloom_filter.log_bufferpool_space);
+      DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
+          << "BufferPool Client should have enough reservation to fulfill bloom filter "
+             "allocation";
+      bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
+      Status status = bloom_filter->Init(params.bloom_filter);
+      if (!status.ok()) {
+        LOG(ERROR) << "Unable to allocate memory for bloom filter: "
+                   << status.GetDetail();
         bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
       } else {
-        bloom_filter = obj_pool_.Add(new BloomFilter(params.bloom_filter));
-        DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-        bloom_memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+        bloom_filters_.push_back(bloom_filter);
+        DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
+        bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
       }
     }
   } else {
@@ -213,18 +215,26 @@ void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params)
 
 BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
   lock_guard<mutex> l(runtime_filter_lock_);
-  if (closed_) return NULL;
+  if (closed_) return nullptr;
 
   RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
   DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
 
   // Track required space
   int64_t log_filter_size = BitUtil::Log2Ceiling64(it->second->filter_size());
-  int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size);
-  if (!filter_mem_tracker_->TryConsume(required_space)) return NULL;
-  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
-  DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-  bloom_memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+  int64_t required_space = BloomFilter::GetExpectedMemoryUsed(log_filter_size);
+  DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
+      << "BufferPool Client should have enough reservation to fulfill bloom filter "
+         "allocation";
+  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
+  Status status = bloom_filter->Init(log_filter_size);
+  if (!status.ok()) {
+    LOG(ERROR) << "Unable to allocate memory for bloom filter: " << status.GetDetail();
+    return nullptr;
+  }
+  bloom_filters_.push_back(bloom_filter);
+  DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
+  bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
   return bloom_filter;
 }
 
@@ -239,15 +249,6 @@ MinMaxFilter* RuntimeFilterBank::AllocateScratchMinMaxFilter(
   return MinMaxFilter::Create(type, &obj_pool_, &mem_pool_);
 }
 
-int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
-  if (ndv == -1) return default_filter_size_;
-  int64_t required_space =
-      1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
-  required_space = max<int64_t>(required_space, min_filter_size_);
-  required_space = min<int64_t>(required_space, max_filter_size_);
-  return required_space;
-}
-
 bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
   double fpp =
       BloomFilter::FalsePositiveProb(observed_ndv, BitUtil::Log2Ceiling64(filter_size));
@@ -257,8 +258,16 @@ bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv)
 void RuntimeFilterBank::Close() {
   lock_guard<mutex> l(runtime_filter_lock_);
   closed_ = true;
+  for (BloomFilter* filter : bloom_filters_) filter->Close();
   obj_pool_.Clear();
   mem_pool_.FreeAll();
-  filter_mem_tracker_->Release(bloom_memory_allocated_->value());
+  if (buffer_pool_client_.is_registered()) {
+    VLOG_FILE << "RuntimeFilterBank (Fragment Id: " << state_->fragment_instance_id()
+              << ") returning reservation " << total_bloom_filter_mem_required_;
+    state_->query_state()->initial_reservations()->Return(
+        &buffer_pool_client_, total_bloom_filter_mem_required_);
+    state_->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
+  }
+  DCHECK_EQ(filter_mem_tracker_->consumption(), 0);
   filter_mem_tracker_->Close();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/runtime/runtime-filter-bank.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 8f6bb42..ead7e82 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -20,6 +20,7 @@
 
 #include "codegen/impala-ir.h"
 #include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/mem-pool.h"
 #include "runtime/types.h"
 #include "util/runtime-profile.h"
@@ -67,7 +68,17 @@ class TQueryCtx;
 /// coordinate in any way.
 class RuntimeFilterBank {
  public:
-  RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state);
+  RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state,
+      long total_filter_mem_required);
+
+  /// Initialize 'buffer_pool_client_' and claim the initial reservation. The client is
+  /// automatically cleaned up in Close(). Should not be called if the client is already
+  /// open.
+  ///
+  /// Must return the initial reservation to QueryState::initial_reservations(), which is
+  /// done automatically in Close() as long as the initial reservation is not released
+  /// before Close().
+  Status ClaimBufferReservation() WARN_UNUSED_RESULT;
 
   /// Registers a filter that will either be produced (is_producer == false) or consumed
   /// (is_producer == true) by fragments that share this RuntimeState. The filter
@@ -99,11 +110,12 @@ class RuntimeFilterBank {
 
   /// Returns a bloom_filter that can be used by an operator to produce a local filter,
   /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by
-  /// the RuntimeFilterBank (which may transfer it to a RuntimeFilter subsequently), and
-  /// should not be deleted by the caller. The filter identified by 'filter_id' must have
-  /// been previously registered as a 'producer' by RegisterFilter().
+  /// the RuntimeFilterBank and should not be deleted by the caller. The filter identified
+  /// by 'filter_id' must have been previously registered as a 'producer' by
+  /// RegisterFilter().
   ///
-  /// If there is not enough memory, or if Close() has been called first, returns NULL.
+  /// If memory allocation for the filter fails, or if Close() has been called first,
+  /// returns NULL.
   BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);
 
   /// Returns a new MinMaxFilter. Handles memory the same as AllocateScratchBloomFilter().
@@ -119,11 +131,6 @@ class RuntimeFilterBank {
   static const int64_t MAX_BLOOM_FILTER_SIZE = 512 * 1024 * 1024;  // 512MB
 
  private:
-  /// Returns the the space (in bytes) required for a filter to achieve the configured
-  /// maximum false-positive rate based on the expected NDV. If 'ndv' is -1 (i.e. no
-  /// estimate is known), the default filter size is returned.
-  int64_t GetFilterSizeForNdv(int64_t ndv);
-
   /// Lock protecting produced_filters_ and consumed_filters_.
   boost::mutex runtime_filter_lock_;
 
@@ -155,14 +162,17 @@ class RuntimeFilterBank {
   /// Total amount of memory allocated to Bloom Filters
   RuntimeProfile::Counter* bloom_memory_allocated_;
 
-  /// Precomputed default BloomFilter size.
-  int64_t default_filter_size_;
+  /// Total amount of memory required by the bloom filters as calculated by the planner.
+  long total_bloom_filter_mem_required_;
 
-  /// Maximum filter size, in bytes, rounded up to a power of two.
-  int64_t max_filter_size_;
+  /// Contains references to all the bloom filters generated. Used in Close() to safely
+  /// release all memory allocated for Bloomfilters.
+  vector<BloomFilter*> bloom_filters_;
 
-  /// Minimum filter size, in bytes, rounded up to a power of two.
-  int64_t min_filter_size_;
+  /// Buffer pool client for the filter bank. Initialized with the required reservation
+  /// in ClaimBufferReservation(). Reservations are returned to the initial reservations
+  /// pool in Close().
+  BufferPool::ClientHandle buffer_pool_client_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 40c5f23..7ab73d7 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -46,7 +46,7 @@ class RuntimeFilter {
       : bloom_filter_(nullptr), min_max_filter_(nullptr), filter_desc_(filter),
         registration_time_(MonotonicMillis()), arrival_time_(0L),
         filter_size_(filter_size) {
-    DCHECK_GT(filter_size_, 0);
+    DCHECK(filter_desc_.type == TRuntimeFilterType::MIN_MAX || filter_size_ > 0);
   }
 
   /// Returns true if SetFilter() has been called.

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 37219cc..4b39ec8 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -124,8 +124,10 @@ void RuntimeState::Init() {
   }
 }
 
-void RuntimeState::InitFilterBank() {
-  filter_bank_.reset(new RuntimeFilterBank(query_ctx(), this));
+Status RuntimeState::InitFilterBank(long runtime_filters_reservation_bytes) {
+  filter_bank_.reset(
+      new RuntimeFilterBank(query_ctx(), this, runtime_filters_reservation_bytes));
+  return filter_bank_->ClaimBufferReservation();
 }
 
 Status RuntimeState::CreateCodegen() {

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 66b2099..9de4dfa 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -91,8 +91,9 @@ class RuntimeState {
   /// Empty d'tor to avoid issues with scoped_ptr.
   ~RuntimeState();
 
-  /// Initializes the runtime filter bank.
-  void InitFilterBank();
+  /// Initializes the runtime filter bank and claims the initial buffer reservation
+  /// for it.
+  Status InitFilterBank(long runtime_filters_reservation_bytes);
 
   QueryState* query_state() const { return query_state_; }
   /// Return the query's ObjectPool

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index bc4afde..2b6e2fd 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -44,6 +44,7 @@
 #include "runtime/runtime-state.h"
 #include "service/impala-server.h"
 #include "service/query-options.h"
+#include "util/bloom-filter.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
@@ -537,6 +538,15 @@ Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions(
   return result_bytes;
 }
 
+// Returns the log (base 2) of the minimum number of bytes we need for a Bloom filter
+// with 'ndv' unique elements and a false positive probability of less than 'fpp'.
+extern "C"
+JNIEXPORT jint JNICALL
+Java_org_apache_impala_service_FeSupport_MinLogSpaceForBloomFilter(
+    JNIEnv* env, jclass caller_class, jlong ndv, jdouble fpp) {
+  return BloomFilter::MinLogSpace(ndv, fpp);
+}
+
 namespace impala {
 
 static JNINativeMethod native_methods[] = {
@@ -581,6 +591,10 @@ static JNINativeMethod native_methods[] = {
       (char*)"NativeLibCacheRemoveEntry", (char*)"(Ljava/lang/String;)Z",
       (void*)::Java_org_apache_impala_service_FeSupport_NativeLibCacheRemoveEntry
   },
+  {
+    (char*)"MinLogSpaceForBloomFilter", (char*)"(JD)I",
+    (void*)::Java_org_apache_impala_service_FeSupport_MinLogSpaceForBloomFilter
+  },
 };
 
 void InitFeSupport(bool disable_codegen) {

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 42681d9..c198919 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -147,9 +147,10 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(runtime_filter_min_size),
           {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
               RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
+      // Lower limit for runtime_filter_max_size is FLAGS_min_buffer_size which has a
+      // default value of is 64KB.
       {MAKE_OPTIONDEF(runtime_filter_max_size),
-          {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
-              RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
+          {64 * 1024, RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
       {MAKE_OPTIONDEF(runtime_bloom_filter_size),
           {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
               RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}}
@@ -309,6 +310,15 @@ TEST(QueryOptions, SetSpecialOptions) {
     TestError("0");
     TestError(to_string(ROW_SIZE_LIMIT + 1).c_str());
   }
+  // RUNTIME_FILTER_MAX_SIZE should not be less than FLAGS_min_buffer_size
+  {
+    OptionDef<int32_t> key_def = MAKE_OPTIONDEF(runtime_filter_max_size);
+    auto TestOk = MakeTestOkFn(options, key_def);
+    auto TestError = MakeTestErrFn(options, key_def);
+    TestOk("128KB", 128 * 1024);
+    TestError("65535"); // default value of FLAGS_min_buffer_size is 64KB
+    TestOk("64KB", 64 * 1024);
+  }
 }
 
 TEST(QueryOptions, ParseQueryOptions) {

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 3f7bb7c..7c76b10 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -30,6 +30,8 @@
 
 #include "common/names.h"
 
+DECLARE_int64(min_buffer_size);
+
 using boost::algorithm::iequals;
 using boost::algorithm::is_any_of;
 using boost::algorithm::token_compress_on;
@@ -364,6 +366,16 @@ Status impala::SetQueryOption(const string& key, const string& value,
                   RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
                   RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE));
         }
+        if (option == TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE
+            && size < FLAGS_min_buffer_size
+            // last condition is to unblock the highly improbable case where the
+            // min_buffer_size is greater than RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE.
+            && FLAGS_min_buffer_size <= RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE) {
+          return Status(Substitute("$0 should not be less than $1 which is the minimum "
+              "buffer size that can be allocated by the buffer pool",
+              PrintTImpalaQueryOptions(static_cast<TImpalaQueryOptions::type>(option)),
+              FLAGS_min_buffer_size));
+        }
         if (option == TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE) {
           query_options->__set_runtime_bloom_filter_size(size);
         } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE) {

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index f7f49ac..0bbaa89 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -47,6 +47,8 @@ DECLARE_string(authorized_proxy_user_config_delimiter);
 DECLARE_string(kudu_master_hosts);
 DECLARE_string(reserved_words_version);
 DECLARE_string(sentry_config);
+DECLARE_double(max_filter_error_rate);
+DECLARE_int64(min_buffer_size);
 
 namespace impala {
 
@@ -84,7 +86,8 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
     DCHECK_EQ(FLAGS_reserved_words_version, "3.0.0");
     cfg.__set_reserved_words_version(TReservedWordsVersion::IMPALA_3_0);
   }
-
+  cfg.__set_max_filter_error_rate(FLAGS_max_filter_error_rate);
+  cfg.__set_min_buffer_size(FLAGS_min_buffer_size);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/util/bloom-filter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter-test.cc b/be/src/util/bloom-filter-test.cc
index 20c2655..6fcfc92 100644
--- a/be/src/util/bloom-filter-test.cc
+++ b/be/src/util/bloom-filter-test.cc
@@ -21,6 +21,11 @@
 #include <unordered_set>
 #include <vector>
 
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 
 using namespace std;
@@ -84,49 +89,176 @@ TBloomFilter BfUnion(const BloomFilter& x, const BloomFilter& y, bool* success)
 
 namespace impala {
 
+// Test that MaxNdv() and MinLogSpace() are dual
+TEST(BloomFilter, MinSpaceMaxNdv) {
+  for (int log2fpp = -2; log2fpp >= -63; --log2fpp) {
+    const double fpp = pow(2, log2fpp);
+    for (int given_log_space = 8; given_log_space < 30; ++given_log_space) {
+      const size_t derived_ndv = BloomFilter::MaxNdv(given_log_space, fpp);
+      int derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp);
+
+      EXPECT_EQ(derived_log_space, given_log_space) << "fpp: " << fpp
+                                                    << " derived_ndv: " << derived_ndv;
+
+      // If we lower the fpp, we need more space; if we raise it we need less.
+      derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp / 2);
+      EXPECT_GE(derived_log_space, given_log_space) << "fpp: " << fpp
+                                                    << " derived_ndv: " << derived_ndv;
+      derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp * 2);
+      EXPECT_LE(derived_log_space, given_log_space) << "fpp: " << fpp
+                                                    << " derived_ndv: " << derived_ndv;
+    }
+    for (size_t given_ndv = 1000; given_ndv < 1000 * 1000; given_ndv *= 3) {
+      const int derived_log_space = BloomFilter::MinLogSpace(given_ndv, fpp);
+      const size_t derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp);
+
+      // The max ndv is close to, but larger than, then ndv we asked for
+      EXPECT_LE(given_ndv, derived_ndv) << "fpp: " << fpp
+                                        << " derived_log_space: " << derived_log_space;
+      EXPECT_GE(given_ndv * 2, derived_ndv)
+          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
+
+      // Changing the fpp changes the ndv capacity in the expected direction.
+      size_t new_derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp / 2);
+      EXPECT_GE(derived_ndv, new_derived_ndv)
+          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
+      new_derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp * 2);
+      EXPECT_LE(derived_ndv, new_derived_ndv)
+          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
+    }
+  }
+}
+
+TEST(BloomFilter, MinSpaceEdgeCase) {
+  int min_space = BloomFilter::MinLogSpace(1, 0.75);
+  EXPECT_GE(min_space, 0) << "LogSpace should always be >= 0";
+}
+
+// Check that MinLogSpace() and FalsePositiveProb() are dual
+TEST(BloomFilter, MinSpaceForFpp) {
+  for (size_t ndv = 10000; ndv < 100 * 1000 * 1000; ndv *= 1.01) {
+    for (double fpp = 0.1; fpp > pow(2, -20); fpp *= 0.99) { // NOLINT: loop on double
+      // When contructing a Bloom filter, we can request a particular fpp by calling
+      // MinLogSpace().
+      const int min_log_space = BloomFilter::MinLogSpace(ndv, fpp);
+      // However, at the resulting ndv and space, the expected fpp might be lower than
+      // the one that was requested.
+      double expected_fpp = BloomFilter::FalsePositiveProb(ndv, min_log_space);
+      EXPECT_LE(expected_fpp, fpp);
+      // The fpp we get might be much lower than the one we asked for. However, if the
+      // space were just one size smaller, the fpp we get would be larger than the one we
+      // asked for.
+      expected_fpp = BloomFilter::FalsePositiveProb(ndv, min_log_space - 1);
+      EXPECT_GE(expected_fpp, fpp);
+      // Therefore, the return value of MinLogSpace() is actually the minimum
+      // log space at which we can guarantee the requested fpp.
+    }
+  }
+}
+
+class BloomFilterTest : public testing::Test {
+ protected:
+  /// Temporary runtime environment for the BloomFilters.
+  unique_ptr<TestEnv> test_env_;
+  RuntimeState* runtime_state_;
+
+  ObjectPool pool_;
+  unique_ptr<MemTracker> tracker_;
+  unique_ptr<BufferPool::ClientHandle> buffer_pool_client_;
+  vector<BloomFilter*> bloom_filters_;
+
+  virtual void SetUp() {
+    int64_t min_page_size = 64; // Min filter size that we allocate in our tests.
+    int64_t buffer_bytes_limit = 4L * 1024 * 1024 * 1024;
+    test_env_.reset(new TestEnv());
+    test_env_->SetBufferPoolArgs(min_page_size, buffer_bytes_limit);
+    ASSERT_OK(test_env_->Init());
+    ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
+    buffer_pool_client_.reset(new BufferPool::ClientHandle);
+    tracker_.reset(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
+    BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool();
+    ASSERT_OK(buffer_pool->RegisterClient("", nullptr,
+        runtime_state_->instance_buffer_reservation(), tracker_.get(),
+        std::numeric_limits<int64>::max(), runtime_state_->runtime_profile(),
+        buffer_pool_client_.get()));
+  }
+
+  virtual void TearDown() {
+    for (BloomFilter* filter : bloom_filters_) filter->Close();
+    bloom_filters_.clear();
+    runtime_state_ = nullptr;
+    pool_.Clear();
+    test_env_->exec_env()->buffer_pool()->DeregisterClient(buffer_pool_client_.get());
+    buffer_pool_client_.reset();
+    tracker_.reset();
+    test_env_.reset();
+  }
+
+  BloomFilter* CreateBloomFilter(int log_bufferpool_space) {
+    int64_t filter_size = BloomFilter::GetExpectedMemoryUsed(log_bufferpool_space);
+    EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
+    BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
+    EXPECT_OK(bloom_filter->Init(log_bufferpool_space));
+    bloom_filters_.push_back(bloom_filter);
+    EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
+    return bloom_filter;
+  }
+
+  BloomFilter* CreateBloomFilter(TBloomFilter t_filter) {
+    int64_t filter_size =
+        BloomFilter::GetExpectedMemoryUsed(t_filter.log_bufferpool_space);
+    EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
+    BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
+    EXPECT_OK(bloom_filter->Init(t_filter));
+    bloom_filters_.push_back(bloom_filter);
+    EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
+    return bloom_filter;
+  }
+};
+
 // We can construct (and destruct) Bloom filters with different spaces.
-TEST(BloomFilter, Constructor) {
-  for (int i = 0; i < 30; ++i) {
-    BloomFilter bf(i);
+TEST_F(BloomFilterTest, Constructor) {
+  for (int i = 1; i < 30; ++i) {
+    CreateBloomFilter(i);
   }
 }
 
 // We can Insert() hashes into a Bloom filter with different spaces.
-TEST(BloomFilter, Insert) {
+TEST_F(BloomFilterTest, Insert) {
   srand(0);
   for (int i = 13; i < 17; ++i) {
-    BloomFilter bf(i);
+    BloomFilter* bf = CreateBloomFilter(i);
     for (int k = 0; k < (1 << 15); ++k) {
-      BfInsert(bf, MakeRand());
+      BfInsert(*bf, MakeRand());
     }
   }
 }
 
 // After Insert()ing something into a Bloom filter, it can be found again immediately.
-TEST(BloomFilter, Find) {
+TEST_F(BloomFilterTest, Find) {
   srand(0);
   for (int i = 13; i < 17; ++i) {
-    BloomFilter bf(i);
+    BloomFilter* bf = CreateBloomFilter(i);
     for (int k = 0; k < (1 << 15); ++k) {
       const uint64_t to_insert = MakeRand();
-      BfInsert(bf, to_insert);
-      EXPECT_TRUE(BfFind(bf, to_insert));
+      BfInsert(*bf, to_insert);
+      EXPECT_TRUE(BfFind(*bf, to_insert));
     }
   }
 }
 
 // After Insert()ing something into a Bloom filter, it can be found again much later.
-TEST(BloomFilter, CumulativeFind) {
+TEST_F(BloomFilterTest, CumulativeFind) {
   srand(0);
   for (int i = 5; i < 11; ++i) {
     std::vector<uint32_t> inserted;
-    BloomFilter bf(i);
+    BloomFilter* bf = CreateBloomFilter(i);
     for (int k = 0; k < (1 << 10); ++k) {
       const uint32_t to_insert = MakeRand();
       inserted.push_back(to_insert);
-      BfInsert(bf, to_insert);
+      BfInsert(*bf, to_insert);
       for (int n = 0; n < inserted.size(); ++n) {
-        EXPECT_TRUE(BfFind(bf, inserted[n]));
+        EXPECT_TRUE(BfFind(*bf, inserted[n]));
       }
     }
   }
@@ -134,7 +266,7 @@ TEST(BloomFilter, CumulativeFind) {
 
 // The empirical false positives we find when looking for random items is with a constant
 // factor of the false positive probability the Bloom filter was constructed for.
-TEST(BloomFilter, FindInvalid) {
+TEST_F(BloomFilterTest, FindInvalid) {
   srand(0);
   static const int find_limit = 1 << 20;
   unordered_set<uint32_t> to_find;
@@ -154,22 +286,23 @@ TEST(BloomFilter, FindInvalid) {
     for (int log_fpp = 4; log_fpp < 15; ++log_fpp) {
       double fpp = 1.0 / (1 << log_fpp);
       const size_t ndv = 1 << log_ndv;
-      const int log_heap_space = BloomFilter::MinLogSpace(ndv, fpp);
-      BloomFilter bf(log_heap_space);
+      const int log_bufferpool_space = BloomFilter::MinLogSpace(ndv, fpp);
+      BloomFilter* bf = CreateBloomFilter(log_bufferpool_space);
       // Fill up a BF with exactly as much ndv as we planned for it:
       for (size_t i = 0; i < ndv; ++i) {
-        BfInsert(bf, shuffled_insert[i]);
+        BfInsert(*bf, shuffled_insert[i]);
       }
       int found = 0;
       // Now we sample from the set of possible hashes, looking for hits.
       for (const auto& i : to_find) {
-        found += BfFind(bf, i);
+        found += BfFind(*bf, i);
       }
       EXPECT_LE(found, find_limit * fpp * 2)
           << "Too many false positives with -log2(fpp) = " << log_fpp;
       // Because the space is rounded up to a power of 2, we might actually get a lower
       // fpp than the one passed to MinLogSpace().
-      const double expected_fpp = BloomFilter::FalsePositiveProb(ndv, log_heap_space);
+      const double expected_fpp =
+          BloomFilter::FalsePositiveProb(ndv, log_bufferpool_space);
       EXPECT_GE(found, find_limit * expected_fpp)
           << "Too few false positives with -log2(fpp) = " << log_fpp;
       EXPECT_LE(found, find_limit * expected_fpp * 8)
@@ -178,118 +311,56 @@ TEST(BloomFilter, FindInvalid) {
   }
 }
 
-// Test that MaxNdv() and MinLogSpace() are dual
-TEST(BloomFilter, MinSpaceMaxNdv) {
-  for (int log2fpp = -2; log2fpp >= -63; --log2fpp) {
-    const double fpp = pow(2, log2fpp);
-    for (int given_log_space = 8; given_log_space < 30; ++given_log_space) {
-      const size_t derived_ndv = BloomFilter::MaxNdv(given_log_space, fpp);
-      int derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp);
-
-      EXPECT_EQ(derived_log_space, given_log_space) << "fpp: " << fpp
-                                                    << " derived_ndv: " << derived_ndv;
-
-      // If we lower the fpp, we need more space; if we raise it we need less.
-      derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp / 2);
-      EXPECT_GE(derived_log_space, given_log_space) << "fpp: " << fpp
-                                                    << " derived_ndv: " << derived_ndv;
-      derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp * 2);
-      EXPECT_LE(derived_log_space, given_log_space) << "fpp: " << fpp
-                                                    << " derived_ndv: " << derived_ndv;
-    }
-    for (size_t given_ndv = 1000; given_ndv < 1000 * 1000; given_ndv *= 3) {
-      const int derived_log_space = BloomFilter::MinLogSpace(given_ndv, fpp);
-      const size_t derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp);
-
-      // The max ndv is close to, but larger than, then ndv we asked for
-      EXPECT_LE(given_ndv, derived_ndv) << "fpp: " << fpp
-                                        << " derived_log_space: " << derived_log_space;
-      EXPECT_GE(given_ndv * 2, derived_ndv)
-          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
-
-      // Changing the fpp changes the ndv capacity in the expected direction.
-      size_t new_derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp / 2);
-      EXPECT_GE(derived_ndv, new_derived_ndv)
-          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
-      new_derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp * 2);
-      EXPECT_LE(derived_ndv, new_derived_ndv)
-          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
-    }
-  }
-}
-
-TEST(BloomFilter, MinSpaceEdgeCase) {
-  int min_space = BloomFilter::MinLogSpace(1, 0.75);
-  EXPECT_GE(min_space, 0) << "LogSpace should always be >= 0";
-}
-
-// Check that MinLogSpace() and FalsePositiveProb() are dual
-TEST(BloomFilter, MinSpaceForFpp) {
-  for (size_t ndv = 10000; ndv < 100 * 1000 * 1000; ndv *= 1.01) {
-    for (double fpp = 0.1; fpp > pow(2, -20); fpp *= 0.99) { // NOLINT: loop on double
-      // When contructing a Bloom filter, we can request a particular fpp by calling
-      // MinLogSpace().
-      const int min_log_space = BloomFilter::MinLogSpace(ndv, fpp);
-      // However, at the resulting ndv and space, the expected fpp might be lower than
-      // the one that was requested.
-      double expected_fpp = BloomFilter::FalsePositiveProb(ndv, min_log_space);
-      EXPECT_LE(expected_fpp, fpp);
-      // The fpp we get might be much lower than the one we asked for. However, if the
-      // space were just one size smaller, the fpp we get would be larger than the one we
-      // asked for.
-      expected_fpp = BloomFilter::FalsePositiveProb(ndv, min_log_space - 1);
-      EXPECT_GE(expected_fpp, fpp);
-      // Therefore, the return value of MinLogSpace() is actually the minimum
-      // log space at which we can guarantee the requested fpp.
-    }
-  }
-}
-
-TEST(BloomFilter, Thrift) {
-  BloomFilter bf(BloomFilter::MinLogSpace(100, 0.01));
-  for (int i = 0; i < 10; ++i) BfInsert(bf, i);
+TEST_F(BloomFilterTest, Thrift) {
+  BloomFilter* bf = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
+  for (int i = 0; i < 10; ++i) BfInsert(*bf, i);
   // Check no unexpected new false positives.
   unordered_set<int> missing_ints;
   for (int i = 11; i < 100; ++i) {
-    if (!BfFind(bf, i)) missing_ints.insert(i);
+    if (!BfFind(*bf, i)) missing_ints.insert(i);
   }
 
   TBloomFilter to_thrift;
-  BloomFilter::ToThrift(&bf, &to_thrift);
+  BloomFilter::ToThrift(bf, &to_thrift);
   EXPECT_EQ(to_thrift.always_true, false);
 
-  BloomFilter from_thrift(to_thrift);
-  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(from_thrift, i));
-  for (int missing: missing_ints) ASSERT_FALSE(BfFind(from_thrift, missing));
+  BloomFilter* from_thrift = CreateBloomFilter(to_thrift);
+  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_thrift, i));
+  for (int missing: missing_ints) ASSERT_FALSE(BfFind(*from_thrift, missing));
 
   BloomFilter::ToThrift(NULL, &to_thrift);
   EXPECT_EQ(to_thrift.always_true, true);
 }
 
-TEST(BloomFilter, ThriftOr) {
-  BloomFilter bf1(BloomFilter::MinLogSpace(100, 0.01));
-  BloomFilter bf2(BloomFilter::MinLogSpace(100, 0.01));
+TEST_F(BloomFilterTest, ThriftOr) {
+  BloomFilter* bf1 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
+  BloomFilter* bf2 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
 
-  for (int i = 60; i < 80; ++i) BfInsert(bf2, i);
-  for (int i = 0; i < 10; ++i) BfInsert(bf1, i);
+  for (int i = 60; i < 80; ++i) BfInsert(*bf2, i);
+  for (int i = 0; i < 10; ++i) BfInsert(*bf1, i);
 
   bool success;
-  BloomFilter bf3(BfUnion(bf1, bf2, &success));
+  BloomFilter *bf3 = CreateBloomFilter(BfUnion(*bf1, *bf2, &success));
   ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
-  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(bf3, i)) << i;
-  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(bf3, i)) << i;
+  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
+  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
 
   // Insert another value to aggregated BloomFilter.
-  for (int i = 11; i < 50; ++i) BfInsert(bf3, i);
+  for (int i = 11; i < 50; ++i) BfInsert(*bf3, i);
 
   // Apply TBloomFilter back to BloomFilter and verify if aggregation was correct.
-  BloomFilter bf4(BfUnion(bf1, bf3, &success));
+  BloomFilter *bf4 = CreateBloomFilter(BfUnion(*bf1, *bf3, &success));
   ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
-  for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(bf4, i)) << i;
-  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(bf4, i)) << i;
-  ASSERT_FALSE(BfFind(bf4, 81));
+  for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
+  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
+  ASSERT_FALSE(BfFind(*bf4, 81));
 }
 
 }  // namespace impala
 
-IMPALA_TEST_MAIN();
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/util/bloom-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index f8ce625..0a2f8fc 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -17,6 +17,7 @@
 
 #include "util/bloom-filter.h"
 
+#include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
 
 using namespace std;
@@ -25,47 +26,55 @@ namespace impala {
 
 constexpr uint32_t BloomFilter::REHASH[8] __attribute__((aligned(32)));
 
-BloomFilter::BloomFilter(const int log_heap_space)
-  : always_false_(true),
-    // Since log_heap_space is in bytes, we need to convert it to the number of tiny Bloom
-    // filters we will use.
-    log_num_buckets_(std::max(1, log_heap_space - LOG_BUCKET_BYTE_SIZE)),
-    // Don't use log_num_buckets_ if it will lead to undefined behavior by a shift
-    // that is too large.
-    directory_mask_((1ull << std::min(63, log_num_buckets_)) - 1),
-    directory_(NULL) {
+BloomFilter::BloomFilter(BufferPool::ClientHandle* client)
+  : buffer_pool_client_(client) {}
+
+BloomFilter::~BloomFilter() {
+  DCHECK(directory_ == nullptr)
+      << "Close() should have been called before the object is destroyed.";
+}
+
+Status BloomFilter::Init(const int log_bufferpool_space) {
+  // Since log_bufferpool_space is in bytes, we need to convert it to the number of tiny
+  // Bloom filters we will use.
+  log_num_buckets_ = std::max(1, log_bufferpool_space - LOG_BUCKET_BYTE_SIZE);
+  // Don't use log_num_buckets_ if it will lead to undefined behavior by a shift
+  // that is too large.
+  directory_mask_ = (1ull << std::min(63, log_num_buckets_)) - 1;
   // Since we use 32 bits in the arguments of Insert() and Find(), log_num_buckets_
   // must be limited.
-  DCHECK(log_num_buckets_ <= 32)
-      << "Bloom filter too large. log_heap_space: " << log_heap_space;
+  DCHECK(log_num_buckets_ <= 32) << "Bloom filter too large. log_bufferpool_space: "
+                                 << log_bufferpool_space;
   const size_t alloc_size = directory_size();
-  const int malloc_failed =
-      posix_memalign(reinterpret_cast<void**>(&directory_), 64, alloc_size);
-  DCHECK_EQ(malloc_failed, 0) << "Malloc failed. log_heap_space: " << log_heap_space
-                              << " log_num_buckets_: " << log_num_buckets_
-                              << " alloc_size: " << alloc_size;
-  DCHECK(directory_ != nullptr);
+  BufferPool* buffer_pool_ = ExecEnv::GetInstance()->buffer_pool();
+  Close(); // Ensure that any previously allocated memory for directory_ is released.
+  RETURN_IF_ERROR(
+      buffer_pool_->AllocateBuffer(buffer_pool_client_, alloc_size, &buffer_handle_));
+  directory_ = reinterpret_cast<Bucket*>(buffer_handle_.data());
   memset(directory_, 0, alloc_size);
+  return Status::OK();
 }
 
-BloomFilter::BloomFilter(const TBloomFilter& thrift)
-    : BloomFilter(thrift.log_heap_space) {
-  if (!thrift.always_false) {
+Status BloomFilter::Init(const TBloomFilter& thrift) {
+  RETURN_IF_ERROR(Init(thrift.log_bufferpool_space));
+  if (directory_ != nullptr && !thrift.always_false) {
     always_false_ = false;
     DCHECK_EQ(thrift.directory.size(), directory_size());
     memcpy(directory_, &thrift.directory[0], thrift.directory.size());
   }
+  return Status::OK();
 }
 
-BloomFilter::~BloomFilter() {
-  if (directory_) {
-    free(directory_);
-    directory_ = NULL;
+void BloomFilter::Close() {
+  if (directory_ != nullptr) {
+    BufferPool* buffer_pool_ = ExecEnv::GetInstance()->buffer_pool();
+    buffer_pool_->FreeBuffer(buffer_pool_client_, &buffer_handle_);
+    directory_ = nullptr;
   }
 }
 
 void BloomFilter::ToThrift(TBloomFilter* thrift) const {
-  thrift->log_heap_space = log_num_buckets_ + LOG_BUCKET_BYTE_SIZE;
+  thrift->log_bufferpool_space = log_num_buckets_ + LOG_BUCKET_BYTE_SIZE;
   if (always_false_) {
     thrift->always_false = true;
     thrift->always_true = false;
@@ -184,9 +193,9 @@ void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
   DCHECK(!out->always_true);
   DCHECK(!in.always_true);
   if (in.always_false) return;
-  DCHECK_EQ(in.log_heap_space, out->log_heap_space);
+  DCHECK_EQ(in.log_bufferpool_space, out->log_bufferpool_space);
   DCHECK_EQ(in.directory.size(), out->directory.size())
-      << "Equal log heap space " << in.log_heap_space
+      << "Equal log heap space " << in.log_bufferpool_space
       << ", but different directory sizes: " << in.directory.size() << ", "
       << out->directory.size();
   // The trivial loop out[i] |= in[i] should auto-vectorize with gcc at -O3, but it is not
@@ -220,11 +229,11 @@ void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
 //
 // where space is in bits.
 
-size_t BloomFilter::MaxNdv(const int log_heap_space, const double fpp) {
-  DCHECK(log_heap_space > 0 && log_heap_space < 61);
+size_t BloomFilter::MaxNdv(const int log_bufferpool_space, const double fpp) {
+  DCHECK(log_bufferpool_space > 0 && log_bufferpool_space < 61);
   DCHECK(0 < fpp && fpp < 1);
   static const double ik = 1.0 / BUCKET_WORDS;
-  return -1 * ik * (1ull << (log_heap_space + 3)) * log(1 - pow(fpp, ik));
+  return -1 * ik * (1ull << (log_bufferpool_space + 3)) * log(1 - pow(fpp, ik));
 }
 
 int BloomFilter::MinLogSpace(const size_t ndv, const double fpp) {
@@ -237,9 +246,9 @@ int BloomFilter::MinLogSpace(const size_t ndv, const double fpp) {
   return max(0, static_cast<int>(ceil(log2(m / 8))));
 }
 
-double BloomFilter::FalsePositiveProb(const size_t ndv, const int log_heap_space) {
+double BloomFilter::FalsePositiveProb(const size_t ndv, const int log_bufferpool_space) {
   return pow(1 - exp((-1.0 * static_cast<double>(BUCKET_WORDS) * static_cast<double>(ndv))
-                     / static_cast<double>(1ull << (log_heap_space + 3))),
+                     / static_cast<double>(1ull << (log_bufferpool_space + 3))),
       BUCKET_WORDS);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/be/src/util/bloom-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 9628402..73cb01e 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -28,6 +28,7 @@
 #include "common/compiler-util.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/macros.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "util/cpu-info.h"
 #include "util/hash-util.h"
 
@@ -41,7 +42,7 @@ namespace impala {
 /// When talking about Bloom filter size, rather than talking about 'size', which might be
 /// ambiguous, we distinguish two different quantities:
 ///
-/// 1. Space: the amount of heap memory used
+/// 1. Space: the amount of buffer pool memory used
 ///
 /// 2. NDV: the number of unique items that have been inserted
 ///
@@ -57,11 +58,19 @@ namespace impala {
 /// so LLVM will not generate exception related code at their call sites.
 class BloomFilter {
  public:
-  /// Consumes at most (1 << log_heap_space) bytes on the heap.
-  explicit BloomFilter(const int log_heap_space);
-  explicit BloomFilter(const TBloomFilter& thrift);
+  /// Consumes at most (1 << log_bufferpool_space) bytes from the buffer pool client.
+  /// 'client' should be a valid registered BufferPool Client and should have enough
+  /// reservation to fulfill allocation for 'directory_'.
+  explicit BloomFilter(BufferPool::ClientHandle* client);
   ~BloomFilter();
 
+  /// Reset the filter state, allocate/reallocate and initialize the 'directory_'. All
+  /// calls to Insert() and Find() should only be done between the calls to Init() and
+  /// Close().Init and Close are safe to call multiple times.
+  Status Init(const int log_bufferpool_space);
+  Status Init(const TBloomFilter& thrift);
+  void Close();
+
   /// Representation of a filter which allows all elements to pass.
   static constexpr BloomFilter* const ALWAYS_TRUE_FILTER = NULL;
 
@@ -87,29 +96,32 @@ class BloomFilter {
 
   /// As more distinct items are inserted into a BloomFilter, the false positive rate
   /// rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter
-  /// constructed with (1 << log_heap_space) bytes of heap space hits false positive
+  /// constructed with (1 << log_bufferpool_space) bytes of heap space hits false positive
   /// probabilty fpp.
-  static size_t MaxNdv(const int log_heap_space, const double fpp);
+  static size_t MaxNdv(const int log_bufferpool_space, const double fpp);
 
   /// If we expect to fill a Bloom filter with 'ndv' different unique elements and we
   /// want a false positive probabilty of less than 'fpp', then this is the log (base 2)
   /// of the minimum number of bytes we need.
   static int MinLogSpace(const size_t ndv, const double fpp);
 
-  /// Returns the expected false positive rate for the given ndv and log_heap_space
-  static double FalsePositiveProb(const size_t ndv, const int log_heap_space);
+  /// Returns the expected false positive rate for the given ndv and log_bufferpool_space
+  static double FalsePositiveProb(const size_t ndv, const int log_bufferpool_space);
 
-  /// Returns amount of heap space used, in bytes
-  int64_t GetHeapSpaceUsed() const { return sizeof(Bucket) * (1LL << log_num_buckets_); }
+  /// Returns the amount of buffer pool space used (in bytes). A value of -1 means that
+  /// 'directory_' has not been allocated which can happen if the object was just created
+  /// and Init() hasn't been called or Init() failed or Close() was called on the object.
+  int64_t GetBufferPoolSpaceUsed() const {
+    return directory_ == nullptr ? -1 : sizeof(Bucket) * (1LL << log_num_buckets_);
+  }
 
-  static int64_t GetExpectedHeapSpaceUsed(uint32_t log_heap_size) {
-    DCHECK_GE(log_heap_size, LOG_BUCKET_WORD_BITS);
-    return sizeof(Bucket) * (1LL << (log_heap_size - LOG_BUCKET_WORD_BITS));
+  static int64_t GetExpectedMemoryUsed(int log_heap_size) {
+    return sizeof(Bucket) * (1LL << std::max(1, log_heap_size - LOG_BUCKET_WORD_BITS));
   }
 
  private:
   // always_false_ is true when the bloom filter hasn't had any elements inserted.
-  bool always_false_;
+  bool always_false_ = true;
 
   /// The BloomFilter is divided up into Buckets
   static const uint64_t BUCKET_WORDS = 8;
@@ -128,13 +140,18 @@ class BloomFilter {
   typedef BucketWord Bucket[BUCKET_WORDS];
 
   /// log_num_buckets_ is the log (base 2) of the number of buckets in the directory.
-  const int log_num_buckets_;
+  int log_num_buckets_ = 0;
 
   /// directory_mask_ is (1 << log_num_buckets_) - 1. It is precomputed for
   /// efficiency reasons.
-  const uint32_t directory_mask_;
+  uint32_t directory_mask_ = 0;
+
+  Bucket* directory_ = nullptr;
 
-  Bucket* directory_;
+  /// Bufferpool client and handle used for allocating and freeing directory memory.
+  /// Client is not owned by the filter.
+  BufferPool::ClientHandle* buffer_pool_client_;
+  BufferPool::BufferHandle buffer_handle_;
 
   // Same as Insert(), but skips the CPU check and assumes that AVX is not available.
   void InsertNoAvx2(const uint32_t hash) noexcept;
@@ -168,7 +185,7 @@ class BloomFilter {
   /// Serializes this filter as Thrift.
   void ToThrift(TBloomFilter* thrift) const;
 
-  /// Some constants used in hashing. #defined for efficiency reasons.
+/// Some constants used in hashing. #defined for efficiency reasons.
 #define IMPALA_BLOOM_HASH_CONSTANTS                                             \
   0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU, 0x705495c7U, 0x2df1424bU, \
       0x9efc4947U, 0x5c6bfb31U
@@ -188,6 +205,7 @@ class BloomFilter {
 // a split Bloom filter, but log2(256) * 8 = 64 random bits for a standard Bloom filter.
 
 inline void ALWAYS_INLINE BloomFilter::Insert(const uint32_t hash) noexcept {
+  DCHECK(directory_ != nullptr);
   always_false_ = false;
   const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
   if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
@@ -199,6 +217,7 @@ inline void ALWAYS_INLINE BloomFilter::Insert(const uint32_t hash) noexcept {
 
 inline bool ALWAYS_INLINE BloomFilter::Find(const uint32_t hash) const noexcept {
   if (always_false_) return false;
+  DCHECK(directory_ != nullptr);
   const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
   if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
     return BucketFindAVX2(bucket_idx, hash);
@@ -207,6 +226,6 @@ inline bool ALWAYS_INLINE BloomFilter::Find(const uint32_t hash) const noexcept
   }
 }
 
-}  // namespace impala
+} // namespace impala
 
-#endif  // IMPALA_UTIL_BLOOM_H
+#endif // IMPALA_UTIL_BLOOM_H

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 36c4a7e..412ca06 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -69,4 +69,8 @@ struct TBackendGflags {
   21: required i32 max_nonhdfs_partitions_parallel_load
 
   22: required TReservedWordsVersion reserved_words_version
+
+  23: required double max_filter_error_rate
+
+  24: required i64 min_buffer_size
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index ae2eee6..613e4b0 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -756,17 +756,17 @@ struct TPoolConfig {
 }
 
 struct TBloomFilter {
-  // Log_2 of the heap space required for this filter. See BloomFilter::BloomFilter() for
-  // details.
-  1: required i32 log_heap_space
+  // Log_2 of the bufferpool space required for this filter.
+  // See BloomFilter::BloomFilter() for details.
+  1: required i32 log_bufferpool_space
 
   // List of buckets representing the Bloom Filter contents, laid out contiguously in one
   // string for efficiency of (de)serialisation. See BloomFilter::Bucket and
   // BloomFilter::directory_.
   2: binary directory
 
-  // If always_true or always_false is true, 'directory' and 'log_heap_space' are not
-  // meaningful.
+  // If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
+  // not meaningful.
   3: required bool always_true
   4: required bool always_false
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index fedca3c..c5df1cd 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -145,6 +145,10 @@ struct TRuntimeFilterDesc {
 
   // The type of runtime filter to build.
   10: required TRuntimeFilterType type
+
+  // The size of the filter based on the ndv estimate and the min/max limit specified in
+  // the query options. Should be greater than zero for bloom filters, zero otherwise.
+  11: optional i64 filter_size_bytes
 }
 
 // The information contained in subclasses of ScanNode captured in two separate

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index 9c1c8e7..74256c3 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -70,6 +70,10 @@ struct TPlanFragment {
   // sink) in a single instance of this fragment. This is used for an optimization in
   // InitialReservation. Measured in bytes. required in V1
   8: optional i64 initial_reservation_total_claims
+
+  // The total memory (in bytes) required for the runtime filters used by the plan nodes
+  // managed by this fragment. Is included in min_reservation_bytes.
+  9: optional i64 runtime_filters_reservation_bytes
 }
 
 // location information for a single scan range

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 3b3ace7..775219f 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.planner;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -29,6 +30,7 @@ import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.planner.PlanNode.ExecPhaseResourceProfiles;
+import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPartitionType;
 import org.apache.impala.thrift.TPlanFragment;
@@ -113,6 +115,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   // of this fragment. Computed in computeResourceProfile().
   private long initialReservationTotalClaims_ = -1;
 
+  // The total memory (in bytes) required for the runtime filters used by the plan nodes
+  // managed by this fragment.
+  private long runtimeFiltersReservationBytes_ = 0;
+
   /**
    * C'tor for fragment with specific partition; the output is by default broadcast.
    */
@@ -219,13 +225,23 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   /**
    * Compute the peak resource profile for an instance of this fragment. Must
    * be called after all the plan nodes and sinks are added to the fragment and resource
-   * profiles of all children fragments are computed.
+   * profiles of all children fragments are computed. Also accounts for the memory used by
+   * runtime filters that are stored at the fragment level.
    */
   public void computeResourceProfile(Analyzer analyzer) {
     // Compute resource profiles for all plan nodes and sinks in the fragment.
     sink_.computeResourceProfile(analyzer.getQueryOptions());
+    HashSet<RuntimeFilterId> filterSet = Sets.newHashSet();
     for (PlanNode node: collectPlanNodes()) {
       node.computeNodeResourceProfile(analyzer.getQueryOptions());
+      for (RuntimeFilter filter: node.getRuntimeFilters()) {
+        // A filter can be a part of both produced and consumed filters in a fragment,
+        // so only add it once.
+        if (!filterSet.contains(filter.getFilterId())) {
+          filterSet.add(filter.getFilterId());
+          runtimeFiltersReservationBytes_ += filter.getFilterSize();
+        }
+      }
     }
 
     if (sink_ instanceof JoinBuildSink) {
@@ -241,10 +257,12 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     // The sink is opened after the plan tree.
     ResourceProfile fInstancePostOpenProfile =
         planTreeProfile.postOpenProfile.sum(sink_.getResourceProfile());
-    resourceProfile_ =
-        planTreeProfile.duringOpenProfile.max(fInstancePostOpenProfile);
-
-    initialReservationTotalClaims_ = sink_.getResourceProfile().getMinReservationBytes();
+    resourceProfile_ = new ResourceProfileBuilder()
+        .setMemEstimateBytes(runtimeFiltersReservationBytes_)
+        .setMinReservationBytes(runtimeFiltersReservationBytes_).build()
+        .sum(planTreeProfile.duringOpenProfile.max(fInstancePostOpenProfile));
+    initialReservationTotalClaims_ = sink_.getResourceProfile().getMinReservationBytes() +
+        runtimeFiltersReservationBytes_;
     for (PlanNode node: collectPlanNodes()) {
       initialReservationTotalClaims_ +=
           node.getNodeResourceProfile().getMinReservationBytes();
@@ -322,9 +340,11 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       Preconditions.checkArgument(initialReservationTotalClaims_ > -1);
       result.setMin_reservation_bytes(resourceProfile_.getMinReservationBytes());
       result.setInitial_reservation_total_claims(initialReservationTotalClaims_);
+      result.setRuntime_filters_reservation_bytes(runtimeFiltersReservationBytes_);
     } else {
       result.setMin_reservation_bytes(0);
       result.setInitial_reservation_total_claims(0);
+      result.setRuntime_filters_reservation_bytes(0);
     }
     return result;
   }
@@ -408,6 +428,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     } else {
       builder.append(resourceProfile_.multiply(getNumInstancesPerHost(mt_dop))
           .getExplainString());
+      if (resourceProfile_.isValid() && runtimeFiltersReservationBytes_ > 0) {
+        builder.append(" runtime-filters-memory=");
+        builder.append(PrintUtils.printBytes(runtimeFiltersReservationBytes_));
+      }
     }
     builder.append("\n");
     return builder.toString();

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index d295bf5..5368b5f 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -44,10 +44,14 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.IdGenerator;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterDesc;
 import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TRuntimeFilterTargetDesc;
 import org.apache.impala.thrift.TRuntimeFilterType;
+import org.apache.impala.util.BitUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +70,13 @@ import com.google.common.collect.Sets;
  * Runtime filters are generated from equi-join predicates but they do not replace the
  * original predicates.
  *
+ * MinMax filters are of a fixed size (except for those used for string type) and
+ * therefore only sizes for bloom filters need to be calculated. These calculations are
+ * based on the NDV estimates of the associated table columns, the min buffer size that
+ * can be allocated by the bufferpool, and the query options. Moreover, it is also bound
+ * by the MIN/MAX_BLOOM_FILTER_SIZE limits which are enforced on the query options before
+ * this phase of planning.
+ *
  * Example: select * from T1, T2 where T1.a = T2.b and T2.c = '1';
  * Assuming that T1 is a fact table and T2 is a significantly smaller dimension table, a
  * runtime filter is constructed at the join node between tables T1 and T2 while building
@@ -81,6 +92,10 @@ public final class RuntimeFilterGenerator {
   private final static Logger LOG =
       LoggerFactory.getLogger(RuntimeFilterGenerator.class);
 
+  // Should be in sync with corresponding values in runtime-filter-bank.cc.
+  private static final long MIN_BLOOM_FILTER_SIZE = 4 * 1024;
+  private static final long MAX_BLOOM_FILTER_SIZE = 512 * 1024 * 1024;
+
   // Map of base table tuple ids to a list of runtime filters that
   // can be applied at the corresponding scan nodes.
   private final Map<TupleId, List<RuntimeFilter>> runtimeFiltersByTid_ =
@@ -90,7 +105,44 @@ public final class RuntimeFilterGenerator {
   private final IdGenerator<RuntimeFilterId> filterIdGenerator =
       RuntimeFilterId.createGenerator();
 
-  private RuntimeFilterGenerator() {};
+  /**
+   * Internal class that encapsulates the max, min and default sizes used for creating
+   * bloom filter objects.
+   */
+  private class FilterSizeLimits {
+    // Maximum filter size, in bytes, rounded up to a power of two.
+    public final long maxVal;
+
+    // Minimum filter size, in bytes, rounded up to a power of two.
+    public final long minVal;
+
+    // Pre-computed default filter size, in bytes, rounded up to a power of two.
+    public final long defaultVal;
+
+    public FilterSizeLimits(TQueryOptions tQueryOptions) {
+      // Round up all limits to a power of two and make sure filter size is more
+      // than the min buffer size that can be allocated by the buffer pool.
+      long maxLimit = tQueryOptions.getRuntime_filter_max_size();
+      long minBufferSize = BackendConfig.INSTANCE.getMinBufferSize();
+      maxVal = BitUtil.roundUpToPowerOf2(Math.max(maxLimit, minBufferSize));
+
+      long minLimit = tQueryOptions.getRuntime_filter_min_size();
+      minLimit = Math.max(minLimit, minBufferSize);
+      // Make sure minVal <= defaultVal <= maxVal
+      minVal = BitUtil.roundUpToPowerOf2(Math.min(minLimit, maxVal));
+
+      long defaultValue = tQueryOptions.getRuntime_bloom_filter_size();
+      defaultValue = Math.max(defaultValue, minVal);
+      defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultValue, maxVal));
+    }
+  };
+
+  // Contains size limits for bloom filters.
+  private FilterSizeLimits bloomFilterSizeLimits_;
+
+  private RuntimeFilterGenerator(TQueryOptions tQueryOptions) {
+    bloomFilterSizeLimits_ = new FilterSizeLimits(tQueryOptions);
+  };
 
   /**
    * Internal representation of a runtime filter. A runtime filter is generated from
@@ -125,6 +177,8 @@ public final class RuntimeFilterGenerator {
     // for the filter. A value of -1 means no estimate is available, and default filter
     // parameters should be used.
     private long ndvEstimate_ = -1;
+    // Size of the filter (in Bytes). Should be greater than zero for bloom filters.
+    private long filterSizeBytes_ = 0;
     // If true, the filter is produced by a broadcast join and there is at least one
     // destination scan node which is in the same fragment as the join; set in
     // DistributedPlanner.createHashJoinFragment().
@@ -196,7 +250,7 @@ public final class RuntimeFilterGenerator {
 
     private RuntimeFilter(RuntimeFilterId filterId, JoinNode filterSrcNode, Expr srcExpr,
         Expr origTargetExpr, Operator exprCmpOp, Map<TupleId, List<SlotId>> targetSlots,
-        TRuntimeFilterType type) {
+        TRuntimeFilterType type, FilterSizeLimits filterSizeLimits) {
       id_ = filterId;
       src_ = filterSrcNode;
       srcExpr_ = srcExpr;
@@ -205,6 +259,7 @@ public final class RuntimeFilterGenerator {
       targetSlotsByTid_ = targetSlots;
       type_ = type;
       computeNdvEstimate();
+      calculateFilterSize(filterSizeLimits);
     }
 
     @Override
@@ -240,6 +295,7 @@ public final class RuntimeFilterGenerator {
       }
       tFilter.setApplied_on_partition_columns(appliedOnPartitionColumns);
       tFilter.setType(type_);
+      tFilter.setFilter_size_bytes(filterSizeBytes_);
       return tFilter;
     }
 
@@ -250,7 +306,7 @@ public final class RuntimeFilterGenerator {
      */
     public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen,
         Analyzer analyzer, Expr joinPredicate, JoinNode filterSrcNode,
-        TRuntimeFilterType type) {
+        TRuntimeFilterType type, FilterSizeLimits filterSizeLimits) {
       Preconditions.checkNotNull(idGen);
       Preconditions.checkNotNull(joinPredicate);
       Preconditions.checkNotNull(filterSrcNode);
@@ -277,7 +333,7 @@ public final class RuntimeFilterGenerator {
         LOG.trace("Generating runtime filter from predicate " + joinPredicate);
       }
       return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, targetExpr,
-          normalizedJoinConjunct.getOp(), targetSlots, type);
+          normalizedJoinConjunct.getOp(), targetSlots, type, filterSizeLimits);
     }
 
     /**
@@ -383,6 +439,7 @@ public final class RuntimeFilterGenerator {
     public RuntimeFilterId getFilterId() { return id_; }
     public TRuntimeFilterType getType() { return type_; }
     public Operator getExprCompOp() { return exprCmpOp_; }
+    public long getFilterSize() { return filterSizeBytes_; }
 
     /**
      * Estimates the selectivity of a runtime filter as the cardinality of the
@@ -415,6 +472,25 @@ public final class RuntimeFilterGenerator {
     }
 
     /**
+     * Sets the filter size (in bytes) required for a bloom filter to achieve the
+     * configured maximum false-positive rate based on the expected NDV. Also bounds the
+     * filter size between the max and minimum filter sizes supplied to it by
+     * 'filterSizeLimits'.
+     */
+    private void calculateFilterSize(FilterSizeLimits filterSizeLimits) {
+      if (type_ == TRuntimeFilterType.MIN_MAX) return;
+      if (ndvEstimate_ == -1) {
+        filterSizeBytes_ = filterSizeLimits.defaultVal;
+        return;
+      }
+      double fpp = BackendConfig.INSTANCE.getMaxFilterErrorRate();
+      int logFilterSize = FeSupport.GetMinLogSpaceForBloomFilter(ndvEstimate_, fpp);
+      filterSizeBytes_ = 1L << logFilterSize;
+      filterSizeBytes_ = Math.max(filterSizeBytes_, filterSizeLimits.minVal);
+      filterSizeBytes_ = Math.min(filterSizeBytes_, filterSizeLimits.maxVal);
+    }
+
+    /**
      * Assigns this runtime filter to the corresponding plan nodes.
      */
     public void assignToPlanNodes() {
@@ -442,7 +518,8 @@ public final class RuntimeFilterGenerator {
     Preconditions.checkNotNull(ctx.getQueryOptions());
     int maxNumBloomFilters = ctx.getQueryOptions().getMax_num_runtime_filters();
     Preconditions.checkState(maxNumBloomFilters >= 0);
-    RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator();
+    RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator(
+        ctx.getQueryOptions());
     filterGenerator.generateFilters(ctx, plan);
     List<RuntimeFilter> filters = Lists.newArrayList(filterGenerator.getRuntimeFilters());
     if (filters.size() > maxNumBloomFilters) {
@@ -516,8 +593,8 @@ public final class RuntimeFilterGenerator {
       List<RuntimeFilter> filters = Lists.newArrayList();
       for (TRuntimeFilterType type : TRuntimeFilterType.values()) {
         for (Expr conjunct : joinConjuncts) {
-          RuntimeFilter filter = RuntimeFilter.create(
-              filterIdGenerator, ctx.getRootAnalyzer(), conjunct, joinNode, type);
+          RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator,
+              ctx.getRootAnalyzer(), conjunct, joinNode, type, bloomFilterSizeLimits_);
           if (filter == null) continue;
           registerRuntimeFilter(filter);
           filters.add(filter);

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 48d417a..3833094 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -77,6 +77,10 @@ public class BackendConfig {
     return backendCfg_.max_nonhdfs_partitions_parallel_load;
   }
 
+  public double getMaxFilterErrorRate() { return backendCfg_.max_filter_error_rate; }
+
+  public long getMinBufferSize() { return backendCfg_.min_buffer_size; }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/fe/src/main/java/org/apache/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index b471448..5bc1d87 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -114,6 +114,8 @@ public class FeSupport {
   public native static byte[] NativeParseQueryOptions(String csvQueryOptions,
       byte[] queryOptions);
 
+  public native static int MinLogSpaceForBloomFilter(long ndv, double fpp);
+
   /**
    * Locally caches the jar at the specified HDFS location.
    *
@@ -324,6 +326,20 @@ public class FeSupport {
   }
 
   /**
+   * Returns the log (base 2) of the minimum number of bytes we need for a Bloom
+   * filter with 'ndv' unique elements and a false positive probability of less
+   * than 'fpp'.
+   */
+  public static int GetMinLogSpaceForBloomFilter(long ndv, double fpp) {
+    try {
+      return MinLogSpaceForBloomFilter(ndv, fpp);
+    } catch (UnsatisfiedLinkError e) {
+      loadLibrary();
+    }
+    return MinLogSpaceForBloomFilter(ndv, fpp);
+  }
+
+  /**
    * This function should only be called explicitly by the FeSupport to ensure that
    * native functions are loaded.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/8fc1eccc/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
index 7b4afb8..0f4a5da 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
@@ -56,7 +56,7 @@ PLAN-ROOT SINK
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=174.39KB
 ====
 # > 3000 rows returned to coordinator: codegen should be enabled
 select * from functional_parquet.alltypes
@@ -71,15 +71,15 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=174.39KB
 ====
 # Optimisation is enabled for join producing < 3000 rows
 select count(*)
 from functional.alltypes t1
 join functional.alltypestiny t2 on t1.id = t2.id
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=1.94MB
-Per-Host Resource Estimates: Memory=181.94MB
+Max Per-Host Resource Reservation: Memory=2.94MB
+Per-Host Resource Estimates: Memory=182.94MB
 Codegen disabled by planner
 
 PLAN-ROOT SINK