You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/14 01:09:04 UTC

[1/6] impala git commit: IMPALA-6489: use correct template tuple size

Repository: impala
Updated Branches:
  refs/heads/2.x c3e993327 -> 3d7d8209e


IMPALA-6489: use correct template tuple size

The bug was that we mixed up the size of the top-level tuple with the
size of the nested tuple. The upshot in this case was that the wrong
amount of data was memcpy()ed over and we read past the bounds of the
original allocation.

Testing:
TestParquetArrayEncodings.test_avro_primitive_in_list reliably
reproduced the problem under ASAN. After the fix it not longer
reproduces.

Change-Id: I8193c04673f15e5057f457cc8a3a91a8fef64be2
Reviewed-on: http://gerrit.cloudera.org:8080/9288
Reviewed-by: Alex Behm <al...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3660122c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3660122c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3660122c

Branch: refs/heads/2.x
Commit: 3660122ca2911e187bc4e957f7d3cefdfec9f2f5
Parents: c3e9933
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Feb 12 14:23:17 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 13 04:10:14 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scanner.h | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3660122c/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index e3c186f..6497457 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -428,7 +428,7 @@ class HdfsScanner {
   void IR_ALWAYS_INLINE InitTuple(
       const TupleDescriptor* desc, Tuple* template_tuple, Tuple* tuple) {
     if (has_template_tuple(template_tuple)) {
-      InitTupleFromTemplate(template_tuple, tuple, tuple_byte_size());
+      InitTupleFromTemplate(template_tuple, tuple, tuple_byte_size(*desc));
     } else {
       tuple->ClearNullBits(desc->null_bytes_offset(), desc->num_null_bytes());
     }
@@ -479,6 +479,9 @@ class HdfsScanner {
 
   /// Not inlined in IR so it can be replaced with a constant.
   int IR_NO_INLINE tuple_byte_size() const { return tuple_byte_size_; }
+  int IR_NO_INLINE tuple_byte_size(const TupleDescriptor& desc) const {
+    return desc.byte_size();
+  }
 
   /// Returns true iff 'template_tuple' is non-NULL.
   /// Not inlined in IR so it can be replaced with a constant.


[5/6] impala git commit: IMPALA-5519: Allocate fragment's runtime filter memory from Buffer pool

Posted by ta...@apache.org.
IMPALA-5519: Allocate fragment's runtime filter memory from Buffer pool

This patch adds changes to the planner to account for memory used by
bloom filters at the fragment instance level. Also adds changes to
allocate memory for those bloom filters from the buffer pool.

Testing:
- Modified Planner Tests and end to end tests to account for memory
  reservation for the runtime filters.
- Modified backend tests and benchmarks to use the bufferpool for
  bloom filter allocation.
- Add an end to end test.
- Ran rest of the core tests.

Change-Id: Iea2759665fb2e8bef9433014a8d42a7ebf99ce1f
Reviewed-on: http://gerrit.cloudera.org:8080/8971
Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1a632e7c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1a632e7c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1a632e7c

Branch: refs/heads/2.x
Commit: 1a632e7ceb762e17d6efcb1b64a8d4b505be133f
Parents: 92c1a48
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Dec 18 11:04:27 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 13 11:23:40 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/bloom-filter-benchmark.cc     |  90 ++++--
 be/src/runtime/fragment-instance-state.cc       |   3 +-
 be/src/runtime/runtime-filter-bank.cc           | 109 +++----
 be/src/runtime/runtime-filter-bank.h            |  42 ++-
 be/src/runtime/runtime-filter.h                 |   2 +-
 be/src/runtime/runtime-state.cc                 |   6 +-
 be/src/runtime/runtime-state.h                  |   5 +-
 be/src/service/fe-support.cc                    |  14 +
 be/src/service/query-options-test.cc            |  14 +-
 be/src/service/query-options.cc                 |  12 +
 be/src/util/backend-gflag-util.cc               |   5 +-
 be/src/util/bloom-filter-test.cc                | 289 ++++++++++++-------
 be/src/util/bloom-filter.cc                     |  73 +++--
 be/src/util/bloom-filter.h                      |  59 ++--
 common/thrift/BackendGflags.thrift              |   4 +
 common/thrift/ImpalaInternalService.thrift      |  10 +-
 common/thrift/PlanNodes.thrift                  |   4 +
 common/thrift/Planner.thrift                    |   4 +
 .../org/apache/impala/planner/PlanFragment.java |  34 ++-
 .../impala/planner/RuntimeFilterGenerator.java  |  91 +++++-
 .../apache/impala/service/BackendConfig.java    |   4 +
 .../org/apache/impala/service/FeSupport.java    |  16 +
 .../queries/PlannerTest/disable-codegen.test    |   8 +-
 .../PlannerTest/fk-pk-join-detection.test       |  20 +-
 .../queries/PlannerTest/max-row-size.test       |  54 ++--
 .../PlannerTest/min-max-runtime-filters.test    |   4 +-
 .../PlannerTest/resource-requirements.test      | 218 +++++++-------
 .../PlannerTest/spillable-buffer-sizing.test    |  80 ++---
 .../queries/PlannerTest/tablesample.test        |   4 +-
 .../admission-reject-min-reservation.test       |   4 +-
 .../queries/QueryTest/bloom_filters.test        |  73 +++--
 .../queries/QueryTest/bloom_filters_wait.test   |  17 +-
 .../queries/QueryTest/explain-level0.test       |   4 +-
 .../queries/QueryTest/explain-level1.test       |   4 +-
 .../queries/QueryTest/explain-level2.test       |   6 +-
 .../queries/QueryTest/explain-level3.test       |   6 +-
 .../queries/QueryTest/runtime_row_filters.test  |  34 +--
 .../queries/QueryTest/spilling.test             |   4 +-
 38 files changed, 891 insertions(+), 539 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/benchmarks/bloom-filter-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/bloom-filter-benchmark.cc b/be/src/benchmarks/bloom-filter-benchmark.cc
index 6728c42..9660bfa 100644
--- a/be/src/benchmarks/bloom-filter-benchmark.cc
+++ b/be/src/benchmarks/bloom-filter-benchmark.cc
@@ -21,9 +21,15 @@
 #include <iostream>
 #include <vector>
 
+#include "runtime/bufferpool/buffer-allocator.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
 #include "util/benchmark.h"
-#include "util/cpu-info.h"
 #include "util/bloom-filter.h"
+#include "common/init.h"
 
 #include "common/names.h"
 
@@ -172,25 +178,40 @@ uint32_t MakeRand() {
 namespace initialize {
 
 void Benchmark(int batch_size, void* data) {
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "");
+  ExecEnv* env = ExecEnv::GetInstance();
+  BufferPool::ClientHandle client;
+  CHECK(env->buffer_pool()
+            ->RegisterClient("", nullptr, env->buffer_reservation(), nullptr,
+                std::numeric_limits<int64>::max(), profile, &client).ok());
   int* d = reinterpret_cast<int*>(data);
+  CHECK(client.IncreaseReservation(BloomFilter::GetExpectedMemoryUsed(*d)));
   for (int i = 0; i < batch_size; ++i) {
-    BloomFilter bf(*d);
+    BloomFilter bf(&client);
+    CHECK(bf.Init(*d).ok());
+    bf.Close();
   }
+  env->buffer_pool()->DeregisterClient(&client);
+  pool.Clear();
 }
 
 }  // namespace initialize
 
-
 // Benchmark insert
 namespace insert {
 
 struct TestData {
-  explicit TestData(int log_heap_size) : bf(log_heap_size), data(1ull << 20) {
+  explicit TestData(int log_bufferpool_size, BufferPool::ClientHandle* client)
+    : bf(client), data(1ull << 20) {
+    CHECK(bf.Init(log_bufferpool_size).ok());
     for (size_t i = 0; i < data.size(); ++i) {
       data[i] = MakeRand();
     }
   }
 
+  ~TestData() { bf.Close(); }
+
   BloomFilter bf;
   vector<uint32_t> data;
 };
@@ -211,12 +232,13 @@ void Benchmark(int batch_size, void* data) {
 namespace find {
 
 struct TestData {
-  TestData(int log_heap_size, size_t size)
-      : bf(log_heap_size),
-        vec_mask((1ull << static_cast<int>(floor(log2(size))))-1),
-        present(size),
-        absent(size),
-        result(0) {
+  TestData(int log_bufferpool_size, BufferPool::ClientHandle* client, size_t size)
+    : bf(client),
+      vec_mask((1ull << static_cast<int>(floor(log2(size)))) - 1),
+      present(size),
+      absent(size),
+      result(0) {
+    CHECK(bf.Init(log_bufferpool_size).ok());
     for (size_t i = 0; i < size; ++i) {
       present[i] = MakeRand();
       absent[i] = MakeRand();
@@ -224,6 +246,8 @@ struct TestData {
     }
   }
 
+  ~TestData() { bf.Close(); }
+
   BloomFilter bf;
   // A mask value such that i & vec_mask < present.size() (and absent.size()). This is
   // used in the benchmark functions to loop through present and absent, because
@@ -254,10 +278,12 @@ void Absent(int batch_size, void* data) {
 namespace either {
 
 struct TestData {
-  explicit TestData(int log_heap_size) {
-    BloomFilter bf(log_heap_size);
+  explicit TestData(int log_bufferpool_size, BufferPool::ClientHandle* client) {
+    BloomFilter bf(client);
+    CHECK(bf.Init(log_bufferpool_size).ok());
     BloomFilter::ToThrift(&bf, &tbf1);
     BloomFilter::ToThrift(&bf, &tbf2);
+    bf.Close();
   }
 
   TBloomFilter tbf1, tbf2;
@@ -273,7 +299,13 @@ void Benchmark(int batch_size, void* data) {
 } // namespace either
 
 void RunBenchmarks() {
-
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "");
+  ExecEnv* env = ExecEnv::GetInstance();
+  BufferPool::ClientHandle client;
+  CHECK(env->buffer_pool()
+            ->RegisterClient("", nullptr, env->buffer_reservation(), nullptr,
+                std::numeric_limits<int64>::max(), profile, &client).ok());
   char name[120];
 
   {
@@ -282,13 +314,18 @@ void RunBenchmarks() {
     for (int ndv = 10000; ndv <= 100 * 1000 * 1000; ndv *= 100) {
       for (int log10fpp = -1; log10fpp >= -3; --log10fpp) {
         const double fpp = pow(10, log10fpp);
-        testdata.emplace_back(new insert::TestData(BloomFilter::MinLogSpace(ndv, fpp)));
+        int log_required_size = BloomFilter::MinLogSpace(ndv, fpp);
+        CHECK(client.IncreaseReservation(
+            BloomFilter::GetExpectedMemoryUsed(log_required_size)));
+        testdata.emplace_back(
+            new insert::TestData(log_required_size, &client));
         snprintf(name, sizeof(name), "ndv %7dk fpp %6.1f%%", ndv/1000, fpp*100);
         suite.AddBenchmark(name, insert::Benchmark, testdata.back().get());
       }
     }
     cout << suite.Measure() << endl;
   }
+  CHECK(client.DecreaseReservationTo(0).ok());
 
   {
     Benchmark suite("find");
@@ -296,8 +333,11 @@ void RunBenchmarks() {
     for (int ndv = 10000; ndv <= 100 * 1000 * 1000; ndv *= 100) {
       for (int log10fpp = -1; log10fpp >= -3; --log10fpp) {
         const double fpp = pow(10, log10fpp);
+        int log_required_size = BloomFilter::MinLogSpace(ndv, fpp);
+        CHECK(client.IncreaseReservation(
+            BloomFilter::GetExpectedMemoryUsed(log_required_size)));
         testdata.emplace_back(
-            new find::TestData(BloomFilter::MinLogSpace(ndv, fpp), ndv));
+            new find::TestData(BloomFilter::MinLogSpace(ndv, fpp), &client , ndv));
         snprintf(name, sizeof(name), "present ndv %7dk fpp %6.1f%%", ndv/1000, fpp*100);
         suite.AddBenchmark(name, find::Present, testdata.back().get());
 
@@ -307,6 +347,7 @@ void RunBenchmarks() {
     }
     cout << suite.Measure() << endl;
   }
+  CHECK(client.DecreaseReservationTo(0).ok());
 
   {
     Benchmark suite("union", false /* micro_heuristics */);
@@ -314,18 +355,31 @@ void RunBenchmarks() {
     for (int ndv = 10000; ndv <= 100 * 1000 * 1000; ndv *= 100) {
       for (int log10fpp = -1; log10fpp >= -3; --log10fpp) {
         const double fpp = pow(10, log10fpp);
-        testdata.emplace_back(
-            new either::TestData(BloomFilter::MinLogSpace(ndv, fpp)));
+        int log_required_size = BloomFilter::MinLogSpace(ndv, fpp);
+        CHECK(client.IncreaseReservation(
+            BloomFilter::GetExpectedMemoryUsed(log_required_size)));
+        testdata.emplace_back(new either::TestData(
+            BloomFilter::MinLogSpace(ndv, fpp), &client));
         snprintf(name, sizeof(name), "ndv %7dk fpp %6.1f%%", ndv/1000, fpp*100);
         suite.AddBenchmark(name, either::Benchmark, testdata.back().get());
       }
     }
     cout << suite.Measure() << endl;
   }
+
+  CHECK(client.DecreaseReservationTo(0).ok());
+  env->buffer_pool()->DeregisterClient(&client);
+  pool.Clear();
 }
 
 int main(int argc, char **argv) {
-  CpuInfo::Init();
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  TestEnv test_env;
+  int64_t min_page_size = 8;
+  int64_t buffer_bytes_limit = 4L * 1024 * 1024 * 1024;
+  test_env.SetBufferPoolArgs(min_page_size, buffer_bytes_limit);
+  CHECK(test_env.Init().ok());
 
   cout << endl << Benchmark::GetMachineInfo() << endl << endl;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index ad9e99e..f36c91d 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -135,7 +135,8 @@ Status FragmentInstanceState::Prepare() {
   event_sequence_->Start(query_state_->fragment_events_start_time());
   UpdateState(StateEvent::PREPARE_START);
 
-  runtime_state_->InitFilterBank();
+  RETURN_IF_ERROR(runtime_state_->InitFilterBank(
+      fragment_ctx_.fragment.runtime_filters_reservation_bytes));
 
   // Reserve one main thread from the pool
   runtime_state_->resource_pool()->AcquireThreadToken();

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 178aef1..239e066 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -22,7 +22,10 @@
 #include "runtime/client-cache.h"
 #include "runtime/exec-env.h"
 #include "runtime/backend-client.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/initial-reservations.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/runtime-filter.inline.h"
 #include "service/impala-server.h"
 #include "util/bit-util.h"
@@ -41,48 +44,44 @@ DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability o
 const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
 const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
 
-RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
+RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state,
+    long total_filter_mem_required)
   : state_(state),
     filter_mem_tracker_(
         new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false)),
     mem_pool_(filter_mem_tracker_.get()),
-    closed_(false) {
+    closed_(false),
+    total_bloom_filter_mem_required_(total_filter_mem_required) {
   bloom_memory_allocated_ =
       state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
+}
 
-  // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
-  max_filter_size_ = query_ctx.client_request.query_options.runtime_filter_max_size;
-  max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
-  max_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
-
-  min_filter_size_ = query_ctx.client_request.query_options.runtime_filter_min_size;
-  min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
-  min_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
-
-  // Make sure that min <= max
-  min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
-
-  DCHECK_GT(min_filter_size_, 0);
-  DCHECK_GT(max_filter_size_, 0);
-
-  default_filter_size_ = query_ctx.client_request.query_options.runtime_bloom_filter_size;
-  default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
-  default_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
+Status RuntimeFilterBank::ClaimBufferReservation() {
+  DCHECK(!buffer_pool_client_.is_registered());
+  string filter_bank_name = Substitute(
+      "RuntimeFilterBank (Fragment Id: $0)", PrintId(state_->fragment_instance_id()));
+  RETURN_IF_ERROR(state_->exec_env()->buffer_pool()->RegisterClient(filter_bank_name,
+      state_->query_state()->file_group(), state_->instance_buffer_reservation(),
+      filter_mem_tracker_.get(), total_bloom_filter_mem_required_,
+      state_->runtime_profile(), &buffer_pool_client_));
+  VLOG_FILE << filter_bank_name << " claiming reservation "
+            << total_bloom_filter_mem_required_;
+  state_->query_state()->initial_reservations()->Claim(
+      &buffer_pool_client_, total_bloom_filter_mem_required_);
+  return Status::OK();
 }
 
 RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
     bool is_producer) {
-  RuntimeFilter* ret = obj_pool_.Add(
-      new RuntimeFilter(filter_desc, GetFilterSizeForNdv(filter_desc.ndv_estimate)));
+  RuntimeFilter* ret = nullptr;
   lock_guard<mutex> l(runtime_filter_lock_);
   if (is_producer) {
     DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
+    ret = obj_pool_.Add(new RuntimeFilter(filter_desc, filter_desc.filter_size_bytes));
     produced_filters_[filter_desc.filter_id] = ret;
   } else {
     if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
+      ret = obj_pool_.Add(new RuntimeFilter(filter_desc, filter_desc.filter_size_bytes));
       consumed_filters_[filter_desc.filter_id] = ret;
       VLOG_QUERY << "registered consumer filter " << filter_desc.filter_id;
     } else {
@@ -186,17 +185,20 @@ void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params)
       bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
     } else {
       int64_t required_space =
-          BloomFilter::GetExpectedHeapSpaceUsed(params.bloom_filter.log_heap_space);
-      // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
-      // there's not enough memory for it.
-      if (!filter_mem_tracker_->TryConsume(required_space)) {
-        VLOG_QUERY << "No memory for global filter: " << params.filter_id
-                   << " (fragment instance: " << state_->fragment_instance_id() << ")";
+          BloomFilter::GetExpectedMemoryUsed(params.bloom_filter.log_bufferpool_space);
+      DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
+          << "BufferPool Client should have enough reservation to fulfill bloom filter "
+             "allocation";
+      bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
+      Status status = bloom_filter->Init(params.bloom_filter);
+      if (!status.ok()) {
+        LOG(ERROR) << "Unable to allocate memory for bloom filter: "
+                   << status.GetDetail();
         bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
       } else {
-        bloom_filter = obj_pool_.Add(new BloomFilter(params.bloom_filter));
-        DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-        bloom_memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+        bloom_filters_.push_back(bloom_filter);
+        DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
+        bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
       }
     }
   } else {
@@ -213,18 +215,26 @@ void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params)
 
 BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
   lock_guard<mutex> l(runtime_filter_lock_);
-  if (closed_) return NULL;
+  if (closed_) return nullptr;
 
   RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
   DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
 
   // Track required space
   int64_t log_filter_size = BitUtil::Log2Ceiling64(it->second->filter_size());
-  int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size);
-  if (!filter_mem_tracker_->TryConsume(required_space)) return NULL;
-  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
-  DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-  bloom_memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+  int64_t required_space = BloomFilter::GetExpectedMemoryUsed(log_filter_size);
+  DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
+      << "BufferPool Client should have enough reservation to fulfill bloom filter "
+         "allocation";
+  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
+  Status status = bloom_filter->Init(log_filter_size);
+  if (!status.ok()) {
+    LOG(ERROR) << "Unable to allocate memory for bloom filter: " << status.GetDetail();
+    return nullptr;
+  }
+  bloom_filters_.push_back(bloom_filter);
+  DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
+  bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
   return bloom_filter;
 }
 
@@ -239,15 +249,6 @@ MinMaxFilter* RuntimeFilterBank::AllocateScratchMinMaxFilter(
   return MinMaxFilter::Create(type, &obj_pool_, &mem_pool_);
 }
 
-int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
-  if (ndv == -1) return default_filter_size_;
-  int64_t required_space =
-      1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
-  required_space = max<int64_t>(required_space, min_filter_size_);
-  required_space = min<int64_t>(required_space, max_filter_size_);
-  return required_space;
-}
-
 bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
   double fpp =
       BloomFilter::FalsePositiveProb(observed_ndv, BitUtil::Log2Ceiling64(filter_size));
@@ -257,8 +258,16 @@ bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv)
 void RuntimeFilterBank::Close() {
   lock_guard<mutex> l(runtime_filter_lock_);
   closed_ = true;
+  for (BloomFilter* filter : bloom_filters_) filter->Close();
   obj_pool_.Clear();
   mem_pool_.FreeAll();
-  filter_mem_tracker_->Release(bloom_memory_allocated_->value());
+  if (buffer_pool_client_.is_registered()) {
+    VLOG_FILE << "RuntimeFilterBank (Fragment Id: " << state_->fragment_instance_id()
+              << ") returning reservation " << total_bloom_filter_mem_required_;
+    state_->query_state()->initial_reservations()->Return(
+        &buffer_pool_client_, total_bloom_filter_mem_required_);
+    state_->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
+  }
+  DCHECK_EQ(filter_mem_tracker_->consumption(), 0);
   filter_mem_tracker_->Close();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/runtime/runtime-filter-bank.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 8f6bb42..ead7e82 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -20,6 +20,7 @@
 
 #include "codegen/impala-ir.h"
 #include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/mem-pool.h"
 #include "runtime/types.h"
 #include "util/runtime-profile.h"
@@ -67,7 +68,17 @@ class TQueryCtx;
 /// coordinate in any way.
 class RuntimeFilterBank {
  public:
-  RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state);
+  RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state,
+      long total_filter_mem_required);
+
+  /// Initialize 'buffer_pool_client_' and claim the initial reservation. The client is
+  /// automatically cleaned up in Close(). Should not be called if the client is already
+  /// open.
+  ///
+  /// Must return the initial reservation to QueryState::initial_reservations(), which is
+  /// done automatically in Close() as long as the initial reservation is not released
+  /// before Close().
+  Status ClaimBufferReservation() WARN_UNUSED_RESULT;
 
   /// Registers a filter that will either be produced (is_producer == false) or consumed
   /// (is_producer == true) by fragments that share this RuntimeState. The filter
@@ -99,11 +110,12 @@ class RuntimeFilterBank {
 
   /// Returns a bloom_filter that can be used by an operator to produce a local filter,
   /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by
-  /// the RuntimeFilterBank (which may transfer it to a RuntimeFilter subsequently), and
-  /// should not be deleted by the caller. The filter identified by 'filter_id' must have
-  /// been previously registered as a 'producer' by RegisterFilter().
+  /// the RuntimeFilterBank and should not be deleted by the caller. The filter identified
+  /// by 'filter_id' must have been previously registered as a 'producer' by
+  /// RegisterFilter().
   ///
-  /// If there is not enough memory, or if Close() has been called first, returns NULL.
+  /// If memory allocation for the filter fails, or if Close() has been called first,
+  /// returns NULL.
   BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);
 
   /// Returns a new MinMaxFilter. Handles memory the same as AllocateScratchBloomFilter().
@@ -119,11 +131,6 @@ class RuntimeFilterBank {
   static const int64_t MAX_BLOOM_FILTER_SIZE = 512 * 1024 * 1024;  // 512MB
 
  private:
-  /// Returns the the space (in bytes) required for a filter to achieve the configured
-  /// maximum false-positive rate based on the expected NDV. If 'ndv' is -1 (i.e. no
-  /// estimate is known), the default filter size is returned.
-  int64_t GetFilterSizeForNdv(int64_t ndv);
-
   /// Lock protecting produced_filters_ and consumed_filters_.
   boost::mutex runtime_filter_lock_;
 
@@ -155,14 +162,17 @@ class RuntimeFilterBank {
   /// Total amount of memory allocated to Bloom Filters
   RuntimeProfile::Counter* bloom_memory_allocated_;
 
-  /// Precomputed default BloomFilter size.
-  int64_t default_filter_size_;
+  /// Total amount of memory required by the bloom filters as calculated by the planner.
+  long total_bloom_filter_mem_required_;
 
-  /// Maximum filter size, in bytes, rounded up to a power of two.
-  int64_t max_filter_size_;
+  /// Contains references to all the bloom filters generated. Used in Close() to safely
+  /// release all memory allocated for Bloomfilters.
+  vector<BloomFilter*> bloom_filters_;
 
-  /// Minimum filter size, in bytes, rounded up to a power of two.
-  int64_t min_filter_size_;
+  /// Buffer pool client for the filter bank. Initialized with the required reservation
+  /// in ClaimBufferReservation(). Reservations are returned to the initial reservations
+  /// pool in Close().
+  BufferPool::ClientHandle buffer_pool_client_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 40c5f23..7ab73d7 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -46,7 +46,7 @@ class RuntimeFilter {
       : bloom_filter_(nullptr), min_max_filter_(nullptr), filter_desc_(filter),
         registration_time_(MonotonicMillis()), arrival_time_(0L),
         filter_size_(filter_size) {
-    DCHECK_GT(filter_size_, 0);
+    DCHECK(filter_desc_.type == TRuntimeFilterType::MIN_MAX || filter_size_ > 0);
   }
 
   /// Returns true if SetFilter() has been called.

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 37219cc..4b39ec8 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -124,8 +124,10 @@ void RuntimeState::Init() {
   }
 }
 
-void RuntimeState::InitFilterBank() {
-  filter_bank_.reset(new RuntimeFilterBank(query_ctx(), this));
+Status RuntimeState::InitFilterBank(long runtime_filters_reservation_bytes) {
+  filter_bank_.reset(
+      new RuntimeFilterBank(query_ctx(), this, runtime_filters_reservation_bytes));
+  return filter_bank_->ClaimBufferReservation();
 }
 
 Status RuntimeState::CreateCodegen() {

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 4eb3e10..b292789 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -91,8 +91,9 @@ class RuntimeState {
   /// Empty d'tor to avoid issues with scoped_ptr.
   ~RuntimeState();
 
-  /// Initializes the runtime filter bank.
-  void InitFilterBank();
+  /// Initializes the runtime filter bank and claims the initial buffer reservation
+  /// for it.
+  Status InitFilterBank(long runtime_filters_reservation_bytes);
 
   QueryState* query_state() const { return query_state_; }
   /// Return the query's ObjectPool

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index bc4afde..2b6e2fd 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -44,6 +44,7 @@
 #include "runtime/runtime-state.h"
 #include "service/impala-server.h"
 #include "service/query-options.h"
+#include "util/bloom-filter.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
@@ -537,6 +538,15 @@ Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions(
   return result_bytes;
 }
 
+// Returns the log (base 2) of the minimum number of bytes we need for a Bloom filter
+// with 'ndv' unique elements and a false positive probability of less than 'fpp'.
+extern "C"
+JNIEXPORT jint JNICALL
+Java_org_apache_impala_service_FeSupport_MinLogSpaceForBloomFilter(
+    JNIEnv* env, jclass caller_class, jlong ndv, jdouble fpp) {
+  return BloomFilter::MinLogSpace(ndv, fpp);
+}
+
 namespace impala {
 
 static JNINativeMethod native_methods[] = {
@@ -581,6 +591,10 @@ static JNINativeMethod native_methods[] = {
       (char*)"NativeLibCacheRemoveEntry", (char*)"(Ljava/lang/String;)Z",
       (void*)::Java_org_apache_impala_service_FeSupport_NativeLibCacheRemoveEntry
   },
+  {
+    (char*)"MinLogSpaceForBloomFilter", (char*)"(JD)I",
+    (void*)::Java_org_apache_impala_service_FeSupport_MinLogSpaceForBloomFilter
+  },
 };
 
 void InitFeSupport(bool disable_codegen) {

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 552c218..9a97b08 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -148,9 +148,10 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(runtime_filter_min_size),
           {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
               RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
+      // Lower limit for runtime_filter_max_size is FLAGS_min_buffer_size which has a
+      // default value of is 64KB.
       {MAKE_OPTIONDEF(runtime_filter_max_size),
-          {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
-              RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
+          {64 * 1024, RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
       {MAKE_OPTIONDEF(runtime_bloom_filter_size),
           {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
               RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}}
@@ -315,6 +316,15 @@ TEST(QueryOptions, SetSpecialOptions) {
     TestError("0");
     TestError(to_string(ROW_SIZE_LIMIT + 1).c_str());
   }
+  // RUNTIME_FILTER_MAX_SIZE should not be less than FLAGS_min_buffer_size
+  {
+    OptionDef<int32_t> key_def = MAKE_OPTIONDEF(runtime_filter_max_size);
+    auto TestOk = MakeTestOkFn(options, key_def);
+    auto TestError = MakeTestErrFn(options, key_def);
+    TestOk("128KB", 128 * 1024);
+    TestError("65535"); // default value of FLAGS_min_buffer_size is 64KB
+    TestOk("64KB", 64 * 1024);
+  }
 }
 
 TEST(QueryOptions, ParseQueryOptions) {

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ff2fd4e..d6f917b 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -30,6 +30,8 @@
 
 #include "common/names.h"
 
+DECLARE_int64(min_buffer_size);
+
 using boost::algorithm::iequals;
 using boost::algorithm::is_any_of;
 using boost::algorithm::token_compress_on;
@@ -378,6 +380,16 @@ Status impala::SetQueryOption(const string& key, const string& value,
                   RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
                   RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE));
         }
+        if (option == TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE
+            && size < FLAGS_min_buffer_size
+            // last condition is to unblock the highly improbable case where the
+            // min_buffer_size is greater than RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE.
+            && FLAGS_min_buffer_size <= RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE) {
+          return Status(Substitute("$0 should not be less than $1 which is the minimum "
+              "buffer size that can be allocated by the buffer pool",
+              PrintTImpalaQueryOptions(static_cast<TImpalaQueryOptions::type>(option)),
+              FLAGS_min_buffer_size));
+        }
         if (option == TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE) {
           query_options->__set_runtime_bloom_filter_size(size);
         } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE) {

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index f7f49ac..0bbaa89 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -47,6 +47,8 @@ DECLARE_string(authorized_proxy_user_config_delimiter);
 DECLARE_string(kudu_master_hosts);
 DECLARE_string(reserved_words_version);
 DECLARE_string(sentry_config);
+DECLARE_double(max_filter_error_rate);
+DECLARE_int64(min_buffer_size);
 
 namespace impala {
 
@@ -84,7 +86,8 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
     DCHECK_EQ(FLAGS_reserved_words_version, "3.0.0");
     cfg.__set_reserved_words_version(TReservedWordsVersion::IMPALA_3_0);
   }
-
+  cfg.__set_max_filter_error_rate(FLAGS_max_filter_error_rate);
+  cfg.__set_min_buffer_size(FLAGS_min_buffer_size);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/util/bloom-filter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter-test.cc b/be/src/util/bloom-filter-test.cc
index 20c2655..6fcfc92 100644
--- a/be/src/util/bloom-filter-test.cc
+++ b/be/src/util/bloom-filter-test.cc
@@ -21,6 +21,11 @@
 #include <unordered_set>
 #include <vector>
 
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 
 using namespace std;
@@ -84,49 +89,176 @@ TBloomFilter BfUnion(const BloomFilter& x, const BloomFilter& y, bool* success)
 
 namespace impala {
 
+// Test that MaxNdv() and MinLogSpace() are dual
+TEST(BloomFilter, MinSpaceMaxNdv) {
+  for (int log2fpp = -2; log2fpp >= -63; --log2fpp) {
+    const double fpp = pow(2, log2fpp);
+    for (int given_log_space = 8; given_log_space < 30; ++given_log_space) {
+      const size_t derived_ndv = BloomFilter::MaxNdv(given_log_space, fpp);
+      int derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp);
+
+      EXPECT_EQ(derived_log_space, given_log_space) << "fpp: " << fpp
+                                                    << " derived_ndv: " << derived_ndv;
+
+      // If we lower the fpp, we need more space; if we raise it we need less.
+      derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp / 2);
+      EXPECT_GE(derived_log_space, given_log_space) << "fpp: " << fpp
+                                                    << " derived_ndv: " << derived_ndv;
+      derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp * 2);
+      EXPECT_LE(derived_log_space, given_log_space) << "fpp: " << fpp
+                                                    << " derived_ndv: " << derived_ndv;
+    }
+    for (size_t given_ndv = 1000; given_ndv < 1000 * 1000; given_ndv *= 3) {
+      const int derived_log_space = BloomFilter::MinLogSpace(given_ndv, fpp);
+      const size_t derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp);
+
+      // The max ndv is close to, but larger than, then ndv we asked for
+      EXPECT_LE(given_ndv, derived_ndv) << "fpp: " << fpp
+                                        << " derived_log_space: " << derived_log_space;
+      EXPECT_GE(given_ndv * 2, derived_ndv)
+          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
+
+      // Changing the fpp changes the ndv capacity in the expected direction.
+      size_t new_derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp / 2);
+      EXPECT_GE(derived_ndv, new_derived_ndv)
+          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
+      new_derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp * 2);
+      EXPECT_LE(derived_ndv, new_derived_ndv)
+          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
+    }
+  }
+}
+
+TEST(BloomFilter, MinSpaceEdgeCase) {
+  int min_space = BloomFilter::MinLogSpace(1, 0.75);
+  EXPECT_GE(min_space, 0) << "LogSpace should always be >= 0";
+}
+
+// Check that MinLogSpace() and FalsePositiveProb() are dual
+TEST(BloomFilter, MinSpaceForFpp) {
+  for (size_t ndv = 10000; ndv < 100 * 1000 * 1000; ndv *= 1.01) {
+    for (double fpp = 0.1; fpp > pow(2, -20); fpp *= 0.99) { // NOLINT: loop on double
+      // When contructing a Bloom filter, we can request a particular fpp by calling
+      // MinLogSpace().
+      const int min_log_space = BloomFilter::MinLogSpace(ndv, fpp);
+      // However, at the resulting ndv and space, the expected fpp might be lower than
+      // the one that was requested.
+      double expected_fpp = BloomFilter::FalsePositiveProb(ndv, min_log_space);
+      EXPECT_LE(expected_fpp, fpp);
+      // The fpp we get might be much lower than the one we asked for. However, if the
+      // space were just one size smaller, the fpp we get would be larger than the one we
+      // asked for.
+      expected_fpp = BloomFilter::FalsePositiveProb(ndv, min_log_space - 1);
+      EXPECT_GE(expected_fpp, fpp);
+      // Therefore, the return value of MinLogSpace() is actually the minimum
+      // log space at which we can guarantee the requested fpp.
+    }
+  }
+}
+
+class BloomFilterTest : public testing::Test {
+ protected:
+  /// Temporary runtime environment for the BloomFilters.
+  unique_ptr<TestEnv> test_env_;
+  RuntimeState* runtime_state_;
+
+  ObjectPool pool_;
+  unique_ptr<MemTracker> tracker_;
+  unique_ptr<BufferPool::ClientHandle> buffer_pool_client_;
+  vector<BloomFilter*> bloom_filters_;
+
+  virtual void SetUp() {
+    int64_t min_page_size = 64; // Min filter size that we allocate in our tests.
+    int64_t buffer_bytes_limit = 4L * 1024 * 1024 * 1024;
+    test_env_.reset(new TestEnv());
+    test_env_->SetBufferPoolArgs(min_page_size, buffer_bytes_limit);
+    ASSERT_OK(test_env_->Init());
+    ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
+    buffer_pool_client_.reset(new BufferPool::ClientHandle);
+    tracker_.reset(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
+    BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool();
+    ASSERT_OK(buffer_pool->RegisterClient("", nullptr,
+        runtime_state_->instance_buffer_reservation(), tracker_.get(),
+        std::numeric_limits<int64>::max(), runtime_state_->runtime_profile(),
+        buffer_pool_client_.get()));
+  }
+
+  virtual void TearDown() {
+    for (BloomFilter* filter : bloom_filters_) filter->Close();
+    bloom_filters_.clear();
+    runtime_state_ = nullptr;
+    pool_.Clear();
+    test_env_->exec_env()->buffer_pool()->DeregisterClient(buffer_pool_client_.get());
+    buffer_pool_client_.reset();
+    tracker_.reset();
+    test_env_.reset();
+  }
+
+  BloomFilter* CreateBloomFilter(int log_bufferpool_space) {
+    int64_t filter_size = BloomFilter::GetExpectedMemoryUsed(log_bufferpool_space);
+    EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
+    BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
+    EXPECT_OK(bloom_filter->Init(log_bufferpool_space));
+    bloom_filters_.push_back(bloom_filter);
+    EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
+    return bloom_filter;
+  }
+
+  BloomFilter* CreateBloomFilter(TBloomFilter t_filter) {
+    int64_t filter_size =
+        BloomFilter::GetExpectedMemoryUsed(t_filter.log_bufferpool_space);
+    EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
+    BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
+    EXPECT_OK(bloom_filter->Init(t_filter));
+    bloom_filters_.push_back(bloom_filter);
+    EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
+    return bloom_filter;
+  }
+};
+
 // We can construct (and destruct) Bloom filters with different spaces.
-TEST(BloomFilter, Constructor) {
-  for (int i = 0; i < 30; ++i) {
-    BloomFilter bf(i);
+TEST_F(BloomFilterTest, Constructor) {
+  for (int i = 1; i < 30; ++i) {
+    CreateBloomFilter(i);
   }
 }
 
 // We can Insert() hashes into a Bloom filter with different spaces.
-TEST(BloomFilter, Insert) {
+TEST_F(BloomFilterTest, Insert) {
   srand(0);
   for (int i = 13; i < 17; ++i) {
-    BloomFilter bf(i);
+    BloomFilter* bf = CreateBloomFilter(i);
     for (int k = 0; k < (1 << 15); ++k) {
-      BfInsert(bf, MakeRand());
+      BfInsert(*bf, MakeRand());
     }
   }
 }
 
 // After Insert()ing something into a Bloom filter, it can be found again immediately.
-TEST(BloomFilter, Find) {
+TEST_F(BloomFilterTest, Find) {
   srand(0);
   for (int i = 13; i < 17; ++i) {
-    BloomFilter bf(i);
+    BloomFilter* bf = CreateBloomFilter(i);
     for (int k = 0; k < (1 << 15); ++k) {
       const uint64_t to_insert = MakeRand();
-      BfInsert(bf, to_insert);
-      EXPECT_TRUE(BfFind(bf, to_insert));
+      BfInsert(*bf, to_insert);
+      EXPECT_TRUE(BfFind(*bf, to_insert));
     }
   }
 }
 
 // After Insert()ing something into a Bloom filter, it can be found again much later.
-TEST(BloomFilter, CumulativeFind) {
+TEST_F(BloomFilterTest, CumulativeFind) {
   srand(0);
   for (int i = 5; i < 11; ++i) {
     std::vector<uint32_t> inserted;
-    BloomFilter bf(i);
+    BloomFilter* bf = CreateBloomFilter(i);
     for (int k = 0; k < (1 << 10); ++k) {
       const uint32_t to_insert = MakeRand();
       inserted.push_back(to_insert);
-      BfInsert(bf, to_insert);
+      BfInsert(*bf, to_insert);
       for (int n = 0; n < inserted.size(); ++n) {
-        EXPECT_TRUE(BfFind(bf, inserted[n]));
+        EXPECT_TRUE(BfFind(*bf, inserted[n]));
       }
     }
   }
@@ -134,7 +266,7 @@ TEST(BloomFilter, CumulativeFind) {
 
 // The empirical false positives we find when looking for random items is with a constant
 // factor of the false positive probability the Bloom filter was constructed for.
-TEST(BloomFilter, FindInvalid) {
+TEST_F(BloomFilterTest, FindInvalid) {
   srand(0);
   static const int find_limit = 1 << 20;
   unordered_set<uint32_t> to_find;
@@ -154,22 +286,23 @@ TEST(BloomFilter, FindInvalid) {
     for (int log_fpp = 4; log_fpp < 15; ++log_fpp) {
       double fpp = 1.0 / (1 << log_fpp);
       const size_t ndv = 1 << log_ndv;
-      const int log_heap_space = BloomFilter::MinLogSpace(ndv, fpp);
-      BloomFilter bf(log_heap_space);
+      const int log_bufferpool_space = BloomFilter::MinLogSpace(ndv, fpp);
+      BloomFilter* bf = CreateBloomFilter(log_bufferpool_space);
       // Fill up a BF with exactly as much ndv as we planned for it:
       for (size_t i = 0; i < ndv; ++i) {
-        BfInsert(bf, shuffled_insert[i]);
+        BfInsert(*bf, shuffled_insert[i]);
       }
       int found = 0;
       // Now we sample from the set of possible hashes, looking for hits.
       for (const auto& i : to_find) {
-        found += BfFind(bf, i);
+        found += BfFind(*bf, i);
       }
       EXPECT_LE(found, find_limit * fpp * 2)
           << "Too many false positives with -log2(fpp) = " << log_fpp;
       // Because the space is rounded up to a power of 2, we might actually get a lower
       // fpp than the one passed to MinLogSpace().
-      const double expected_fpp = BloomFilter::FalsePositiveProb(ndv, log_heap_space);
+      const double expected_fpp =
+          BloomFilter::FalsePositiveProb(ndv, log_bufferpool_space);
       EXPECT_GE(found, find_limit * expected_fpp)
           << "Too few false positives with -log2(fpp) = " << log_fpp;
       EXPECT_LE(found, find_limit * expected_fpp * 8)
@@ -178,118 +311,56 @@ TEST(BloomFilter, FindInvalid) {
   }
 }
 
-// Test that MaxNdv() and MinLogSpace() are dual
-TEST(BloomFilter, MinSpaceMaxNdv) {
-  for (int log2fpp = -2; log2fpp >= -63; --log2fpp) {
-    const double fpp = pow(2, log2fpp);
-    for (int given_log_space = 8; given_log_space < 30; ++given_log_space) {
-      const size_t derived_ndv = BloomFilter::MaxNdv(given_log_space, fpp);
-      int derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp);
-
-      EXPECT_EQ(derived_log_space, given_log_space) << "fpp: " << fpp
-                                                    << " derived_ndv: " << derived_ndv;
-
-      // If we lower the fpp, we need more space; if we raise it we need less.
-      derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp / 2);
-      EXPECT_GE(derived_log_space, given_log_space) << "fpp: " << fpp
-                                                    << " derived_ndv: " << derived_ndv;
-      derived_log_space = BloomFilter::MinLogSpace(derived_ndv, fpp * 2);
-      EXPECT_LE(derived_log_space, given_log_space) << "fpp: " << fpp
-                                                    << " derived_ndv: " << derived_ndv;
-    }
-    for (size_t given_ndv = 1000; given_ndv < 1000 * 1000; given_ndv *= 3) {
-      const int derived_log_space = BloomFilter::MinLogSpace(given_ndv, fpp);
-      const size_t derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp);
-
-      // The max ndv is close to, but larger than, then ndv we asked for
-      EXPECT_LE(given_ndv, derived_ndv) << "fpp: " << fpp
-                                        << " derived_log_space: " << derived_log_space;
-      EXPECT_GE(given_ndv * 2, derived_ndv)
-          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
-
-      // Changing the fpp changes the ndv capacity in the expected direction.
-      size_t new_derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp / 2);
-      EXPECT_GE(derived_ndv, new_derived_ndv)
-          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
-      new_derived_ndv = BloomFilter::MaxNdv(derived_log_space, fpp * 2);
-      EXPECT_LE(derived_ndv, new_derived_ndv)
-          << "fpp: " << fpp << " derived_log_space: " << derived_log_space;
-    }
-  }
-}
-
-TEST(BloomFilter, MinSpaceEdgeCase) {
-  int min_space = BloomFilter::MinLogSpace(1, 0.75);
-  EXPECT_GE(min_space, 0) << "LogSpace should always be >= 0";
-}
-
-// Check that MinLogSpace() and FalsePositiveProb() are dual
-TEST(BloomFilter, MinSpaceForFpp) {
-  for (size_t ndv = 10000; ndv < 100 * 1000 * 1000; ndv *= 1.01) {
-    for (double fpp = 0.1; fpp > pow(2, -20); fpp *= 0.99) { // NOLINT: loop on double
-      // When contructing a Bloom filter, we can request a particular fpp by calling
-      // MinLogSpace().
-      const int min_log_space = BloomFilter::MinLogSpace(ndv, fpp);
-      // However, at the resulting ndv and space, the expected fpp might be lower than
-      // the one that was requested.
-      double expected_fpp = BloomFilter::FalsePositiveProb(ndv, min_log_space);
-      EXPECT_LE(expected_fpp, fpp);
-      // The fpp we get might be much lower than the one we asked for. However, if the
-      // space were just one size smaller, the fpp we get would be larger than the one we
-      // asked for.
-      expected_fpp = BloomFilter::FalsePositiveProb(ndv, min_log_space - 1);
-      EXPECT_GE(expected_fpp, fpp);
-      // Therefore, the return value of MinLogSpace() is actually the minimum
-      // log space at which we can guarantee the requested fpp.
-    }
-  }
-}
-
-TEST(BloomFilter, Thrift) {
-  BloomFilter bf(BloomFilter::MinLogSpace(100, 0.01));
-  for (int i = 0; i < 10; ++i) BfInsert(bf, i);
+TEST_F(BloomFilterTest, Thrift) {
+  BloomFilter* bf = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
+  for (int i = 0; i < 10; ++i) BfInsert(*bf, i);
   // Check no unexpected new false positives.
   unordered_set<int> missing_ints;
   for (int i = 11; i < 100; ++i) {
-    if (!BfFind(bf, i)) missing_ints.insert(i);
+    if (!BfFind(*bf, i)) missing_ints.insert(i);
   }
 
   TBloomFilter to_thrift;
-  BloomFilter::ToThrift(&bf, &to_thrift);
+  BloomFilter::ToThrift(bf, &to_thrift);
   EXPECT_EQ(to_thrift.always_true, false);
 
-  BloomFilter from_thrift(to_thrift);
-  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(from_thrift, i));
-  for (int missing: missing_ints) ASSERT_FALSE(BfFind(from_thrift, missing));
+  BloomFilter* from_thrift = CreateBloomFilter(to_thrift);
+  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_thrift, i));
+  for (int missing: missing_ints) ASSERT_FALSE(BfFind(*from_thrift, missing));
 
   BloomFilter::ToThrift(NULL, &to_thrift);
   EXPECT_EQ(to_thrift.always_true, true);
 }
 
-TEST(BloomFilter, ThriftOr) {
-  BloomFilter bf1(BloomFilter::MinLogSpace(100, 0.01));
-  BloomFilter bf2(BloomFilter::MinLogSpace(100, 0.01));
+TEST_F(BloomFilterTest, ThriftOr) {
+  BloomFilter* bf1 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
+  BloomFilter* bf2 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
 
-  for (int i = 60; i < 80; ++i) BfInsert(bf2, i);
-  for (int i = 0; i < 10; ++i) BfInsert(bf1, i);
+  for (int i = 60; i < 80; ++i) BfInsert(*bf2, i);
+  for (int i = 0; i < 10; ++i) BfInsert(*bf1, i);
 
   bool success;
-  BloomFilter bf3(BfUnion(bf1, bf2, &success));
+  BloomFilter *bf3 = CreateBloomFilter(BfUnion(*bf1, *bf2, &success));
   ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
-  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(bf3, i)) << i;
-  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(bf3, i)) << i;
+  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
+  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
 
   // Insert another value to aggregated BloomFilter.
-  for (int i = 11; i < 50; ++i) BfInsert(bf3, i);
+  for (int i = 11; i < 50; ++i) BfInsert(*bf3, i);
 
   // Apply TBloomFilter back to BloomFilter and verify if aggregation was correct.
-  BloomFilter bf4(BfUnion(bf1, bf3, &success));
+  BloomFilter *bf4 = CreateBloomFilter(BfUnion(*bf1, *bf3, &success));
   ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
-  for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(bf4, i)) << i;
-  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(bf4, i)) << i;
-  ASSERT_FALSE(BfFind(bf4, 81));
+  for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
+  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
+  ASSERT_FALSE(BfFind(*bf4, 81));
 }
 
 }  // namespace impala
 
-IMPALA_TEST_MAIN();
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/util/bloom-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index f8ce625..0a2f8fc 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -17,6 +17,7 @@
 
 #include "util/bloom-filter.h"
 
+#include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
 
 using namespace std;
@@ -25,47 +26,55 @@ namespace impala {
 
 constexpr uint32_t BloomFilter::REHASH[8] __attribute__((aligned(32)));
 
-BloomFilter::BloomFilter(const int log_heap_space)
-  : always_false_(true),
-    // Since log_heap_space is in bytes, we need to convert it to the number of tiny Bloom
-    // filters we will use.
-    log_num_buckets_(std::max(1, log_heap_space - LOG_BUCKET_BYTE_SIZE)),
-    // Don't use log_num_buckets_ if it will lead to undefined behavior by a shift
-    // that is too large.
-    directory_mask_((1ull << std::min(63, log_num_buckets_)) - 1),
-    directory_(NULL) {
+BloomFilter::BloomFilter(BufferPool::ClientHandle* client)
+  : buffer_pool_client_(client) {}
+
+BloomFilter::~BloomFilter() {
+  DCHECK(directory_ == nullptr)
+      << "Close() should have been called before the object is destroyed.";
+}
+
+Status BloomFilter::Init(const int log_bufferpool_space) {
+  // Since log_bufferpool_space is in bytes, we need to convert it to the number of tiny
+  // Bloom filters we will use.
+  log_num_buckets_ = std::max(1, log_bufferpool_space - LOG_BUCKET_BYTE_SIZE);
+  // Don't use log_num_buckets_ if it will lead to undefined behavior by a shift
+  // that is too large.
+  directory_mask_ = (1ull << std::min(63, log_num_buckets_)) - 1;
   // Since we use 32 bits in the arguments of Insert() and Find(), log_num_buckets_
   // must be limited.
-  DCHECK(log_num_buckets_ <= 32)
-      << "Bloom filter too large. log_heap_space: " << log_heap_space;
+  DCHECK(log_num_buckets_ <= 32) << "Bloom filter too large. log_bufferpool_space: "
+                                 << log_bufferpool_space;
   const size_t alloc_size = directory_size();
-  const int malloc_failed =
-      posix_memalign(reinterpret_cast<void**>(&directory_), 64, alloc_size);
-  DCHECK_EQ(malloc_failed, 0) << "Malloc failed. log_heap_space: " << log_heap_space
-                              << " log_num_buckets_: " << log_num_buckets_
-                              << " alloc_size: " << alloc_size;
-  DCHECK(directory_ != nullptr);
+  BufferPool* buffer_pool_ = ExecEnv::GetInstance()->buffer_pool();
+  Close(); // Ensure that any previously allocated memory for directory_ is released.
+  RETURN_IF_ERROR(
+      buffer_pool_->AllocateBuffer(buffer_pool_client_, alloc_size, &buffer_handle_));
+  directory_ = reinterpret_cast<Bucket*>(buffer_handle_.data());
   memset(directory_, 0, alloc_size);
+  return Status::OK();
 }
 
-BloomFilter::BloomFilter(const TBloomFilter& thrift)
-    : BloomFilter(thrift.log_heap_space) {
-  if (!thrift.always_false) {
+Status BloomFilter::Init(const TBloomFilter& thrift) {
+  RETURN_IF_ERROR(Init(thrift.log_bufferpool_space));
+  if (directory_ != nullptr && !thrift.always_false) {
     always_false_ = false;
     DCHECK_EQ(thrift.directory.size(), directory_size());
     memcpy(directory_, &thrift.directory[0], thrift.directory.size());
   }
+  return Status::OK();
 }
 
-BloomFilter::~BloomFilter() {
-  if (directory_) {
-    free(directory_);
-    directory_ = NULL;
+void BloomFilter::Close() {
+  if (directory_ != nullptr) {
+    BufferPool* buffer_pool_ = ExecEnv::GetInstance()->buffer_pool();
+    buffer_pool_->FreeBuffer(buffer_pool_client_, &buffer_handle_);
+    directory_ = nullptr;
   }
 }
 
 void BloomFilter::ToThrift(TBloomFilter* thrift) const {
-  thrift->log_heap_space = log_num_buckets_ + LOG_BUCKET_BYTE_SIZE;
+  thrift->log_bufferpool_space = log_num_buckets_ + LOG_BUCKET_BYTE_SIZE;
   if (always_false_) {
     thrift->always_false = true;
     thrift->always_true = false;
@@ -184,9 +193,9 @@ void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
   DCHECK(!out->always_true);
   DCHECK(!in.always_true);
   if (in.always_false) return;
-  DCHECK_EQ(in.log_heap_space, out->log_heap_space);
+  DCHECK_EQ(in.log_bufferpool_space, out->log_bufferpool_space);
   DCHECK_EQ(in.directory.size(), out->directory.size())
-      << "Equal log heap space " << in.log_heap_space
+      << "Equal log heap space " << in.log_bufferpool_space
       << ", but different directory sizes: " << in.directory.size() << ", "
       << out->directory.size();
   // The trivial loop out[i] |= in[i] should auto-vectorize with gcc at -O3, but it is not
@@ -220,11 +229,11 @@ void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
 //
 // where space is in bits.
 
-size_t BloomFilter::MaxNdv(const int log_heap_space, const double fpp) {
-  DCHECK(log_heap_space > 0 && log_heap_space < 61);
+size_t BloomFilter::MaxNdv(const int log_bufferpool_space, const double fpp) {
+  DCHECK(log_bufferpool_space > 0 && log_bufferpool_space < 61);
   DCHECK(0 < fpp && fpp < 1);
   static const double ik = 1.0 / BUCKET_WORDS;
-  return -1 * ik * (1ull << (log_heap_space + 3)) * log(1 - pow(fpp, ik));
+  return -1 * ik * (1ull << (log_bufferpool_space + 3)) * log(1 - pow(fpp, ik));
 }
 
 int BloomFilter::MinLogSpace(const size_t ndv, const double fpp) {
@@ -237,9 +246,9 @@ int BloomFilter::MinLogSpace(const size_t ndv, const double fpp) {
   return max(0, static_cast<int>(ceil(log2(m / 8))));
 }
 
-double BloomFilter::FalsePositiveProb(const size_t ndv, const int log_heap_space) {
+double BloomFilter::FalsePositiveProb(const size_t ndv, const int log_bufferpool_space) {
   return pow(1 - exp((-1.0 * static_cast<double>(BUCKET_WORDS) * static_cast<double>(ndv))
-                     / static_cast<double>(1ull << (log_heap_space + 3))),
+                     / static_cast<double>(1ull << (log_bufferpool_space + 3))),
       BUCKET_WORDS);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/be/src/util/bloom-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 9628402..73cb01e 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -28,6 +28,7 @@
 #include "common/compiler-util.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/macros.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "util/cpu-info.h"
 #include "util/hash-util.h"
 
@@ -41,7 +42,7 @@ namespace impala {
 /// When talking about Bloom filter size, rather than talking about 'size', which might be
 /// ambiguous, we distinguish two different quantities:
 ///
-/// 1. Space: the amount of heap memory used
+/// 1. Space: the amount of buffer pool memory used
 ///
 /// 2. NDV: the number of unique items that have been inserted
 ///
@@ -57,11 +58,19 @@ namespace impala {
 /// so LLVM will not generate exception related code at their call sites.
 class BloomFilter {
  public:
-  /// Consumes at most (1 << log_heap_space) bytes on the heap.
-  explicit BloomFilter(const int log_heap_space);
-  explicit BloomFilter(const TBloomFilter& thrift);
+  /// Consumes at most (1 << log_bufferpool_space) bytes from the buffer pool client.
+  /// 'client' should be a valid registered BufferPool Client and should have enough
+  /// reservation to fulfill allocation for 'directory_'.
+  explicit BloomFilter(BufferPool::ClientHandle* client);
   ~BloomFilter();
 
+  /// Reset the filter state, allocate/reallocate and initialize the 'directory_'. All
+  /// calls to Insert() and Find() should only be done between the calls to Init() and
+  /// Close().Init and Close are safe to call multiple times.
+  Status Init(const int log_bufferpool_space);
+  Status Init(const TBloomFilter& thrift);
+  void Close();
+
   /// Representation of a filter which allows all elements to pass.
   static constexpr BloomFilter* const ALWAYS_TRUE_FILTER = NULL;
 
@@ -87,29 +96,32 @@ class BloomFilter {
 
   /// As more distinct items are inserted into a BloomFilter, the false positive rate
   /// rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter
-  /// constructed with (1 << log_heap_space) bytes of heap space hits false positive
+  /// constructed with (1 << log_bufferpool_space) bytes of heap space hits false positive
   /// probabilty fpp.
-  static size_t MaxNdv(const int log_heap_space, const double fpp);
+  static size_t MaxNdv(const int log_bufferpool_space, const double fpp);
 
   /// If we expect to fill a Bloom filter with 'ndv' different unique elements and we
   /// want a false positive probabilty of less than 'fpp', then this is the log (base 2)
   /// of the minimum number of bytes we need.
   static int MinLogSpace(const size_t ndv, const double fpp);
 
-  /// Returns the expected false positive rate for the given ndv and log_heap_space
-  static double FalsePositiveProb(const size_t ndv, const int log_heap_space);
+  /// Returns the expected false positive rate for the given ndv and log_bufferpool_space
+  static double FalsePositiveProb(const size_t ndv, const int log_bufferpool_space);
 
-  /// Returns amount of heap space used, in bytes
-  int64_t GetHeapSpaceUsed() const { return sizeof(Bucket) * (1LL << log_num_buckets_); }
+  /// Returns the amount of buffer pool space used (in bytes). A value of -1 means that
+  /// 'directory_' has not been allocated which can happen if the object was just created
+  /// and Init() hasn't been called or Init() failed or Close() was called on the object.
+  int64_t GetBufferPoolSpaceUsed() const {
+    return directory_ == nullptr ? -1 : sizeof(Bucket) * (1LL << log_num_buckets_);
+  }
 
-  static int64_t GetExpectedHeapSpaceUsed(uint32_t log_heap_size) {
-    DCHECK_GE(log_heap_size, LOG_BUCKET_WORD_BITS);
-    return sizeof(Bucket) * (1LL << (log_heap_size - LOG_BUCKET_WORD_BITS));
+  static int64_t GetExpectedMemoryUsed(int log_heap_size) {
+    return sizeof(Bucket) * (1LL << std::max(1, log_heap_size - LOG_BUCKET_WORD_BITS));
   }
 
  private:
   // always_false_ is true when the bloom filter hasn't had any elements inserted.
-  bool always_false_;
+  bool always_false_ = true;
 
   /// The BloomFilter is divided up into Buckets
   static const uint64_t BUCKET_WORDS = 8;
@@ -128,13 +140,18 @@ class BloomFilter {
   typedef BucketWord Bucket[BUCKET_WORDS];
 
   /// log_num_buckets_ is the log (base 2) of the number of buckets in the directory.
-  const int log_num_buckets_;
+  int log_num_buckets_ = 0;
 
   /// directory_mask_ is (1 << log_num_buckets_) - 1. It is precomputed for
   /// efficiency reasons.
-  const uint32_t directory_mask_;
+  uint32_t directory_mask_ = 0;
+
+  Bucket* directory_ = nullptr;
 
-  Bucket* directory_;
+  /// Bufferpool client and handle used for allocating and freeing directory memory.
+  /// Client is not owned by the filter.
+  BufferPool::ClientHandle* buffer_pool_client_;
+  BufferPool::BufferHandle buffer_handle_;
 
   // Same as Insert(), but skips the CPU check and assumes that AVX is not available.
   void InsertNoAvx2(const uint32_t hash) noexcept;
@@ -168,7 +185,7 @@ class BloomFilter {
   /// Serializes this filter as Thrift.
   void ToThrift(TBloomFilter* thrift) const;
 
-  /// Some constants used in hashing. #defined for efficiency reasons.
+/// Some constants used in hashing. #defined for efficiency reasons.
 #define IMPALA_BLOOM_HASH_CONSTANTS                                             \
   0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU, 0x705495c7U, 0x2df1424bU, \
       0x9efc4947U, 0x5c6bfb31U
@@ -188,6 +205,7 @@ class BloomFilter {
 // a split Bloom filter, but log2(256) * 8 = 64 random bits for a standard Bloom filter.
 
 inline void ALWAYS_INLINE BloomFilter::Insert(const uint32_t hash) noexcept {
+  DCHECK(directory_ != nullptr);
   always_false_ = false;
   const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
   if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
@@ -199,6 +217,7 @@ inline void ALWAYS_INLINE BloomFilter::Insert(const uint32_t hash) noexcept {
 
 inline bool ALWAYS_INLINE BloomFilter::Find(const uint32_t hash) const noexcept {
   if (always_false_) return false;
+  DCHECK(directory_ != nullptr);
   const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
   if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
     return BucketFindAVX2(bucket_idx, hash);
@@ -207,6 +226,6 @@ inline bool ALWAYS_INLINE BloomFilter::Find(const uint32_t hash) const noexcept
   }
 }
 
-}  // namespace impala
+} // namespace impala
 
-#endif  // IMPALA_UTIL_BLOOM_H
+#endif // IMPALA_UTIL_BLOOM_H

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 36c4a7e..412ca06 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -69,4 +69,8 @@ struct TBackendGflags {
   21: required i32 max_nonhdfs_partitions_parallel_load
 
   22: required TReservedWordsVersion reserved_words_version
+
+  23: required double max_filter_error_rate
+
+  24: required i64 min_buffer_size
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index bf00424..5cddc78 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -785,17 +785,17 @@ struct TPoolConfig {
 }
 
 struct TBloomFilter {
-  // Log_2 of the heap space required for this filter. See BloomFilter::BloomFilter() for
-  // details.
-  1: required i32 log_heap_space
+  // Log_2 of the bufferpool space required for this filter.
+  // See BloomFilter::BloomFilter() for details.
+  1: required i32 log_bufferpool_space
 
   // List of buckets representing the Bloom Filter contents, laid out contiguously in one
   // string for efficiency of (de)serialisation. See BloomFilter::Bucket and
   // BloomFilter::directory_.
   2: binary directory
 
-  // If always_true or always_false is true, 'directory' and 'log_heap_space' are not
-  // meaningful.
+  // If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
+  // not meaningful.
   3: required bool always_true
   4: required bool always_false
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index fedca3c..c5df1cd 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -145,6 +145,10 @@ struct TRuntimeFilterDesc {
 
   // The type of runtime filter to build.
   10: required TRuntimeFilterType type
+
+  // The size of the filter based on the ndv estimate and the min/max limit specified in
+  // the query options. Should be greater than zero for bloom filters, zero otherwise.
+  11: optional i64 filter_size_bytes
 }
 
 // The information contained in subclasses of ScanNode captured in two separate

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index 9c1c8e7..74256c3 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -70,6 +70,10 @@ struct TPlanFragment {
   // sink) in a single instance of this fragment. This is used for an optimization in
   // InitialReservation. Measured in bytes. required in V1
   8: optional i64 initial_reservation_total_claims
+
+  // The total memory (in bytes) required for the runtime filters used by the plan nodes
+  // managed by this fragment. Is included in min_reservation_bytes.
+  9: optional i64 runtime_filters_reservation_bytes
 }
 
 // location information for a single scan range

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 3b3ace7..775219f 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.planner;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -29,6 +30,7 @@ import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.planner.PlanNode.ExecPhaseResourceProfiles;
+import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPartitionType;
 import org.apache.impala.thrift.TPlanFragment;
@@ -113,6 +115,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   // of this fragment. Computed in computeResourceProfile().
   private long initialReservationTotalClaims_ = -1;
 
+  // The total memory (in bytes) required for the runtime filters used by the plan nodes
+  // managed by this fragment.
+  private long runtimeFiltersReservationBytes_ = 0;
+
   /**
    * C'tor for fragment with specific partition; the output is by default broadcast.
    */
@@ -219,13 +225,23 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   /**
    * Compute the peak resource profile for an instance of this fragment. Must
    * be called after all the plan nodes and sinks are added to the fragment and resource
-   * profiles of all children fragments are computed.
+   * profiles of all children fragments are computed. Also accounts for the memory used by
+   * runtime filters that are stored at the fragment level.
    */
   public void computeResourceProfile(Analyzer analyzer) {
     // Compute resource profiles for all plan nodes and sinks in the fragment.
     sink_.computeResourceProfile(analyzer.getQueryOptions());
+    HashSet<RuntimeFilterId> filterSet = Sets.newHashSet();
     for (PlanNode node: collectPlanNodes()) {
       node.computeNodeResourceProfile(analyzer.getQueryOptions());
+      for (RuntimeFilter filter: node.getRuntimeFilters()) {
+        // A filter can be a part of both produced and consumed filters in a fragment,
+        // so only add it once.
+        if (!filterSet.contains(filter.getFilterId())) {
+          filterSet.add(filter.getFilterId());
+          runtimeFiltersReservationBytes_ += filter.getFilterSize();
+        }
+      }
     }
 
     if (sink_ instanceof JoinBuildSink) {
@@ -241,10 +257,12 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     // The sink is opened after the plan tree.
     ResourceProfile fInstancePostOpenProfile =
         planTreeProfile.postOpenProfile.sum(sink_.getResourceProfile());
-    resourceProfile_ =
-        planTreeProfile.duringOpenProfile.max(fInstancePostOpenProfile);
-
-    initialReservationTotalClaims_ = sink_.getResourceProfile().getMinReservationBytes();
+    resourceProfile_ = new ResourceProfileBuilder()
+        .setMemEstimateBytes(runtimeFiltersReservationBytes_)
+        .setMinReservationBytes(runtimeFiltersReservationBytes_).build()
+        .sum(planTreeProfile.duringOpenProfile.max(fInstancePostOpenProfile));
+    initialReservationTotalClaims_ = sink_.getResourceProfile().getMinReservationBytes() +
+        runtimeFiltersReservationBytes_;
     for (PlanNode node: collectPlanNodes()) {
       initialReservationTotalClaims_ +=
           node.getNodeResourceProfile().getMinReservationBytes();
@@ -322,9 +340,11 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       Preconditions.checkArgument(initialReservationTotalClaims_ > -1);
       result.setMin_reservation_bytes(resourceProfile_.getMinReservationBytes());
       result.setInitial_reservation_total_claims(initialReservationTotalClaims_);
+      result.setRuntime_filters_reservation_bytes(runtimeFiltersReservationBytes_);
     } else {
       result.setMin_reservation_bytes(0);
       result.setInitial_reservation_total_claims(0);
+      result.setRuntime_filters_reservation_bytes(0);
     }
     return result;
   }
@@ -408,6 +428,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     } else {
       builder.append(resourceProfile_.multiply(getNumInstancesPerHost(mt_dop))
           .getExplainString());
+      if (resourceProfile_.isValid() && runtimeFiltersReservationBytes_ > 0) {
+        builder.append(" runtime-filters-memory=");
+        builder.append(PrintUtils.printBytes(runtimeFiltersReservationBytes_));
+      }
     }
     builder.append("\n");
     return builder.toString();

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index d295bf5..5368b5f 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -44,10 +44,14 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.IdGenerator;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterDesc;
 import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TRuntimeFilterTargetDesc;
 import org.apache.impala.thrift.TRuntimeFilterType;
+import org.apache.impala.util.BitUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +70,13 @@ import com.google.common.collect.Sets;
  * Runtime filters are generated from equi-join predicates but they do not replace the
  * original predicates.
  *
+ * MinMax filters are of a fixed size (except for those used for string type) and
+ * therefore only sizes for bloom filters need to be calculated. These calculations are
+ * based on the NDV estimates of the associated table columns, the min buffer size that
+ * can be allocated by the bufferpool, and the query options. Moreover, it is also bound
+ * by the MIN/MAX_BLOOM_FILTER_SIZE limits which are enforced on the query options before
+ * this phase of planning.
+ *
  * Example: select * from T1, T2 where T1.a = T2.b and T2.c = '1';
  * Assuming that T1 is a fact table and T2 is a significantly smaller dimension table, a
  * runtime filter is constructed at the join node between tables T1 and T2 while building
@@ -81,6 +92,10 @@ public final class RuntimeFilterGenerator {
   private final static Logger LOG =
       LoggerFactory.getLogger(RuntimeFilterGenerator.class);
 
+  // Should be in sync with corresponding values in runtime-filter-bank.cc.
+  private static final long MIN_BLOOM_FILTER_SIZE = 4 * 1024;
+  private static final long MAX_BLOOM_FILTER_SIZE = 512 * 1024 * 1024;
+
   // Map of base table tuple ids to a list of runtime filters that
   // can be applied at the corresponding scan nodes.
   private final Map<TupleId, List<RuntimeFilter>> runtimeFiltersByTid_ =
@@ -90,7 +105,44 @@ public final class RuntimeFilterGenerator {
   private final IdGenerator<RuntimeFilterId> filterIdGenerator =
       RuntimeFilterId.createGenerator();
 
-  private RuntimeFilterGenerator() {};
+  /**
+   * Internal class that encapsulates the max, min and default sizes used for creating
+   * bloom filter objects.
+   */
+  private class FilterSizeLimits {
+    // Maximum filter size, in bytes, rounded up to a power of two.
+    public final long maxVal;
+
+    // Minimum filter size, in bytes, rounded up to a power of two.
+    public final long minVal;
+
+    // Pre-computed default filter size, in bytes, rounded up to a power of two.
+    public final long defaultVal;
+
+    public FilterSizeLimits(TQueryOptions tQueryOptions) {
+      // Round up all limits to a power of two and make sure filter size is more
+      // than the min buffer size that can be allocated by the buffer pool.
+      long maxLimit = tQueryOptions.getRuntime_filter_max_size();
+      long minBufferSize = BackendConfig.INSTANCE.getMinBufferSize();
+      maxVal = BitUtil.roundUpToPowerOf2(Math.max(maxLimit, minBufferSize));
+
+      long minLimit = tQueryOptions.getRuntime_filter_min_size();
+      minLimit = Math.max(minLimit, minBufferSize);
+      // Make sure minVal <= defaultVal <= maxVal
+      minVal = BitUtil.roundUpToPowerOf2(Math.min(minLimit, maxVal));
+
+      long defaultValue = tQueryOptions.getRuntime_bloom_filter_size();
+      defaultValue = Math.max(defaultValue, minVal);
+      defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultValue, maxVal));
+    }
+  };
+
+  // Contains size limits for bloom filters.
+  private FilterSizeLimits bloomFilterSizeLimits_;
+
+  private RuntimeFilterGenerator(TQueryOptions tQueryOptions) {
+    bloomFilterSizeLimits_ = new FilterSizeLimits(tQueryOptions);
+  };
 
   /**
    * Internal representation of a runtime filter. A runtime filter is generated from
@@ -125,6 +177,8 @@ public final class RuntimeFilterGenerator {
     // for the filter. A value of -1 means no estimate is available, and default filter
     // parameters should be used.
     private long ndvEstimate_ = -1;
+    // Size of the filter (in Bytes). Should be greater than zero for bloom filters.
+    private long filterSizeBytes_ = 0;
     // If true, the filter is produced by a broadcast join and there is at least one
     // destination scan node which is in the same fragment as the join; set in
     // DistributedPlanner.createHashJoinFragment().
@@ -196,7 +250,7 @@ public final class RuntimeFilterGenerator {
 
     private RuntimeFilter(RuntimeFilterId filterId, JoinNode filterSrcNode, Expr srcExpr,
         Expr origTargetExpr, Operator exprCmpOp, Map<TupleId, List<SlotId>> targetSlots,
-        TRuntimeFilterType type) {
+        TRuntimeFilterType type, FilterSizeLimits filterSizeLimits) {
       id_ = filterId;
       src_ = filterSrcNode;
       srcExpr_ = srcExpr;
@@ -205,6 +259,7 @@ public final class RuntimeFilterGenerator {
       targetSlotsByTid_ = targetSlots;
       type_ = type;
       computeNdvEstimate();
+      calculateFilterSize(filterSizeLimits);
     }
 
     @Override
@@ -240,6 +295,7 @@ public final class RuntimeFilterGenerator {
       }
       tFilter.setApplied_on_partition_columns(appliedOnPartitionColumns);
       tFilter.setType(type_);
+      tFilter.setFilter_size_bytes(filterSizeBytes_);
       return tFilter;
     }
 
@@ -250,7 +306,7 @@ public final class RuntimeFilterGenerator {
      */
     public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen,
         Analyzer analyzer, Expr joinPredicate, JoinNode filterSrcNode,
-        TRuntimeFilterType type) {
+        TRuntimeFilterType type, FilterSizeLimits filterSizeLimits) {
       Preconditions.checkNotNull(idGen);
       Preconditions.checkNotNull(joinPredicate);
       Preconditions.checkNotNull(filterSrcNode);
@@ -277,7 +333,7 @@ public final class RuntimeFilterGenerator {
         LOG.trace("Generating runtime filter from predicate " + joinPredicate);
       }
       return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, targetExpr,
-          normalizedJoinConjunct.getOp(), targetSlots, type);
+          normalizedJoinConjunct.getOp(), targetSlots, type, filterSizeLimits);
     }
 
     /**
@@ -383,6 +439,7 @@ public final class RuntimeFilterGenerator {
     public RuntimeFilterId getFilterId() { return id_; }
     public TRuntimeFilterType getType() { return type_; }
     public Operator getExprCompOp() { return exprCmpOp_; }
+    public long getFilterSize() { return filterSizeBytes_; }
 
     /**
      * Estimates the selectivity of a runtime filter as the cardinality of the
@@ -415,6 +472,25 @@ public final class RuntimeFilterGenerator {
     }
 
     /**
+     * Sets the filter size (in bytes) required for a bloom filter to achieve the
+     * configured maximum false-positive rate based on the expected NDV. Also bounds the
+     * filter size between the max and minimum filter sizes supplied to it by
+     * 'filterSizeLimits'.
+     */
+    private void calculateFilterSize(FilterSizeLimits filterSizeLimits) {
+      if (type_ == TRuntimeFilterType.MIN_MAX) return;
+      if (ndvEstimate_ == -1) {
+        filterSizeBytes_ = filterSizeLimits.defaultVal;
+        return;
+      }
+      double fpp = BackendConfig.INSTANCE.getMaxFilterErrorRate();
+      int logFilterSize = FeSupport.GetMinLogSpaceForBloomFilter(ndvEstimate_, fpp);
+      filterSizeBytes_ = 1L << logFilterSize;
+      filterSizeBytes_ = Math.max(filterSizeBytes_, filterSizeLimits.minVal);
+      filterSizeBytes_ = Math.min(filterSizeBytes_, filterSizeLimits.maxVal);
+    }
+
+    /**
      * Assigns this runtime filter to the corresponding plan nodes.
      */
     public void assignToPlanNodes() {
@@ -442,7 +518,8 @@ public final class RuntimeFilterGenerator {
     Preconditions.checkNotNull(ctx.getQueryOptions());
     int maxNumBloomFilters = ctx.getQueryOptions().getMax_num_runtime_filters();
     Preconditions.checkState(maxNumBloomFilters >= 0);
-    RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator();
+    RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator(
+        ctx.getQueryOptions());
     filterGenerator.generateFilters(ctx, plan);
     List<RuntimeFilter> filters = Lists.newArrayList(filterGenerator.getRuntimeFilters());
     if (filters.size() > maxNumBloomFilters) {
@@ -516,8 +593,8 @@ public final class RuntimeFilterGenerator {
       List<RuntimeFilter> filters = Lists.newArrayList();
       for (TRuntimeFilterType type : TRuntimeFilterType.values()) {
         for (Expr conjunct : joinConjuncts) {
-          RuntimeFilter filter = RuntimeFilter.create(
-              filterIdGenerator, ctx.getRootAnalyzer(), conjunct, joinNode, type);
+          RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator,
+              ctx.getRootAnalyzer(), conjunct, joinNode, type, bloomFilterSizeLimits_);
           if (filter == null) continue;
           registerRuntimeFilter(filter);
           filters.add(filter);

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 48d417a..3833094 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -77,6 +77,10 @@ public class BackendConfig {
     return backendCfg_.max_nonhdfs_partitions_parallel_load;
   }
 
+  public double getMaxFilterErrorRate() { return backendCfg_.max_filter_error_rate; }
+
+  public long getMinBufferSize() { return backendCfg_.min_buffer_size; }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/fe/src/main/java/org/apache/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index b471448..5bc1d87 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -114,6 +114,8 @@ public class FeSupport {
   public native static byte[] NativeParseQueryOptions(String csvQueryOptions,
       byte[] queryOptions);
 
+  public native static int MinLogSpaceForBloomFilter(long ndv, double fpp);
+
   /**
    * Locally caches the jar at the specified HDFS location.
    *
@@ -324,6 +326,20 @@ public class FeSupport {
   }
 
   /**
+   * Returns the log (base 2) of the minimum number of bytes we need for a Bloom
+   * filter with 'ndv' unique elements and a false positive probability of less
+   * than 'fpp'.
+   */
+  public static int GetMinLogSpaceForBloomFilter(long ndv, double fpp) {
+    try {
+      return MinLogSpaceForBloomFilter(ndv, fpp);
+    } catch (UnsatisfiedLinkError e) {
+      loadLibrary();
+    }
+    return MinLogSpaceForBloomFilter(ndv, fpp);
+  }
+
+  /**
    * This function should only be called explicitly by the FeSupport to ensure that
    * native functions are loaded.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
index 7b4afb8..0f4a5da 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
@@ -56,7 +56,7 @@ PLAN-ROOT SINK
 |  output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=174.39KB
 ====
 # > 3000 rows returned to coordinator: codegen should be enabled
 select * from functional_parquet.alltypes
@@ -71,15 +71,15 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=178.13KB
+   partitions=24/24 files=24 size=174.39KB
 ====
 # Optimisation is enabled for join producing < 3000 rows
 select count(*)
 from functional.alltypes t1
 join functional.alltypestiny t2 on t1.id = t2.id
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=1.94MB
-Per-Host Resource Estimates: Memory=181.94MB
+Max Per-Host Resource Reservation: Memory=2.94MB
+Per-Host Resource Estimates: Memory=182.94MB
 Codegen disabled by planner
 
 PLAN-ROOT SINK


[6/6] impala git commit: IMPALA-6392: Consistent explain format for parquet predicate statistics

Posted by ta...@apache.org.
IMPALA-6392: Consistent explain format for parquet predicate statistics

In EXPLAIN_LEVEL=2+, change the explain format for parquet predicate
statistics to output each tuple descriptor per line. This change is to
make it consistent with the output of other predicates.

Before:
parquet statistics predicates: c_custkey < 10, o_orderkey < 5, l_linenumber < 3

After:
parquet statistics predicates: c_custkey < 10
parquet statistics predicates on o: o_orderkey < 5
parquet statistics predicates on o_lineitems: l_linenumber < 3

Testing:
- Ran existing planner tests and updated the ones that are affected by
  this change.
- Ran end-to-end tests in query_test

Change-Id: Ia3d55ab6a1ae551867a9f68b3622844102cc854e
Reviewed-on: http://gerrit.cloudera.org:8080/9223
Tested-by: Impala Public Jenkins
Reviewed-by: Alex Behm <al...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3d7d8209
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3d7d8209
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3d7d8209

Branch: refs/heads/2.x
Commit: 3d7d8209edf77216b8d990ea5b0eb6a16d06fc07
Parents: 1a632e7
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Tue Feb 6 01:05:14 2018 -0600
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 13 21:10:13 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/planner/HdfsScanNode.java | 46 +++++++++++++++-----
 .../queries/PlannerTest/constant-folding.test   |  3 +-
 .../queries/PlannerTest/mt-dop-validation.test  | 12 +++--
 .../queries/PlannerTest/parquet-filtering.test  |  8 ++--
 4 files changed, 51 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3d7d8209/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 45ad8d6..7735f98 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -225,9 +225,10 @@ public class HdfsScanNode extends ScanNode {
   // data when scanning Parquet files.
   private final List<Expr> minMaxConjuncts_ = Lists.newArrayList();
 
-  // List of PlanNode conjuncts that have been transformed into conjuncts in
-  // 'minMaxConjuncts_'.
-  private final List<Expr> minMaxOriginalConjuncts_ = Lists.newArrayList();
+  // Map from TupleDescriptor to list of PlanNode conjuncts that have been transformed
+  // into conjuncts in 'minMaxConjuncts_'.
+  private final Map<TupleDescriptor, List<Expr>> minMaxOriginalConjuncts_ =
+      Maps.newLinkedHashMap();
 
   // Tuple that is used to materialize statistics when scanning Parquet files. For each
   // column it can contain 0, 1, or 2 slots, depending on whether the column needs to be
@@ -470,10 +471,10 @@ public class HdfsScanNode extends ScanNode {
     BinaryPredicate.Operator op = binaryPred.getOp();
     if (op == BinaryPredicate.Operator.LT || op == BinaryPredicate.Operator.LE ||
         op == BinaryPredicate.Operator.GE || op == BinaryPredicate.Operator.GT) {
-      minMaxOriginalConjuncts_.add(binaryPred);
+      addMinMaxOriginalConjunct(slotRef.getDesc().getParent(), binaryPred);
       buildStatsPredicate(analyzer, slotRef, binaryPred, op);
     } else if (op == BinaryPredicate.Operator.EQ) {
-      minMaxOriginalConjuncts_.add(binaryPred);
+      addMinMaxOriginalConjunct(slotRef.getDesc().getParent(), binaryPred);
       // TODO: this could be optimized for boolean columns.
       buildStatsPredicate(analyzer, slotRef, binaryPred, BinaryPredicate.Operator.LE);
       buildStatsPredicate(analyzer, slotRef, binaryPred, BinaryPredicate.Operator.GE);
@@ -513,11 +514,20 @@ public class HdfsScanNode extends ScanNode {
     BinaryPredicate maxBound = new BinaryPredicate(BinaryPredicate.Operator.LE,
         children.get(0).clone(), max.clone());
 
-    minMaxOriginalConjuncts_.add(inPred);
+    addMinMaxOriginalConjunct(slotRef.getDesc().getParent(), inPred);
     buildStatsPredicate(analyzer, slotRef, minBound, minBound.getOp());
     buildStatsPredicate(analyzer, slotRef, maxBound, maxBound.getOp());
   }
 
+  private void addMinMaxOriginalConjunct(TupleDescriptor tupleDesc, Expr expr) {
+    List<Expr> exprs = minMaxOriginalConjuncts_.get(tupleDesc);
+    if (exprs == null) {
+      exprs = new ArrayList<Expr>();
+      minMaxOriginalConjuncts_.put(tupleDesc, exprs);
+    }
+    exprs.add(expr);
+  }
+
   private void tryComputeMinMaxPredicate(Analyzer analyzer, Expr pred) {
     if (pred instanceof BinaryPredicate) {
       tryComputeBinaryMinMaxPredicate(analyzer, (BinaryPredicate) pred);
@@ -1080,16 +1090,32 @@ public class HdfsScanNode extends ScanNode {
             numPartitionsNoDiskIds_, numPartitions_, numFilesNoDiskIds_,
             totalFiles_, numScanRangesNoDiskIds_, scanRanges_.size()));
       }
-      if (!minMaxOriginalConjuncts_.isEmpty()) {
-        output.append(String.format("%sparquet statistics predicates: %s\n",
-            detailPrefix, getExplainString(minMaxOriginalConjuncts_)));
-      }
+      // Groups the min max original conjuncts by tuple descriptor.
+      output.append(getMinMaxOriginalConjunctsExplainString(detailPrefix));
       // Groups the dictionary filterable conjuncts by tuple descriptor.
       output.append(getDictionaryConjunctsExplainString(detailPrefix));
     }
     return output.toString();
   }
 
+  // Helper method that prints min max original conjuncts by tuple descriptor.
+  private String getMinMaxOriginalConjunctsExplainString(String prefix) {
+    StringBuilder output = new StringBuilder();
+    for (Map.Entry<TupleDescriptor, List<Expr>> entry :
+        minMaxOriginalConjuncts_.entrySet()) {
+      TupleDescriptor tupleDesc = entry.getKey();
+      List<Expr> exprs = entry.getValue();
+      if (tupleDesc == getTupleDesc()) {
+        output.append(String.format("%sparquet statistics predicates: %s\n", prefix,
+            getExplainString(exprs)));
+      } else {
+        output.append(String.format("%sparquet statistics predicates on %s: %s\n",
+            prefix, tupleDesc.getAlias(), getExplainString(exprs)));
+      }
+    }
+    return output.toString();
+  }
+
   // Helper method that prints the dictionary filterable conjuncts by tuple descriptor.
   private String getDictionaryConjunctsExplainString(String prefix) {
     StringBuilder output = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/impala/blob/3d7d8209/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 2b2d5ef..f25ad0a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -54,7 +54,8 @@ PLAN-ROOT SINK
      table: rows=150000 size=292.36MB
      columns missing stats: c_orders
    extrapolated-rows=disabled
-   parquet statistics predicates: c_custkey > 10, o_orderkey = 4
+   parquet statistics predicates: c_custkey > 10
+   parquet statistics predicates on o: o_orderkey = 4
    parquet dictionary predicates: c_custkey > 10
    parquet dictionary predicates on o: o_orderkey = 4
    parquet dictionary predicates on o_lineitems: 20 + l_linenumber < 0

http://git-wip-us.apache.org/repos/asf/impala/blob/3d7d8209/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index f3a46de..61d646b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -251,7 +251,9 @@ PLAN-ROOT SINK
      table: rows=150000 size=292.36MB
      columns missing stats: c_orders
    extrapolated-rows=disabled
-   parquet statistics predicates: c_custkey < 10, o_orderkey < 5, l_linenumber < 3
+   parquet statistics predicates: c_custkey < 10
+   parquet statistics predicates on o: o_orderkey < 5
+   parquet statistics predicates on o_lineitems: l_linenumber < 3
    parquet dictionary predicates: c_custkey < 10
    parquet dictionary predicates on o: o_orderkey < 5
    parquet dictionary predicates on o_lineitems: l_linenumber < 3
@@ -314,7 +316,9 @@ Per-Host Resources: mem-estimate=264.00MB mem-reservation=0B
      table: rows=150000 size=292.36MB
      columns missing stats: c_orders
    extrapolated-rows=disabled
-   parquet statistics predicates: c_custkey < 10, o_orderkey < 5, l_linenumber < 3
+   parquet statistics predicates: c_custkey < 10
+   parquet statistics predicates on o: o_orderkey < 5
+   parquet statistics predicates on o_lineitems: l_linenumber < 3
    parquet dictionary predicates: c_custkey < 10
    parquet dictionary predicates on o: o_orderkey < 5
    parquet dictionary predicates on o_lineitems: l_linenumber < 3
@@ -368,7 +372,7 @@ PLAN-ROOT SINK
      table: rows=150000 size=292.36MB
      columns missing stats: c_orders, c_orders
    extrapolated-rows=disabled
-   parquet statistics predicates: o1.o_orderkey < 5
+   parquet statistics predicates on o1: o1.o_orderkey < 5
    parquet dictionary predicates on o1: o1.o_orderkey < 5
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000
@@ -421,7 +425,7 @@ Per-Host Resources: mem-estimate=269.81MB mem-reservation=5.81MB
      table: rows=150000 size=292.36MB
      columns missing stats: c_orders, c_orders
    extrapolated-rows=disabled
-   parquet statistics predicates: o1.o_orderkey < 5
+   parquet statistics predicates on o1: o1.o_orderkey < 5
    parquet dictionary predicates on o1: o1.o_orderkey < 5
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000

http://git-wip-us.apache.org/repos/asf/impala/blob/3d7d8209/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index e7dee4e..2b602c9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -150,7 +150,7 @@ PLAN-ROOT SINK
      table: rows=unavailable size=unavailable
      columns missing stats: id
    extrapolated-rows=disabled
-   parquet statistics predicates: a.item.e < -10
+   parquet statistics predicates on a: a.item.e < -10
    parquet dictionary predicates on a: a.item.e < -10
    mem-estimate=32.00MB mem-reservation=0B
    tuple-ids=0 row-size=24B cardinality=unavailable
@@ -327,7 +327,9 @@ PLAN-ROOT SINK
      table: rows=150000 size=292.36MB
      columns missing stats: c_orders
    extrapolated-rows=disabled
-   parquet statistics predicates: c_custkey > 0, o.o_orderkey > 0, l.l_partkey > 0
+   parquet statistics predicates: c_custkey > 0
+   parquet statistics predicates on o: o.o_orderkey > 0
+   parquet statistics predicates on l: l.l_partkey > 0
    parquet dictionary predicates: c_custkey > 0
    parquet dictionary predicates on o: o.o_orderkey > 0
    parquet dictionary predicates on l: l.l_partkey > 0
@@ -435,7 +437,7 @@ PLAN-ROOT SINK
      table: rows=150000 size=292.36MB
      columns missing stats: c_orders
    extrapolated-rows=disabled
-   parquet statistics predicates: l.l_shipdate = '1994-08-19', l.l_receiptdate = '1994-08-24', l.l_shipmode = 'RAIL', l.l_returnflag = 'R'
+   parquet statistics predicates on l: l.l_shipdate = '1994-08-19', l.l_receiptdate = '1994-08-24', l.l_shipmode = 'RAIL', l.l_returnflag = 'R'
    parquet dictionary predicates on l: l.l_shipdate = '1994-08-19', l.l_receiptdate = '1994-08-24', l.l_shipmode = 'RAIL', l.l_returnflag = 'R'
    mem-estimate=176.00MB mem-reservation=0B
    tuple-ids=0 row-size=50B cardinality=150000


[3/6] impala git commit: IMPALA-5440: Add planner tests with extreme statistics values

Posted by ta...@apache.org.
IMPALA-5440: Add planner tests with extreme statistics values

This commit address some of the issues in JIRA: tests against the cardinality
overflowing from JOIN, UNION, CROSS JOIN, FULL OUTER JOIN,
0 row number and negative row number, as well as cardinality on Subplan node.

Change-Id: I86dec47cf1438882cafaec53e97864ccfcdff3cb
Reviewed-on: http://gerrit.cloudera.org:8080/9065
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4e29a7f9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4e29a7f9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4e29a7f9

Branch: refs/heads/2.x
Commit: 4e29a7f91a35aec7371b2431080ec20082296471
Parents: 3660122
Author: Xinran Yu Tinney <xy...@cloudera.com>
Authored: Thu Jan 18 15:27:31 2018 -0600
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 13 07:44:19 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/planner/PlannerTest.java  | 77 ++++++++++++++++++++
 .../apache/impala/planner/PlannerTestBase.java  | 40 ++++++++++
 2 files changed, 117 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4e29a7f9/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 760334d..5dbba75 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -504,4 +504,81 @@ public class PlannerTest extends PlannerTestBase {
     options.setExplain_level(TExplainLevel.EXTENDED);
     runPlannerTestFile("min-max-runtime-filters", options);
   }
+
+  @Test
+  public void testCardinalityOverflow() throws ImpalaException {
+    String tblName = "tpch.cardinality_overflow";
+    String colDefs = "("
+        + "l_orderkey BIGINT, "
+        + "l_partkey BIGINT, "
+        + "l_suppkey BIGINT, "
+        + "l_linenumber INT, "
+        + "l_shipmode STRING, "
+        + "l_comment STRING"
+        + ")";
+    String tblLocation = "LOCATION "
+        + "'hdfs://localhost:20500/test-warehouse/tpch.lineitem'";
+    String tblPropsTemplate = "TBLPROPERTIES('numRows'='%s')";
+    String tblProps = String.format(tblPropsTemplate, Long.toString(Long.MAX_VALUE));
+
+    addTestTable(String.format("CREATE EXTERNAL TABLE %s %s %s %s;",
+        tblName, colDefs, tblLocation, tblProps));
+
+    // CROSS JOIN query: tests that multiplying the input cardinalities does not overflow
+    // the cross-join's estimated cardinality
+    String query = "select * from tpch.cardinality_overflow a,"
+        + "tpch.cardinality_overflow b, tpch.cardinality_overflow c";
+    checkCardinality(query, 0, Long.MAX_VALUE);
+
+    // FULL OUTER JOIN query: tests that adding the input cardinalities does not overflow
+    // the full outer join's estimated cardinality
+    query = "select a.l_comment from tpch.cardinality_overflow a full outer join "
+        + "tpch.cardinality_overflow b on a.l_orderkey = b.l_partkey";
+    checkCardinality(query, 0, Long.MAX_VALUE);
+
+    // UNION query: tests that adding the input cardinalities does not overflow
+    // the union's estimated cardinality
+    query = "select l_shipmode from tpch.cardinality_overflow "
+        + "union select l_comment from tpch.cardinality_overflow";
+    checkCardinality(query, 0, Long.MAX_VALUE);
+
+    // JOIN query: tests that multiplying the input cardinalities does not overflow
+    // the join's estimated cardinality
+    query = "select a.l_comment from tpch.cardinality_overflow a inner join "
+        + "tpch.cardinality_overflow b on a.l_linenumber < b.l_orderkey";
+    checkCardinality(query, 0, Long.MAX_VALUE);
+
+    // creates an empty table and tests that the cardinality is 0
+    tblName = "tpch.ex_customer_cardinality_zero";
+    tblProps = String.format(tblPropsTemplate, 0);
+    addTestTable(String.format("CREATE EXTERNAL TABLE  %s %s %s %s;",
+        tblName, colDefs, tblLocation, tblProps));
+    query = "select * from tpch.ex_customer_cardinality_zero";
+    checkCardinality(query, 0, 0);
+
+    // creates a table with negative row count and
+    // tests that the cardinality is not negative
+    tblName = "tpch.ex_customer_cardinality_neg";
+    tblProps = String.format(tblPropsTemplate, -1);
+    addTestTable(String.format("CREATE EXTERNAL TABLE  %s %s %s %s;",
+        tblName, colDefs, tblLocation, tblProps));
+    query = "select * from tpch.ex_customer_cardinality_neg";
+    checkCardinality(query, -1, Long.MAX_VALUE);
+
+    // SUBPLAN query: tests that adding the input cardinalities does not overflow
+    // the SUBPLAN's estimated cardinality
+    tblName = "functional_parquet.cardinality_overflow";
+    colDefs = "("
+        + "id BIGINT, "
+        + "int_array ARRAY<INT>"
+        + ")";
+    String storedAs = "STORED AS PARQUET";
+    tblLocation = "LOCATION "
+        + "'hdfs://localhost:20500/test-warehouse/complextypestbl_parquet'";
+    tblProps = String.format(tblPropsTemplate, Long.toString(Long.MAX_VALUE));
+    addTestTable(String.format("CREATE EXTERNAL TABLE  %s %s %s %s %s;",
+        tblName, colDefs, storedAs, tblLocation, tblProps));
+    query = "select id from functional_parquet.cardinality_overflow t, t.int_array";
+    checkCardinality(query, 0, Long.MAX_VALUE);
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/4e29a7f9/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index bc93160..7c47d74 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -36,6 +36,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.ColumnLineageGraph;
 import org.apache.impala.analysis.DescriptorTable;
+import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.common.ImpalaException;
@@ -665,6 +666,45 @@ public class PlannerTestBase extends FrontendTestBase {
     }
   }
 
+  /**
+   * This function plans the given query and fails if the estimated cardinalities are
+   * not within the specified bounds [min, max].
+   */
+  protected void checkCardinality(String query, long min, long max)
+        throws ImpalaException {
+    TQueryCtx queryCtx = TestUtils.createQueryContext(Catalog.DEFAULT_DB,
+        System.getProperty("user.name"));
+    queryCtx.client_request.setStmt(query);
+    StringBuilder explainBuilder = new StringBuilder();
+    TExecRequest execRequest = frontend_.createExecRequest(queryCtx, explainBuilder);
+
+    if (!execRequest.isSetQuery_exec_request()
+        || execRequest.query_exec_request == null
+        || execRequest.query_exec_request.plan_exec_info == null) {
+      return;
+    }
+    for (TPlanExecInfo execInfo : execRequest.query_exec_request.plan_exec_info) {
+      for (TPlanFragment planFragment : execInfo.fragments) {
+        if (!planFragment.isSetPlan() || planFragment.plan == null) continue;
+        for (TPlanNode node : planFragment.plan.nodes) {
+          if (node.estimated_stats == null) {
+            fail("Query: " + query + " has no estimated statistics");
+          }
+          long cardinality = node.estimated_stats.cardinality;
+          if (cardinality < min || cardinality > max) {
+            StringBuilder errorLog = new StringBuilder();
+            errorLog.append("Query: " + query + "\n");
+            errorLog.append(
+                "Expected cardinality estimate between " + min + " and " + max + "\n");
+            errorLog.append("Actual cardinality estimate: " + cardinality + "\n");
+            errorLog.append("In node id " + node.node_id + "\n");
+            fail(errorLog.toString());
+          }
+        }
+      }
+    }
+  }
+
   private void checkColumnLineage(TestCase testCase, TExecRequest execRequest,
       StringBuilder errorLog, StringBuilder actualOutput) {
     String query = testCase.getQuery();


[4/6] impala git commit: IMPALA-5519: Allocate fragment's runtime filter memory from Buffer pool

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
index 540df5b..bb12bca 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
@@ -5,7 +5,7 @@ on ss_customer_sk = c_customer_sk
 where c_salutation = 'Mrs.'
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=184.50MB mem-reservation=8.50MB
+|  Per-Host Resources: mem-estimate=185.50MB mem-reservation=9.50MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -86,7 +86,7 @@ on ss_customer_sk = c_customer_sk
 where c_salutation = 'Mrs.'
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=184.50MB mem-reservation=8.50MB
+|  Per-Host Resources: mem-estimate=185.50MB mem-reservation=9.50MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -126,7 +126,7 @@ on ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number
 where sr_return_quantity < 10
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=212.75MB mem-reservation=4.75MB
+|  Per-Host Resources: mem-estimate=214.75MB mem-reservation=6.75MB runtime-filters-memory=2.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -165,7 +165,7 @@ tpcds.store_sales inner join tpcds.web_sales
 on ss_sold_time_sk = ws_sold_time_sk
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=396.67MB mem-reservation=34.00MB
+|  Per-Host Resources: mem-estimate=397.67MB mem-reservation=35.00MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -203,7 +203,7 @@ on a.d_date_sk = b.d_date_sk
 where a.d_holiday = "Y"
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=113.00MB mem-reservation=17.00MB
+|  Per-Host Resources: mem-estimate=114.00MB mem-reservation=18.00MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -246,7 +246,7 @@ where ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number
   and d1.d_fy_week_seq = 1000
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=357.81MB mem-reservation=7.75MB
+|  Per-Host Resources: mem-estimate=362.81MB mem-reservation=12.75MB runtime-filters-memory=5.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -334,7 +334,7 @@ tpcds.store_sales inner join tpcds.customer
 on ss_customer_sk % 10 = c_customer_sk / 100
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=210.00MB mem-reservation=34.00MB
+|  Per-Host Resources: mem-estimate=211.00MB mem-reservation=35.00MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -372,7 +372,7 @@ tpcds.store_sales inner join tpcds_seq_snap.customer
 on ss_customer_sk = c_customer_sk
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=2.17GB mem-reservation=34.00MB
+|  Per-Host Resources: mem-estimate=2.17GB mem-reservation=35.00MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -409,7 +409,7 @@ tpcds_seq_snap.store_sales inner join tpcds.customer
 on ss_customer_sk = c_customer_sk
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=177.94MB mem-reservation=1.94MB
+|  Per-Host Resources: mem-estimate=178.94MB mem-reservation=2.94MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -448,7 +448,7 @@ tpcds.store_sales inner join
 on ss_sold_time_sk = ws_sold_time_sk
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=298.00MB mem-reservation=3.88MB
+|  Per-Host Resources: mem-estimate=299.00MB mem-reservation=4.88MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
index d62e0ad..96015e0 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
@@ -4,8 +4,8 @@ select straight_join *
 from tpch_parquet.customer
     inner join tpch_parquet.nation on c_nationkey = n_nationkey
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=16.94MB
-Per-Host Resource Estimates: Memory=56.94MB
+Max Per-Host Resource Reservation: Memory=17.94MB
+Per-Host Resource Estimates: Memory=57.94MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -17,7 +17,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=355B cardinality=150000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
-Per-Host Resources: mem-estimate=40.94MB mem-reservation=16.94MB
+Per-Host Resources: mem-estimate=41.94MB mem-reservation=17.94MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: c_nationkey = n_nationkey
 |  fk/pk conjuncts: c_nationkey = n_nationkey
@@ -32,19 +32,19 @@ Per-Host Resources: mem-estimate=40.94MB mem-reservation=16.94MB
 |  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=16.00MB mem-reservation=0B
 |  01:SCAN HDFS [tpch_parquet.nation, RANDOM]
-|     partitions=1/1 files=1 size=2.96KB
+|     partitions=1/1 files=1 size=2.74KB
 |     stored statistics:
-|       table: rows=25 size=2.96KB
+|       table: rows=25 size=2.74KB
 |       columns: all
 |     extrapolated-rows=disabled
 |     mem-estimate=16.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=117B cardinality=25
 |
 00:SCAN HDFS [tpch_parquet.customer, RANDOM]
-   partitions=1/1 files=1 size=12.34MB
+   partitions=1/1 files=1 size=12.31MB
    runtime filters: RF000[bloom] -> c_nationkey
    stored statistics:
-     table: rows=150000 size=12.34MB
+     table: rows=150000 size=12.31MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=24.00MB mem-reservation=0B
@@ -83,18 +83,18 @@ Per-Host Resources: mem-estimate=380.41MB mem-reservation=46.00MB
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 |  Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
 |  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
-|     partitions=1/1 files=2 size=54.20MB
+|     partitions=1/1 files=2 size=54.07MB
 |     stored statistics:
-|       table: rows=1500000 size=54.20MB
+|       table: rows=1500000 size=54.07MB
 |       columns: all
 |     extrapolated-rows=disabled
 |     mem-estimate=40.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=191B cardinality=1500000
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.93MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.93MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -131,18 +131,18 @@ Per-Host Resources: mem-estimate=114.00MB mem-reservation=34.00MB
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 |  Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
 |  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
-|     partitions=1/1 files=2 size=54.20MB
+|     partitions=1/1 files=2 size=54.07MB
 |     stored statistics:
-|       table: rows=1500000 size=54.20MB
+|       table: rows=1500000 size=54.07MB
 |       columns: all
 |     extrapolated-rows=disabled
 |     mem-estimate=40.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=8B cardinality=1500000
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.93MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.93MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -156,8 +156,8 @@ from tpch_parquet.lineitem
 group by 1, 2
 having count(*) = 1
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=111.00MB
-Per-Host Resource Estimates: Memory=251.12MB
+Max Per-Host Resource Reservation: Memory=113.00MB
+Per-Host Resource Estimates: Memory=253.12MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -182,7 +182,7 @@ Per-Host Resources: mem-estimate=46.00MB mem-reservation=46.00MB
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=85.12MB mem-reservation=65.00MB
+Per-Host Resources: mem-estimate=86.12MB mem-reservation=66.00MB runtime-filters-memory=1.00MB
 03:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: l_orderkey, o_orderstatus
@@ -203,9 +203,9 @@ Per-Host Resources: mem-estimate=85.12MB mem-reservation=65.00MB
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 |  Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
 |  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
-|     partitions=1/1 files=2 size=54.20MB
+|     partitions=1/1 files=2 size=54.07MB
 |     stored statistics:
-|       table: rows=1500000 size=54.20MB
+|       table: rows=1500000 size=54.07MB
 |       columns: all
 |     extrapolated-rows=disabled
 |     mem-estimate=40.00MB mem-reservation=0B
@@ -216,12 +216,12 @@ Per-Host Resources: mem-estimate=85.12MB mem-reservation=65.00MB
 |  tuple-ids=0 row-size=8B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=81.00MB mem-reservation=1.00MB runtime-filters-memory=1.00MB
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.93MB
+   partitions=1/1 files=3 size=193.73MB
    runtime filters: RF000[bloom] -> l_orderkey
    stored statistics:
-     table: rows=6001215 size=193.93MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -263,9 +263,9 @@ Per-Host Resources: mem-estimate=1.69GB mem-reservation=34.00MB
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.93MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.93MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -310,9 +310,9 @@ Per-Host Resources: mem-estimate=281.46MB mem-reservation=34.00MB
 |  tuple-ids=1 row-size=32B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.93MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.93MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
index b9b12fa..7f6d96b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
@@ -99,7 +99,7 @@ select count(*) from functional_kudu.alltypes a, functional_parquet.alltypes b,
 where a.int_col = b.int_col and a.int_col = c.int_col
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=2.02GB mem-reservation=35.94MB
+|  Per-Host Resources: mem-estimate=2.02GB mem-reservation=36.94MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -127,7 +127,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=8B cardinality=7300
 |
 |--01:SCAN HDFS [functional_parquet.alltypes b]
-|     partitions=24/24 files=24 size=179.19KB
+|     partitions=24/24 files=24 size=174.39KB
 |     runtime filters: RF000[bloom] -> b.int_col
 |     stored statistics:
 |       table: rows=unavailable size=unavailable

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 3c1fd44..261e8fe 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -10,9 +10,9 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -33,9 +33,9 @@ PLAN-ROOT SINK
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -56,9 +56,9 @@ PLAN-ROOT SINK
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=160.00MB mem-reservation=0B
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -383,9 +383,9 @@ PLAN-ROOT SINK
 |  tuple-ids=1 row-size=16B cardinality=1563438
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -424,9 +424,9 @@ Per-Host Resources: mem-estimate=114.00MB mem-reservation=34.00MB
 |  tuple-ids=1 row-size=16B cardinality=1563438
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -465,9 +465,9 @@ Per-Host Resources: mem-estimate=228.00MB mem-reservation=68.00MB
 |  tuple-ids=1 row-size=16B cardinality=1563438
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -490,9 +490,9 @@ PLAN-ROOT SINK
 |  tuple-ids=1 row-size=8B cardinality=1
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=1.00MB mem-reservation=0B
@@ -523,9 +523,9 @@ Per-Host Resources: mem-estimate=11.00MB mem-reservation=0B
 |  tuple-ids=1 row-size=8B cardinality=1
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=1.00MB mem-reservation=0B
@@ -556,9 +556,9 @@ Per-Host Resources: mem-estimate=180.00MB mem-reservation=0B
 |  tuple-ids=1 row-size=8B cardinality=1
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -583,9 +583,9 @@ PLAN-ROOT SINK
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -612,9 +612,9 @@ Per-Host Resources: mem-estimate=120.00MB mem-reservation=12.00MB
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -641,9 +641,9 @@ Per-Host Resources: mem-estimate=240.00MB mem-reservation=24.00MB
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -669,9 +669,9 @@ PLAN-ROOT SINK
 |  tuple-ids=1 row-size=263B cardinality=100
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -699,9 +699,9 @@ Per-Host Resources: mem-estimate=80.03MB mem-reservation=0B
 |  tuple-ids=1 row-size=263B cardinality=100
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -729,9 +729,9 @@ Per-Host Resources: mem-estimate=160.05MB mem-reservation=0B
 |  tuple-ids=1 row-size=263B cardinality=100
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -741,11 +741,11 @@ Per-Host Resources: mem-estimate=160.05MB mem-reservation=0B
 select *
 from tpch.lineitem inner join tpch.orders on l_orderkey = o_orderkey
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=34.00MB
-Per-Host Resource Estimates: Memory=476.41MB
+Max Per-Host Resource Reservation: Memory=35.00MB
+Per-Host Resource Estimates: Memory=477.41MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=476.41MB mem-reservation=34.00MB
+|  Per-Host Resources: mem-estimate=477.41MB mem-reservation=35.00MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -775,8 +775,8 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=34.00MB
-Per-Host Resource Estimates: Memory=476.41MB
+Max Per-Host Resource Reservation: Memory=35.00MB
+Per-Host Resource Estimates: Memory=477.41MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -788,7 +788,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=388.41MB mem-reservation=34.00MB
+Per-Host Resources: mem-estimate=389.41MB mem-reservation=35.00MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: l_orderkey = o_orderkey
 |  fk/pk conjuncts: l_orderkey = o_orderkey
@@ -821,8 +821,8 @@ Per-Host Resources: mem-estimate=388.41MB mem-reservation=34.00MB
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=68.00MB
-Per-Host Resource Estimates: Memory=952.83MB
+Max Per-Host Resource Reservation: Memory=70.00MB
+Per-Host Resource Estimates: Memory=954.83MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -834,7 +834,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=776.83MB mem-reservation=68.00MB
+Per-Host Resources: mem-estimate=778.83MB mem-reservation=70.00MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash-table-id=00
 |  hash predicates: l_orderkey = o_orderkey
@@ -879,11 +879,11 @@ Per-Host Resources: mem-estimate=776.83MB mem-reservation=68.00MB
 select *
 from tpch.lineitem inner join /* +shuffle */ tpch.orders on l_orderkey = o_orderkey
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=34.00MB
-Per-Host Resource Estimates: Memory=476.41MB
+Max Per-Host Resource Reservation: Memory=35.00MB
+Per-Host Resource Estimates: Memory=477.41MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=476.41MB mem-reservation=34.00MB
+|  Per-Host Resources: mem-estimate=477.41MB mem-reservation=35.00MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -913,8 +913,8 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=34.00MB
-Per-Host Resource Estimates: Memory=276.14MB
+Max Per-Host Resource Reservation: Memory=36.00MB
+Per-Host Resource Estimates: Memory=278.14MB
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -926,7 +926,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=100.14MB mem-reservation=34.00MB
+Per-Host Resources: mem-estimate=101.14MB mem-reservation=35.00MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_orderkey = o_orderkey
 |  fk/pk conjuncts: l_orderkey = o_orderkey
@@ -954,7 +954,7 @@ Per-Host Resources: mem-estimate=100.14MB mem-reservation=34.00MB
 |  tuple-ids=0 row-size=263B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=89.00MB mem-reservation=1.00MB runtime-filters-memory=1.00MB
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000[bloom] -> l_orderkey
@@ -965,8 +965,8 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=68.00MB
-Per-Host Resource Estimates: Memory=452.14MB
+Max Per-Host Resource Reservation: Memory=72.00MB
+Per-Host Resource Estimates: Memory=456.14MB
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -978,7 +978,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
-Per-Host Resources: mem-estimate=100.14MB mem-reservation=68.00MB
+Per-Host Resources: mem-estimate=102.14MB mem-reservation=70.00MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash-table-id=00
 |  hash predicates: l_orderkey = o_orderkey
@@ -1014,7 +1014,7 @@ Per-Host Resources: mem-estimate=100.14MB mem-reservation=68.00MB
 |  tuple-ids=0 row-size=263B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=176.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=178.00MB mem-reservation=2.00MB runtime-filters-memory=1.00MB
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000[bloom] -> l_orderkey
@@ -1503,11 +1503,11 @@ select l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
 from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
 where l_shipmode = 'F'
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=51.00MB
-Per-Host Resource Estimates: Memory=139.58MB
+Max Per-Host Resource Reservation: Memory=54.00MB
+Per-Host Resource Estimates: Memory=142.58MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=139.58MB mem-reservation=51.00MB
+|  Per-Host Resources: mem-estimate=142.58MB mem-reservation=54.00MB runtime-filters-memory=3.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -1533,11 +1533,11 @@ PLAN-ROOT SINK
 |  |     tuple-ids=6 row-size=8B cardinality=1500000
 |  |
 |  08:SCAN HDFS [tpch_parquet.lineitem]
-|     partitions=1/1 files=3 size=193.72MB
+|     partitions=1/1 files=3 size=193.73MB
 |     predicates: l_shipmode = 'F'
 |     runtime filters: RF004[bloom] -> l_orderkey
 |     stored statistics:
-|       table: rows=6001215 size=193.72MB
+|       table: rows=6001215 size=193.73MB
 |       columns: all
 |     extrapolated-rows=disabled
 |     parquet statistics predicates: l_shipmode = 'F'
@@ -1565,10 +1565,10 @@ PLAN-ROOT SINK
 |  |     tuple-ids=4 row-size=32B cardinality=300000
 |  |
 |  05:SCAN HDFS [tpch_parquet.lineitem]
-|     partitions=1/1 files=3 size=193.72MB
+|     partitions=1/1 files=3 size=193.73MB
 |     runtime filters: RF002[bloom] -> l_orderkey
 |     stored statistics:
-|       table: rows=6001215 size=193.72MB
+|       table: rows=6001215 size=193.73MB
 |       columns: all
 |     extrapolated-rows=disabled
 |     mem-estimate=80.00MB mem-reservation=0B
@@ -1596,11 +1596,11 @@ PLAN-ROOT SINK
 |     tuple-ids=1 row-size=8B cardinality=1500000
 |
 01:SCAN HDFS [tpch_parquet.lineitem]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    predicates: l_tax > 10
    runtime filters: RF000[bloom] -> l_orderkey
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    parquet statistics predicates: l_tax > 10
@@ -1608,8 +1608,8 @@ PLAN-ROOT SINK
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=78B cardinality=600122
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=72.75MB
-Per-Host Resource Estimates: Memory=344.33MB
+Max Per-Host Resource Reservation: Memory=76.75MB
+Per-Host Resource Estimates: Memory=348.33MB
 
 F09:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -1621,7 +1621,7 @@ PLAN-ROOT SINK
 |  tuple-ids=7 row-size=70B cardinality=2549844
 |
 F08:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=97.00MB mem-reservation=34.00MB
+Per-Host Resources: mem-estimate=99.00MB mem-reservation=36.00MB runtime-filters-memory=2.00MB
 00:UNION
 |  pass-through-operands: 14
 |  mem-estimate=0B mem-reservation=0B
@@ -1650,11 +1650,11 @@ Per-Host Resources: mem-estimate=97.00MB mem-reservation=34.00MB
 |  |     tuple-ids=6 row-size=8B cardinality=1500000
 |  |
 |  08:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-|     partitions=1/1 files=3 size=193.72MB
+|     partitions=1/1 files=3 size=193.73MB
 |     predicates: l_shipmode = 'F'
 |     runtime filters: RF004[bloom] -> l_orderkey
 |     stored statistics:
-|       table: rows=6001215 size=193.72MB
+|       table: rows=6001215 size=193.73MB
 |       columns: all
 |     extrapolated-rows=disabled
 |     parquet statistics predicates: l_shipmode = 'F'
@@ -1688,10 +1688,10 @@ Per-Host Resources: mem-estimate=97.00MB mem-reservation=34.00MB
 |  |     tuple-ids=4 row-size=32B cardinality=300000
 |  |
 |  05:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-|     partitions=1/1 files=3 size=193.72MB
+|     partitions=1/1 files=3 size=193.73MB
 |     runtime filters: RF002[bloom] -> l_orderkey
 |     stored statistics:
-|       table: rows=6001215 size=193.72MB
+|       table: rows=6001215 size=193.73MB
 |       columns: all
 |     extrapolated-rows=disabled
 |     mem-estimate=80.00MB mem-reservation=0B
@@ -1707,7 +1707,7 @@ Per-Host Resources: mem-estimate=97.00MB mem-reservation=34.00MB
 |  tuple-ids=2 row-size=70B cardinality=575772
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=47.33MB mem-reservation=38.75MB
+Per-Host Resources: mem-estimate=48.33MB mem-reservation=39.75MB runtime-filters-memory=1.00MB
 04:AGGREGATE [STREAMING]
 |  group by: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
 |  mem-estimate=42.58MB mem-reservation=34.00MB spill-buffer=2.00MB
@@ -1740,13 +1740,13 @@ Per-Host Resources: mem-estimate=47.33MB mem-reservation=38.75MB
 |  tuple-ids=0 row-size=78B cardinality=600122
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=81.00MB mem-reservation=1.00MB runtime-filters-memory=1.00MB
 01:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    predicates: l_tax > 10
    runtime filters: RF000[bloom] -> l_orderkey
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    parquet statistics predicates: l_tax > 10
@@ -1754,8 +1754,8 @@ Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=78B cardinality=600122
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=141.75MB
-Per-Host Resource Estimates: Memory=684.91MB
+Max Per-Host Resource Reservation: Memory=149.75MB
+Per-Host Resource Estimates: Memory=692.91MB
 
 F09:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -1767,7 +1767,7 @@ PLAN-ROOT SINK
 |  tuple-ids=7 row-size=70B cardinality=2549844
 |
 F08:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=194.00MB mem-reservation=68.00MB
+Per-Host Resources: mem-estimate=198.00MB mem-reservation=72.00MB runtime-filters-memory=2.00MB
 00:UNION
 |  pass-through-operands: 14
 |  mem-estimate=0B mem-reservation=0B
@@ -1804,11 +1804,11 @@ Per-Host Resources: mem-estimate=194.00MB mem-reservation=68.00MB
 |  |     tuple-ids=6 row-size=8B cardinality=1500000
 |  |
 |  08:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-|     partitions=1/1 files=3 size=193.72MB
+|     partitions=1/1 files=3 size=193.73MB
 |     predicates: l_shipmode = 'F'
 |     runtime filters: RF004[bloom] -> l_orderkey
 |     stored statistics:
-|       table: rows=6001215 size=193.72MB
+|       table: rows=6001215 size=193.73MB
 |       columns: all
 |     extrapolated-rows=disabled
 |     parquet statistics predicates: l_shipmode = 'F'
@@ -1850,10 +1850,10 @@ Per-Host Resources: mem-estimate=194.00MB mem-reservation=68.00MB
 |  |     tuple-ids=4 row-size=32B cardinality=300000
 |  |
 |  05:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-|     partitions=1/1 files=3 size=193.72MB
+|     partitions=1/1 files=3 size=193.73MB
 |     runtime filters: RF002[bloom] -> l_orderkey
 |     stored statistics:
-|       table: rows=6001215 size=193.72MB
+|       table: rows=6001215 size=193.73MB
 |       columns: all
 |     extrapolated-rows=disabled
 |     mem-estimate=80.00MB mem-reservation=0B
@@ -1869,7 +1869,7 @@ Per-Host Resources: mem-estimate=194.00MB mem-reservation=68.00MB
 |  tuple-ids=2 row-size=70B cardinality=575772
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
-Per-Host Resources: mem-estimate=90.91MB mem-reservation=73.75MB
+Per-Host Resources: mem-estimate=92.91MB mem-reservation=75.75MB runtime-filters-memory=1.00MB
 04:AGGREGATE [STREAMING]
 |  group by: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
 |  mem-estimate=42.58MB mem-reservation=34.00MB spill-buffer=2.00MB
@@ -1910,13 +1910,13 @@ Per-Host Resources: mem-estimate=90.91MB mem-reservation=73.75MB
 |  tuple-ids=0 row-size=78B cardinality=600122
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=160.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=162.00MB mem-reservation=2.00MB runtime-filters-memory=1.00MB
 01:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    predicates: l_tax > 10
    runtime filters: RF000[bloom] -> l_orderkey
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    parquet statistics predicates: l_tax > 10
@@ -1960,11 +1960,11 @@ order by
   o_orderdate
 limit 100
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=81.25MB
-Per-Host Resource Estimates: Memory=393.18MB
+Max Per-Host Resource Reservation: Memory=84.25MB
+Per-Host Resource Estimates: Memory=396.18MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=393.18MB mem-reservation=81.25MB
+|  Per-Host Resources: mem-estimate=396.18MB mem-reservation=84.25MB runtime-filters-memory=3.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -2044,8 +2044,8 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=2 row-size=16B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=152.12MB
-Per-Host Resource Estimates: Memory=516.66MB
+Max Per-Host Resource Reservation: Memory=159.12MB
+Per-Host Resource Estimates: Memory=523.66MB
 
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -2076,7 +2076,7 @@ Per-Host Resources: mem-estimate=62.96MB mem-reservation=34.00MB
 |  tuple-ids=6 row-size=100B cardinality=600122
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=118.33MB mem-reservation=84.12MB
+Per-Host Resources: mem-estimate=121.33MB mem-reservation=87.12MB runtime-filters-memory=3.00MB
 08:AGGREGATE [STREAMING]
 |  output: sum(l_quantity)
 |  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
@@ -2151,7 +2151,7 @@ Per-Host Resources: mem-estimate=118.33MB mem-reservation=84.12MB
 |  |  tuple-ids=1 row-size=50B cardinality=1500000
 |  |
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
-|  Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=90.00MB mem-reservation=2.00MB runtime-filters-memory=2.00MB
 |  01:SCAN HDFS [tpch.orders, RANDOM]
 |     partitions=1/1 files=1 size=162.56MB
 |     runtime filters: RF000[bloom] -> o_orderkey, RF002[bloom] -> o_custkey
@@ -2167,7 +2167,7 @@ Per-Host Resources: mem-estimate=118.33MB mem-reservation=84.12MB
 |  tuple-ids=2 row-size=16B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=90.00MB mem-reservation=2.00MB runtime-filters-memory=2.00MB
 02:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000[bloom] -> tpch.lineitem.l_orderkey, RF004[bloom] -> l_orderkey
@@ -2178,8 +2178,8 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=2 row-size=16B cardinality=6001215
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=258.88MB
-Per-Host Resource Estimates: Memory=977.44MB
+Max Per-Host Resource Reservation: Memory=272.88MB
+Per-Host Resource Estimates: Memory=991.44MB
 
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -2210,7 +2210,7 @@ Per-Host Resources: mem-estimate=125.93MB mem-reservation=68.00MB
 |  tuple-ids=6 row-size=100B cardinality=600122
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
-Per-Host Resources: mem-estimate=180.79MB mem-reservation=122.88MB
+Per-Host Resources: mem-estimate=186.79MB mem-reservation=128.88MB runtime-filters-memory=3.00MB
 08:AGGREGATE [STREAMING]
 |  output: sum(l_quantity)
 |  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
@@ -2309,7 +2309,7 @@ Per-Host Resources: mem-estimate=180.79MB mem-reservation=122.88MB
 |  |  tuple-ids=1 row-size=50B cardinality=1500000
 |  |
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
-|  Per-Host Resources: mem-estimate=176.00MB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=180.00MB mem-reservation=4.00MB runtime-filters-memory=2.00MB
 |  01:SCAN HDFS [tpch.orders, RANDOM]
 |     partitions=1/1 files=1 size=162.56MB
 |     runtime filters: RF000[bloom] -> o_orderkey, RF002[bloom] -> o_custkey
@@ -2325,7 +2325,7 @@ Per-Host Resources: mem-estimate=180.79MB mem-reservation=122.88MB
 |  tuple-ids=2 row-size=16B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=176.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=180.00MB mem-reservation=4.00MB runtime-filters-memory=2.00MB
 02:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
    runtime filters: RF000[bloom] -> tpch.lineitem.l_orderkey, RF004[bloom] -> l_orderkey
@@ -2960,11 +2960,11 @@ join (
   ) v2 on v2.k3 = t2.o_orderkey
 ) v1 on v1.k3 = t1.o_orderkey
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=68.00MB
-Per-Host Resource Estimates: Memory=177.00MB
+Max Per-Host Resource Reservation: Memory=71.00MB
+Per-Host Resource Estimates: Memory=180.00MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=177.00MB mem-reservation=68.00MB
+|  Per-Host Resources: mem-estimate=180.00MB mem-reservation=71.00MB runtime-filters-memory=3.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -3028,8 +3028,8 @@ PLAN-ROOT SINK
    mem-estimate=40.00MB mem-reservation=0B
    tuple-ids=0 row-size=191B cardinality=1500000
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=59.50MB
-Per-Host Resource Estimates: Memory=223.27MB
+Max Per-Host Resource Reservation: Memory=64.50MB
+Per-Host Resource Estimates: Memory=228.27MB
 
 F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -3041,7 +3041,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1,2,3 row-size=215B cardinality=1500000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
-Per-Host Resources: mem-estimate=77.77MB mem-reservation=34.00MB
+Per-Host Resources: mem-estimate=78.77MB mem-reservation=35.00MB runtime-filters-memory=1.00MB
 06:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: t1.o_orderkey = t3.o_orderkey
 |  fk/pk conjuncts: t1.o_orderkey = t3.o_orderkey
@@ -3054,7 +3054,7 @@ Per-Host Resources: mem-estimate=77.77MB mem-reservation=34.00MB
 |  |  tuple-ids=1,2,3 row-size=24B cardinality=1500000
 |  |
 |  F04:PLAN FRAGMENT [HASH(t3.o_orderkey)] hosts=2 instances=2
-|  Per-Host Resources: mem-estimate=25.50MB mem-reservation=25.50MB
+|  Per-Host Resources: mem-estimate=27.50MB mem-reservation=27.50MB runtime-filters-memory=2.00MB
 |  05:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  hash predicates: t2.o_orderkey = t3.o_orderkey
 |  |  fk/pk conjuncts: t2.o_orderkey = t3.o_orderkey
@@ -3089,7 +3089,7 @@ Per-Host Resources: mem-estimate=77.77MB mem-reservation=34.00MB
 |  |  |  tuple-ids=2 row-size=8B cardinality=1500000
 |  |  |
 |  |  F02:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
-|  |  Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
+|  |  Per-Host Resources: mem-estimate=41.00MB mem-reservation=1.00MB runtime-filters-memory=1.00MB
 |  |  02:SCAN HDFS [tpch_parquet.orders t3, RANDOM]
 |  |     partitions=1/1 files=2 size=54.07MB
 |  |     runtime filters: RF004[bloom] -> t3.o_orderkey
@@ -3105,7 +3105,7 @@ Per-Host Resources: mem-estimate=77.77MB mem-reservation=34.00MB
 |  |  tuple-ids=1 row-size=8B cardinality=1500000
 |  |
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
-|  Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=41.00MB mem-reservation=1.00MB runtime-filters-memory=1.00MB
 |  01:SCAN HDFS [tpch_parquet.orders t2, RANDOM]
 |     partitions=1/1 files=2 size=54.07MB
 |     runtime filters: RF002[bloom] -> t2.o_orderkey
@@ -3126,8 +3126,8 @@ Per-Host Resources: mem-estimate=77.77MB mem-reservation=34.00MB
    mem-estimate=40.00MB mem-reservation=0B
    tuple-ids=0 row-size=191B cardinality=1500000
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=94.50MB
-Per-Host Resource Estimates: Memory=422.03MB
+Max Per-Host Resource Reservation: Memory=104.50MB
+Per-Host Resource Estimates: Memory=432.03MB
 
 F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -3139,7 +3139,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1,2,3 row-size=215B cardinality=1500000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
-Per-Host Resources: mem-estimate=155.53MB mem-reservation=68.00MB
+Per-Host Resources: mem-estimate=157.53MB mem-reservation=70.00MB runtime-filters-memory=1.00MB
 06:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash-table-id=00
 |  hash predicates: t1.o_orderkey = t3.o_orderkey
@@ -3160,7 +3160,7 @@ Per-Host Resources: mem-estimate=155.53MB mem-reservation=68.00MB
 |  |  tuple-ids=1,2,3 row-size=24B cardinality=1500000
 |  |
 |  F04:PLAN FRAGMENT [HASH(t3.o_orderkey)] hosts=2 instances=4
-|  Per-Host Resources: mem-estimate=26.50MB mem-reservation=26.50MB
+|  Per-Host Resources: mem-estimate=30.50MB mem-reservation=30.50MB runtime-filters-memory=2.00MB
 |  05:HASH JOIN [INNER JOIN, PARTITIONED]
 |  |  hash-table-id=01
 |  |  hash predicates: t2.o_orderkey = t3.o_orderkey
@@ -3211,7 +3211,7 @@ Per-Host Resources: mem-estimate=155.53MB mem-reservation=68.00MB
 |  |  |  tuple-ids=2 row-size=8B cardinality=1500000
 |  |  |
 |  |  F02:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
-|  |  Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
+|  |  Per-Host Resources: mem-estimate=82.00MB mem-reservation=2.00MB runtime-filters-memory=1.00MB
 |  |  02:SCAN HDFS [tpch_parquet.orders t3, RANDOM]
 |  |     partitions=1/1 files=2 size=54.07MB
 |  |     runtime filters: RF004[bloom] -> t3.o_orderkey
@@ -3227,7 +3227,7 @@ Per-Host Resources: mem-estimate=155.53MB mem-reservation=68.00MB
 |  |  tuple-ids=1 row-size=8B cardinality=1500000
 |  |
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
-|  Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=82.00MB mem-reservation=2.00MB runtime-filters-memory=1.00MB
 |  01:SCAN HDFS [tpch_parquet.orders t2, RANDOM]
 |     partitions=1/1 files=2 size=54.07MB
 |     runtime filters: RF002[bloom] -> t2.o_orderkey

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
index 8900a98..03273bc 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
@@ -3,8 +3,8 @@ select straight_join *
 from tpch_parquet.customer
     inner join tpch_parquet.nation on c_nationkey = n_nationkey
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=1.94MB
-Per-Host Resource Estimates: Memory=41.94MB
+Max Per-Host Resource Reservation: Memory=2.94MB
+Per-Host Resource Estimates: Memory=42.94MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -16,7 +16,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=355B cardinality=150000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
-Per-Host Resources: mem-estimate=25.94MB mem-reservation=1.94MB
+Per-Host Resources: mem-estimate=26.94MB mem-reservation=2.94MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: c_nationkey = n_nationkey
 |  fk/pk conjuncts: c_nationkey = n_nationkey
@@ -49,8 +49,8 @@ Per-Host Resources: mem-estimate=25.94MB mem-reservation=1.94MB
    mem-estimate=24.00MB mem-reservation=0B
    tuple-ids=0 row-size=238B cardinality=150000
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=3.88MB
-Per-Host Resource Estimates: Memory=83.88MB
+Max Per-Host Resource Reservation: Memory=5.88MB
+Per-Host Resource Estimates: Memory=85.88MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -62,7 +62,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=355B cardinality=150000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
-Per-Host Resources: mem-estimate=51.88MB mem-reservation=3.88MB
+Per-Host Resources: mem-estimate=53.88MB mem-reservation=5.88MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash-table-id=00
 |  hash predicates: c_nationkey = n_nationkey
@@ -144,9 +144,9 @@ Per-Host Resources: mem-estimate=380.41MB mem-reservation=34.00MB
 |     tuple-ids=1 row-size=191B cardinality=1500000
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -196,9 +196,9 @@ Per-Host Resources: mem-estimate=760.83MB mem-reservation=68.00MB
 |     tuple-ids=1 row-size=191B cardinality=1500000
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -209,8 +209,8 @@ select straight_join *
 from tpch_parquet.orders
     join /*+shuffle*/ tpch_parquet.customer on o_custkey = c_custkey
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=34.00MB
-Per-Host Resource Estimates: Memory=98.00MB
+Max Per-Host Resource Reservation: Memory=36.00MB
+Per-Host Resource Estimates: Memory=100.00MB
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -222,7 +222,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=428B cardinality=1500000
 |
 F02:PLAN FRAGMENT [HASH(o_custkey)] hosts=2 instances=2
-Per-Host Resources: mem-estimate=34.00MB mem-reservation=34.00MB
+Per-Host Resources: mem-estimate=35.00MB mem-reservation=35.00MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: o_custkey = c_custkey
 |  fk/pk conjuncts: o_custkey = c_custkey
@@ -250,7 +250,7 @@ Per-Host Resources: mem-estimate=34.00MB mem-reservation=34.00MB
 |  tuple-ids=0 row-size=191B cardinality=1500000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
-Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=41.00MB mem-reservation=1.00MB runtime-filters-memory=1.00MB
 00:SCAN HDFS [tpch_parquet.orders, RANDOM]
    partitions=1/1 files=2 size=54.07MB
    runtime filters: RF000[bloom] -> o_custkey
@@ -261,8 +261,8 @@ Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
    mem-estimate=40.00MB mem-reservation=0B
    tuple-ids=0 row-size=191B cardinality=1500000
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=34.00MB
-Per-Host Resource Estimates: Memory=162.00MB
+Max Per-Host Resource Reservation: Memory=38.00MB
+Per-Host Resource Estimates: Memory=166.00MB
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -274,7 +274,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=428B cardinality=1500000
 |
 F02:PLAN FRAGMENT [HASH(o_custkey)] hosts=2 instances=4
-Per-Host Resources: mem-estimate=34.00MB mem-reservation=34.00MB
+Per-Host Resources: mem-estimate=36.00MB mem-reservation=36.00MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash-table-id=00
 |  hash predicates: o_custkey = c_custkey
@@ -310,7 +310,7 @@ Per-Host Resources: mem-estimate=34.00MB mem-reservation=34.00MB
 |  tuple-ids=0 row-size=191B cardinality=1500000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
-Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=82.00MB mem-reservation=2.00MB runtime-filters-memory=1.00MB
 00:SCAN HDFS [tpch_parquet.orders, RANDOM]
    partitions=1/1 files=2 size=54.07MB
    runtime filters: RF000[bloom] -> o_custkey
@@ -326,8 +326,8 @@ select straight_join *
 from tpch_parquet.orders
     join /*+broadcast*/ tpch_parquet.customer on o_custkey = c_custkey
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=34.00MB
-Per-Host Resource Estimates: Memory=101.38MB
+Max Per-Host Resource Reservation: Memory=35.00MB
+Per-Host Resource Estimates: Memory=102.38MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -339,7 +339,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=428B cardinality=1500000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
-Per-Host Resources: mem-estimate=77.38MB mem-reservation=34.00MB
+Per-Host Resources: mem-estimate=78.38MB mem-reservation=35.00MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: o_custkey = c_custkey
 |  fk/pk conjuncts: o_custkey = c_custkey
@@ -372,8 +372,8 @@ Per-Host Resources: mem-estimate=77.38MB mem-reservation=34.00MB
    mem-estimate=40.00MB mem-reservation=0B
    tuple-ids=0 row-size=191B cardinality=1500000
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=68.00MB
-Per-Host Resource Estimates: Memory=202.76MB
+Max Per-Host Resource Reservation: Memory=70.00MB
+Per-Host Resource Estimates: Memory=204.76MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -385,7 +385,7 @@ PLAN-ROOT SINK
 |  tuple-ids=0,1 row-size=428B cardinality=1500000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
-Per-Host Resources: mem-estimate=154.76MB mem-reservation=68.00MB
+Per-Host Resources: mem-estimate=156.76MB mem-reservation=70.00MB runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash-table-id=00
 |  hash predicates: o_custkey = c_custkey
@@ -629,8 +629,8 @@ from tpch_parquet.lineitem
 group by 1, 2
 having count(*) = 1
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=85.00MB
-Per-Host Resource Estimates: Memory=225.12MB
+Max Per-Host Resource Reservation: Memory=87.00MB
+Per-Host Resource Estimates: Memory=227.12MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -655,7 +655,7 @@ Per-Host Resources: mem-estimate=34.00MB mem-reservation=34.00MB
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=71.12MB mem-reservation=51.00MB
+Per-Host Resources: mem-estimate=72.12MB mem-reservation=52.00MB runtime-filters-memory=1.00MB
 03:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: l_orderkey, o_orderstatus
@@ -689,19 +689,19 @@ Per-Host Resources: mem-estimate=71.12MB mem-reservation=51.00MB
 |  tuple-ids=0 row-size=8B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=81.00MB mem-reservation=1.00MB runtime-filters-memory=1.00MB
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    runtime filters: RF000[bloom] -> l_orderkey
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=119.00MB
-Per-Host Resource Estimates: Memory=359.00MB
+Max Per-Host Resource Reservation: Memory=123.00MB
+Per-Host Resource Estimates: Memory=363.00MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
@@ -726,7 +726,7 @@ Per-Host Resources: mem-estimate=34.00MB mem-reservation=34.00MB
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
-Per-Host Resources: mem-estimate=85.00MB mem-reservation=85.00MB
+Per-Host Resources: mem-estimate=87.00MB mem-reservation=87.00MB runtime-filters-memory=1.00MB
 03:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: l_orderkey, o_orderstatus
@@ -768,12 +768,12 @@ Per-Host Resources: mem-estimate=85.00MB mem-reservation=85.00MB
 |  tuple-ids=0 row-size=8B cardinality=6001215
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=160.00MB mem-reservation=0B
+Per-Host Resources: mem-estimate=162.00MB mem-reservation=2.00MB runtime-filters-memory=1.00MB
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    runtime filters: RF000[bloom] -> l_orderkey
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -814,9 +814,9 @@ Per-Host Resources: mem-estimate=1.69GB mem-reservation=34.00MB
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B
@@ -853,9 +853,9 @@ Per-Host Resources: mem-estimate=3.39GB mem-reservation=68.00MB
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.72MB
+   partitions=1/1 files=3 size=193.73MB
    stored statistics:
-     table: rows=6001215 size=193.72MB
+     table: rows=6001215 size=193.73MB
      columns: all
    extrapolated-rows=disabled
    mem-estimate=80.00MB mem-reservation=0B

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index 4d39a47..48163ec 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -157,7 +157,7 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=3/24 files=3 size=22.67KB
+   partitions=3/24 files=3 size=22.07KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -172,7 +172,7 @@ select id from functional.alltypes t1 where exists (
   where t1.id = t2.id)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=161.94MB mem-reservation=1.94MB
+|  Per-Host Resources: mem-estimate=162.94MB mem-reservation=2.94MB runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
index a84ec34..119c2fd 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
@@ -29,7 +29,7 @@ from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
 ---- CATCH
 minimum memory reservation is greater than memory available to the
  query for buffer reservations. Memory reservation needed given the
- current plan: 4.75 MB. Set mem_limit to at least 79.75 MB.
+ current plan: 6.75 MB. Set mem_limit to at least 81.75 MB.
 ====
 ---- QUERY
 set mem_limit=80mb;
@@ -38,5 +38,5 @@ from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
 ---- CATCH
 minimum memory reservation is greater than memory available to the
  query for buffer reservations. Memory reservation needed given the
- current plan: 17.00 MB. Set mem_limit to at least 92.00 MB.
+ current plan: 18.00 MB. Set mem_limit to at least 93.00 MB.
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
index ace4596..b7a6123 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
@@ -2,28 +2,20 @@
 ---- QUERY
 ####################################################
 # Test case 1: bloom filters with high expected FP rate get disabled.
-# To trigger this path, we have to trick the planner into estimating a too-small
-# build-side cardinality, which will cause the BF size to be estimated low (and therefore
-# the FP rate to be high). We do this by using predicates that are completely unselective,
-# but which the planner thinks have relatively high selectivity.
-# Kudu doesn't support bloom filters, so it just doesn't filter anything.
+# To trigger this path, we limit the size of the filter to a much lower value than
+# required to achieve the desired FP rate.
 ####################################################
 
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
 SET RUNTIME_FILTER_MODE=GLOBAL;
-SET RUNTIME_FILTER_MAX_SIZE=4K;
-select STRAIGHT_JOIN count(*) from alltypes a
-    join [BROADCAST]
-    # Build-side needs to be sufficiently large to trigger FP check.
-    (select id, int_col from alltypes UNION ALL select id, int_col from alltypes) b
-        on a.id = b.id
-        # Predicates that are always true (but planner thinks are selective)
-        where (b.id - b.id) < 1 AND (b.int_col - b.int_col) < 1;
+SET RUNTIME_FILTER_MAX_SIZE=64K;
+with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
+select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
+    join (select * from l LIMIT 500000) b on a.l_orderkey = -b.l_orderkey;
 ---- RESULTS
-14600
+0
 ---- RUNTIME_PROFILE
 row_regex: .*0 of 1 Runtime Filter Published, 1 Disabled.*
-row_regex: .*Rows rejected: 0 .*
 ====
 
 
@@ -33,20 +25,20 @@ row_regex: .*Rows rejected: 0 .*
 ####################################################
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
-SET RUNTIME_FILTER_MIN_SIZE=4KB;
+SET RUNTIME_FILTER_MIN_SIZE=64KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
-    join (select * from l LIMIT 1) b on a.l_orderkey = -b.l_orderkey;
+    join (select * from l LIMIT 125000) b on a.l_orderkey = -b.l_orderkey;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
 row_regex: .*1 of 1 Runtime Filter Published.*
-row_regex: .*Filter 0 \(4.00 KB\).*
+row_regex: .*Filter 0 \(64.00 KB\).*
 ====
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
-SET RUNTIME_FILTER_MIN_SIZE=4KB;
+SET RUNTIME_FILTER_MIN_SIZE=64KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 500000) b on a.l_orderkey = -b.l_orderkey;
@@ -59,7 +51,7 @@ row_regex: .*Filter 0 \(256.00 KB\).*
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
-SET RUNTIME_FILTER_MIN_SIZE=4KB;
+SET RUNTIME_FILTER_MIN_SIZE=64KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey;
@@ -72,7 +64,7 @@ row_regex: .*Filter 0 \(512.00 KB\).*
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
-SET RUNTIME_FILTER_MIN_SIZE=4KB;
+SET RUNTIME_FILTER_MIN_SIZE=64KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;
@@ -91,36 +83,57 @@ row_regex: .*Filter 0 \(1.00 MB\).*
 ####################################################
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
-SET RUNTIME_FILTER_MIN_SIZE=8KB;
-SET RUNTIME_FILTER_MAX_SIZE=8KB;
-# This query would produce a 4KB filter without setting the minimum size.
+SET RUNTIME_FILTER_MIN_SIZE=128KB;
+SET RUNTIME_FILTER_MAX_SIZE=128KB;
+# This query would produce a 64KB filter without setting the minimum size.
 select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id;
 ---- RESULTS
 7300
 ---- RUNTIME_PROFILE
 row_regex: .*1 of 1 Runtime Filter Published.*
-row_regex: .*Filter 0 \(8.00 KB\).*
+row_regex: .*Filter 0 \(128.00 KB\).*
 ====
 ---- QUERY
 # Check that filter sizes are rounded up to power-of-two
-SET RUNTIME_FILTER_MIN_SIZE=6000B;
-SET RUNTIME_FILTER_MAX_SIZE=6000B;
+SET RUNTIME_FILTER_MIN_SIZE=80000B;
+SET RUNTIME_FILTER_MAX_SIZE=80000B;
 select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id;
 ---- RESULTS
 7300
 ---- RUNTIME_PROFILE
 row_regex: .*1 of 1 Runtime Filter Published.*
-row_regex: .*Filter 0 \(8.00 KB\).*
+row_regex: .*Filter 0 \(128.00 KB\).*
 ====
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
-SET RUNTIME_FILTER_MAX_SIZE=8192;
+SET RUNTIME_FILTER_MAX_SIZE=64KB;
 # Query would produce a 512KB filter without setting the max
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey;
 ---- RUNTIME_PROFILE
 row_regex: .*0 of 1 Runtime Filter Published.*
-row_regex: .*Filter 0 \(8.00 KB\).*
+row_regex: .*Filter 0 \(64.00 KB\).*
+====
+
+
+
+---- QUERY
+####################################################
+# Test case 4: Filter size is >= the min buffer size that can be allocated by the
+# buffer pool
+####################################################
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
+SET RUNTIME_BLOOM_FILTER_SIZE=4KB;
+# The min buffer size is set to 64KB for end to end tests. This query would
+# produce a 4KB filter if the min buffer size limit bound is not enforced.
+select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id;
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(64.00 KB\).*
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-query/queries/QueryTest/bloom_filters_wait.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters_wait.test b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters_wait.test
index 1ed6668..f766b8c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters_wait.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters_wait.test
@@ -2,21 +2,18 @@
 ---- QUERY
 ####################################################
 # Regression test for IMPALA-3141: Disabled filters should send dummy filters
-# to unblock waiters.
+# to unblock waiters. We limit the size of the filter to a much lower value than required
+# to achieve the desired FP rate so that it gets rejected/disabled during FP rate check.
 ####################################################
 
 SET RUNTIME_FILTER_WAIT_TIME_MS=600000;
 SET RUNTIME_FILTER_MODE=GLOBAL;
-SET RUNTIME_FILTER_MAX_SIZE=4096;
-select STRAIGHT_JOIN count(*) from alltypes a
-    join [BROADCAST]
-    # Build-side needs to be sufficiently large to trigger FP check.
-    (select id, int_col from alltypes UNION ALL select id, int_col from alltypes) b
-        on a.id = b.id
-        # Predicates that are always true (but planner thinks are selective)
-        where (b.id - b.id) < 1 AND (b.int_col - b.int_col) < 1;
+SET RUNTIME_FILTER_MAX_SIZE=64K;
+with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
+select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
+    join (select * from l LIMIT 500000) b on a.l_orderkey = -b.l_orderkey;
 ---- RESULTS
-14600
+0
 ---- RUNTIME_PROFILE
 row_regex: .*0 of 1 Runtime Filter Published, 1 Disabled.*
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-query/queries/QueryTest/explain-level0.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level0.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level0.test
index 9ee1a65..92abd5f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level0.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level0.test
@@ -5,8 +5,8 @@ explain
 select *
 from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Max Per-Host Resource Reservation: Memory=34.00MB'
-'Per-Host Resource Estimates: Memory=476.41MB'
+'Max Per-Host Resource Reservation: Memory=35.00MB'
+'Per-Host Resource Estimates: Memory=477.41MB'
 ''
 'PLAN-ROOT SINK'
 '04:EXCHANGE [UNPARTITIONED]'

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
index 64b11e6..e65745a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
@@ -5,8 +5,8 @@ explain
 select *
 from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Max Per-Host Resource Reservation: Memory=34.00MB'
-'Per-Host Resource Estimates: Memory=476.41MB'
+'Max Per-Host Resource Reservation: Memory=35.00MB'
+'Per-Host Resource Estimates: Memory=477.41MB'
 ''
 'PLAN-ROOT SINK'
 '|'

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
index faf196d..b1c58f4 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
@@ -5,8 +5,8 @@ explain
 select *
 from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Max Per-Host Resource Reservation: Memory=34.00MB'
-'Per-Host Resource Estimates: Memory=476.41MB'
+'Max Per-Host Resource Reservation: Memory=35.00MB'
+'Per-Host Resource Estimates: Memory=477.41MB'
 ''
 'F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1'
 '|  Per-Host Resources: mem-estimate=0B mem-reservation=0B'
@@ -18,7 +18,7 @@ from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 '|  tuple-ids=0,1 row-size=454B cardinality=5757710'
 '|'
 'F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3'
-'Per-Host Resources: mem-estimate=388.41MB mem-reservation=34.00MB'
+'Per-Host Resources: mem-estimate=389.41MB mem-reservation=35.00MB runtime-filters-memory=1.00MB'
 '02:HASH JOIN [INNER JOIN, BROADCAST]'
 '|  hash predicates: l_orderkey = o_orderkey'
 '|  fk/pk conjuncts: l_orderkey = o_orderkey'

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
index 85d5b81..625e2b0 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
@@ -5,8 +5,8 @@ explain
 select *
 from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Max Per-Host Resource Reservation: Memory=34.00MB'
-'Per-Host Resource Estimates: Memory=476.41MB'
+'Max Per-Host Resource Reservation: Memory=35.00MB'
+'Per-Host Resource Estimates: Memory=477.41MB'
 ''
 'F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1'
 'Per-Host Resources: mem-estimate=0B mem-reservation=0B'
@@ -18,7 +18,7 @@ from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 '     tuple-ids=0,1 row-size=454B cardinality=5757710'
 ''
 'F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3'
-'Per-Host Resources: mem-estimate=388.41MB mem-reservation=34.00MB'
+'Per-Host Resources: mem-estimate=389.41MB mem-reservation=35.00MB runtime-filters-memory=1.00MB'
 '  DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=04, UNPARTITIONED]'
 '  |  mem-estimate=0B mem-reservation=0B'
 '  02:HASH JOIN [INNER JOIN, BROADCAST]'

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
index 818a8d5..3d92acb 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
@@ -387,31 +387,25 @@ row_regex: .*Rows rejected: 2.43K \(2432\).*
 
 ---- QUERY
 ####################################################
-# Test case 18: Filters will not be used if they exceed
-# the configured memory limit on the coordinator.
-# To test this, we need to construct a query where memory
-# consumption on the coordinator exceeds MEM_LIMIT, but
-# not on the backends (because otherwise they will disable
-# the filters through another path). We set MEM_LIMIT to
-# the minimum possible then set filter size to be roughly
-# half that: since the coordinator must aggregate two of
-# these filters (and indeed must create one as well), it
-# will exceed the memory limit. This is checked for
-# indirectly by confirming that the filter had no effect
-# (when usually it would be selective).
+# Test case 18: Query is not admitted if it exceeds the
+# mem requirement after accounting for the memory
+# required by runtime filters.
 ####################################################
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET RUNTIME_FILTER_MIN_SIZE=128MB;
 SET RUNTIME_FILTER_MAX_SIZE=500MB;
-SET MEM_LIMIT=140MB;
+# Query would have been admitted if memory for runtime filters was not accounted for.
+SET BUFFER_POOL_LIMIT=140MB;
 select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
     on a.month = b.id and b.int_col = -3
 ---- RESULTS
----- RUNTIME_PROFILE
-row_regex: .*Filter 0 \(128.00 MB\).*
-row_regex: .*Files processed: 8.*
-row_regex: .*Files rejected: 0.*
+---- CATCH
+ImpalaBeeswaxException: INNER EXCEPTION: <class 'beeswaxd.ttypes.BeeswaxException'>
+ MESSAGE: Rejected query from pool default-pool: minimum memory reservation is
+ greater than memory available to the query for buffer reservations. Increase
+ the buffer_pool_limit to 290.00 MB. See the query profile for more information
+ about the per-node memory requirements.
 ====
 ---- QUERY
 # Confirm that with broadcast join, memory limit is not hit.
@@ -419,9 +413,9 @@ SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET RUNTIME_FILTER_MIN_SIZE=128MB;
 SET RUNTIME_FILTER_MAX_SIZE=500MB;
-# Allocate enough memory for the join + filter + scan
-SET MEM_LIMIT=170MB;
-select STRAIGHT_JOIN * from alltypes a join [BROADCAST] alltypes b
+# This would run perfectly with just enough memory provided by the buffer pool.
+SET BUFFER_POOL_LIMIT=290MB;
+select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
     on a.month = b.id and b.int_col = -3
 ---- RESULTS
 ---- RUNTIME_PROFILE

http://git-wip-us.apache.org/repos/asf/impala/blob/1a632e7c/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index f8226f1..3bfe0af 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -60,7 +60,7 @@ row_regex: .*SpilledPartitions: 0 .*
 # Adding TPCH-Q21 in the spilling test to check for IMPALA-1471 (spilling left anti
 # and left outer joins were returning wrong results).
 # Q21 - Suppliers Who Kept Orders Waiting Query
-set buffer_pool_limit=20m;
+set buffer_pool_limit=23m;
 select
   s_name,
   count(*) as numwait
@@ -315,7 +315,7 @@ row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
 #   where l1.l_quantity = 31.0 and l1.l_tax = 0.03 and l1.l_orderkey <= 100000
 # order by l_orderkey, l_partkey, l_suppkey, l_linenumber
 # limit 5
-set buffer_pool_limit=7m;
+set buffer_pool_limit=9m;
 set num_nodes=1;
 select straight_join l.*
 from


[2/6] impala git commit: IMPALA-6269: Cherry-pick dependency change for KRPC

Posted by ta...@apache.org.
IMPALA-6269: Cherry-pick dependency change for KRPC

Expose RPC method info map and various metrics

These changes are needed for IMPALA-6269.

Change-Id: I8bda390ea92cceb0d696767402c978a83b386825
Reviewed-on: http://gerrit.cloudera.org:8080/9269
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/9287
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/92c1a485
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/92c1a485
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/92c1a485

Branch: refs/heads/2.x
Commit: 92c1a4855c29b50c768685909c6f0301e067973e
Parents: 4e29a7f
Author: Lars Volker <lv...@cloudera.com>
Authored: Mon Feb 12 14:46:19 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 13 07:44:19 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/acceptor_pool.cc | 4 ++++
 be/src/kudu/rpc/acceptor_pool.h  | 4 ++++
 be/src/kudu/rpc/service_if.h     | 6 +++++-
 be/src/kudu/util/metrics.h       | 4 ++++
 4 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/92c1a485/be/src/kudu/rpc/acceptor_pool.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/acceptor_pool.cc b/be/src/kudu/rpc/acceptor_pool.cc
index cbb1232..deff77b 100644
--- a/be/src/kudu/rpc/acceptor_pool.cc
+++ b/be/src/kudu/rpc/acceptor_pool.cc
@@ -134,6 +134,10 @@ Status AcceptorPool::GetBoundAddress(Sockaddr* addr) const {
   return socket_.GetSocketAddress(addr);
 }
 
+int64_t AcceptorPool::num_rpc_connections_accepted() const {
+  return rpc_connections_accepted_->value();
+}
+
 void AcceptorPool::RunThread() {
   while (true) {
     Socket new_sock;

http://git-wip-us.apache.org/repos/asf/impala/blob/92c1a485/be/src/kudu/rpc/acceptor_pool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/acceptor_pool.h b/be/src/kudu/rpc/acceptor_pool.h
index 92b7fc5..653f135 100644
--- a/be/src/kudu/rpc/acceptor_pool.h
+++ b/be/src/kudu/rpc/acceptor_pool.h
@@ -18,6 +18,7 @@
 #ifndef KUDU_RPC_ACCEPTOR_POOL_H
 #define KUDU_RPC_ACCEPTOR_POOL_H
 
+#include <stdint.h>
 #include <vector>
 
 #include "kudu/gutil/atomicops.h"
@@ -59,6 +60,9 @@ class AcceptorPool {
   // actual port that was bound.
   Status GetBoundAddress(Sockaddr* addr) const;
 
+  // Return the number of connections accepted by this messenger. Thread-safe.
+  int64_t num_rpc_connections_accepted() const;
+
  private:
   void RunThread();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/92c1a485/be/src/kudu/rpc/service_if.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_if.h b/be/src/kudu/rpc/service_if.h
index a3722c6..7846c0c 100644
--- a/be/src/kudu/rpc/service_if.h
+++ b/be/src/kudu/rpc/service_if.h
@@ -121,12 +121,16 @@ class GeneratedServiceIf : public ServiceIf {
 
   RpcMethodInfo* LookupMethod(const RemoteMethod& method) override;
 
+  // Returns the mapping from method names to method infos.
+  typedef std::unordered_map<std::string, scoped_refptr<RpcMethodInfo>> MethodInfoMap;
+  const MethodInfoMap& methods_by_name() const { return methods_by_name_; }
+
  protected:
   // For each method, stores the relevant information about how to handle the
   // call. Methods are inserted by the constructor of the generated subclass.
   // After construction, this map is accessed by multiple threads and therefore
   // must not be modified.
-  std::unordered_map<std::string, scoped_refptr<RpcMethodInfo>> methods_by_name_;
+  MethodInfoMap methods_by_name_;
 
   // The result tracker for this service's methods.
   scoped_refptr<ResultTracker> result_tracker_;

http://git-wip-us.apache.org/repos/asf/impala/blob/92c1a485/be/src/kudu/util/metrics.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/metrics.h b/be/src/kudu/util/metrics.h
index 8aeaf93..0db08ee 100644
--- a/be/src/kudu/util/metrics.h
+++ b/be/src/kudu/util/metrics.h
@@ -987,6 +987,10 @@ class Histogram : public Metric {
   Status GetHistogramSnapshotPB(HistogramSnapshotPB* snapshot,
                                 const MetricJsonOptions& opts) const;
 
+  // Returns a pointer to the underlying histogram. The implementation of HdrHistogram
+  // is thread safe.
+  const HdrHistogram* histogram() const { return histogram_.get(); }
+
   uint64_t CountInBucketForValueForTests(uint64_t value) const;
   uint64_t MinValueForTests() const;
   uint64_t MaxValueForTests() const;