You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/06/05 23:01:51 UTC

[impala] branch master updated (6a1c448 -> 03f2b55)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 6a1c448  IMPALA-9782: fix Kudu DML with mt_dop
     new c62a680  IMPALA-3741 [part 2]: Push runtime bloom filter to Kudu
     new 03f2b55  Filter out "Checksum validation failed" messages during the maven build

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/CMakeLists.txt                                  |   3 +-
 be/src/benchmarks/bloom-filter-benchmark.cc        |  26 +-
 be/src/codegen/gen_ir_descriptions.py              |   7 +-
 be/src/exec/filter-context.cc                      |  40 +-
 be/src/exec/kudu-scanner.cc                        | 134 ++++---
 be/src/runtime/raw-value-ir.cc                     |  77 +++-
 be/src/runtime/raw-value.h                         |  17 +
 be/src/runtime/raw-value.inline.h                  | 125 +++++++
 be/src/runtime/runtime-filter-bank.cc              |   6 +-
 be/src/runtime/runtime-filter-ir.cc                |   4 +-
 be/src/runtime/runtime-filter.h                    |   1 +
 be/src/service/query-options-test.cc               |   4 +
 be/src/service/query-options.cc                    |   8 +
 be/src/service/query-options.h                     |   6 +-
 be/src/util/bloom-filter-ir.cc                     |  13 +-
 be/src/util/bloom-filter-test.cc                   |  65 ++--
 be/src/util/bloom-filter.cc                        | 248 ++++---------
 be/src/util/bloom-filter.h                         | 201 ++++------
 be/src/util/debug-util.cc                          |   1 +
 be/src/util/debug-util.h                           |   1 +
 bin/impala-config.sh                               |   6 +-
 bin/mvn-quiet.sh                                   |   8 +-
 common/thrift/ImpalaInternalService.thrift         |   4 +
 common/thrift/ImpalaService.thrift                 |   8 +
 common/thrift/PlanNodes.thrift                     |   7 +
 .../impala/planner/RuntimeFilterGenerator.java     |  63 +++-
 .../org/apache/impala/planner/PlannerTest.java     |  24 +-
 .../PlannerTest/bloom-filter-assignment.test       | 408 +++++++++++++++++++++
 .../queries/PlannerTest/kudu-update.test           |  20 +-
 .../queries/PlannerTest/kudu.test                  |   4 +-
 .../PlannerTest/runtime-filter-query-options.test  | 117 ++++++
 .../queries/PlannerTest/tpch-kudu.test             | 381 ++++++++++---------
 ...n_max_filters.test => all_runtime_filters.test} | 188 ++++++----
 .../QueryTest/diff_runtime_filter_types.test       | 151 ++++++++
 .../queries/QueryTest/runtime_filters.test         |   5 +
 tests/query_test/test_runtime_filters.py           |  33 +-
 tests/query_test/test_spilling.py                  |   6 +-
 37 files changed, 1696 insertions(+), 724 deletions(-)
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
 copy testdata/workloads/functional-query/queries/QueryTest/{min_max_filters.test => all_runtime_filters.test} (67%)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/diff_runtime_filter_types.test


[impala] 02/02: Filter out "Checksum validation failed" messages during the maven build

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 03f2b559c31af7fc11165cf3b00876900e234663
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Fri Apr 17 19:20:53 2020 -0700

    Filter out "Checksum validation failed" messages during the maven build
    
    Some Impala dependencies come from repositories that don't have
    checksums available. During the build, this produces a large
    number of messages like:
    [WARNING] Checksum validation failed, no checksums available from the repository for ...
    or:
    [WARNING] Checksum validation failed, could not read expected checksum ...
    These messages are not very useful, and they make it harder to search
    the console output for failed tests. This filters them out of the maven
    output. Differet versions of maven structure the messsages differently,
    so this filters all the "Checksum validation failed" messages that happen
    at WARNING level.
    
    Testing:
     - Ran core tests, verified the messages are gone
    
    Change-Id: I19afbd157533e52ef3157730c7ec5159241749bc
    Reviewed-on: http://gerrit.cloudera.org:8080/15775
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Anurag Mantripragada <an...@cloudera.com>
---
 bin/mvn-quiet.sh | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/bin/mvn-quiet.sh b/bin/mvn-quiet.sh
index f782ff4..c7c557e 100755
--- a/bin/mvn-quiet.sh
+++ b/bin/mvn-quiet.sh
@@ -34,10 +34,16 @@ EOF
 LOGGING_OPTIONS="-Dorg.slf4j.simpleLogger.showDateTime \
   -Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss"
 
+# Filter out "Checksum validation failed" messages, as they are mostly harmless and
+# make it harder to search for failed tests in the console output. Limit the filtering
+# to WARNING messages.
+CHECKSUM_VALIDATION_FAILED_REGEX="[WARNING].*Checksum validation failed"
+
 # Always use maven's batch mode (-B), as it produces output that is easier to parse.
 if ! mvn -B $IMPALA_MAVEN_OPTIONS $LOGGING_OPTIONS "$@" | \
   tee -a "$LOG_FILE" | \
-  grep -E -e WARNING -e ERROR -e SUCCESS -e FAILURE -e Test -e "Found Banned"; then
+  grep -E -e WARNING -e ERROR -e SUCCESS -e FAILURE -e Test -e "Found Banned" | \
+  grep -v -i "${CHECKSUM_VALIDATION_FAILED_REGEX}"; then
   echo "mvn $IMPALA_MAVEN_OPTIONS $@ exited with code $?"
   exit 1
 fi


[impala] 01/02: IMPALA-3741 [part 2]: Push runtime bloom filter to Kudu

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c62a6808fc379f812a2eaa2363d3c3c18a1836b0
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Thu Apr 30 15:46:16 2020 -0700

    IMPALA-3741 [part 2]: Push runtime bloom filter to Kudu
    
    Defined the BloomFilter class as the wrapper of kudu::BlockBloomFilter.
    impala::BloomFilter build runtime bloom filter in kudu::BlockBloomFilter
    APIs with FastHash as default hash algorithm.
    Removed the duplicated functions from impala::BloomFillter class.
    Pushed down bloom filter to Kudu through Kudu clinet API.
    
    Added a new query option ENABLED_RUNTIME_FILTER_TYPES to set enabled
    runtime filter types, which only affect Kudu scan node now. By default,
    bloom filter is not enabled, only min-max filter will be enabled for
    Kudu. With this option, user could enable bloom filter, min-max filter,
    or both bloom and min-max runtime filters.
    
    Added new test cases in PlannerTest and end-end runtime_filters test
    for pushing down bloom filter to Kudu.
    Added test cases to compare the number of rows returned from Kudu
    scan when appling different types of runtime filter on same queries.
    Updated bloom-filter-benchmark due to the bloom-filter implementation
    change.
    
    Bump Kudu version to d652cab17.
    
    Testing:
     - Passed all exhaustive tests.
    
    Performance benchmark:
     - Ran single_node_perf_run.py on TPC-H with scale as 30 for parquet
       and Kudu. Verified that new hash function and bloom-filter
       implementation don't cause regressions for HDFS bloom filters.
       For Kudu, there is one regression for query TPCH-Q9 and there
       are improvement for about 8 queris when appling both bloom and
       min-max filters. The bloom filter reduce the number of rows
       returned from Kudu scan, hence reduce the cost for aggregation
       and hash join. But bloom filter evaluation add extra cost for
       Kudu scan, which offset the gain on aggregation and join.
       Kudu scan need to be optimized for bloom filter in following
       tasks.
     - Ran bloom-filter microbenchmarks and verified that there is no
       regression for Insert/Find/Union functions with or without AVX2
       due to bloom-filter implementation changes. There is small
       performance degradation for Init function, but this function is
       not in hot path.
    
    Change-Id: I9100076f68ea299ddb6ec8bc027cac7a47f5d754
    Reviewed-on: http://gerrit.cloudera.org:8080/15683
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/CMakeLists.txt                                  |   3 +-
 be/src/benchmarks/bloom-filter-benchmark.cc        |  26 +-
 be/src/codegen/gen_ir_descriptions.py              |   7 +-
 be/src/exec/filter-context.cc                      |  40 +-
 be/src/exec/kudu-scanner.cc                        | 134 +++---
 be/src/runtime/raw-value-ir.cc                     |  77 +++-
 be/src/runtime/raw-value.h                         |  17 +
 be/src/runtime/raw-value.inline.h                  | 125 ++++++
 be/src/runtime/runtime-filter-bank.cc              |   6 +-
 be/src/runtime/runtime-filter-ir.cc                |   4 +-
 be/src/runtime/runtime-filter.h                    |   1 +
 be/src/service/query-options-test.cc               |   4 +
 be/src/service/query-options.cc                    |   8 +
 be/src/service/query-options.h                     |   6 +-
 be/src/util/bloom-filter-ir.cc                     |  13 +-
 be/src/util/bloom-filter-test.cc                   |  65 +--
 be/src/util/bloom-filter.cc                        | 248 ++++-------
 be/src/util/bloom-filter.h                         | 201 ++++-----
 be/src/util/debug-util.cc                          |   1 +
 be/src/util/debug-util.h                           |   1 +
 bin/impala-config.sh                               |   6 +-
 common/thrift/ImpalaInternalService.thrift         |   4 +
 common/thrift/ImpalaService.thrift                 |   8 +
 common/thrift/PlanNodes.thrift                     |   7 +
 .../impala/planner/RuntimeFilterGenerator.java     |  63 ++-
 .../org/apache/impala/planner/PlannerTest.java     |  24 +-
 .../PlannerTest/bloom-filter-assignment.test       | 408 ++++++++++++++++++
 .../queries/PlannerTest/kudu-update.test           |  20 +-
 .../queries/PlannerTest/kudu.test                  |   4 +-
 .../PlannerTest/runtime-filter-query-options.test  | 117 ++++++
 .../queries/PlannerTest/tpch-kudu.test             | 381 +++++++++--------
 .../queries/QueryTest/all_runtime_filters.test     | 461 +++++++++++++++++++++
 .../QueryTest/diff_runtime_filter_types.test       | 151 +++++++
 .../queries/QueryTest/runtime_filters.test         |   5 +
 tests/query_test/test_runtime_filters.py           |  33 +-
 tests/query_test/test_spilling.py                  |   6 +-
 36 files changed, 2024 insertions(+), 661 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 63cd34c..f87e7d0 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -231,7 +231,8 @@ add_definitions(-DKUDU_HEADERS_USE_RICH_SLICE -DKUDU_HEADERS_NO_STUBS)
 #  -DBOOST_NO_EXCEPTIONS: call a custom error handler for exceptions in codegen'd code.
 set(CLANG_IR_CXX_FLAGS "-emit-llvm" "-c" "-std=c++14" "-DIR_COMPILE" "-DHAVE_INTTYPES_H"
   "-DHAVE_NETINET_IN_H" "-DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG" "-DBOOST_NO_EXCEPTIONS"
-  "-fcolor-diagnostics" "-Wno-deprecated" "-Wno-return-type-c-linkage" "-O1")
+  "-DKUDU_HEADERS_NO_STUBS" "-fcolor-diagnostics" "-Wno-deprecated"
+  "-Wno-return-type-c-linkage" "-O1")
 
 if (CMAKE_SYSTEM_NAME MATCHES "Linux" AND CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64")
   set(CLANG_IR_CXX_FLAGS "${CLANG_IR_CXX_FLAGS}" "-DCACHELINESIZE_AARCH64=${CACHELINESIZE_AARCH64}")
diff --git a/be/src/benchmarks/bloom-filter-benchmark.cc b/be/src/benchmarks/bloom-filter-benchmark.cc
index a3aa2f7..c33f831 100644
--- a/be/src/benchmarks/bloom-filter-benchmark.cc
+++ b/be/src/benchmarks/bloom-filter-benchmark.cc
@@ -35,6 +35,12 @@
 
 #include "common/names.h"
 
+// kudu::BlockBloomFilter::kCpu is a static variable and is initialized once.
+// To temporarily disable AVX2 for Bloom Filter in runtime testing, set flag
+// disable_blockbloomfilter_avx2 as true. See kudu::BlockBloomFilter::has_avx2().
+// This flag has no effect if the target CPU doesn't support AVX2.
+DECLARE_bool(disable_blockbloomfilter_avx2);
+
 using namespace std;
 using namespace impala;
 
@@ -195,7 +201,7 @@ void Benchmark(int batch_size, void* data) {
   CHECK(client.IncreaseReservation(BloomFilter::GetExpectedMemoryUsed(*d)));
   for (int i = 0; i < batch_size; ++i) {
     BloomFilter bf(&client);
-    CHECK(bf.Init(*d).ok());
+    CHECK(bf.Init(*d, 0).ok());
     bf.Close();
   }
   env->buffer_pool()->DeregisterClient(&client);
@@ -210,7 +216,7 @@ namespace insert {
 struct TestData {
   explicit TestData(int log_bufferpool_size, BufferPool::ClientHandle* client)
     : bf(client), data(1ull << 20) {
-    CHECK(bf.Init(log_bufferpool_size).ok());
+    CHECK(bf.Init(log_bufferpool_size, 0).ok());
     for (size_t i = 0; i < data.size(); ++i) {
       data[i] = MakeRand();
     }
@@ -244,7 +250,7 @@ struct TestData {
       present(size),
       absent(size),
       result(0) {
-    CHECK(bf.Init(log_bufferpool_size).ok());
+    CHECK(bf.Init(log_bufferpool_size, 0).ok());
     for (size_t i = 0; i < size; ++i) {
       present[i] = MakeRand();
       absent[i] = MakeRand();
@@ -286,7 +292,7 @@ namespace either {
 struct TestData {
   explicit TestData(int log_bufferpool_size, BufferPool::ClientHandle* client) {
     BloomFilter bf(client);
-    CHECK(bf.Init(log_bufferpool_size).ok());
+    CHECK(bf.Init(log_bufferpool_size, 0).ok());
 
     RpcController controller1;
     RpcController controller2;
@@ -306,8 +312,10 @@ struct TestData {
     pbf2.set_always_false(false);
 
     int64_t directory_size = BloomFilter::GetExpectedMemoryUsed(log_bufferpool_size);
-    string d1(reinterpret_cast<const char*>(bf.directory_), directory_size);
-    string d2(reinterpret_cast<const char*>(bf.directory_), directory_size);
+    string d1(reinterpret_cast<const char*>(bf.GetBlockBloomFilter()->directory().data()),
+        directory_size);
+    string d2(reinterpret_cast<const char*>(bf.GetBlockBloomFilter()->directory().data()),
+        directory_size);
 
     directory1 = d1;
     directory2 = d2;
@@ -369,7 +377,7 @@ void RunBenchmarks() {
         CHECK(client.IncreaseReservation(
             BloomFilter::GetExpectedMemoryUsed(log_required_size)));
         testdata.emplace_back(
-            new find::TestData(BloomFilter::MinLogSpace(ndv, fpp), &client , ndv));
+            new find::TestData(BloomFilter::MinLogSpace(ndv, fpp), &client, ndv));
         snprintf(name, sizeof(name), "present ndv %7dk fpp %6.1f%%", ndv/1000, fpp*100);
         suite.AddBenchmark(name, find::Present, testdata.back().get());
 
@@ -431,9 +439,9 @@ int main(int argc, char **argv) {
   }
 
   cout << "With AVX2:" << endl << endl;
+  FLAGS_disable_blockbloomfilter_avx2 = false;
   RunBenchmarks();
   cout << endl << "Without AVX or AVX2:" << endl << endl;
-  CpuInfo::TempDisable t1(CpuInfo::AVX);
-  CpuInfo::TempDisable t2(CpuInfo::AVX2);
+  FLAGS_disable_blockbloomfilter_avx2 = true;
   RunBenchmarks();
 }
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 2905de3..49d5c4e 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -201,10 +201,10 @@ ir_functions = [
   ["GENERIC_IS_NULL_STRING", "IrGenericIsNullString"],
   ["RAW_VALUE_COMPARE",
    "_ZN6impala8RawValue7CompareEPKvS2_RKNS_10ColumnTypeE"],
-  ["RAW_VALUE_GET_HASH_VALUE",
-   "_ZN6impala8RawValue12GetHashValueEPKvRKNS_10ColumnTypeEj"],
   ["RAW_VALUE_GET_HASH_VALUE_FAST_HASH",
    "_ZN6impala8RawValue20GetHashValueFastHashEPKvRKNS_10ColumnTypeEm"],
+  ["RAW_VALUE_GET_HASH_VALUE_FAST_HASH32",
+   "_ZN6impala8RawValue22GetHashValueFastHash32EPKvRKNS_10ColumnTypeEj"],
   ["TOPN_NODE_INSERT_BATCH",
    "_ZN6impala8TopNNode11InsertBatchEPNS_8RowBatchE"],
   ["MEMPOOL_ALLOCATE",
@@ -217,8 +217,7 @@ ir_functions = [
    "_ZN6impala5Tuple11CopyStringsEPKcPNS_12RuntimeStateEPKNS_11SlotOffsetsEiPNS_7MemPoolEPNS_6StatusE"],
   ["UNION_MATERIALIZE_BATCH",
   "_ZN6impala9UnionNode16MaterializeBatchEPNS_8RowBatchEPPh"],
-  ["BLOOM_FILTER_INSERT_NO_AVX2", "_ZN6impala11BloomFilter12InsertNoAvx2Ej"],
-  ["BLOOM_FILTER_INSERT_AVX2", "_ZN6impala11BloomFilter10InsertAvx2Ej"],
+  ["BLOOM_FILTER_INSERT", "_ZN6impala11BloomFilter8IrInsertEj"],
   ["SELECT_NODE_COPY_ROWS", "_ZN6impala10SelectNode8CopyRowsEPNS_8RowBatchE"],
   ["BOOL_MIN_MAX_FILTER_INSERT", "_ZN6impala16BoolMinMaxFilter6InsertEPKv"],
   ["TINYINT_MIN_MAX_FILTER_INSERT", "_ZN6impala19TinyIntMinMaxFilter6InsertEPKv"],
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index 32d78cc..28af515 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -82,7 +82,7 @@ void FilterContext::Insert(TupleRow* row) const noexcept {
   if (filter->is_bloom_filter()) {
     if (local_bloom_filter == nullptr) return;
     void* val = expr_eval->GetValue(row);
-    uint32_t filter_hash = RawValue::GetHashValue(
+    uint32_t filter_hash = RawValue::GetHashValueFastHash32(
         val, expr_eval->root().type(), RuntimeFilterBank::DefaultHashSeed());
     local_bloom_filter->Insert(filter_hash);
   } else {
@@ -107,14 +107,14 @@ void FilterContext::MaterializeValues() const {
 //
 // ; Function Attrs: alwaysinline
 // define i1 @FilterContextEval(%"struct.impala::FilterContext"* %this,
-//                              %"class.impala::TupleRow"* %row) #34 {
+//                              %"class.impala::TupleRow"* %row) #41 {
 // entry:
 //   %0 = alloca i16
 //   %expr_eval_ptr = getelementptr inbounds %"struct.impala::FilterContext",
 //       %"struct.impala::FilterContext"* %this, i32 0, i32 0
-//   %expr_eval_arg = load %"class.impala::ExprContext"*,
-//       %"class.impala::ExprContext"** %expr_eval_ptr
-//   %result = call i32 @GetSlotRef(%"class.impala::ExprContext"* %expr_eval_arg,
+//   %expr_eval_arg = load %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %expr_eval_ptr
+//   %result = call i32 @GetSlotRef(%"class.impala::ScalarExprEvaluator"* %expr_eval_arg,
 //       %"class.impala::TupleRow"* %row)
 //   %is_null1 = trunc i32 %result to i1
 //   br i1 %is_null1, label %is_null, label %not_null
@@ -135,7 +135,7 @@ void FilterContext::MaterializeValues() const {
 //       %"struct.impala::FilterContext"* %this, i32 0, i32 1
 //   %filter_arg = load %"class.impala::RuntimeFilter"*,
 //       %"class.impala::RuntimeFilter"** %filter_ptr
-//   %passed_filter = call i1 @_ZNK6impala13RuntimeFilter4EvalEPvRKNS_10ColumnTypeE.3(
+//   %passed_filter = call i1 @_ZNK6impala13RuntimeFilter4EvalEPvRKNS_10ColumnTypeE(
 //       %"class.impala::RuntimeFilter"* %filter_arg, i8* %val_ptr_phi,
 //       %"struct.impala::ColumnType"* @expr_type_arg)
 //   ret i1 %passed_filter
@@ -221,7 +221,7 @@ Status FilterContext::CodegenEval(
   builder.CreateRet(passed_filter);
 
   *fn = codegen->FinalizeFunction(eval_filter_fn);
-  if (*fn == NULL) {
+  if (*fn == nullptr) {
     return Status("Codegen'ed FilterContext::Eval() fails verification, see log");
   }
   return Status::OK();
@@ -234,7 +234,7 @@ Status FilterContext::CodegenEval(
 //     %"class.std::vector.101" zeroinitializer }
 //
 // define void @FilterContextInsert(%"struct.impala::FilterContext"* %this,
-//     %"class.impala::TupleRow"* %row) #37 {
+//     %"class.impala::TupleRow"* %row) #47 {
 // entry:
 //   %0 = alloca i16
 //   %local_bloom_filter_ptr = getelementptr inbounds %"struct.impala::FilterContext",
@@ -249,7 +249,7 @@ Status FilterContext::CodegenEval(
 //       %"struct.impala::FilterContext"* %this, i32 0, i32 0
 //   %expr_eval_arg = load %"class.impala::ScalarExprEvaluator"*,
 //       %"class.impala::ScalarExprEvaluator"** %expr_eval_ptr
-//   %result = call i32 @GetSlotRef.46(
+//   %result = call i32 @GetSlotRef.26(
 //       %"class.impala::ScalarExprEvaluator"* %expr_eval_arg,
 //       %"class.impala::TupleRow"* %row)
 //   %is_null = trunc i32 %result to i1
@@ -270,9 +270,10 @@ Status FilterContext::CodegenEval(
 //
 // insert_filter:                                    ; preds = %val_not_null, %val_is_null
 //   %val_ptr_phi = phi i8* [ %native_ptr, %val_not_null ], [ null, %val_is_null ]
-//   %hash_value = call i32 @_ZN6impala8RawValue12GetHashValueEPKvRKNS_10ColumnTypeEj(
-//       i8* %val_ptr_phi, %"struct.impala::ColumnType"* @expr_type_arg, i32 1234)
-//   call void @_ZN6impala11BloomFilter10InsertAvx2Ej(
+//   %hash_value = call i32
+//       @_ZN6impala8RawValue22GetHashValueFastHash32EPKvRKNS_10ColumnTypeEj(
+//       i8* %val_ptr_phi, %"struct.impala::ColumnType"* @expr_type_arg.29, i32 1234)
+//   call void @_ZN6impala11BloomFilter8IrInsertEj(
 //       %"class.impala::BloomFilter"* %local_bloom_filter_arg, i32 %hash_value)
 //   ret void
 // }
@@ -382,20 +383,15 @@ Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_exp
         codegen->GetI32Constant(RuntimeFilterBank::DefaultHashSeed());
     llvm::Value* get_hash_value_args[] = {val_ptr_phi, expr_type_arg, seed_arg};
     llvm::Function* get_hash_value_fn =
-        codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE, false);
+        codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE_FAST_HASH32, false);
     DCHECK(get_hash_value_fn != nullptr);
     llvm::Value* hash_value =
         builder.CreateCall(get_hash_value_fn, get_hash_value_args, "hash_value");
 
     // Call Insert() on the bloom filter.
-    llvm::Function* insert_bloom_filter_fn;
-    if (LlvmCodeGen::IsCPUFeatureEnabled(CpuInfo::AVX2)) {
-      insert_bloom_filter_fn =
-          codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_AVX2, false);
-    } else {
-      insert_bloom_filter_fn =
-          codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_NO_AVX2, false);
-    }
+    llvm::Function* insert_bloom_filter_fn =
+        codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT, false);
+
     DCHECK(insert_bloom_filter_fn != nullptr);
 
     llvm::Value* insert_args[] = {local_filter_arg, hash_value};
@@ -414,7 +410,7 @@ Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_exp
   builder.CreateRetVoid();
 
   *fn = codegen->FinalizeFunction(insert_filter_fn);
-  if (*fn == NULL) {
+  if (*fn == nullptr) {
     return Status("Codegen'ed FilterContext::Insert() fails verification, see log");
   }
   return Status::OK();
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index f2e9eea..bf72c48 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -25,20 +25,23 @@
 
 #include "exec/exec-node.inline.h"
 #include "exec/kudu-util.h"
-#include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "exprs/scalar-expr.h"
 #include "exprs/slot-ref.h"
+#include "gutil/gscoped_ptr.h"
+#include "gutil/strings/substitute.h"
+#include "kudu/util/block_bloom_filter.h"
+#include "kudu/util/slice.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
-#include "runtime/runtime-filter.h"
-#include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
+#include "runtime/runtime-filter.inline.h"
+#include "runtime/runtime-state.h"
 #include "runtime/string-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "runtime/tuple-row.h"
-#include "gutil/gscoped_ptr.h"
-#include "gutil/strings/substitute.h"
+#include "util/bloom-filter.h"
 #include "util/debug-util.h"
 #include "util/jni-util.h"
 #include "util/min-max-filter.h"
@@ -91,7 +94,7 @@ Status KuduScanner::Open() {
 }
 
 void KuduScanner::KeepKuduScannerAlive() {
-  if (scanner_ == NULL) return;
+  if (scanner_ == nullptr) return;
   int64_t now = MonotonicMicros();
   int64_t keepalive_us = FLAGS_kudu_scanner_keep_alive_period_sec * 1e6;
   if (now < last_alive_time_micros_ + keepalive_us) {
@@ -182,7 +185,7 @@ void KuduScanner::Close() {
 }
 
 Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
-  DCHECK(scanner_ == NULL);
+  DCHECK(scanner_ == nullptr);
   kudu::client::KuduScanner* scanner;
   KUDU_RETURN_IF_ERROR(kudu::client::KuduScanToken::DeserializeIntoScanner(
                            scan_node_->kudu_client(), scan_token, &scanner),
@@ -221,56 +224,75 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
 
   if (scan_node_->filter_ctxs_.size() > 0) {
     for (const FilterContext& ctx : scan_node_->filter_ctxs_) {
-      MinMaxFilter* filter = ctx.filter->get_min_max();
-      if (filter != nullptr && !filter->AlwaysTrue()) {
-        if (filter->AlwaysFalse()) {
-          // We can skip this entire scan.
-          CloseCurrentClientScanner();
-          *eos = true;
-          return Status::OK();
-        } else {
-          auto it = ctx.filter->filter_desc().planid_to_target_ndx.find(scan_node_->id());
-          const TRuntimeFilterTargetDesc& target_desc =
-              ctx.filter->filter_desc().targets[it->second];
-          const string& col_name = target_desc.kudu_col_name;
-          DCHECK(col_name != "");
-          const ColumnType& col_type = ColumnType::FromThrift(target_desc.kudu_col_type);
-
-          const void* min = filter->GetMin();
-          const void* max = filter->GetMax();
-          // If the type of the filter is not the same as the type of the target column,
-          // there must be an implicit integer cast and we need to ensure the min/max we
-          // pass to Kudu are within the range of the target column.
-          int64_t int_min;
-          int64_t int_max;
-          if (col_type.type != filter->type()) {
-            DCHECK(col_type.IsIntegerType());
-
-            if (!filter->GetCastIntMinMax(col_type, &int_min, &int_max)) {
-              // The min/max for this filter is outside the range for the target column,
-              // so all rows are filtered out and we can skip the scan.
-              CloseCurrentClientScanner();
-              *eos = true;
-              return Status::OK();
-            }
-            min = &int_min;
-            max = &int_max;
-          }
+      if (!ctx.filter->HasFilter() || ctx.filter->AlwaysTrue()) {
+        // If it's always true, the filter won't actually remove any rows so we
+        // don't need to push it down to Kudu.
+        continue;
+      } else if (ctx.filter->AlwaysFalse()) {
+        // We can skip this entire scan if it's always false.
+        CloseCurrentClientScanner();
+        *eos = true;
+        return Status::OK();
+      }
+
+      auto it = ctx.filter->filter_desc().planid_to_target_ndx.find(scan_node_->id());
+      const TRuntimeFilterTargetDesc& target_desc =
+          ctx.filter->filter_desc().targets[it->second];
+      const string& col_name = target_desc.kudu_col_name;
+      DCHECK(col_name != "");
+
+      if (ctx.filter->is_bloom_filter()) {
+        BloomFilter* filter = ctx.filter->get_bloom_filter();
+        DCHECK(filter != nullptr);
 
-          KuduValue* min_value;
-          RETURN_IF_ERROR(CreateKuduValue(col_type, min, &min_value));
-          KUDU_RETURN_IF_ERROR(
-              scanner_->AddConjunctPredicate(scan_node_->table_->NewComparisonPredicate(
-                  col_name, KuduPredicate::ComparisonOp::GREATER_EQUAL, min_value)),
-              BuildErrorString("Failed to add min predicate"));
-
-          KuduValue* max_value;
-          RETURN_IF_ERROR(CreateKuduValue(col_type, max, &max_value));
-          KUDU_RETURN_IF_ERROR(
-              scanner_->AddConjunctPredicate(scan_node_->table_->NewComparisonPredicate(
-                  col_name, KuduPredicate::ComparisonOp::LESS_EQUAL, max_value)),
-              BuildErrorString("Failed to add max predicate"));
+        kudu::BlockBloomFilter* bbf = filter->GetBlockBloomFilter();
+        vector<kudu::Slice> bbf_vec = {
+            kudu::Slice(reinterpret_cast<const uint8_t*>(bbf), sizeof(*bbf))};
+
+        KUDU_RETURN_IF_ERROR(
+            scanner_->AddConjunctPredicate(
+                scan_node_->table_->NewInBloomFilterPredicate(col_name, bbf_vec)),
+            BuildErrorString("Failed to add bloom filter predicate"));
+      } else {
+        DCHECK(ctx.filter->is_min_max_filter());
+        MinMaxFilter* filter = ctx.filter->get_min_max();
+        DCHECK(filter != nullptr);
+
+        const void* min = filter->GetMin();
+        const void* max = filter->GetMax();
+        // If the type of the filter is not the same as the type of the target column,
+        // there must be an implicit integer cast and we need to ensure the min/max we
+        // pass to Kudu are within the range of the target column.
+        int64_t int_min;
+        int64_t int_max;
+        const ColumnType& col_type = ColumnType::FromThrift(target_desc.kudu_col_type);
+        if (col_type.type != filter->type()) {
+          DCHECK(col_type.IsIntegerType());
+
+          if (!filter->GetCastIntMinMax(col_type, &int_min, &int_max)) {
+            // The min/max for this filter is outside the range for the target column,
+            // so all rows are filtered out and we can skip the scan.
+            CloseCurrentClientScanner();
+            *eos = true;
+            return Status::OK();
+          }
+          min = &int_min;
+          max = &int_max;
         }
+
+        KuduValue* min_value;
+        RETURN_IF_ERROR(CreateKuduValue(col_type, min, &min_value));
+        KUDU_RETURN_IF_ERROR(
+            scanner_->AddConjunctPredicate(scan_node_->table_->NewComparisonPredicate(
+                col_name, KuduPredicate::ComparisonOp::GREATER_EQUAL, min_value)),
+            BuildErrorString("Failed to add min predicate"));
+
+        KuduValue* max_value;
+        RETURN_IF_ERROR(CreateKuduValue(col_type, max, &max_value));
+        KUDU_RETURN_IF_ERROR(
+            scanner_->AddConjunctPredicate(scan_node_->table_->NewComparisonPredicate(
+                col_name, KuduPredicate::ComparisonOp::LESS_EQUAL, max_value)),
+            BuildErrorString("Failed to add max predicate"));
       }
     }
   }
@@ -352,7 +374,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me
           kudu_tuple->GetSlot(slot->tuple_offset()));
       TimestampValue tv = TimestampValue::UtcFromUnixTimeMicros(ts_micros);
       if (tv.HasDateAndTime()) {
-        RawValue::Write(&tv, kudu_tuple, slot, NULL);
+        RawValue::Write(&tv, kudu_tuple, slot, nullptr);
       } else {
         kudu_tuple->SetNull(slot->null_indicator_offset());
         RETURN_IF_ERROR(state_->LogOrReturnError(
diff --git a/be/src/runtime/raw-value-ir.cc b/be/src/runtime/raw-value-ir.cc
index 916d96c..50500b3 100644
--- a/be/src/runtime/raw-value-ir.cc
+++ b/be/src/runtime/raw-value-ir.cc
@@ -176,27 +176,68 @@ uint32_t IR_ALWAYS_INLINE RawValue::GetHashValue(
 uint64_t IR_ALWAYS_INLINE RawValue::GetHashValueFastHash(const void* v,
     const ColumnType& type, uint64_t seed) {
   // Hash with an arbitrary constant to ensure we don't return seed.
-  if (v == nullptr) {
+  if (UNLIKELY(v == nullptr)) {
     return HashUtil::FastHash64(&HASH_VAL_NULL, sizeof(HASH_VAL_NULL), seed);
   }
   switch (type.type) {
+    case TYPE_CHAR:
     case TYPE_STRING:
-    case TYPE_VARCHAR: {
-      const StringValue* string_value = reinterpret_cast<const StringValue*>(v);
-      return HashUtil::FastHash64(string_value->ptr,
-          static_cast<size_t>(string_value->len), seed);
-    }
-    case TYPE_BOOLEAN: return HashUtil::FastHash64(v, 1, seed);
-    case TYPE_TINYINT: return HashUtil::FastHash64(v, 1, seed);
-    case TYPE_SMALLINT: return HashUtil::FastHash64(v, 2, seed);
-    case TYPE_INT: return HashUtil::FastHash64(v, 4, seed);
-    case TYPE_BIGINT: return HashUtil::FastHash64(v, 8, seed);
-    case TYPE_FLOAT: return HashUtil::FastHash64(v, 4, seed);
-    case TYPE_DOUBLE: return HashUtil::FastHash64(v, 8, seed);
-    case TYPE_TIMESTAMP: return HashUtil::FastHash64(v, 12, seed);
-    case TYPE_CHAR: return HashUtil::FastHash64(v, type.len, seed);
-    case TYPE_DECIMAL: return HashUtil::FastHash64(v, type.GetByteSize(), seed);
-    case TYPE_DATE: return HashUtil::FastHash64(v, 4, seed);
-    default: DCHECK(false); return 0;
+    case TYPE_VARCHAR:
+      return RawValue::GetHashValueFastHashNonNull<impala::StringValue>(
+          reinterpret_cast<const StringValue*>(v), type, seed);
+    case TYPE_BOOLEAN:
+      return RawValue::GetHashValueFastHashNonNull<bool>(
+          reinterpret_cast<const bool*>(v), type, seed);
+    case TYPE_TINYINT:
+      return RawValue::GetHashValueFastHashNonNull<int8_t>(
+          reinterpret_cast<const int8_t*>(v), type, seed);
+    case TYPE_SMALLINT:
+      return RawValue::GetHashValueFastHashNonNull<int16_t>(
+          reinterpret_cast<const int16_t*>(v), type, seed);
+    case TYPE_INT:
+      return RawValue::GetHashValueFastHashNonNull<int32_t>(
+          reinterpret_cast<const int32_t*>(v), type, seed);
+    case TYPE_DATE:
+      return RawValue::GetHashValueFastHashNonNull<DateValue>(
+          reinterpret_cast<const DateValue*>(v), type, seed);
+    case TYPE_BIGINT:
+      return RawValue::GetHashValueFastHashNonNull<int64_t>(
+          reinterpret_cast<const int64_t*>(v), type, seed);
+    case TYPE_FLOAT:
+      return RawValue::GetHashValueFastHashNonNull<float>(
+          reinterpret_cast<const float*>(v), type, seed);
+    case TYPE_DOUBLE:
+      return RawValue::GetHashValueFastHashNonNull<double>(
+          reinterpret_cast<const double*>(v), type, seed);
+    case TYPE_TIMESTAMP:
+      return RawValue::GetHashValueFastHashNonNull<TimestampValue>(
+          reinterpret_cast<const TimestampValue*>(v), type, seed);
+    case TYPE_DECIMAL:
+      switch (type.GetByteSize()) {
+        case 4:
+          return RawValue::GetHashValueFastHashNonNull<Decimal4Value>(
+              reinterpret_cast<const impala::Decimal4Value*>(v), type, seed);
+        case 8:
+          return RawValue::GetHashValueFastHashNonNull<Decimal8Value>(
+              reinterpret_cast<const Decimal8Value*>(v), type, seed);
+        case 16:
+          return RawValue::GetHashValueFastHashNonNull<Decimal16Value>(
+              reinterpret_cast<const Decimal16Value*>(v), type, seed);
+        default:
+          DCHECK(false);
+          return 0;
+      }
+    default:
+      DCHECK(false);
+      return 0;
   }
 }
+
+uint32_t IR_ALWAYS_INLINE RawValue::GetHashValueFastHash32(
+    const void* v, const ColumnType& type, uint32_t seed) noexcept {
+  // the following trick converts the 64-bit hashcode to Fermat
+  // residue, which shall retain information from both the higher
+  // and lower parts of hashcode.
+  uint64_t h = GetHashValueFastHash(v, type, seed);
+  return h - (h >> 32);
+}
diff --git a/be/src/runtime/raw-value.h b/be/src/runtime/raw-value.h
index b036ace..81250bf 100644
--- a/be/src/runtime/raw-value.h
+++ b/be/src/runtime/raw-value.h
@@ -83,6 +83,23 @@ class RawValue {
   static uint64_t GetHashValueFastHash(const void* v, const ColumnType& type,
       uint64_t seed);
 
+  /// Templatized version of GetHashValueFastHash, use if type is known ahead.
+  /// GetHashValueFastHash handles nulls. Inlined in IR so that the constant
+  /// 'type' can be propagated.
+  template <typename T>
+  static inline uint64_t IR_ALWAYS_INLINE GetHashValueFastHash(
+      const T* v, const ColumnType& type, uint64_t seed);
+
+  /// Returns hash value for non-nullable 'v' for type T. GetHashValueFastHashNonNull
+  /// doesn't handle nulls.
+  template <typename T>
+  static inline uint64_t GetHashValueFastHashNonNull(
+      const T* v, const ColumnType& type, uint64_t seed);
+
+  // Get a 32-bit hash value using the FastHash algorithm.
+  static uint32_t IR_ALWAYS_INLINE GetHashValueFastHash32(
+      const void* v, const ColumnType& type, uint32_t seed = 0) noexcept;
+
   /// Compares both values.
   /// Return value is < 0  if v1 < v2, 0 if v1 == v2, > 0 if v1 > v2.
   /// Inlined in IR so that the constant 'type' can be propagated.
diff --git a/be/src/runtime/raw-value.inline.h b/be/src/runtime/raw-value.inline.h
index a9fc3b3..f6b6e45 100644
--- a/be/src/runtime/raw-value.inline.h
+++ b/be/src/runtime/raw-value.inline.h
@@ -279,6 +279,131 @@ inline uint32_t RawValue::GetHashValue(const T* v, const ColumnType& type,
   return RawValue::GetHashValueNonNull<T>(v, type, seed);
 }
 
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<bool>(
+    const bool* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_BOOLEAN);
+  DCHECK(v != NULL);
+  return HashUtil::FastHash64(v, 1, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<int8_t>(
+    const int8_t* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_TINYINT);
+  DCHECK(v != NULL);
+  return HashUtil::FastHash64(v, 1, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<int16_t>(
+    const int16_t* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_SMALLINT);
+  DCHECK(v != NULL);
+  return HashUtil::FastHash64(v, 2, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<int32_t>(
+    const int32_t* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_INT);
+  DCHECK(v != NULL);
+  return HashUtil::FastHash64(v, 4, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<int64_t>(
+    const int64_t* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_BIGINT);
+  DCHECK(v != NULL);
+  return HashUtil::FastHash64(v, 8, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<float>(
+    const float* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_FLOAT);
+  DCHECK(v != NULL);
+  if (std::isnan(*v)) v = &RawValue::CANONICAL_FLOAT_NAN;
+  return HashUtil::FastHash64(v, 4, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<double>(
+    const double* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_DOUBLE);
+  DCHECK(v != NULL);
+  if (std::isnan(*v)) v = &RawValue::CANONICAL_DOUBLE_NAN;
+  return HashUtil::FastHash64(v, 8, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<impala::StringValue>(
+    const impala::StringValue* v, const ColumnType& type, uint64_t seed) {
+  DCHECK(v != NULL);
+  if (type.type == TYPE_CHAR) {
+    // This is a inlined CHAR(n) slot.
+    // TODO: this is a bit wonky since it's not really a StringValue*. Handle CHAR(n)
+    // in a separate function.
+    return HashUtil::FastHash64(v, type.len, seed);
+  } else {
+    DCHECK(type.type == TYPE_STRING || type.type == TYPE_VARCHAR);
+    if (v->len == 0) {
+      return HashUtil::FastHash64(&HASH_VAL_EMPTY, sizeof(HASH_VAL_EMPTY), seed);
+    }
+    return HashUtil::FastHash64(v->ptr, static_cast<size_t>(v->len), seed);
+  }
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<TimestampValue>(
+    const TimestampValue* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_TIMESTAMP);
+  DCHECK(v != NULL);
+  return HashUtil::FastHash64(v, 12, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<DateValue>(
+    const DateValue* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_DATE);
+  DCHECK(v != NULL);
+  return HashUtil::FastHash64(v, 4, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<Decimal4Value>(
+    const Decimal4Value* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_DECIMAL);
+  DCHECK(v != NULL);
+  return HashUtil::FastHash64(v, 4, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<Decimal8Value>(
+    const Decimal8Value* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_DECIMAL);
+  DCHECK(v != NULL);
+  return HashUtil::FastHash64(v, 8, seed);
+}
+
+template <>
+inline uint64_t RawValue::GetHashValueFastHashNonNull<Decimal16Value>(
+    const Decimal16Value* v, const ColumnType& type, uint64_t seed) {
+  DCHECK_EQ(type.type, TYPE_DECIMAL);
+  DCHECK(v != NULL);
+  return HashUtil::FastHash64(v, 16, seed);
+}
+
+template <typename T>
+inline uint64_t RawValue::GetHashValueFastHash(
+    const T* v, const ColumnType& type, uint64_t seed) {
+  // Hash with an arbitrary constant to ensure we don't return seed.
+  if (UNLIKELY(v == NULL)) {
+    return HashUtil::FastHash64(&HASH_VAL_NULL, sizeof(HASH_VAL_NULL), seed);
+  }
+  return RawValue::GetHashValueFastHashNonNull<T>(v, type, seed);
+}
 }
 
 #endif
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 00ca877..f199ffd 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -342,8 +342,8 @@ void RuntimeFilterBank::PublishGlobalFilter(
       }
 
       if (bloom_filter != BloomFilter::ALWAYS_TRUE_FILTER) {
-        Status status = bloom_filter->Init(
-            params.bloom_filter(), sidecar_slice.data(), sidecar_slice.size());
+        Status status = bloom_filter->Init(params.bloom_filter(), sidecar_slice.data(),
+            sidecar_slice.size(), DefaultHashSeed());
         if (!status.ok()) {
           LOG(ERROR) << "Unable to allocate memory for bloom filter: "
                      << status.GetDetail();
@@ -383,7 +383,7 @@ BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
       << "BufferPool Client should have enough reservation to fulfill bloom filter "
          "allocation";
   BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
-  Status status = bloom_filter->Init(log_filter_size);
+  Status status = bloom_filter->Init(log_filter_size, DefaultHashSeed());
   if (!status.ok()) {
     LOG(ERROR) << "Unable to allocate memory for bloom filter: " << status.GetDetail();
     return nullptr;
diff --git a/be/src/runtime/runtime-filter-ir.cc b/be/src/runtime/runtime-filter-ir.cc
index 6436213..dbf58fb 100644
--- a/be/src/runtime/runtime-filter-ir.cc
+++ b/be/src/runtime/runtime-filter-ir.cc
@@ -23,7 +23,7 @@ bool IR_ALWAYS_INLINE RuntimeFilter::Eval(
     void* val, const ColumnType& col_type) const noexcept {
   DCHECK(is_bloom_filter());
   if (bloom_filter_.Load() == BloomFilter::ALWAYS_TRUE_FILTER) return true;
-  uint32_t h = RawValue::GetHashValue(val, col_type,
-      RuntimeFilterBank::DefaultHashSeed());
+  uint32_t h = RawValue::GetHashValueFastHash32(
+      val, col_type, RuntimeFilterBank::DefaultHashSeed());
   return bloom_filter_.Load()->Find(h);
 }
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index e2a4a67..8f494cf 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -64,6 +64,7 @@ class RuntimeFilter {
     return filter_desc().type == TRuntimeFilterType::MIN_MAX;
   }
 
+  BloomFilter* get_bloom_filter() const { return bloom_filter_.Load(); }
   MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); }
 
   /// Sets the internal filter bloom_filter to 'bloom_filter' or 'min_max_filter'
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 9720dc4..83b7833 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -226,6 +226,10 @@ TEST(QueryOptions, SetEnumOptions) {
       (OFF, LOCAL, GLOBAL)), true);
   TestEnumCase(options, CASE(kudu_read_mode, TKuduReadMode,
       (DEFAULT, READ_LATEST, READ_AT_SNAPSHOT)), true);
+  TestEnumCase(options,
+      CASE(enabled_runtime_filter_types, TEnabledRuntimeFilterTypes,
+          (BLOOM, MIN_MAX, ALL)),
+      true);
 #undef CASE
 #undef ENTRIES
 #undef ENTRY
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 510c694..9880ff7 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -898,6 +898,14 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_kudu_snapshot_read_timestamp_micros(timestamp);
         break;
       }
+      case TImpalaQueryOptions::ENABLED_RUNTIME_FILTER_TYPES: {
+        // Parse the enabled runtime filter types and validate it.
+        TEnabledRuntimeFilterTypes::type enum_type;
+        RETURN_IF_ERROR(GetThriftEnum(value, "enabled runtime filter types",
+            _TEnabledRuntimeFilterTypes_VALUES_TO_NAMES, &enum_type));
+        query_options->__set_enabled_runtime_filter_types(enum_type);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index dd35a67..196bb20 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::RETRY_FAILED_QUERIES + 1);\
+      TImpalaQueryOptions::ENABLED_RUNTIME_FILTER_TYPES + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -199,7 +199,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(max_cnf_exprs, MAX_CNF_EXPRS, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(kudu_snapshot_read_timestamp_micros, KUDU_SNAPSHOT_READ_TIMESTAMP_MICROS,\
       TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(retry_failed_queries, RETRY_FAILED_QUERIES, TQueryOptionLevel::REGULAR)
+  QUERY_OPT_FN(retry_failed_queries, RETRY_FAILED_QUERIES, TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(enabled_runtime_filter_types, ENABLED_RUNTIME_FILTER_TYPES,\
+      TQueryOptionLevel::ADVANCED)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/bloom-filter-ir.cc b/be/src/util/bloom-filter-ir.cc
index 0042c44..d2376a7 100644
--- a/be/src/util/bloom-filter-ir.cc
+++ b/be/src/util/bloom-filter-ir.cc
@@ -16,18 +16,9 @@
 // under the License.
 
 #include "util/bloom-filter.h"
-#include "util/hash-util.h"
 
 using namespace impala;
 
-void BloomFilter::InsertNoAvx2(const uint32_t hash) noexcept {
-  always_false_ = false;
-  const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
-  BucketInsert(bucket_idx, hash);
-}
-
-void BloomFilter::InsertAvx2(const uint32_t hash) noexcept {
-  always_false_ = false;
-  const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
-  BucketInsertAVX2(bucket_idx, hash);
+void IR_ALWAYS_INLINE BloomFilter::IrInsert(const uint32_t hash) noexcept {
+  block_bloom_filter_.Insert(hash);
 }
diff --git a/be/src/util/bloom-filter-test.cc b/be/src/util/bloom-filter-test.cc
index 26d972b..7f97ac8 100644
--- a/be/src/util/bloom-filter-test.cc
+++ b/be/src/util/bloom-filter-test.cc
@@ -31,6 +31,9 @@
 
 #include "gen-cpp/data_stream_service.pb.h"
 
+// This flag is used in Kudu to temporarily disable AVX2 support for testing purpose.
+DECLARE_bool(disable_blockbloomfilter_avx2);
+
 using namespace std;
 
 using namespace impala;
@@ -47,57 +50,53 @@ uint64_t MakeRand() {
   return result;
 }
 
-// BfInsert() and BfFind() are like BloomFilter::{Insert,Find}, except they randomly
-// disable AVX2 instructions half of the time. These are used for testing that AVX2
-// machines and non-AVX2 machines produce compatible BloomFilters.
+// BfInsert() and BfFind() call BloomFilter::{Insert,Find} respectively.
 
 void BfInsert(BloomFilter& bf, uint32_t h) {
-  if (MakeRand() & 0x1) {
-    bf.Insert(h);
-  } else {
-    CpuInfo::TempDisable t1(CpuInfo::AVX2);
-    bf.Insert(h);
-  }
+  bf.Insert(h);
 }
 
 bool BfFind(BloomFilter& bf, uint32_t h) {
-  if (MakeRand() & 0x1) {
-    return bf.Find(h);
-  } else {
-    CpuInfo::TempDisable t1(CpuInfo::AVX2);
-    return bf.Find(h);
-  }
+  return bf.Find(h);
 }
 
 // Computes union of 'x' and 'y'. Computes twice with AVX enabled and disabled and
 // verifies both produce the same result. 'success' is set to true if both union
 // computations returned the same result and set to false otherwise.
-void BfUnion(const BloomFilter& x, const BloomFilter& y, int64_t directory_size,
-    bool* success, BloomFilterPB* protobuf, std::string* directory) {
+void BfUnion(BloomFilter& x, BloomFilter& y, int64_t directory_size, bool* success,
+    BloomFilterPB* protobuf, std::string* directory) {
   BloomFilterPB protobuf_x, protobuf_y;
   RpcController controller_x;
   RpcController controller_y;
   BloomFilter::ToProtobuf(&x, &controller_x, &protobuf_x);
   BloomFilter::ToProtobuf(&y, &controller_y, &protobuf_y);
 
-  string directory_x(reinterpret_cast<const char*>(x.directory_), directory_size);
-  string directory_y(reinterpret_cast<const char*>(y.directory_), directory_size);
+  FLAGS_disable_blockbloomfilter_avx2 = false;
+  string directory_x(
+      reinterpret_cast<const char*>(x.GetBlockBloomFilter()->directory().data()),
+      directory_size);
+  string directory_y(
+      reinterpret_cast<const char*>(y.GetBlockBloomFilter()->directory().data()),
+      directory_size);
 
   BloomFilter::Or(protobuf_x, reinterpret_cast<const uint8_t*>(directory_x.data()),
       &protobuf_y, reinterpret_cast<uint8_t*>(const_cast<char*>(directory_y.data())),
       directory_size);
 
   {
-    CpuInfo::TempDisable t1(CpuInfo::AVX);
-    CpuInfo::TempDisable t2(CpuInfo::AVX2);
+    FLAGS_disable_blockbloomfilter_avx2 = true;
     BloomFilterPB protobuf_x2, protobuf_y2;
     RpcController controller_x2;
     RpcController controller_y2;
     BloomFilter::ToProtobuf(&x, &controller_x2, &protobuf_x2);
     BloomFilter::ToProtobuf(&y, &controller_y2, &protobuf_y2);
 
-    string directory_x2(reinterpret_cast<const char*>(x.directory_), directory_size);
-    string directory_y2(reinterpret_cast<const char*>(y.directory_), directory_size);
+    string directory_x2(
+        reinterpret_cast<const char*>(x.GetBlockBloomFilter()->directory().data()),
+        directory_size);
+    string directory_y2(
+        reinterpret_cast<const char*>(y.GetBlockBloomFilter()->directory().data()),
+        directory_size);
 
     BloomFilter::Or(protobuf_x2, reinterpret_cast<const uint8_t*>(directory_x2.data()),
         &protobuf_y2, reinterpret_cast<uint8_t*>(const_cast<char*>(directory_y2.data())),
@@ -224,8 +223,11 @@ class BloomFilterTest : public testing::Test {
   BloomFilter* CreateBloomFilter(int log_bufferpool_space) {
     int64_t filter_size = BloomFilter::GetExpectedMemoryUsed(log_bufferpool_space);
     EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
+    // Randomly disable AVX2 instructions half of the time. These are used for testing
+    // that AVX2 machines and non-AVX2 machines produce compatible BloomFilters.
+    FLAGS_disable_blockbloomfilter_avx2 = (MakeRand() & 0x1) == 0;
     BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
-    EXPECT_OK(bloom_filter->Init(log_bufferpool_space));
+    EXPECT_OK(bloom_filter->Init(log_bufferpool_space, 0));
     bloom_filters_.push_back(bloom_filter);
     EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
     return bloom_filter;
@@ -235,10 +237,13 @@ class BloomFilterTest : public testing::Test {
     int64_t filter_size =
         BloomFilter::GetExpectedMemoryUsed(filter_pb.log_bufferpool_space());
     EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
+    // Randomly disable AVX2 instructions half of the time. These are used for testing
+    // that AVX2 machines and non-AVX2 machines produce compatible BloomFilters.
+    FLAGS_disable_blockbloomfilter_avx2 = (MakeRand() & 0x1) == 0;
     BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
 
-    EXPECT_OK(bloom_filter->Init(
-        filter_pb, reinterpret_cast<const uint8_t*>(directory.data()), directory.size()));
+    EXPECT_OK(bloom_filter->Init(filter_pb,
+        reinterpret_cast<const uint8_t*>(directory.data()), directory.size(), 0));
 
     bloom_filters_.push_back(bloom_filter);
     EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
@@ -248,7 +253,10 @@ class BloomFilterTest : public testing::Test {
 
 // We can construct (and destruct) Bloom filters with different spaces.
 TEST_F(BloomFilterTest, Constructor) {
-  for (int i = 1; i < 30; ++i) {
+  // The minimum log_bufferpool_space size is 5 for bloom filter, which is defined
+  // as BlockBloomFilter.kLogBucketWordBits in kudu/util/block_bloom_filter.h,
+  // and is checked in function BlockBloomFilter.GetExpectedMemoryUsed().
+  for (int i = 5; i < 30; ++i) {
     CreateBloomFilter(i);
   }
 }
@@ -357,7 +365,8 @@ TEST_F(BloomFilterTest, Protobuf) {
 
   EXPECT_EQ(to_protobuf.always_true(), false);
 
-  std::string directory(reinterpret_cast<const char*>(bf->directory_),
+  std::string directory(
+      reinterpret_cast<const char*>(bf->GetBlockBloomFilter()->directory().data()),
       BloomFilter::GetExpectedMemoryUsed(BloomFilter::MinLogSpace(100, 0.01)));
 
   BloomFilter* from_protobuf = CreateBloomFilter(to_protobuf, directory);
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index 10f0987..0698349 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -29,63 +29,49 @@
 #include "gen-cpp/data_stream_service.pb.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/block_bloom_filter.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "runtime/exec-env.h"
+#include "util/kudu-status-util.h"
 
 using namespace std;
 
 namespace impala {
 
-constexpr uint32_t BloomFilter::REHASH[8] __attribute__((aligned(32)));
 constexpr BloomFilter* const BloomFilter::ALWAYS_TRUE_FILTER;
 
 BloomFilter::BloomFilter(BufferPool::ClientHandle* client)
-  : buffer_pool_client_(client) {}
+  : buffer_allocator_(client), block_bloom_filter_(&buffer_allocator_) {}
 
-BloomFilter::~BloomFilter() {
-  DCHECK(directory_ == nullptr)
-      << "Close() should have been called before the object is destroyed.";
-}
+BloomFilter::~BloomFilter() {}
 
-Status BloomFilter::Init(const int log_bufferpool_space) {
-  // Since log_bufferpool_space is in bytes, we need to convert it to the number of tiny
-  // Bloom filters we will use.
-  log_num_buckets_ = std::max(1, log_bufferpool_space - LOG_BUCKET_BYTE_SIZE);
-  // Don't use log_num_buckets_ if it will lead to undefined behavior by a shift
-  // that is too large.
-  directory_mask_ = (1ull << std::min(63, log_num_buckets_)) - 1;
-  // Since we use 32 bits in the arguments of Insert() and Find(), log_num_buckets_
-  // must be limited.
-  DCHECK(log_num_buckets_ <= 32) << "Bloom filter too large. log_bufferpool_space: "
-                                 << log_bufferpool_space;
-  const size_t alloc_size = directory_size();
-  BufferPool* buffer_pool_ = ExecEnv::GetInstance()->buffer_pool();
-  Close(); // Ensure that any previously allocated memory for directory_ is released.
-  RETURN_IF_ERROR(
-      buffer_pool_->AllocateBuffer(buffer_pool_client_, alloc_size, &buffer_handle_));
-  directory_ = reinterpret_cast<Bucket*>(buffer_handle_.data());
-  memset(directory_, 0, alloc_size);
+Status BloomFilter::Init(const int log_bufferpool_space, uint32_t hash_seed) {
+  KUDU_RETURN_IF_ERROR(
+      block_bloom_filter_.Init(log_bufferpool_space, kudu::FAST_HASH, hash_seed),
+      "Failed to init Block Bloom Filter");
   return Status::OK();
 }
 
 Status BloomFilter::Init(const BloomFilterPB& protobuf, const uint8_t* directory_in,
-    size_t directory_in_size) {
-  RETURN_IF_ERROR(Init(protobuf.log_bufferpool_space()));
-  if (directory_ != nullptr && !protobuf.always_false()) {
-    always_false_ = false;
-    DCHECK_EQ(directory_in_size, directory_size());
-    memcpy(directory_, directory_in, directory_in_size);
+    size_t directory_in_size, uint32_t hash_seed) {
+  if (protobuf.always_false() || directory_in_size == 0) {
+    // Directory size equal 0 only when it's always false.
+    KUDU_RETURN_IF_ERROR(block_bloom_filter_.Init(
+                             protobuf.log_bufferpool_space(), kudu::FAST_HASH, hash_seed),
+        "Failed to init Block Bloom Filter");
+  } else {
+    kudu::Slice slice(directory_in, directory_in_size);
+    KUDU_RETURN_IF_ERROR(
+        block_bloom_filter_.InitFromDirectory(
+            protobuf.log_bufferpool_space(), slice, false, kudu::FAST_HASH, hash_seed),
+        "Failed to init Block Bloom Filter");
   }
   return Status::OK();
 }
 
 void BloomFilter::Close() {
-  if (directory_ != nullptr) {
-    BufferPool* buffer_pool_ = ExecEnv::GetInstance()->buffer_pool();
-    buffer_pool_->FreeBuffer(buffer_pool_client_, &buffer_handle_);
-    directory_ = nullptr;
-  }
+  block_bloom_filter_.Close();
 }
 
 void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
@@ -116,22 +102,23 @@ void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
 
 void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
     kudu::rpc::RpcController* controller, const string& directory) {
-      AddDirectorySidecar(rpc_params, controller,
+  AddDirectorySidecar(rpc_params, controller,
       reinterpret_cast<const char*>(&(directory[0])),
       static_cast<unsigned long>(directory.size()));
 }
 
 void BloomFilter::ToProtobuf(
     BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const {
-  protobuf->set_log_bufferpool_space(log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
-  if (always_false_) {
+  protobuf->set_log_bufferpool_space(block_bloom_filter_.log_space_bytes());
+  if (block_bloom_filter_.always_false()) {
     protobuf->set_always_false(true);
     protobuf->set_always_true(false);
     return;
   }
+  kudu::Slice directory = block_bloom_filter_.directory();
   BloomFilter::AddDirectorySidecar(protobuf, controller,
-      reinterpret_cast<const char*>(directory_),
-      static_cast<unsigned long>(directory_size()));
+      reinterpret_cast<const char*>(directory.data()),
+      static_cast<unsigned long>(directory.size()));
 }
 
 void BloomFilter::ToProtobuf(const BloomFilter* filter,
@@ -146,126 +133,17 @@ void BloomFilter::ToProtobuf(const BloomFilter* filter,
   filter->ToProtobuf(protobuf, controller);
 }
 
-// The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. However, we
-// compile with -fno-strict-aliasing.
-void BloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept {
-  // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is
-  // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second
-  // part of this method.
-  uint32_t new_bucket[8] __attribute__((aligned(16)));
-  for (int i = 0; i < 8; ++i) {
-    // Rehash 'hash' and use the top LOG_BUCKET_WORD_BITS bits, following Dietzfelbinger.
-    new_bucket[i] =
-        (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - LOG_BUCKET_WORD_BITS);
-    new_bucket[i] = 1U << new_bucket[i];
-  }
-  for (int i = 0; i < 2; ++i) {
-    __m128i new_bucket_sse =
-        _mm_load_si128(reinterpret_cast<__m128i*>(new_bucket + 4 * i));
-    __m128i* existing_bucket = reinterpret_cast<__m128i*>(&directory_[bucket_idx][4 * i]);
-    *existing_bucket = _mm_or_si128(*existing_bucket, new_bucket_sse);
-  }
-}
-
-__m256i BloomFilter::MakeMask(const uint32_t hash) {
-   const __m256i ones = _mm256_set1_epi32(1);
-   const __m256i rehash = _mm256_setr_epi32(IMPALA_BLOOM_HASH_CONSTANTS);
-  // Load hash into a YMM register, repeated eight times
-  __m256i hash_data = _mm256_set1_epi32(hash);
-  // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different
-  // odd constants, then keep the 5 most significant bits from each product.
-  hash_data = _mm256_mullo_epi32(rehash, hash_data);
-  hash_data = _mm256_srli_epi32(hash_data, 27);
-  // Use these 5 bits to shift a single bit to a location in each 32-bit lane
-  return _mm256_sllv_epi32(ones, hash_data);
-}
-
-void BloomFilter::BucketInsertAVX2(
-    const uint32_t bucket_idx, const uint32_t hash) noexcept {
-  const __m256i mask = MakeMask(hash);
-  __m256i* const bucket = &reinterpret_cast<__m256i*>(directory_)[bucket_idx];
-  _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
-  // For SSE compatibility, unset the high bits of each YMM register so SSE instructions
-  // dont have to save them off before using XMM registers.
-  _mm256_zeroupper();
-}
-
-bool BloomFilter::BucketFindAVX2(
-    const uint32_t bucket_idx, const uint32_t hash) const noexcept {
-  const __m256i mask = MakeMask(hash);
-  const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx];
-  // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256
-  // takes the negation of its first argument and ands that with its second argument. In
-  // our case, the result is zero everywhere iff there is a one in 'bucket' wherever
-  // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise.
-  const bool result = _mm256_testc_si256(bucket, mask);
-  _mm256_zeroupper();
-  return result;
-}
-
-bool BloomFilter::BucketFind(
-    const uint32_t bucket_idx, const uint32_t hash) const noexcept {
-  for (int i = 0; i < BUCKET_WORDS; ++i) {
-    BucketWord hval =
-        (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - LOG_BUCKET_WORD_BITS);
-    hval = 1U << hval;
-    if (!(directory_[bucket_idx][i] & hval)) {
-      return false;
-    }
-  }
-  return true;
+int64_t BloomFilter::GetBufferPoolSpaceUsed() {
+  return buffer_allocator_.IsAllocated() ? block_bloom_filter_.GetSpaceUsed() : -1;
 }
 
-namespace {
-// Computes out[i] |= in[i] for the arrays 'in' and 'out' of length 'n' using AVX
-// instructions. 'n' must be a multiple of 32.
-void __attribute__((target("avx")))
-OrEqualArrayAvx(size_t n, const uint8_t* __restrict__ in, uint8_t* __restrict__ out) {
-  constexpr size_t AVX_REGISTER_BYTES = sizeof(__m256d);
-  DCHECK_EQ(n % AVX_REGISTER_BYTES, 0) << "Invalid Bloom Filter directory size";
-  const uint8_t* const in_end = in + n;
-  for (; in != in_end; (in += AVX_REGISTER_BYTES), (out += AVX_REGISTER_BYTES)) {
-    const double* double_in = reinterpret_cast<const double*>(in);
-    double* double_out = reinterpret_cast<double*>(out);
-    _mm256_storeu_pd(double_out,
-        _mm256_or_pd(_mm256_loadu_pd(double_out), _mm256_loadu_pd(double_in)));
-  }
-}
-
-void OrEqualArray(size_t n, const uint8_t* __restrict__ in, uint8_t* __restrict__ out) {
-  // The trivial loop out[i] |= in[i] should auto-vectorize with gcc at -O3, but it is not
-  // written in a way that is very friendly to auto-vectorization. Instead, we manually
-  // vectorize, increasing the speed by up to 56x.
-  //
-  // TODO: Tune gcc flags to auto-vectorize the trivial loop instead of hand-vectorizing
-  // it. This might not be possible.
-  if (CpuInfo::IsSupported(CpuInfo::AVX)) {
-    OrEqualArrayAvx(n, in, out);
-  } else {
-    const __m128i* simd_in = reinterpret_cast<const __m128i*>(in);
-    const __m128i* const simd_in_end = reinterpret_cast<const __m128i*>(in + n);
-    __m128i* simd_out = reinterpret_cast<__m128i*>(out);
-    // in.directory has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
-    // == 16, we can do two _mm_or_si128's in each iteration without checking array
-    // bounds.
-    while (simd_in != simd_in_end) {
-      for (int i = 0; i < 2; ++i, ++simd_in, ++simd_out) {
-        _mm_storeu_si128(
-            simd_out, _mm_or_si128(_mm_loadu_si128(simd_out), _mm_loadu_si128(simd_in)));
-      }
-    }
-  }
-}
-} // namespace
-
 void BloomFilter::Or(const BloomFilter& other) {
   DCHECK_NE(this, &other);
   DCHECK_NE(&other, ALWAYS_TRUE_FILTER);
   if (other.AlwaysFalse()) return;
-  DCHECK_EQ(directory_size(), other.directory_size());
-  OrEqualArray(directory_size(), reinterpret_cast<uint8_t*>(other.directory_),
-               reinterpret_cast<uint8_t*>(directory_));
-  always_false_ = false;
+  DCHECK_EQ(
+      block_bloom_filter_.log_space_bytes(), other.block_bloom_filter_.log_space_bytes());
+  block_bloom_filter_.Or(other.block_bloom_filter_);
 }
 
 void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in,
@@ -279,37 +157,55 @@ void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in,
   DCHECK(!in.always_true());
   if (in.always_false()) return;
   DCHECK_EQ(in.log_bufferpool_space(), out->log_bufferpool_space());
-  OrEqualArray(directory_size, directory_in, directory_out);
+  kudu::BlockBloomFilter::OrEqualArray(directory_size, directory_in, directory_out);
 }
 
+ImpalaBloomFilterBufferAllocator::ImpalaBloomFilterBufferAllocator()
+  : buffer_pool_client_(nullptr), is_allocated_(false) {
+  // Default constructor, which is defined to support the virtual function Clone().
+  // Impala code should not hit this function.
+  LOG(DFATAL) << "Unsupported code path.";
+}
 
-// The following three methods are derived from
-//
-// fpp = (1 - exp(-BUCKET_WORDS * ndv/space))^BUCKET_WORDS
-//
-// where space is in bits.
+ImpalaBloomFilterBufferAllocator::ImpalaBloomFilterBufferAllocator(
+    BufferPool::ClientHandle* client)
+  : buffer_pool_client_(DCHECK_NOTNULL(client)), is_allocated_(false) {}
 
-size_t BloomFilter::MaxNdv(const int log_bufferpool_space, const double fpp) {
-  DCHECK(log_bufferpool_space > 0 && log_bufferpool_space < 61);
-  DCHECK(0 < fpp && fpp < 1);
-  static const double ik = 1.0 / BUCKET_WORDS;
-  return -1 * ik * (1ull << (log_bufferpool_space + 3)) * log(1 - pow(fpp, ik));
+ImpalaBloomFilterBufferAllocator::~ImpalaBloomFilterBufferAllocator() {
+  if (is_allocated_) {
+    LOG(DFATAL) << "Close() should have been called before the object is destroyed.";
+    Close();
+  }
 }
 
-int BloomFilter::MinLogSpace(const size_t ndv, const double fpp) {
-  static const double k = BUCKET_WORDS;
-  if (0 == ndv) return 0;
-  // m is the number of bits we would need to get the fpp specified
-  const double m = -k * ndv / log(1 - pow(fpp, 1.0 / k));
+void ImpalaBloomFilterBufferAllocator::Close() {
+  if (!is_allocated_) return;
+  DCHECK(buffer_pool_client_ != nullptr);
+  BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool();
+  buffer_pool->FreeBuffer(buffer_pool_client_, &buffer_handle_);
+  is_allocated_ = false;
+}
+
+kudu::Status ImpalaBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) {
+  Close(); // Ensure that any previously allocated memory is released.
 
-  // Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
-  return max(0, static_cast<int>(ceil(log2(m / 8))));
+  BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool();
+  DCHECK(buffer_pool_client_ != nullptr);
+  impala::Status status =
+      buffer_pool->AllocateBuffer(buffer_pool_client_, bytes, &buffer_handle_);
+  if (!status.ok()) {
+    return kudu::Status::RuntimeError(
+        strings::Substitute("BufferPool bad_alloc, bytes: $0", bytes));
+  }
+  *ptr = reinterpret_cast<void*>(buffer_handle_.data());
+  is_allocated_ = true;
+  return kudu::Status::OK();
 }
 
-double BloomFilter::FalsePositiveProb(const size_t ndv, const int log_bufferpool_space) {
-  return pow(1 - exp((-1.0 * static_cast<double>(BUCKET_WORDS) * static_cast<double>(ndv))
-                     / static_cast<double>(1ull << (log_bufferpool_space + 3))),
-      BUCKET_WORDS);
+void ImpalaBloomFilterBufferAllocator::FreeBuffer(void* ptr) {
+  if (ptr == nullptr) return;
+  DCHECK_EQ(ptr, buffer_handle_.data());
+  Close();
 }
 
 } // namespace impala
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index c628e12..ab85377 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -25,10 +25,12 @@
 
 #include <immintrin.h>
 
+#include "codegen/impala-ir.h"
 #include "common/compiler-util.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "gutil/macros.h"
+#include "kudu/util/block_bloom_filter.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "util/cpu-info.h"
 #include "util/hash-util.h"
@@ -59,28 +61,55 @@ struct TestData;
 
 namespace impala {
 
-/// A BloomFilter stores sets of items and offers a query operation indicating whether or
-/// not that item is in the set.  BloomFilters use much less space than other compact data
-/// structures, but they are less accurate: for a small percentage of elements, the query
-/// operation incorrectly returns true even when the item is not in the set.
-///
-/// When talking about Bloom filter size, rather than talking about 'size', which might be
-/// ambiguous, we distinguish two different quantities:
-///
-/// 1. Space: the amount of buffer pool memory used
-///
-/// 2. NDV: the number of unique items that have been inserted
-///
-/// BloomFilter is implemented using block Bloom filters from Putze et al.'s "Cache-,
-/// Hash- and Space-Efficient Bloom Filters". The basic idea is to hash the item to a tiny
-/// Bloom filter the size of a single cache line or smaller. This implementation sets 8
-/// bits in each tiny Bloom filter. This provides a false positive rate near optimal for
-/// between 5 and 15 bits per distinct value, which corresponds to false positive
-/// probabilities between 0.1% (for 15 bits) and 10% (for 5 bits).
-///
-/// Our tiny BloomFilters are 32 bytes to take advantage of 32-byte SIMD in newer Intel
-/// machines. 'noexcept' is added to various functions called from the cross-compiled code
-/// so LLVM will not generate exception related code at their call sites.
+// Buffer allocator to allocate and de-allocate memory for the BlockBloomFilter
+// from buffer pool.
+class ImpalaBloomFilterBufferAllocator : public kudu::BlockBloomFilterBufferAllocatorIf {
+ public:
+  // Default constructor, which is defined to support the virtual function Clone().
+  // It uses kudu::DefaultBlockBloomFilterBufferAllocator to allocate/de-allocate
+  // memory. Since Clone function is only used for internal testing, so that
+  // memory allocation don't need to be tracked.
+  ImpalaBloomFilterBufferAllocator();
+
+  // Constructor with client handle of the buffer pool, which is created for
+  // runtime filters in runtime-filter-bank.
+  explicit ImpalaBloomFilterBufferAllocator(BufferPool::ClientHandle* client);
+
+  ~ImpalaBloomFilterBufferAllocator() override;
+
+  kudu::Status AllocateBuffer(size_t bytes, void** ptr) override;
+  void FreeBuffer(void* ptr) override;
+
+  // This virtual function is only defined for Kudu internal testing.
+  // Impala code should not hit this function.
+  std::shared_ptr<kudu::BlockBloomFilterBufferAllocatorIf> Clone() const override {
+    LOG(DFATAL) << "Unsupported code path.";
+    return std::make_shared<ImpalaBloomFilterBufferAllocator>();
+  }
+
+  bool IsAllocated() { return is_allocated_; }
+
+ private:
+  void Close();
+
+  /// Bufferpool client and handle used for allocating and freeing directory memory.
+  /// Client is not owned by the buffer allocator.
+  BufferPool::ClientHandle* buffer_pool_client_;
+  BufferPool::BufferHandle buffer_handle_;
+  bool is_allocated_;
+
+  DISALLOW_COPY_AND_ASSIGN(ImpalaBloomFilterBufferAllocator);
+};
+
+/// A BloomFilter stores sets of items and offers a query operation indicating whether
+/// or not that item is in the set. The BloomFilter functionality is implemented in
+/// kudu::BlockBloomFilter class (see source at be/src/kudu/util/block_bloom_filter.h),
+/// which using block Bloom filters from Putze et al.'s "Cache-, Hash- and
+/// Space-Efficient Bloom Filters".
+/// This class is defined as thin wrapper around kudu::BlockBloomFilter.
+/// Note: Kudu only support FastHash for BlockBloomFilter.
+///       Since Fasthash is strictly better than Murmur Hash2, we do not
+///       support Murmur Hash2 algorithm for Bloom filter.
 class BloomFilter {
  public:
   /// Consumes at most (1 << log_bufferpool_space) bytes from the buffer pool client.
@@ -92,9 +121,9 @@ class BloomFilter {
   /// Reset the filter state, allocate/reallocate and initialize the 'directory_'. All
   /// calls to Insert() and Find() should only be done between the calls to Init() and
   /// Close(). Init and Close are safe to call multiple times.
-  Status Init(const int log_bufferpool_space);
+  Status Init(const int log_bufferpool_space, uint32_t hash_seed);
   Status Init(const BloomFilterPB& protobuf, const uint8_t* directory_in,
-      size_t directory_in_size);
+      size_t directory_in_size, uint32_t hash_seed);
   void Close();
 
   /// Representation of a filter which allows all elements to pass.
@@ -107,7 +136,7 @@ class BloomFilter {
   static void ToProtobuf(const BloomFilter* filter, kudu::rpc::RpcController* controller,
       BloomFilterPB* protobuf);
 
-  bool AlwaysFalse() const { return always_false_; }
+  bool AlwaysFalse() const { return block_bloom_filter_.always_false(); }
 
   /// Adds an element to the BloomFilter. The function used to generate 'hash' need not
   /// have good uniformity, but it should have low collision probability. For instance, if
@@ -115,6 +144,8 @@ class BloomFilter {
   /// this Bloom filter, since the collision probability (the probability that two
   /// non-equal values will have the same hash value) is 0.
   void Insert(const uint32_t hash) noexcept;
+  // Same as above for codegen
+  void IR_ALWAYS_INLINE IrInsert(const uint32_t hash) noexcept;
 
   /// Finds an element in the BloomFilter, returning true if it is found and false (with
   /// high probabilty) if it is not.
@@ -136,25 +167,29 @@ class BloomFilter {
   /// rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter
   /// constructed with (1 << log_bufferpool_space) bytes of heap space hits false positive
   /// probabilty fpp.
-  static size_t MaxNdv(const int log_bufferpool_space, const double fpp);
+  static size_t MaxNdv(const int log_bufferpool_space, const double fpp) {
+    return kudu::BlockBloomFilter::MaxNdv(log_bufferpool_space, fpp);
+  }
 
   /// If we expect to fill a Bloom filter with 'ndv' different unique elements and we
   /// want a false positive probabilty of less than 'fpp', then this is the log (base 2)
   /// of the minimum number of bytes we need.
-  static int MinLogSpace(const size_t ndv, const double fpp);
+  static int MinLogSpace(const size_t ndv, const double fpp) {
+    return kudu::BlockBloomFilter::MinLogSpace(ndv, fpp);
+  }
 
   /// Returns the expected false positive rate for the given ndv and log_bufferpool_space
-  static double FalsePositiveProb(const size_t ndv, const int log_bufferpool_space);
+  static double FalsePositiveProb(const size_t ndv, const int log_bufferpool_space) {
+    return kudu::BlockBloomFilter::FalsePositiveProb(ndv, log_bufferpool_space);
+  }
 
   /// Returns the amount of buffer pool space used (in bytes). A value of -1 means that
   /// 'directory_' has not been allocated which can happen if the object was just created
   /// and Init() hasn't been called or Init() failed or Close() was called on the object.
-  int64_t GetBufferPoolSpaceUsed() const {
-    return directory_ == nullptr ? -1 : sizeof(Bucket) * (1LL << log_num_buckets_);
-  }
+  int64_t GetBufferPoolSpaceUsed();
 
   static int64_t GetExpectedMemoryUsed(int log_heap_size) {
-    return sizeof(Bucket) * (1LL << std::max(1, log_heap_size - LOG_BUCKET_WORD_BITS));
+    return kudu::BlockBloomFilter::GetExpectedMemoryUsed(log_heap_size);
   }
 
   /// The following two functions set a sidecar on 'controller' containing the Bloom
@@ -171,82 +206,19 @@ class BloomFilter {
   static void AddDirectorySidecar(BloomFilterPB* rpc_params,
       kudu::rpc::RpcController* controller, const string& directory);
 
- private:
-  // always_false_ is true when the bloom filter hasn't had any elements inserted.
-  bool always_false_ = true;
-
-  /// The BloomFilter is divided up into Buckets
-  static const uint64_t BUCKET_WORDS = 8;
-  typedef uint32_t BucketWord;
-
-  // log2(number of bits in a BucketWord)
-  static const int LOG_BUCKET_WORD_BITS = 5;
-  static const BucketWord BUCKET_WORD_MASK = (1 << LOG_BUCKET_WORD_BITS) - 1;
-
-  /// log2(number of bytes in a bucket)
-  static const int LOG_BUCKET_BYTE_SIZE = 5;
-
-  static_assert((1 << LOG_BUCKET_WORD_BITS) == std::numeric_limits<BucketWord>::digits,
-      "BucketWord must have a bit-width that is be a power of 2, like 64 for uint64_t.");
+  kudu::BlockBloomFilter* GetBlockBloomFilter() { return &block_bloom_filter_; }
 
-  typedef BucketWord Bucket[BUCKET_WORDS];
-
-  /// log_num_buckets_ is the log (base 2) of the number of buckets in the directory.
-  int log_num_buckets_ = 0;
-
-  /// directory_mask_ is (1 << log_num_buckets_) - 1. It is precomputed for
-  /// efficiency reasons.
-  uint32_t directory_mask_ = 0;
-
-  Bucket* directory_ = nullptr;
-
-  /// Bufferpool client and handle used for allocating and freeing directory memory.
-  /// Client is not owned by the filter.
-  BufferPool::ClientHandle* buffer_pool_client_;
-  BufferPool::BufferHandle buffer_handle_;
-
-  // Same as Insert(), but skips the CPU check and assumes that AVX is not available.
-  void InsertNoAvx2(const uint32_t hash) noexcept;
-
-  // Same as Insert(), but skips the CPU check and assumes that AVX is available.
-  void InsertAvx2(const uint32_t hash) noexcept;
-
-  /// Does the actual work of Insert(). bucket_idx is the index of the bucket to insert
-  /// into and 'hash' is the value passed to Insert().
-  void BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept;
-
-  /// A faster SIMD version of BucketInsert().
-  void BucketInsertAVX2(const uint32_t bucket_idx, const uint32_t hash) noexcept
-      __attribute__((__target__("avx2")));
-
-  /// BucketFind() and BucketFindAVX2() are just like BucketInsert() and
-  /// BucketInsertAVX2(), but for Find().
-  bool BucketFind(const uint32_t bucket_idx, const uint32_t hash) const noexcept;
-  bool BucketFindAVX2(const uint32_t bucket_idx, const uint32_t hash) const noexcept
-      __attribute__((__target__("avx2")));
+ private:
+  /// Buffer allocator is used by Kudu::BlockBloomFilter to allocate memory for
+  /// Kudu::BlockBloomFilter.directory_.
+  ImpalaBloomFilterBufferAllocator buffer_allocator_;
 
-  /// A helper function for the AVX2 methods. Turns a 32-bit hash into a 256-bit Bucket
-  /// with 1 single 1-bit set in each 32-bit lane.
-  static inline ALWAYS_INLINE __m256i MakeMask(const uint32_t hash)
-      __attribute__((__target__("avx2")));
-
-  int64_t directory_size() const {
-    return 1uLL << (log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
-  }
+  /// Embedded Kudu BlockBloomFilter object
+  kudu::BlockBloomFilter block_bloom_filter_;
 
   /// Serializes this filter as Protobuf.
   void ToProtobuf(BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const;
 
-/// Some constants used in hashing. #defined for efficiency reasons.
-#define IMPALA_BLOOM_HASH_CONSTANTS                                             \
-  0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU, 0x705495c7U, 0x2df1424bU, \
-      0x9efc4947U, 0x5c6bfb31U
-
-  /// REHASH is used as 8 odd 32-bit unsigned ints.  See Dietzfelbinger et al.'s "A
-  /// reliable randomized algorithm for the closest-pair problem".
-  static constexpr uint32_t REHASH[8]
-      __attribute__((aligned(32))) = {IMPALA_BLOOM_HASH_CONSTANTS};
-
   DISALLOW_COPY_AND_ASSIGN(BloomFilter);
 
   /// List 'BloomFilterTest_Protobuf_Test' as a friend class to run the backend
@@ -267,31 +239,12 @@ class BloomFilter {
   friend struct either::TestData;
 };
 
-// To set 8 bits in an 32-byte Bloom filter, we set one bit in each 32-bit uint32_t. This
-// is a "split Bloom filter", and it has approximately the same false positive probability
-// as standard a Bloom filter; See Mitzenmacher's "Bloom Filters and Such". It also has
-// the advantage of requiring fewer random bits: log2(32) * 8 = 5 * 8 = 40 random bits for
-// a split Bloom filter, but log2(256) * 8 = 64 random bits for a standard Bloom filter.
-
 inline void ALWAYS_INLINE BloomFilter::Insert(const uint32_t hash) noexcept {
-  DCHECK(directory_ != nullptr);
-  always_false_ = false;
-  const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
-  if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
-    BucketInsertAVX2(bucket_idx, hash);
-  } else {
-    BucketInsert(bucket_idx, hash);
-  }
+  block_bloom_filter_.Insert(hash);
 }
 
 inline bool ALWAYS_INLINE BloomFilter::Find(const uint32_t hash) const noexcept {
-  if (always_false_) return false;
-  DCHECK(directory_ != nullptr);
-  const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
-  if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
-    return BucketFindAVX2(bucket_idx, hash);
-  } else {
-    return BucketFind(bucket_idx, hash);
-  }
+  return block_bloom_filter_.Find(hash);
 }
+
 } // namespace impala
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 2e9f14a..40e0f28 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -100,6 +100,7 @@ PRINT_THRIFT_ENUM_IMPL(TStmtType)
 PRINT_THRIFT_ENUM_IMPL(TUnit)
 PRINT_THRIFT_ENUM_IMPL(TParquetTimestampType)
 PRINT_THRIFT_ENUM_IMPL(TTransactionalType)
+PRINT_THRIFT_ENUM_IMPL(TEnabledRuntimeFilterTypes)
 
 string PrintId(const TUniqueId& id, const string& separator) {
   stringstream out;
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 5126299..a9e89c1 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -77,6 +77,7 @@ std::string PrintThriftEnum(const TStmtType::type& value);
 std::string PrintThriftEnum(const TUnit::type& value);
 std::string PrintThriftEnum(const TParquetTimestampType::type& value);
 std::string PrintThriftEnum(const TTransactionalType::type& value);
+std::string PrintThriftEnum(const TEnabledRuntimeFilterTypes::type& value);
 
 std::string PrintTuple(const Tuple* t, const TupleDescriptor& d);
 std::string PrintRow(TupleRow* row, const RowDescriptor& d);
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index ec67fee..481ea4e 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -68,7 +68,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=22-f3824b7d15
+export IMPALA_TOOLCHAIN_BUILD_ID=29-34813f22eb
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p5
@@ -99,7 +99,7 @@ export IMPALA_GFLAGS_VERSION=2.2.0-p2
 unset IMPALA_GFLAGS_URL
 export IMPALA_GLOG_VERSION=0.3.4-p3
 unset IMPALA_GLOG_URL
-export IMPALA_GPERFTOOLS_VERSION=2.5
+export IMPALA_GPERFTOOLS_VERSION=2.5-p1
 unset IMPALA_GPERFTOOLS_URL
 export IMPALA_GTEST_VERSION=1.6.0
 unset IMPALA_GTEST_URL
@@ -661,7 +661,7 @@ if [[ -z "${KUDU_IS_SUPPORTED-}" ]]; then
 fi
 export KUDU_IS_SUPPORTED
 
-export IMPALA_KUDU_VERSION=${IMPALA_KUDU_VERSION-"389d4f1e1"}
+export IMPALA_KUDU_VERSION=${IMPALA_KUDU_VERSION-"d652cab17"}
 export IMPALA_KUDU_JAVA_VERSION=${IMPALA_KUDU_JAVA_VERSION-"1.13.0-SNAPSHOT"}
 export IMPALA_KUDU_HOME=${IMPALA_TOOLCHAIN_PACKAGES_HOME}/kudu-$IMPALA_KUDU_VERSION
 export IMPALA_KUDU_JAVA_HOME=\
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 4728c91..097525a 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -422,6 +422,10 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   103: optional bool retry_failed_queries = false;
+
+  // See comment in ImpalaService.thrift
+  104: optional PlanNodes.TEnabledRuntimeFilterTypes enabled_runtime_filter_types =
+      PlanNodes.TEnabledRuntimeFilterTypes.MIN_MAX;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 7302ce0..02e32ac 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -527,6 +527,14 @@ enum TImpalaQueryOptions {
   // retried query is a brand new query. From the client perspective, requests for the
   // failed query are transparently re-routed to the new query.
   RETRY_FAILED_QUERIES = 102
+
+  // Enabled runtime filter types to be applied to scanner.
+  // This option only apply to Kudu now, will apply to HDFS once we support
+  // min-max filter for HDFS.
+  //     BLOOM   - apply bloom filter only,
+  //     MIN_MAX - apply min-max filter only (default).
+  //     ALL     - apply both bloom filter and min-max filter.
+  ENABLED_RUNTIME_FILTER_TYPES = 103
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index e0a765c..413b849 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -119,6 +119,13 @@ enum TRuntimeFilterType {
   MIN_MAX = 1
 }
 
+// Enabled runtime filter types to be applied to scan nodes.
+enum TEnabledRuntimeFilterTypes {
+  BLOOM = 1
+  MIN_MAX = 2
+  ALL = 3
+}
+
 // Specification of a runtime filter.
 struct TRuntimeFilterDesc {
   // Filter unique id (within a query)
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index e892dcb..9c959d9 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -49,6 +49,7 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TEnabledRuntimeFilterTypes;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterDesc;
 import org.apache.impala.thrift.TRuntimeFilterMode;
@@ -675,8 +676,9 @@ public final class RuntimeFilterGenerator {
    *    scan node.
    * 3. Only Hdfs and Kudu scan nodes are supported:
    *     a. If the target is an HdfsScanNode, the filter must be type BLOOM.
-   *     b. If the target is a KuduScanNode, the filter must be type MIN_MAX, the target
-   *         must be a slot ref on a column, and the comp op cannot be 'not distinct'.
+   *     b. If the target is a KuduScanNode, the filter could be type MIN_MAX, and/or
+   *        BLOOM, the target must be a slot ref on a column, and the comp op cannot
+   *        be 'not distinct'.
    * A scan node may be used as a destination node for multiple runtime filters.
    */
   private void assignRuntimeFilters(PlannerContext ctx, ScanNode scanNode) {
@@ -687,6 +689,8 @@ public final class RuntimeFilterGenerator {
     boolean disableRowRuntimeFiltering =
         ctx.getQueryOptions().isDisable_row_runtime_filtering();
     TRuntimeFilterMode runtimeFilterMode = ctx.getQueryOptions().getRuntime_filter_mode();
+    TEnabledRuntimeFilterTypes enabledRuntimeFilterTypes =
+        ctx.getQueryOptions().getEnabled_runtime_filter_types();
     for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) {
       if (filter.isFinalized()) continue;
       Expr targetExpr = computeTargetExpr(filter, tid, analyzer);
@@ -702,21 +706,46 @@ public final class RuntimeFilterGenerator {
           && filter.getType() != TRuntimeFilterType.BLOOM) {
         continue;
       } else if (scanNode instanceof KuduScanNode) {
-        if (filter.getType() != TRuntimeFilterType.MIN_MAX) continue;
-        // TODO: IMPALA-9294: Support Kudu Date Min/Max Filters
-        if (targetExpr.getType().isDate()) continue;
-        // TODO: IMPALA-9580: Support Kudu VARCHAR Min/Max Filters
-        if (targetExpr.getType().isVarchar()) continue;
-        SlotRef slotRef = targetExpr.unwrapSlotRef(true);
-        // Kudu only supports targeting a single column, not general exprs, so the target
-        // must be a SlotRef pointing to a column. We can allow implicit integer casts
-        // by casting the min/max values before sending them to Kudu.
-        // Kudu also cannot currently return nulls if a filter is applied, so it does not
-        // work with "is not distinct".
-        if (slotRef == null || slotRef.getDesc().getColumn() == null
-            || (targetExpr instanceof CastExpr && !targetExpr.getType().isIntegerType())
-            || filter.getExprCompOp() == Operator.NOT_DISTINCT) {
-          continue;
+        if (filter.getType() == TRuntimeFilterType.BLOOM) {
+          if (enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.BLOOM
+              && enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.ALL) {
+            continue;
+          }
+          // TODO: IMPALA-9691 Support Kudu Timestamp and Date Bloom Filters
+          if (targetExpr.getType().isTimestamp() || targetExpr.getType().isDate()) {
+            continue;
+          }
+          // TODO: Support Kudu VARCHAR Bloom Filter
+          if (targetExpr.getType().isVarchar()) continue;
+          // Kudu only supports targeting a single column, not general exprs, so the
+          // target must be a SlotRef pointing to a column without casting
+          if (!(targetExpr instanceof SlotRef)
+              || filter.getExprCompOp() == Operator.NOT_DISTINCT) {
+            continue;
+          }
+          SlotRef slotRef = (SlotRef) targetExpr;
+          if (slotRef.getDesc().getColumn() == null) continue;
+        } else {
+          Preconditions.checkState(filter.getType() == TRuntimeFilterType.MIN_MAX);
+          if (enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.MIN_MAX
+              && enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.ALL) {
+            continue;
+          }
+          // TODO: IMPALA-9294: Support Kudu Date Min/Max Filters
+          if (targetExpr.getType().isDate()) continue;
+          // TODO: IMPALA-9580: Support Kudu VARCHAR Min/Max Filters
+          if (targetExpr.getType().isVarchar()) continue;
+          SlotRef slotRef = targetExpr.unwrapSlotRef(true);
+          // Kudu only supports targeting a single column, not general exprs, so the
+          // target must be a SlotRef pointing to a column. We can allow implicit
+          // integer casts by casting the min/max values before sending them to Kudu.
+          // Kudu also cannot currently return nulls if a filter is applied, so it
+          // does not work with "is not distinct".
+          if (slotRef == null || slotRef.getDesc().getColumn() == null
+              || (targetExpr instanceof CastExpr && !targetExpr.getType().isIntegerType())
+              || filter.getExprCompOp() == Operator.NOT_DISTINCT) {
+            continue;
+          }
         }
       }
 
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index b2cea00..f94f0a8 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -34,6 +34,7 @@ import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.Frontend.PlanCtx;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.testutil.TestUtils.IgnoreValueFilter;
+import org.apache.impala.thrift.TEnabledRuntimeFilterTypes;
 import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TJoinDistributionMode;
@@ -567,6 +568,11 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testBloomFilterAssignment() {
+    runPlannerTestFile("bloom-filter-assignment");
+  }
+
+  @Test
   public void testConjunctOrdering() {
     runPlannerTestFile("conjunct-ordering");
   }
@@ -594,10 +600,12 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testKudu() {
     Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
+    TQueryOptions options = defaultQueryOptions();
+    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.ALL);
     addTestDb("kudu_planner_test", "Test DB for Kudu Planner.");
     addTestTable("CREATE EXTERNAL TABLE kudu_planner_test.no_stats STORED AS KUDU " +
         "TBLPROPERTIES ('kudu.table_name' = 'impala::functional_kudu.alltypes');");
-    runPlannerTestFile("kudu");
+    runPlannerTestFile("kudu", options);
   }
 
   @Test
@@ -609,7 +617,9 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testKuduUpdate() {
     Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
-    runPlannerTestFile("kudu-update");
+    TQueryOptions options = defaultQueryOptions();
+    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.ALL);
+    runPlannerTestFile("kudu-update", options);
   }
 
   @Test
@@ -629,9 +639,11 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testKuduTpch() {
     Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
-    runPlannerTestFile("tpch-kudu", ImmutableSet.of(
-        PlannerTestOption.INCLUDE_RESOURCE_HEADER,
-        PlannerTestOption.VALIDATE_RESOURCES));
+    TQueryOptions options = defaultQueryOptions();
+    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.ALL);
+    runPlannerTestFile("tpch-kudu", options,
+        ImmutableSet.of(PlannerTestOption.INCLUDE_RESOURCE_HEADER,
+            PlannerTestOption.VALIDATE_RESOURCES));
   }
 
   @Test
@@ -813,6 +825,7 @@ public class PlannerTest extends PlannerTestBase {
     TQueryOptions options = defaultQueryOptions();
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setDisable_hdfs_num_rows_estimate(false);
+    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.MIN_MAX);
     runPlannerTestFile("min-max-runtime-filters-hdfs-num-rows-est-enabled", options);
   }
 
@@ -821,6 +834,7 @@ public class PlannerTest extends PlannerTestBase {
     TQueryOptions options = defaultQueryOptions();
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setDisable_hdfs_num_rows_estimate(true);
+    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.MIN_MAX);
     runPlannerTestFile("min-max-runtime-filters", options);
   }
 
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
new file mode 100644
index 0000000..5cc9a21
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
@@ -0,0 +1,408 @@
+# Target a slot ref:
+#   Join two HDFS tables, a bloom filter is assigned to HDFS scanner.
+select /* +straight_join */ count(*) from functional_parquet.alltypes a
+  join functional_parquet.alltypes b on a.id = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=BLOOM
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=34.94MB mem-reservation=2.97MB thread-reservation=3 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- b.id
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=12.79K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [functional_parquet.alltypes b]
+|     HDFS partitions=24/24 files=24 size=201.11KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/24 rows=unavailable
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=12.79K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypes a]
+   HDFS partitions=24/24 files=24 size=201.11KB
+   runtime filters: RF000[bloom] -> a.id
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.79K
+   in pipelines: 00(GETNEXT)
+====
+# Target a slot ref:
+#   Join two Kudu tables, a bloom filter is assigned to Kudu scanner.
+select /* +straight_join */ count(*) from functional_kudu.alltypes a
+  join functional_kudu.alltypes b on a.id = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=BLOOM
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=13.69MB mem-reservation=2.94MB thread-reservation=3 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  fk/pk conjuncts: a.id = b.id
+|  runtime filters: RF000[bloom] <- b.id
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=7.30K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN KUDU [functional_kudu.alltypes b]
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN KUDU [functional_kudu.alltypes a]
+   runtime filters: RF000[bloom] -> a.id
+   mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=7.30K
+   in pipelines: 00(GETNEXT)
+====
+# Target a slot ref:
+#   Join HDFS and Kudu tables, a bloom filter is assigned to HDFS scanner.
+select /* +straight_join */ count(*) from functional_parquet.alltypes a
+  join functional_kudu.alltypes b on a.id = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=BLOOM
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=28.94MB mem-reservation=2.95MB thread-reservation=3 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- b.id
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=12.79K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN KUDU [functional_kudu.alltypes b]
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypes a]
+   HDFS partitions=24/24 files=24 size=201.11KB
+   runtime filters: RF000[bloom] -> a.id
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.79K
+   in pipelines: 00(GETNEXT)
+====
+# Target a slot ref:
+#   Join Kudu and HDFS tables, a bloom filter is assigned to Kudu scanner.
+select /* +straight_join */ count(*) from functional_kudu.alltypes a
+  join functional_parquet.alltypes b on a.id = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=BLOOM
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=19.69MB mem-reservation=2.95MB thread-reservation=3 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- b.id
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=7.30K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [functional_parquet.alltypes b]
+|     HDFS partitions=24/24 files=24 size=201.11KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/24 rows=unavailable
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=12.79K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN KUDU [functional_kudu.alltypes a]
+   runtime filters: RF000[bloom] -> a.id
+   mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=7.30K
+   in pipelines: 00(GETNEXT)
+====
+# Not target a slot ref:
+#   Join HDFS and Kudu tables, a bloom filter is assigned to HDFS scanner.
+select /* +straight_join */ count(*) from functional_parquet.alltypes a
+  join functional_kudu.alltypes b on a.id+1 = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=BLOOM
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=28.94MB mem-reservation=2.95MB thread-reservation=3 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id + 1 = b.id
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- b.id
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=12.79K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN KUDU [functional_kudu.alltypes b]
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypes a]
+   HDFS partitions=24/24 files=24 size=201.11KB
+   runtime filters: RF000[bloom] -> a.id + 1
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.79K
+   in pipelines: 00(GETNEXT)
+====
+# Not target a slot ref:
+#   Join Kudu and Parquet tables, bloom filter is NOT assigned to Kudu scanner.
+select /* +straight_join */ count(*) from functional_kudu.alltypes a
+  join functional_parquet.alltypes b on a.id+1 = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=BLOOM
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=18.69MB mem-reservation=1.95MB thread-reservation=3
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id + 1 = b.id
+|  fk/pk conjuncts: assumed fk/pk
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=7.30K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [functional_parquet.alltypes b]
+|     HDFS partitions=24/24 files=24 size=201.11KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/24 rows=unavailable
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=12.79K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN KUDU [functional_kudu.alltypes a]
+   mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=7.30K
+   in pipelines: 00(GETNEXT)
+====
+# Target slot refs:
+#   Join three tables, bloom filters are assigned to HDFS and Kudu scanner.
+select straight_join count(*)
+  from functional_parquet.alltypes a join [BROADCAST] functional_kudu.alltypes b
+    join [BROADCAST] functional_parquet.alltypes c
+  where a.int_col = b.int_col and a.int_col = c.smallint_col * 2 and c.id < 100
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=BLOOM
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=54.62MB mem-reservation=5.91MB thread-reservation=4 runtime-filters-memory=2.00MB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+05:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=3 row-size=8B cardinality=1
+|  in pipelines: 05(GETNEXT), 00(OPEN)
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = c.smallint_col * 2
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- c.smallint_col * 2
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1,2 row-size=14B cardinality=12.79K
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--02:SCAN HDFS [functional_parquet.alltypes c]
+|     HDFS partitions=24/24 files=24 size=201.11KB
+|     predicates: c.id < CAST(100 AS INT)
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/24 rows=unavailable
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     parquet statistics predicates: c.id < CAST(100 AS INT)
+|     parquet dictionary predicates: c.id < CAST(100 AS INT)
+|     mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
+|     tuple-ids=2 row-size=6B cardinality=1.28K
+|     in pipelines: 02(GETNEXT)
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = b.int_col
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF002[bloom] <- b.int_col
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=12.79K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN KUDU [functional_kudu.alltypes b]
+|     runtime filters: RF000[bloom] -> b.int_col
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypes a]
+   HDFS partitions=24/24 files=24 size=201.11KB
+   runtime filters: RF000[bloom] -> a.int_col, RF002[bloom] -> a.int_col
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.79K
+   in pipelines: 00(GETNEXT)
+====
+# Not target slot refs:
+#   Join three tables, bloom filter is assigned to HDFS scanner, but
+#   not assigned to Kudu scanner.
+select straight_join count(*)
+  from functional_parquet.alltypes a join [BROADCAST] functional_kudu.alltypes b
+    join [BROADCAST] functional_parquet.alltypes c
+  where a.int_col = b.int_col and a.int_col+1 = c.smallint_col * 2 and c.id < 100
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=BLOOM
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=54.62MB mem-reservation=5.91MB thread-reservation=4 runtime-filters-memory=2.00MB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+05:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=3 row-size=8B cardinality=1
+|  in pipelines: 05(GETNEXT), 00(OPEN)
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col + 1 = c.smallint_col * 2
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- c.smallint_col * 2
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1,2 row-size=14B cardinality=12.79K
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--02:SCAN HDFS [functional_parquet.alltypes c]
+|     HDFS partitions=24/24 files=24 size=201.11KB
+|     predicates: c.id < CAST(100 AS INT)
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/24 rows=unavailable
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     parquet statistics predicates: c.id < CAST(100 AS INT)
+|     parquet dictionary predicates: c.id < CAST(100 AS INT)
+|     mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
+|     tuple-ids=2 row-size=6B cardinality=1.28K
+|     in pipelines: 02(GETNEXT)
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = b.int_col
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF002[bloom] <- b.int_col
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=12.79K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN KUDU [functional_kudu.alltypes b]
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypes a]
+   HDFS partitions=24/24 files=24 size=201.11KB
+   runtime filters: RF000[bloom] -> a.int_col + 1, RF002[bloom] -> a.int_col
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.79K
+   in pipelines: 00(GETNEXT)
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
index 2c70157..7d2f845 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
@@ -51,7 +51,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id
-|  runtime filters: RF001 <- b.id
+|  runtime filters: RF000 <- b.id, RF001 <- b.id
 |  row-size=28B cardinality=0
 |
 |--01:SCAN HDFS [functional.testtbl b]
@@ -61,14 +61,14 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 00:SCAN KUDU [functional_kudu.testtbl a]
    kudu predicates: a.id = 10
-   runtime filters: RF001 -> a.id
+   runtime filters: RF000 -> a.id, RF001 -> a.id
    row-size=8B cardinality=0
 ---- DISTRIBUTEDPLAN
 UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = b.id
-|  runtime filters: RF001 <- b.id
+|  runtime filters: RF000 <- b.id, RF001 <- b.id
 |  row-size=28B cardinality=0
 |
 |--03:EXCHANGE [BROADCAST]
@@ -80,7 +80,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 00:SCAN KUDU [functional_kudu.testtbl a]
    kudu predicates: a.id = 10
-   runtime filters: RF001 -> a.id
+   runtime filters: RF000 -> a.id, RF001 -> a.id
    row-size=8B cardinality=0
 ====
 update a
@@ -104,7 +104,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: a.id = ids
-|  runtime filters: RF001 <- ids
+|  runtime filters: RF000 <- ids, RF001 <- ids
 |  row-size=9B cardinality=1
 |
 |--04:EXCHANGE [HASH(ids)]
@@ -116,7 +116,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 03:EXCHANGE [HASH(a.id)]
 |
 00:SCAN KUDU [functional_kudu.testtbl a]
-   runtime filters: RF001 -> a.id
+   runtime filters: RF000 -> a.id, RF001 -> a.id
    row-size=8B cardinality=0
 ====
 update a
@@ -128,7 +128,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.zip = zip
-|  runtime filters: RF001 <- zip
+|  runtime filters: RF000 <- zip, RF001 <- zip
 |  row-size=12B cardinality=0
 |
 |--01:SCAN HDFS [functional.testtbl]
@@ -137,14 +137,14 @@ UPDATE KUDU [functional_kudu.testtbl]
 |     row-size=4B cardinality=0
 |
 00:SCAN KUDU [functional_kudu.testtbl a]
-   runtime filters: RF001 -> a.zip
+   runtime filters: RF000 -> a.zip, RF001 -> a.zip
    row-size=12B cardinality=0
 ---- DISTRIBUTEDPLAN
 UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
 |  hash predicates: a.zip = zip
-|  runtime filters: RF001 <- zip
+|  runtime filters: RF000 <- zip, RF001 <- zip
 |  row-size=12B cardinality=0
 |
 |--04:EXCHANGE [BROADCAST]
@@ -158,7 +158,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 |     row-size=4B cardinality=0
 |
 00:SCAN KUDU [functional_kudu.testtbl a]
-   runtime filters: RF001 -> a.zip
+   runtime filters: RF000 -> a.zip, RF001 -> a.zip
    row-size=12B cardinality=0
 ====
 update functional_kudu.testtbl set zip = 94546 where false
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index dcfdc05..824188a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -392,7 +392,7 @@ PLAN-ROOT SINK
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id
-|  runtime filters: RF001 <- b.id
+|  runtime filters: RF000 <- b.id, RF001 <- b.id
 |  row-size=12B cardinality=1
 |
 |--01:SCAN KUDU [functional_kudu.alltypessmall b]
@@ -403,7 +403,7 @@ PLAN-ROOT SINK
 00:SCAN KUDU [functional_kudu.alltypes a]
    predicates: CAST(a.id AS STRING) > '123'
    kudu predicates: a.id > 10
-   runtime filters: RF001 -> a.id
+   runtime filters: RF000 -> a.id, RF001 -> a.id
    row-size=4B cardinality=730
 ====
 # IMPALA-4662: Kudu analysis failure for NULL literal in IN list
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
index b8e5b4f..8094095 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
@@ -664,3 +664,120 @@ PLAN-ROOT SINK
 00:SCAN KUDU [functional_kudu.alltypes a]
    row-size=4B cardinality=7.30K
 ====
+# ENABLED_RUNTIME_FILTER_TYPES is set as BLOOM, Bloom filter is assigned
+# to Kudu.
+select /* +straight_join */ count(*) from functional_kudu.alltypes a
+  join functional_kudu.alltypes b on a.id = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=BLOOM
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=13.69MB mem-reservation=2.94MB thread-reservation=3 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  fk/pk conjuncts: a.id = b.id
+|  runtime filters: RF000[bloom] <- b.id
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=7.30K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN KUDU [functional_kudu.alltypes b]
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN KUDU [functional_kudu.alltypes a]
+   runtime filters: RF000[bloom] -> a.id
+   mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=7.30K
+   in pipelines: 00(GETNEXT)
+====
+# ENABLED_RUNTIME_FILTER_TYPES is set as MIN_MAX, Min-max filter is assigned
+# to Kudu.
+select /* +straight_join */ count(*) from functional_kudu.alltypes a
+  join functional_kudu.alltypes b on a.id = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=MIN_MAX
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=12.69MB mem-reservation=1.94MB thread-reservation=3
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  fk/pk conjuncts: a.id = b.id
+|  runtime filters: RF001[min_max] <- b.id
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=7.30K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN KUDU [functional_kudu.alltypes b]
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN KUDU [functional_kudu.alltypes a]
+   runtime filters: RF001[min_max] -> a.id
+   mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=7.30K
+   in pipelines: 00(GETNEXT)
+====
+# ENABLED_RUNTIME_FILTER_TYPES is set as BLOOM_MIN_MAX, both Bloom filter
+# and Min-max filter are assigned to Kudu.
+select /* +straight_join */ count(*) from functional_kudu.alltypes a
+  join functional_kudu.alltypes b on a.id = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=ALL
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=13.69MB mem-reservation=2.94MB thread-reservation=3 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  fk/pk conjuncts: a.id = b.id
+|  runtime filters: RF000[bloom] <- b.id, RF001[min_max] <- b.id
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=7.30K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN KUDU [functional_kudu.alltypes b]
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN KUDU [functional_kudu.alltypes a]
+   runtime filters: RF000[bloom] -> a.id, RF001[min_max] -> a.id
+   mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=7.30K
+   in pipelines: 00(GETNEXT)
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
index 32fbf0f..83f8bea 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
@@ -83,96 +83,147 @@ order by
   s_name,
   p_partkey
 limit 100
+---- QUERYOPTIONS
+EXPLAIN_LEVEL=2
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=18.31MB Threads=10
-Per-Host Resource Estimates: Memory=49MB
+Max Per-Host Resource Reservation: Memory=26.31MB Threads=10
+Per-Host Resource Estimates: Memory=57MB
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=57.00MB mem-reservation=26.31MB thread-reservation=10 runtime-filters-memory=8.00MB
 PLAN-ROOT SINK
+|  output exprs: round(s_acctbal, CAST(2 AS TINYINT)), s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 18:TOP-N [LIMIT=100]
 |  order by: s_acctbal DESC, n_name ASC, s_name ASC, p_partkey ASC
-|  row-size=230B cardinality=100
+|  mem-estimate=22.42KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=11 row-size=230B cardinality=100
+|  in pipelines: 18(GETNEXT), 12(OPEN)
 |
 17:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: ps_partkey = p_partkey, min(ps_supplycost) = ps_supplycost
-|  runtime filters: RF002 <- p_partkey
-|  row-size=330B cardinality=1.01K
+|  runtime filters: RF000[bloom] <- p_partkey, RF002[min_max] <- p_partkey
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=1,2,0,3,4 row-size=330B cardinality=1.01K
+|  in pipelines: 12(GETNEXT), 01(OPEN)
 |
 |--16:HASH JOIN [INNER JOIN]
 |  |  hash predicates: n_regionkey = r_regionkey
-|  |  runtime filters: RF011 <- r_regionkey
-|  |  row-size=330B cardinality=1.01K
+|  |  fk/pk conjuncts: n_regionkey = r_regionkey
+|  |  runtime filters: RF010[bloom] <- r_regionkey, RF011[min_max] <- r_regionkey
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  |  tuple-ids=1,2,0,3,4 row-size=330B cardinality=1.01K
+|  |  in pipelines: 01(GETNEXT), 04(OPEN)
 |  |
 |  |--04:SCAN KUDU [tpch_kudu.region]
 |  |     kudu predicates: r_name = 'EUROPE'
-|  |     row-size=2B cardinality=1
+|  |     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|  |     tuple-ids=4 row-size=2B cardinality=1
+|  |     in pipelines: 04(GETNEXT)
 |  |
 |  15:HASH JOIN [INNER JOIN]
 |  |  hash predicates: s_nationkey = n_nationkey
-|  |  runtime filters: RF013 <- n_nationkey
-|  |  row-size=328B cardinality=5.05K
+|  |  fk/pk conjuncts: s_nationkey = n_nationkey
+|  |  runtime filters: RF012[bloom] <- n_nationkey, RF013[min_max] <- n_nationkey
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  |  tuple-ids=1,2,0,3 row-size=328B cardinality=5.05K
+|  |  in pipelines: 01(GETNEXT), 03(OPEN)
 |  |
 |  |--03:SCAN KUDU [tpch_kudu.nation]
-|  |     runtime filters: RF011 -> n_regionkey
-|  |     row-size=27B cardinality=25
+|  |     runtime filters: RF010[bloom] -> n_regionkey, RF011[min_max] -> n_regionkey
+|  |     mem-estimate=1.12MB mem-reservation=0B thread-reservation=1
+|  |     tuple-ids=3 row-size=27B cardinality=25
+|  |     in pipelines: 03(GETNEXT)
 |  |
 |  14:HASH JOIN [INNER JOIN]
 |  |  hash predicates: s_suppkey = ps_suppkey
-|  |  runtime filters: RF015 <- ps_suppkey
-|  |  row-size=301B cardinality=5.05K
+|  |  fk/pk conjuncts: s_suppkey = ps_suppkey
+|  |  runtime filters: RF014[bloom] <- ps_suppkey, RF015[min_max] <- ps_suppkey
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  |  tuple-ids=1,2,0 row-size=301B cardinality=5.05K
+|  |  in pipelines: 01(GETNEXT), 02(OPEN)
 |  |
 |  |--13:HASH JOIN [INNER JOIN]
 |  |  |  hash predicates: ps_partkey = p_partkey
-|  |  |  runtime filters: RF017 <- p_partkey
-|  |  |  row-size=99B cardinality=5.05K
+|  |  |  fk/pk conjuncts: ps_partkey = p_partkey
+|  |  |  runtime filters: RF016[bloom] <- p_partkey, RF017[min_max] <- p_partkey
+|  |  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  |  |  tuple-ids=2,0 row-size=99B cardinality=5.05K
+|  |  |  in pipelines: 02(GETNEXT), 00(OPEN)
 |  |  |
 |  |  |--00:SCAN KUDU [tpch_kudu.part]
 |  |  |     predicates: p_type LIKE '%BRASS'
-|  |  |     kudu predicates: p_size = 15
-|  |  |     row-size=75B cardinality=1.26K
+|  |  |     kudu predicates: p_size = CAST(15 AS INT)
+|  |  |     mem-estimate=6.00MB mem-reservation=0B thread-reservation=1
+|  |  |     tuple-ids=0 row-size=75B cardinality=1.26K
+|  |  |     in pipelines: 00(GETNEXT)
 |  |  |
 |  |  02:SCAN KUDU [tpch_kudu.partsupp]
-|  |     runtime filters: RF017 -> ps_partkey
-|  |     row-size=24B cardinality=800.00K
+|  |     runtime filters: RF016[bloom] -> ps_partkey, RF017[min_max] -> ps_partkey
+|  |     mem-estimate=4.50MB mem-reservation=0B thread-reservation=1
+|  |     tuple-ids=2 row-size=24B cardinality=800.00K
+|  |     in pipelines: 02(GETNEXT)
 |  |
 |  01:SCAN KUDU [tpch_kudu.supplier]
-|     runtime filters: RF013 -> s_nationkey, RF015 -> s_suppkey
-|     row-size=203B cardinality=10.00K
+|     runtime filters: RF014[bloom] -> s_suppkey, RF015[min_max] -> s_suppkey, RF012[bloom] -> s_nationkey, RF013[min_max] -> s_nationkey
+|     mem-estimate=10.50MB mem-reservation=0B thread-reservation=1
+|     tuple-ids=1 row-size=203B cardinality=10.00K
+|     in pipelines: 01(GETNEXT)
 |
 12:AGGREGATE [FINALIZE]
 |  output: min(ps_supplycost)
 |  group by: ps_partkey
-|  row-size=16B cardinality=160.00K
+|  mem-estimate=10.00MB mem-reservation=4.75MB spill-buffer=256.00KB thread-reservation=0
+|  tuple-ids=9 row-size=16B cardinality=160.00K
+|  in pipelines: 12(GETNEXT), 05(OPEN)
 |
 11:HASH JOIN [INNER JOIN]
 |  hash predicates: n_regionkey = r_regionkey
-|  runtime filters: RF005 <- r_regionkey
-|  row-size=40B cardinality=160.00K
+|  fk/pk conjuncts: n_regionkey = r_regionkey
+|  runtime filters: RF004[bloom] <- r_regionkey, RF005[min_max] <- r_regionkey
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=5,6,7,8 row-size=40B cardinality=160.00K
+|  in pipelines: 05(GETNEXT), 08(OPEN)
 |
 |--08:SCAN KUDU [tpch_kudu.region]
 |     kudu predicates: r_name = 'EUROPE'
-|     row-size=2B cardinality=1
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=8 row-size=2B cardinality=1
+|     in pipelines: 08(GETNEXT)
 |
 10:HASH JOIN [INNER JOIN]
 |  hash predicates: s_nationkey = n_nationkey
-|  runtime filters: RF007 <- n_nationkey
-|  row-size=38B cardinality=800.00K
+|  fk/pk conjuncts: s_nationkey = n_nationkey
+|  runtime filters: RF006[bloom] <- n_nationkey, RF007[min_max] <- n_nationkey
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=5,6,7 row-size=38B cardinality=800.00K
+|  in pipelines: 05(GETNEXT), 07(OPEN)
 |
 |--07:SCAN KUDU [tpch_kudu.nation]
-|     runtime filters: RF005 -> n_regionkey
-|     row-size=4B cardinality=25
+|     runtime filters: RF004[bloom] -> n_regionkey, RF005[min_max] -> n_regionkey
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     tuple-ids=7 row-size=4B cardinality=25
+|     in pipelines: 07(GETNEXT)
 |
 09:HASH JOIN [INNER JOIN]
 |  hash predicates: ps_suppkey = s_suppkey
-|  runtime filters: RF009 <- s_suppkey
-|  row-size=34B cardinality=800.00K
+|  fk/pk conjuncts: ps_suppkey = s_suppkey
+|  runtime filters: RF008[bloom] <- s_suppkey, RF009[min_max] <- s_suppkey
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=5,6 row-size=34B cardinality=800.00K
+|  in pipelines: 05(GETNEXT), 06(OPEN)
 |
 |--06:SCAN KUDU [tpch_kudu.supplier]
-|     runtime filters: RF007 -> s_nationkey
-|     row-size=10B cardinality=10.00K
+|     runtime filters: RF006[bloom] -> s_nationkey, RF007[min_max] -> s_nationkey
+|     mem-estimate=3.00MB mem-reservation=0B thread-reservation=1
+|     tuple-ids=6 row-size=10B cardinality=10.00K
+|     in pipelines: 06(GETNEXT)
 |
 05:SCAN KUDU [tpch_kudu.partsupp]
-   runtime filters: RF002 -> tpch_kudu.partsupp.ps_partkey, RF009 -> ps_suppkey
-   row-size=24B cardinality=800.00K
+   runtime filters: RF000[bloom] -> tpch_kudu.partsupp.ps_partkey, RF002[min_max] -> tpch_kudu.partsupp.ps_partkey, RF008[bloom] -> ps_suppkey, RF009[min_max] -> ps_suppkey
+   mem-estimate=4.50MB mem-reservation=0B thread-reservation=1
+   tuple-ids=5 row-size=24B cardinality=800.00K
+   in pipelines: 05(GETNEXT)
 ====
 # Q3 - Shipping Priority Query
 select
@@ -199,8 +250,8 @@ order by
   o_orderdate
 limit 10
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=12.38MB Threads=4
-Per-Host Resource Estimates: Memory=26MB
+Max Per-Host Resource Reservation: Memory=14.38MB Threads=4
+Per-Host Resource Estimates: Memory=28MB
 PLAN-ROOT SINK
 |
 06:TOP-N [LIMIT=10]
@@ -214,7 +265,7 @@ PLAN-ROOT SINK
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: o_custkey = c_custkey
-|  runtime filters: RF001 <- c_custkey
+|  runtime filters: RF000 <- c_custkey, RF001 <- c_custkey
 |  row-size=78B cardinality=17.56K
 |
 |--00:SCAN KUDU [tpch_kudu.customer]
@@ -223,17 +274,17 @@ PLAN-ROOT SINK
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: l_orderkey = o_orderkey
-|  runtime filters: RF003 <- o_orderkey
+|  runtime filters: RF002 <- o_orderkey, RF003 <- o_orderkey
 |  row-size=70B cardinality=57.58K
 |
 |--01:SCAN KUDU [tpch_kudu.orders]
 |     kudu predicates: o_orderdate < '1995-03-15'
-|     runtime filters: RF001 -> o_custkey
+|     runtime filters: RF000 -> o_custkey, RF001 -> o_custkey
 |     row-size=46B cardinality=150.00K
 |
 02:SCAN KUDU [tpch_kudu.lineitem]
    kudu predicates: l_shipdate > '1995-03-15'
-   runtime filters: RF003 -> l_orderkey
+   runtime filters: RF002 -> l_orderkey, RF003 -> l_orderkey
    row-size=24B cardinality=600.12K
 ====
 # Q4 - Order Priority Checking Query
@@ -259,8 +310,8 @@ group by
 order by
   o_orderpriority
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=13.94MB Threads=3
-Per-Host Resource Estimates: Memory=42MB
+Max Per-Host Resource Reservation: Memory=14.94MB Threads=3
+Per-Host Resource Estimates: Memory=44MB
 PLAN-ROOT SINK
 |
 04:SORT
@@ -274,7 +325,7 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: l_orderkey = o_orderkey
-|  runtime filters: RF001 <- o_orderkey
+|  runtime filters: RF000 <- o_orderkey, RF001 <- o_orderkey
 |  row-size=32B cardinality=150.00K
 |
 |--00:SCAN KUDU [tpch_kudu.orders]
@@ -283,7 +334,7 @@ PLAN-ROOT SINK
 |
 01:SCAN KUDU [tpch_kudu.lineitem]
    predicates: l_commitdate < l_receiptdate
-   runtime filters: RF001 -> l_orderkey
+   runtime filters: RF000 -> l_orderkey, RF001 -> l_orderkey
    row-size=60B cardinality=600.12K
 ====
 # Q5 - Local Supplier Volume Query
@@ -312,8 +363,8 @@ group by
 order by
   revenue desc
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=15.38MB Threads=7
-Per-Host Resource Estimates: Memory=32MB
+Max Per-Host Resource Reservation: Memory=21.38MB Threads=7
+Per-Host Resource Estimates: Memory=38MB
 PLAN-ROOT SINK
 |
 12:SORT
@@ -327,7 +378,7 @@ PLAN-ROOT SINK
 |
 10:HASH JOIN [INNER JOIN]
 |  hash predicates: n_regionkey = r_regionkey
-|  runtime filters: RF001 <- r_regionkey
+|  runtime filters: RF000 <- r_regionkey, RF001 <- r_regionkey
 |  row-size=97B cardinality=115.16K
 |
 |--05:SCAN KUDU [tpch_kudu.region]
@@ -336,43 +387,43 @@ PLAN-ROOT SINK
 |
 09:HASH JOIN [INNER JOIN]
 |  hash predicates: s_nationkey = n_nationkey
-|  runtime filters: RF003 <- n_nationkey
+|  runtime filters: RF002 <- n_nationkey, RF003 <- n_nationkey
 |  row-size=95B cardinality=575.77K
 |
 |--04:SCAN KUDU [tpch_kudu.nation]
-|     runtime filters: RF001 -> n_regionkey
+|     runtime filters: RF000 -> n_regionkey, RF001 -> n_regionkey
 |     row-size=27B cardinality=25
 |
 08:HASH JOIN [INNER JOIN]
 |  hash predicates: c_nationkey = s_nationkey, l_suppkey = s_suppkey
-|  runtime filters: RF006 <- s_nationkey, RF007 <- s_suppkey
+|  runtime filters: RF004 <- s_nationkey, RF005 <- s_suppkey, RF006 <- s_nationkey, RF007 <- s_suppkey
 |  row-size=68B cardinality=575.77K
 |
 |--03:SCAN KUDU [tpch_kudu.supplier]
-|     runtime filters: RF003 -> s_nationkey
+|     runtime filters: RF002 -> s_nationkey, RF003 -> s_nationkey
 |     row-size=10B cardinality=10.00K
 |
 07:HASH JOIN [INNER JOIN]
 |  hash predicates: o_custkey = c_custkey
-|  runtime filters: RF009 <- c_custkey
+|  runtime filters: RF008 <- c_custkey, RF009 <- c_custkey
 |  row-size=58B cardinality=575.77K
 |
 |--00:SCAN KUDU [tpch_kudu.customer]
-|     runtime filters: RF003 -> tpch_kudu.customer.c_nationkey, RF006 -> c_nationkey
+|     runtime filters: RF002 -> tpch_kudu.customer.c_nationkey, RF003 -> tpch_kudu.customer.c_nationkey, RF004 -> c_nationkey, RF006 -> c_nationkey
 |     row-size=10B cardinality=150.00K
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: l_orderkey = o_orderkey
-|  runtime filters: RF011 <- o_orderkey
+|  runtime filters: RF010 <- o_orderkey, RF011 <- o_orderkey
 |  row-size=48B cardinality=575.77K
 |
 |--01:SCAN KUDU [tpch_kudu.orders]
 |     kudu predicates: o_orderdate < '1995-01-01', o_orderdate >= '1994-01-01'
-|     runtime filters: RF009 -> o_custkey
+|     runtime filters: RF008 -> o_custkey, RF009 -> o_custkey
 |     row-size=16B cardinality=150.00K
 |
 02:SCAN KUDU [tpch_kudu.lineitem]
-   runtime filters: RF007 -> l_suppkey, RF011 -> l_orderkey
+   runtime filters: RF010 -> l_orderkey, RF011 -> l_orderkey, RF005 -> l_suppkey, RF007 -> l_suppkey
    row-size=32B cardinality=6.00M
 ====
 # Q6 - Forecasting Revenue Change Query
@@ -438,8 +489,8 @@ order by
   cust_nation,
   l_year
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=44.62MB Threads=7
-Per-Host Resource Estimates: Memory=61MB
+Max Per-Host Resource Reservation: Memory=49.62MB Threads=7
+Per-Host Resource Estimates: Memory=66MB
 PLAN-ROOT SINK
 |
 12:SORT
@@ -454,7 +505,7 @@ PLAN-ROOT SINK
 10:HASH JOIN [INNER JOIN]
 |  hash predicates: c_nationkey = n2.n_nationkey
 |  other predicates: n1.n_name = 'FRANCE' OR n2.n_name = 'FRANCE', n2.n_name = 'GERMANY' OR n1.n_name = 'GERMANY'
-|  runtime filters: RF001 <- n2.n_nationkey
+|  runtime filters: RF000 <- n2.n_nationkey, RF001 <- n2.n_nationkey
 |  row-size=144B cardinality=3.69K
 |
 |--05:SCAN KUDU [tpch_kudu.nation n2]
@@ -463,16 +514,16 @@ PLAN-ROOT SINK
 |
 09:HASH JOIN [INNER JOIN]
 |  hash predicates: o_custkey = c_custkey
-|  runtime filters: RF003 <- c_custkey
+|  runtime filters: RF002 <- c_custkey, RF003 <- c_custkey
 |  row-size=119B cardinality=46.06K
 |
 |--03:SCAN KUDU [tpch_kudu.customer]
-|     runtime filters: RF001 -> c_nationkey
+|     runtime filters: RF000 -> c_nationkey, RF001 -> c_nationkey
 |     row-size=10B cardinality=150.00K
 |
 08:HASH JOIN [INNER JOIN]
 |  hash predicates: s_nationkey = n1.n_nationkey
-|  runtime filters: RF005 <- n1.n_nationkey
+|  runtime filters: RF004 <- n1.n_nationkey, RF005 <- n1.n_nationkey
 |  row-size=109B cardinality=46.06K
 |
 |--04:SCAN KUDU [tpch_kudu.nation n1]
@@ -481,25 +532,25 @@ PLAN-ROOT SINK
 |
 07:HASH JOIN [INNER JOIN]
 |  hash predicates: l_suppkey = s_suppkey
-|  runtime filters: RF007 <- s_suppkey
+|  runtime filters: RF006 <- s_suppkey, RF007 <- s_suppkey
 |  row-size=84B cardinality=575.77K
 |
 |--00:SCAN KUDU [tpch_kudu.supplier]
-|     runtime filters: RF005 -> s_nationkey
+|     runtime filters: RF004 -> s_nationkey, RF005 -> s_nationkey
 |     row-size=10B cardinality=10.00K
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: l_orderkey = o_orderkey
-|  runtime filters: RF009 <- o_orderkey
+|  runtime filters: RF008 <- o_orderkey, RF009 <- o_orderkey
 |  row-size=74B cardinality=575.77K
 |
 |--02:SCAN KUDU [tpch_kudu.orders]
-|     runtime filters: RF003 -> o_custkey
+|     runtime filters: RF002 -> o_custkey, RF003 -> o_custkey
 |     row-size=16B cardinality=1.50M
 |
 01:SCAN KUDU [tpch_kudu.lineitem]
    kudu predicates: l_shipdate <= '1996-12-31', l_shipdate >= '1995-01-01'
-   runtime filters: RF007 -> l_suppkey, RF009 -> l_orderkey
+   runtime filters: RF006 -> l_suppkey, RF007 -> l_suppkey, RF008 -> l_orderkey, RF009 -> l_orderkey
    row-size=58B cardinality=600.12K
 ====
 # Q8 - National Market Share Query
@@ -541,8 +592,8 @@ group by
 order by
   o_year
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=12.56MB Threads=9
-Per-Host Resource Estimates: Memory=33MB
+Max Per-Host Resource Reservation: Memory=19.56MB Threads=9
+Per-Host Resource Estimates: Memory=40MB
 PLAN-ROOT SINK
 |
 16:SORT
@@ -556,7 +607,7 @@ PLAN-ROOT SINK
 |
 14:HASH JOIN [INNER JOIN]
 |  hash predicates: s_nationkey = n2.n_nationkey
-|  runtime filters: RF001 <- n2.n_nationkey
+|  runtime filters: RF000 <- n2.n_nationkey, RF001 <- n2.n_nationkey
 |  row-size=141B cardinality=761
 |
 |--06:SCAN KUDU [tpch_kudu.nation n2]
@@ -564,7 +615,7 @@ PLAN-ROOT SINK
 |
 13:HASH JOIN [INNER JOIN]
 |  hash predicates: n1.n_regionkey = r_regionkey
-|  runtime filters: RF003 <- r_regionkey
+|  runtime filters: RF002 <- r_regionkey, RF003 <- r_regionkey
 |  row-size=116B cardinality=761
 |
 |--07:SCAN KUDU [tpch_kudu.region]
@@ -573,35 +624,35 @@ PLAN-ROOT SINK
 |
 12:HASH JOIN [INNER JOIN]
 |  hash predicates: c_nationkey = n1.n_nationkey
-|  runtime filters: RF005 <- n1.n_nationkey
+|  runtime filters: RF004 <- n1.n_nationkey, RF005 <- n1.n_nationkey
 |  row-size=114B cardinality=3.81K
 |
 |--05:SCAN KUDU [tpch_kudu.nation n1]
-|     runtime filters: RF003 -> n1.n_regionkey
+|     runtime filters: RF002 -> n1.n_regionkey, RF003 -> n1.n_regionkey
 |     row-size=4B cardinality=25
 |
 11:HASH JOIN [INNER JOIN]
 |  hash predicates: c_custkey = o_custkey
-|  runtime filters: RF007 <- o_custkey
+|  runtime filters: RF006 <- o_custkey, RF007 <- o_custkey
 |  row-size=110B cardinality=3.81K
 |
 |--10:HASH JOIN [INNER JOIN]
 |  |  hash predicates: l_suppkey = s_suppkey
-|  |  runtime filters: RF009 <- s_suppkey
+|  |  runtime filters: RF008 <- s_suppkey, RF009 <- s_suppkey
 |  |  row-size=100B cardinality=3.81K
 |  |
 |  |--01:SCAN KUDU [tpch_kudu.supplier]
-|  |     runtime filters: RF001 -> s_nationkey
+|  |     runtime filters: RF000 -> s_nationkey, RF001 -> s_nationkey
 |  |     row-size=10B cardinality=10.00K
 |  |
 |  09:HASH JOIN [INNER JOIN]
 |  |  hash predicates: o_orderkey = l_orderkey
-|  |  runtime filters: RF011 <- l_orderkey
+|  |  runtime filters: RF010 <- l_orderkey, RF011 <- l_orderkey
 |  |  row-size=90B cardinality=3.81K
 |  |
 |  |--08:HASH JOIN [INNER JOIN]
 |  |  |  hash predicates: l_partkey = p_partkey
-|  |  |  runtime filters: RF013 <- p_partkey
+|  |  |  runtime filters: RF012 <- p_partkey, RF013 <- p_partkey
 |  |  |  row-size=48B cardinality=39.66K
 |  |  |
 |  |  |--00:SCAN KUDU [tpch_kudu.part]
@@ -609,16 +660,16 @@ PLAN-ROOT SINK
 |  |  |     row-size=8B cardinality=1.32K
 |  |  |
 |  |  02:SCAN KUDU [tpch_kudu.lineitem]
-|  |     runtime filters: RF009 -> l_suppkey, RF013 -> l_partkey
+|  |     runtime filters: RF012 -> l_partkey, RF013 -> l_partkey, RF008 -> l_suppkey, RF009 -> l_suppkey
 |  |     row-size=40B cardinality=6.00M
 |  |
 |  03:SCAN KUDU [tpch_kudu.orders]
 |     kudu predicates: o_orderdate <= '1996-12-31', o_orderdate >= '1995-01-01'
-|     runtime filters: RF011 -> o_orderkey
+|     runtime filters: RF010 -> o_orderkey, RF011 -> o_orderkey
 |     row-size=42B cardinality=150.00K
 |
 04:SCAN KUDU [tpch_kudu.customer]
-   runtime filters: RF005 -> c_nationkey, RF007 -> c_custkey
+   runtime filters: RF006 -> c_custkey, RF007 -> c_custkey, RF004 -> c_nationkey, RF005 -> c_nationkey
    row-size=10B cardinality=150.00K
 ====
 # Q9 - Product Type Measure Query
@@ -654,8 +705,8 @@ order by
   nation,
   o_year desc
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=79.50MB Threads=7
-Per-Host Resource Estimates: Memory=118MB
+Max Per-Host Resource Reservation: Memory=85.50MB Threads=7
+Per-Host Resource Estimates: Memory=124MB
 PLAN-ROOT SINK
 |
 12:SORT
@@ -669,7 +720,7 @@ PLAN-ROOT SINK
 |
 10:HASH JOIN [INNER JOIN]
 |  hash predicates: s_nationkey = n_nationkey
-|  runtime filters: RF001 <- n_nationkey
+|  runtime filters: RF000 <- n_nationkey, RF001 <- n_nationkey
 |  row-size=198B cardinality=574.29K
 |
 |--05:SCAN KUDU [tpch_kudu.nation]
@@ -677,7 +728,7 @@ PLAN-ROOT SINK
 |
 09:HASH JOIN [INNER JOIN]
 |  hash predicates: l_partkey = ps_partkey, l_suppkey = ps_suppkey
-|  runtime filters: RF004 <- ps_partkey, RF005 <- ps_suppkey
+|  runtime filters: RF002 <- ps_partkey, RF003 <- ps_suppkey, RF004 <- ps_partkey, RF005 <- ps_suppkey
 |  row-size=173B cardinality=574.29K
 |
 |--03:SCAN KUDU [tpch_kudu.partsupp]
@@ -685,16 +736,16 @@ PLAN-ROOT SINK
 |
 08:HASH JOIN [INNER JOIN]
 |  hash predicates: l_suppkey = s_suppkey
-|  runtime filters: RF007 <- s_suppkey
+|  runtime filters: RF006 <- s_suppkey, RF007 <- s_suppkey
 |  row-size=149B cardinality=574.29K
 |
 |--01:SCAN KUDU [tpch_kudu.supplier]
-|     runtime filters: RF001 -> s_nationkey, RF005 -> tpch_kudu.supplier.s_suppkey
+|     runtime filters: RF000 -> s_nationkey, RF001 -> s_nationkey, RF003 -> tpch_kudu.supplier.s_suppkey, RF005 -> tpch_kudu.supplier.s_suppkey
 |     row-size=10B cardinality=10.00K
 |
 07:HASH JOIN [INNER JOIN]
 |  hash predicates: l_orderkey = o_orderkey
-|  runtime filters: RF009 <- o_orderkey
+|  runtime filters: RF008 <- o_orderkey, RF009 <- o_orderkey
 |  row-size=139B cardinality=574.29K
 |
 |--04:SCAN KUDU [tpch_kudu.orders]
@@ -702,16 +753,16 @@ PLAN-ROOT SINK
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: l_partkey = p_partkey
-|  runtime filters: RF011 <- p_partkey
+|  runtime filters: RF010 <- p_partkey, RF011 <- p_partkey
 |  row-size=105B cardinality=598.58K
 |
 |--00:SCAN KUDU [tpch_kudu.part]
 |     predicates: p_name LIKE '%green%'
-|     runtime filters: RF004 -> tpch_kudu.part.p_partkey
+|     runtime filters: RF002 -> tpch_kudu.part.p_partkey, RF004 -> tpch_kudu.part.p_partkey
 |     row-size=57B cardinality=20.00K
 |
 02:SCAN KUDU [tpch_kudu.lineitem]
-   runtime filters: RF004 -> l_partkey, RF005 -> l_suppkey, RF007 -> l_suppkey, RF009 -> l_orderkey, RF011 -> l_partkey
+   runtime filters: RF010 -> l_partkey, RF011 -> l_partkey, RF008 -> l_orderkey, RF009 -> l_orderkey, RF002 -> l_partkey, RF003 -> l_suppkey, RF004 -> l_partkey, RF005 -> l_suppkey, RF006 -> l_suppkey, RF007 -> l_suppkey
    row-size=48B cardinality=6.00M
 ====
 # Q10 - Returned Item Reporting Query
@@ -749,8 +800,8 @@ order by
   revenue desc
 limit 20
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=44.44MB Threads=5
-Per-Host Resource Estimates: Memory=67MB
+Max Per-Host Resource Reservation: Memory=47.44MB Threads=5
+Per-Host Resource Estimates: Memory=70MB
 PLAN-ROOT SINK
 |
 08:TOP-N [LIMIT=20]
@@ -764,7 +815,7 @@ PLAN-ROOT SINK
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: c_nationkey = n_nationkey
-|  runtime filters: RF001 <- n_nationkey
+|  runtime filters: RF000 <- n_nationkey, RF001 <- n_nationkey
 |  row-size=278B cardinality=191.92K
 |
 |--03:SCAN KUDU [tpch_kudu.nation]
@@ -772,12 +823,12 @@ PLAN-ROOT SINK
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: c_custkey = o_custkey
-|  runtime filters: RF003 <- o_custkey
+|  runtime filters: RF002 <- o_custkey, RF003 <- o_custkey
 |  row-size=253B cardinality=191.92K
 |
 |--04:HASH JOIN [INNER JOIN]
 |  |  hash predicates: l_orderkey = o_orderkey
-|  |  runtime filters: RF005 <- o_orderkey
+|  |  runtime filters: RF004 <- o_orderkey, RF005 <- o_orderkey
 |  |  row-size=40B cardinality=191.92K
 |  |
 |  |--01:SCAN KUDU [tpch_kudu.orders]
@@ -786,11 +837,11 @@ PLAN-ROOT SINK
 |  |
 |  02:SCAN KUDU [tpch_kudu.lineitem]
 |     kudu predicates: l_returnflag = 'R'
-|     runtime filters: RF005 -> l_orderkey
+|     runtime filters: RF004 -> l_orderkey, RF005 -> l_orderkey
 |     row-size=24B cardinality=2.00M
 |
 00:SCAN KUDU [tpch_kudu.customer]
-   runtime filters: RF001 -> c_nationkey, RF003 -> c_custkey
+   runtime filters: RF000 -> c_nationkey, RF001 -> c_nationkey, RF002 -> c_custkey, RF003 -> c_custkey
    row-size=213B cardinality=150.00K
 ====
 # Q11 - Important Stock Identification
@@ -828,8 +879,8 @@ where
 order by
   value desc
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=9.69MB Threads=7
-Per-Host Resource Estimates: Memory=38MB
+Max Per-Host Resource Reservation: Memory=13.69MB Threads=7
+Per-Host Resource Estimates: Memory=42MB
 PLAN-ROOT SINK
 |
 13:SORT
@@ -846,7 +897,7 @@ PLAN-ROOT SINK
 |  |
 |  10:HASH JOIN [INNER JOIN]
 |  |  hash predicates: s_nationkey = n_nationkey
-|  |  runtime filters: RF005 <- n_nationkey
+|  |  runtime filters: RF004 <- n_nationkey, RF005 <- n_nationkey
 |  |  row-size=36B cardinality=32.00K
 |  |
 |  |--08:SCAN KUDU [tpch_kudu.nation]
@@ -855,15 +906,15 @@ PLAN-ROOT SINK
 |  |
 |  09:HASH JOIN [INNER JOIN]
 |  |  hash predicates: ps_suppkey = s_suppkey
-|  |  runtime filters: RF007 <- s_suppkey
+|  |  runtime filters: RF006 <- s_suppkey, RF007 <- s_suppkey
 |  |  row-size=34B cardinality=800.00K
 |  |
 |  |--07:SCAN KUDU [tpch_kudu.supplier]
-|  |     runtime filters: RF005 -> s_nationkey
+|  |     runtime filters: RF004 -> s_nationkey, RF005 -> s_nationkey
 |  |     row-size=10B cardinality=10.00K
 |  |
 |  06:SCAN KUDU [tpch_kudu.partsupp]
-|     runtime filters: RF007 -> ps_suppkey
+|     runtime filters: RF006 -> ps_suppkey, RF007 -> ps_suppkey
 |     row-size=24B cardinality=800.00K
 |
 05:AGGREGATE [FINALIZE]
@@ -873,7 +924,7 @@ PLAN-ROOT SINK
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: s_nationkey = n_nationkey
-|  runtime filters: RF001 <- n_nationkey
+|  runtime filters: RF000 <- n_nationkey, RF001 <- n_nationkey
 |  row-size=44B cardinality=32.00K
 |
 |--02:SCAN KUDU [tpch_kudu.nation]
@@ -882,15 +933,15 @@ PLAN-ROOT SINK
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: ps_suppkey = s_suppkey
-|  runtime filters: RF003 <- s_suppkey
+|  runtime filters: RF002 <- s_suppkey, RF003 <- s_suppkey
 |  row-size=42B cardinality=800.00K
 |
 |--01:SCAN KUDU [tpch_kudu.supplier]
-|     runtime filters: RF001 -> s_nationkey
+|     runtime filters: RF000 -> s_nationkey, RF001 -> s_nationkey
 |     row-size=10B cardinality=10.00K
 |
 00:SCAN KUDU [tpch_kudu.partsupp]
-   runtime filters: RF003 -> ps_suppkey
+   runtime filters: RF002 -> ps_suppkey, RF003 -> ps_suppkey
    row-size=32B cardinality=800.00K
 ====
 # Q12 - Shipping Mode and Order Priority Query
@@ -923,8 +974,8 @@ group by
 order by
   l_shipmode
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=35.94MB Threads=3
-Per-Host Resource Estimates: Memory=49MB
+Max Per-Host Resource Reservation: Memory=36.94MB Threads=3
+Per-Host Resource Estimates: Memory=50MB
 PLAN-ROOT SINK
 |
 04:SORT
@@ -938,7 +989,7 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: o_orderkey = l_orderkey
-|  runtime filters: RF001 <- l_orderkey
+|  runtime filters: RF000 <- l_orderkey, RF001 <- l_orderkey
 |  row-size=139B cardinality=320.78K
 |
 |--01:SCAN KUDU [tpch_kudu.lineitem]
@@ -947,7 +998,7 @@ PLAN-ROOT SINK
 |     row-size=106B cardinality=320.78K
 |
 00:SCAN KUDU [tpch_kudu.orders]
-   runtime filters: RF001 -> o_orderkey
+   runtime filters: RF000 -> o_orderkey, RF001 -> o_orderkey
    row-size=32B cardinality=1.50M
 ====
 # Q13 - Customer Distribution Query
@@ -972,8 +1023,8 @@ order by
   custdist desc,
   c_count desc
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=10.75MB Threads=3
-Per-Host Resource Estimates: Memory=20MB
+Max Per-Host Resource Reservation: Memory=11.75MB Threads=3
+Per-Host Resource Estimates: Memory=21MB
 PLAN-ROOT SINK
 |
 05:SORT
@@ -992,7 +1043,7 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: o_custkey = c_custkey
-|  runtime filters: RF001 <- c_custkey
+|  runtime filters: RF000 <- c_custkey, RF001 <- c_custkey
 |  row-size=89B cardinality=150.00K
 |
 |--00:SCAN KUDU [tpch_kudu.customer]
@@ -1000,7 +1051,7 @@ PLAN-ROOT SINK
 |
 01:SCAN KUDU [tpch_kudu.orders]
    predicates: NOT o_comment LIKE '%special%requests%'
-   runtime filters: RF001 -> o_custkey
+   runtime filters: RF000 -> o_custkey, RF001 -> o_custkey
    row-size=81B cardinality=150.00K
 ====
 # Q14 - Promotion Effect
@@ -1018,8 +1069,8 @@ where
   and l_shipdate >= '1995-09-01'
   and l_shipdate < '1995-10-01'
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=17.00MB Threads=3
-Per-Host Resource Estimates: Memory=33MB
+Max Per-Host Resource Reservation: Memory=18.00MB Threads=3
+Per-Host Resource Estimates: Memory=34MB
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
@@ -1028,7 +1079,7 @@ PLAN-ROOT SINK
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: l_partkey = p_partkey
-|  runtime filters: RF001 <- p_partkey
+|  runtime filters: RF000 <- p_partkey, RF001 <- p_partkey
 |  row-size=69B cardinality=598.58K
 |
 |--01:SCAN KUDU [tpch_kudu.part]
@@ -1036,7 +1087,7 @@ PLAN-ROOT SINK
 |
 00:SCAN KUDU [tpch_kudu.lineitem]
    kudu predicates: l_shipdate < '1995-10-01', l_shipdate >= '1995-09-01'
-   runtime filters: RF001 -> l_partkey
+   runtime filters: RF000 -> l_partkey, RF001 -> l_partkey
    row-size=24B cardinality=600.12K
 ====
 # Q15 - Top Supplier Query
@@ -1071,8 +1122,8 @@ where
 order by
   s_suppkey
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=15.88MB Threads=4
-Per-Host Resource Estimates: Memory=42MB
+Max Per-Host Resource Reservation: Memory=16.88MB Threads=4
+Per-Host Resource Estimates: Memory=43MB
 PLAN-ROOT SINK
 |
 08:SORT
@@ -1098,7 +1149,7 @@ PLAN-ROOT SINK
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: s_suppkey = l_suppkey
-|  runtime filters: RF001 <- l_suppkey
+|  runtime filters: RF000 <- l_suppkey, RF001 <- l_suppkey
 |  row-size=138B cardinality=10.00K
 |
 |--02:AGGREGATE [FINALIZE]
@@ -1111,7 +1162,7 @@ PLAN-ROOT SINK
 |     row-size=24B cardinality=600.12K
 |
 00:SCAN KUDU [tpch_kudu.supplier]
-   runtime filters: RF001 -> s_suppkey
+   runtime filters: RF000 -> s_suppkey, RF001 -> s_suppkey
    row-size=114B cardinality=10.00K
 ====
 # Q16 - Parts/Supplier Relation Query
@@ -1146,8 +1197,8 @@ order by
   p_type,
   p_size
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=14.88MB Threads=4
-Per-Host Resource Estimates: Memory=22MB
+Max Per-Host Resource Reservation: Memory=15.88MB Threads=4
+Per-Host Resource Estimates: Memory=23MB
 PLAN-ROOT SINK
 |
 07:SORT
@@ -1173,7 +1224,7 @@ PLAN-ROOT SINK
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: ps_partkey = p_partkey
-|  runtime filters: RF001 <- p_partkey
+|  runtime filters: RF000 <- p_partkey, RF001 <- p_partkey
 |  row-size=89B cardinality=31.92K
 |
 |--01:SCAN KUDU [tpch_kudu.part]
@@ -1182,7 +1233,7 @@ PLAN-ROOT SINK
 |     row-size=73B cardinality=8.00K
 |
 00:SCAN KUDU [tpch_kudu.partsupp]
-   runtime filters: RF001 -> ps_partkey
+   runtime filters: RF000 -> ps_partkey, RF001 -> ps_partkey
    row-size=16B cardinality=800.00K
 ====
 # Q17 - Small-Quantity-Order Revenue Query
@@ -1204,8 +1255,8 @@ where
       l_partkey = p_partkey
   )
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=8.62MB Threads=4
-Per-Host Resource Estimates: Memory=24MB
+Max Per-Host Resource Reservation: Memory=10.62MB Threads=4
+Per-Host Resource Estimates: Memory=26MB
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
@@ -1215,12 +1266,12 @@ PLAN-ROOT SINK
 05:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: l_partkey = p_partkey
 |  other join predicates: l_quantity < round(0.2 * avg(l_quantity), 2)
-|  runtime filters: RF001 <- p_partkey
+|  runtime filters: RF000 <- p_partkey, RF001 <- p_partkey
 |  row-size=32B cardinality=29.93K
 |
 |--04:HASH JOIN [INNER JOIN]
 |  |  hash predicates: l_partkey = p_partkey
-|  |  runtime filters: RF003 <- p_partkey
+|  |  runtime filters: RF002 <- p_partkey, RF003 <- p_partkey
 |  |  row-size=32B cardinality=29.93K
 |  |
 |  |--01:SCAN KUDU [tpch_kudu.part]
@@ -1228,7 +1279,7 @@ PLAN-ROOT SINK
 |  |     row-size=8B cardinality=1.00K
 |  |
 |  00:SCAN KUDU [tpch_kudu.lineitem]
-|     runtime filters: RF003 -> l_partkey
+|     runtime filters: RF002 -> l_partkey, RF003 -> l_partkey
 |     row-size=24B cardinality=6.00M
 |
 03:AGGREGATE [FINALIZE]
@@ -1237,7 +1288,7 @@ PLAN-ROOT SINK
 |  row-size=16B cardinality=200.52K
 |
 02:SCAN KUDU [tpch_kudu.lineitem]
-   runtime filters: RF001 -> tpch_kudu.lineitem.l_partkey
+   runtime filters: RF000 -> tpch_kudu.lineitem.l_partkey, RF001 -> tpch_kudu.lineitem.l_partkey
    row-size=16B cardinality=6.00M
 ====
 # Q18 - Large Value tpch_kudu.customer Query
@@ -1276,8 +1327,8 @@ order by
   o_orderdate
 limit 100
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=81.25MB Threads=5
-Per-Host Resource Estimates: Memory=153MB
+Max Per-Host Resource Reservation: Memory=84.25MB Threads=5
+Per-Host Resource Estimates: Memory=156MB
 PLAN-ROOT SINK
 |
 09:TOP-N [LIMIT=100]
@@ -1291,7 +1342,7 @@ PLAN-ROOT SINK
 |
 07:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: o_orderkey = l_orderkey
-|  runtime filters: RF001 <- l_orderkey
+|  runtime filters: RF000 <- l_orderkey, RF001 <- l_orderkey
 |  row-size=108B cardinality=600.12K
 |
 |--04:AGGREGATE [FINALIZE]
@@ -1305,7 +1356,7 @@ PLAN-ROOT SINK
 |
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: o_custkey = c_custkey
-|  runtime filters: RF003 <- c_custkey
+|  runtime filters: RF002 <- c_custkey, RF003 <- c_custkey
 |  row-size=108B cardinality=5.76M
 |
 |--00:SCAN KUDU [tpch_kudu.customer]
@@ -1313,15 +1364,15 @@ PLAN-ROOT SINK
 |
 05:HASH JOIN [INNER JOIN]
 |  hash predicates: l_orderkey = o_orderkey
-|  runtime filters: RF005 <- o_orderkey
+|  runtime filters: RF004 <- o_orderkey, RF005 <- o_orderkey
 |  row-size=66B cardinality=5.76M
 |
 |--01:SCAN KUDU [tpch_kudu.orders]
-|     runtime filters: RF001 -> o_orderkey, RF003 -> o_custkey
+|     runtime filters: RF000 -> o_orderkey, RF001 -> o_orderkey, RF002 -> o_custkey, RF003 -> o_custkey
 |     row-size=50B cardinality=1.50M
 |
 02:SCAN KUDU [tpch_kudu.lineitem]
-   runtime filters: RF001 -> tpch_kudu.lineitem.l_orderkey, RF005 -> l_orderkey
+   runtime filters: RF000 -> tpch_kudu.lineitem.l_orderkey, RF001 -> tpch_kudu.lineitem.l_orderkey, RF004 -> l_orderkey, RF005 -> l_orderkey
    row-size=16B cardinality=6.00M
 ====
 # Q19 - Discounted Revenue Query
@@ -1361,8 +1412,8 @@ where
     )
   )
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=1.94MB Threads=3
-Per-Host Resource Estimates: Memory=21MB
+Max Per-Host Resource Reservation: Memory=2.94MB Threads=3
+Per-Host Resource Estimates: Memory=22MB
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
@@ -1372,7 +1423,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: l_partkey = p_partkey
 |  other predicates: l_quantity <= 11 OR l_quantity <= 20 OR p_size <= 15, l_quantity <= 11 OR l_quantity >= 10 OR p_size <= 15, l_quantity <= 11 OR p_size <= 10 OR l_quantity <= 30, l_quantity <= 11 OR p_size <= 10 OR l_quantity >= 20, l_quantity <= 11 OR p_size <= 10 OR p_size <= 15, l_quantity >= 1 OR l_quantity <= 20 OR p_size <= 15, l_quantity >= 1 OR l_quantity >= 10 OR p_size <= 15, l_quantity >= 1 OR p_size <= 10 OR l_quantity <= 30, l_quantity >= 1 OR p_size <= 10 OR l_quantity  [...]
-|  runtime filters: RF001 <- p_partkey
+|  runtime filters: RF000 <- p_partkey, RF001 <- p_partkey
 |  row-size=92B cardinality=1.41K
 |
 |--01:SCAN KUDU [tpch_kudu.part]
@@ -1383,7 +1434,7 @@ PLAN-ROOT SINK
 00:SCAN KUDU [tpch_kudu.lineitem]
    predicates: l_quantity <= 11 OR l_quantity <= 20 OR l_quantity <= 30, l_quantity <= 11 OR l_quantity <= 20 OR l_quantity >= 20, l_quantity <= 11 OR l_quantity >= 10 OR l_quantity <= 30, l_quantity <= 11 OR l_quantity >= 10 OR l_quantity >= 20, l_quantity >= 1 OR l_quantity <= 20 OR l_quantity <= 30, l_quantity >= 1 OR l_quantity <= 20 OR l_quantity >= 20, l_quantity >= 1 OR l_quantity >= 10 OR l_quantity <= 30, l_quantity >= 1 OR l_quantity >= 10 OR l_quantity >= 20
    kudu predicates: l_shipmode IN ('AIR', 'AIR REG'), l_shipinstruct = 'DELIVER IN PERSON'
-   runtime filters: RF001 -> l_partkey
+   runtime filters: RF000 -> l_partkey, RF001 -> l_partkey
    row-size=32B cardinality=197.63K
 ====
 # Q20 - Potential Part Promotion Query
@@ -1424,8 +1475,8 @@ where
 order by
   s_name
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=50.81MB Threads=6
-Per-Host Resource Estimates: Memory=60MB
+Max Per-Host Resource Reservation: Memory=55.81MB Threads=6
+Per-Host Resource Estimates: Memory=65MB
 PLAN-ROOT SINK
 |
 10:SORT
@@ -1434,12 +1485,12 @@ PLAN-ROOT SINK
 |
 09:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: ps_suppkey = s_suppkey
-|  runtime filters: RF001 <- s_suppkey
+|  runtime filters: RF000 <- s_suppkey, RF001 <- s_suppkey
 |  row-size=87B cardinality=400
 |
 |--08:HASH JOIN [INNER JOIN]
 |  |  hash predicates: s_nationkey = n_nationkey
-|  |  runtime filters: RF009 <- n_nationkey
+|  |  runtime filters: RF008 <- n_nationkey, RF009 <- n_nationkey
 |  |  row-size=87B cardinality=400
 |  |
 |  |--01:SCAN KUDU [tpch_kudu.nation]
@@ -1447,18 +1498,18 @@ PLAN-ROOT SINK
 |  |     row-size=2B cardinality=1
 |  |
 |  00:SCAN KUDU [tpch_kudu.supplier]
-|     runtime filters: RF009 -> s_nationkey
+|     runtime filters: RF008 -> s_nationkey, RF009 -> s_nationkey
 |     row-size=85B cardinality=10.00K
 |
 07:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: l_partkey = ps_partkey, l_suppkey = ps_suppkey
 |  other join predicates: ps_availqty > 0.5 * sum(l_quantity)
-|  runtime filters: RF004 <- ps_partkey, RF005 <- ps_suppkey
+|  runtime filters: RF002 <- ps_partkey, RF003 <- ps_suppkey, RF004 <- ps_partkey, RF005 <- ps_suppkey
 |  row-size=24B cardinality=79.79K
 |
 |--06:HASH JOIN [LEFT SEMI JOIN]
 |  |  hash predicates: ps_partkey = p_partkey
-|  |  runtime filters: RF007 <- p_partkey
+|  |  runtime filters: RF006 <- p_partkey, RF007 <- p_partkey
 |  |  row-size=24B cardinality=79.79K
 |  |
 |  |--03:SCAN KUDU [tpch_kudu.part]
@@ -1466,7 +1517,7 @@ PLAN-ROOT SINK
 |  |     row-size=57B cardinality=20.00K
 |  |
 |  02:SCAN KUDU [tpch_kudu.partsupp]
-|     runtime filters: RF001 -> ps_suppkey, RF007 -> ps_partkey
+|     runtime filters: RF000 -> ps_suppkey, RF001 -> ps_suppkey, RF006 -> ps_partkey, RF007 -> ps_partkey
 |     row-size=24B cardinality=800.00K
 |
 05:AGGREGATE [FINALIZE]
@@ -1476,7 +1527,7 @@ PLAN-ROOT SINK
 |
 04:SCAN KUDU [tpch_kudu.lineitem]
    kudu predicates: l_shipdate < '1995-01-01', l_shipdate >= '1994-01-01'
-   runtime filters: RF001 -> tpch_kudu.lineitem.l_suppkey, RF004 -> tpch_kudu.lineitem.l_partkey, RF005 -> tpch_kudu.lineitem.l_suppkey
+   runtime filters: RF000 -> tpch_kudu.lineitem.l_suppkey, RF001 -> tpch_kudu.lineitem.l_suppkey, RF002 -> tpch_kudu.lineitem.l_partkey, RF003 -> tpch_kudu.lineitem.l_suppkey, RF004 -> tpch_kudu.lineitem.l_partkey, RF005 -> tpch_kudu.lineitem.l_suppkey
    row-size=24B cardinality=600.12K
 ====
 # Q21 - Suppliers Who Kept Orders Waiting Query
@@ -1521,8 +1572,8 @@ order by
   s_name
 limit 100
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=10.56MB Threads=7
-Per-Host Resource Estimates: Memory=71MB
+Max Per-Host Resource Reservation: Memory=14.56MB Threads=7
+Per-Host Resource Estimates: Memory=75MB
 PLAN-ROOT SINK
 |
 12:TOP-N [LIMIT=100]
@@ -1542,12 +1593,12 @@ PLAN-ROOT SINK
 |--09:HASH JOIN [RIGHT SEMI JOIN]
 |  |  hash predicates: l2.l_orderkey = l1.l_orderkey
 |  |  other join predicates: l2.l_suppkey != l1.l_suppkey
-|  |  runtime filters: RF001 <- l1.l_orderkey
+|  |  runtime filters: RF000 <- l1.l_orderkey, RF001 <- l1.l_orderkey
 |  |  row-size=122B cardinality=7.68K
 |  |
 |  |--08:HASH JOIN [INNER JOIN]
 |  |  |  hash predicates: s_nationkey = n_nationkey
-|  |  |  runtime filters: RF003 <- n_nationkey
+|  |  |  runtime filters: RF002 <- n_nationkey, RF003 <- n_nationkey
 |  |  |  row-size=122B cardinality=7.68K
 |  |  |
 |  |  |--03:SCAN KUDU [tpch_kudu.nation]
@@ -1556,16 +1607,16 @@ PLAN-ROOT SINK
 |  |  |
 |  |  07:HASH JOIN [INNER JOIN]
 |  |  |  hash predicates: l1.l_suppkey = s_suppkey
-|  |  |  runtime filters: RF005 <- s_suppkey
+|  |  |  runtime filters: RF004 <- s_suppkey, RF005 <- s_suppkey
 |  |  |  row-size=120B cardinality=191.92K
 |  |  |
 |  |  |--00:SCAN KUDU [tpch_kudu.supplier]
-|  |  |     runtime filters: RF003 -> s_nationkey
+|  |  |     runtime filters: RF002 -> s_nationkey, RF003 -> s_nationkey
 |  |  |     row-size=44B cardinality=10.00K
 |  |  |
 |  |  06:HASH JOIN [INNER JOIN]
 |  |  |  hash predicates: l1.l_orderkey = o_orderkey
-|  |  |  runtime filters: RF007 <- o_orderkey
+|  |  |  runtime filters: RF006 <- o_orderkey, RF007 <- o_orderkey
 |  |  |  row-size=76B cardinality=191.92K
 |  |  |
 |  |  |--02:SCAN KUDU [tpch_kudu.orders]
@@ -1574,11 +1625,11 @@ PLAN-ROOT SINK
 |  |  |
 |  |  01:SCAN KUDU [tpch_kudu.lineitem l1]
 |  |     predicates: l1.l_receiptdate > l1.l_commitdate
-|  |     runtime filters: RF005 -> l1.l_suppkey, RF007 -> l1.l_orderkey
+|  |     runtime filters: RF004 -> l1.l_suppkey, RF005 -> l1.l_suppkey, RF006 -> l1.l_orderkey, RF007 -> l1.l_orderkey
 |  |     row-size=68B cardinality=600.12K
 |  |
 |  04:SCAN KUDU [tpch_kudu.lineitem l2]
-|     runtime filters: RF001 -> l2.l_orderkey
+|     runtime filters: RF000 -> l2.l_orderkey, RF001 -> l2.l_orderkey
 |     row-size=16B cardinality=6.00M
 |
 05:SCAN KUDU [tpch_kudu.lineitem l3]
diff --git a/testdata/workloads/functional-query/queries/QueryTest/all_runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/all_runtime_filters.test
new file mode 100644
index 0000000..a743f13
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/all_runtime_filters.test
@@ -0,0 +1,461 @@
+====
+---- QUERY
+####################################################
+# Test case 1: all runtime filters of all possible types.
+####################################################
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] alltypestiny b
+where a.bool_col = (b.bool_col && !b.bool_col)
+---- RESULTS
+29200
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 3650
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] alltypestiny b
+where a.tinyint_col = b.tinyint_col
+---- RESULTS
+5840
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 1460
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] alltypestiny b
+where a.smallint_col = b.smallint_col
+---- RESULTS
+5840
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 1460
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] alltypestiny b
+where a.int_col = b.int_col
+---- RESULTS
+5840
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 1460
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] alltypestiny b
+where a.bigint_col = b.bigint_col
+---- RESULTS
+5840
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 1460
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] alltypestiny b
+where a.float_col = b.float_col
+---- RESULTS
+5840
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 1460
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] alltypestiny b
+where a.double_col = b.double_col
+---- RESULTS
+5840
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 1460
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] alltypestiny b
+where a.string_col = b.string_col
+---- RESULTS
+5840
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 1460
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+# TODO update the runtime profile for this test case when IMPALA-9691
+#   (Support Kudu Timestamp and Date Bloom Filter) is fixed.
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from alltypes a join [BROADCAST] alltypestiny b
+where a.timestamp_col = b.timestamp_col
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 902
+row_regex: .*1 of 1 Runtime Filter Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d5_0 = b.d5_0
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d5_1 = b.d5_1
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d5_3 = b.d5_3
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d5_5 = b.d5_5 and b.d5_5 != 0
+---- RESULTS
+37
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 37
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d5_5 = b.d5_5 and b.d5_5 = 0
+---- RESULTS
+180
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 180
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d9_0 = b.d9_0
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d9_1 = b.d9_1
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d9_5 = b.d9_5
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d9_9 = b.d9_9 and b.d9_9 != 0
+---- RESULTS
+37
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 37
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d9_9 = b.d9_9 and b.d9_9 = 0
+---- RESULTS
+306
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 306
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d14_0 = b.d14_0
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d14_1 = b.d14_1
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d14_7 = b.d14_7
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d14_14 = b.d14_14 and b.d14_14 != 0
+---- RESULTS
+37
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 37
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d14_14 = b.d14_14 and b.d14_14 = 0
+---- RESULTS
+441
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 441
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d28_0 = b.d28_0
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d28_1 = b.d28_1
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d28_14 = b.d28_14
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d28_28 = b.d28_28 and b.d28_28 != 0
+---- RESULTS
+37
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 37
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d28_28 = b.d28_28 and b.d28_28 = 0
+---- RESULTS
+686
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 686
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d38_0 = b.d38_0
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d38_1 = b.d38_1
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d38_19 = b.d38_19
+---- RESULTS
+38
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 38
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d38_38 = b.d38_38 and b.d38_38 != 0
+---- RESULTS
+37
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 37
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from decimal_rtf_tbl a
+join [BROADCAST] decimal_rtf_tiny_tbl b
+where a.d38_38 = b.d38_38 and b.d38_38 = 0
+---- RESULTS
+732
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 732
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+
+
+---- QUERY
+####################################################
+# Test case 2: filters on a primary key/partition column
+####################################################
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN a.id, a.tinyint_col, b.id, b.tinyint_col
+from alltypes a join [BROADCAST] alltypestiny b
+where a.id = b.tinyint_col * 2;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+0,0,4,0
+0,0,2,0
+0,0,0,0
+0,0,6,0
+2,2,3,1
+2,2,7,1
+2,2,5,1
+2,2,1,1
+----TYPES
+INT,TINYINT,INT,TINYINT
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 2
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+select STRAIGHT_JOIN count(*) from tpch_kudu.orders a
+join [BROADCAST] tpch_kudu.orders b
+where a.o_orderkey = b.o_orderkey and b.o_orderkey = 100000;
+---- RESULTS
+1
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 1
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+select STRAIGHT_JOIN count(*) from tpch_kudu.orders a
+join [BROADCAST] tpch_kudu.orders b
+where a.o_orderkey = b.o_orderkey and b.o_orderkey != 100000;
+---- RESULTS
+1499999
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 1499999
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+---- QUERY
+select STRAIGHT_JOIN count(*) from tpch_kudu.orders a
+join [BROADCAST] tpch_kudu.orders b
+where a.o_orderkey = b.o_orderkey and b.o_orderkey = 100009;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 0
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
+
+
+---- QUERY
+####################################################
+# Test case 3: Target expr has an implicit integer cast.
+# Bloom filter will not be created for join expr with casting,
+# only min-max filter will be created.
+####################################################
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*)
+from alltypes a join [BROADCAST] alltypes b
+where a.tinyint_col = b.int_col and b.int_col in (0, 1)
+---- RESULTS
+1065800
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 1460
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*BloomFilterBytes: 0.*
+====
+
+
+---- QUERY
+####################################################
+# Test case 4: Test is with a SEMI_JOIN since
+# it's a common pattern.
+###################################################
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select COUNT(*) from alltypes a
+where a.id in (select b.id from alltypes b where b.int_col < 10);
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 7300
+row_regex: .*2 of 2 Runtime Filters Published.*
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/diff_runtime_filter_types.test b/testdata/workloads/functional-query/queries/QueryTest/diff_runtime_filter_types.test
new file mode 100644
index 0000000..66ce73b
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/diff_runtime_filter_types.test
@@ -0,0 +1,151 @@
+====
+---- QUERY
+####################################################
+# Test case 1: straight join
+#     apply different types of runtime filters on same
+#     query and compare number of probe rows
+####################################################
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=BLOOM;
+select STRAIGHT_JOIN count(*) from tpch_kudu.lineitem a
+join [BROADCAST] tpch_kudu.supplier b
+where a.l_suppkey = b.s_suppkey and b.s_nationkey = 0;
+---- RESULTS
+253146
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 253146
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=MIN_MAX;
+select STRAIGHT_JOIN count(*) from tpch_kudu.lineitem a
+join [BROADCAST] tpch_kudu.supplier b
+where a.l_suppkey = b.s_suppkey and b.s_nationkey = 0;
+---- RESULTS
+253146
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 5983282
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=ALL;
+select STRAIGHT_JOIN count(*) from tpch_kudu.lineitem a
+join [BROADCAST] tpch_kudu.supplier b
+where a.l_suppkey = b.s_suppkey and b.s_nationkey = 0;
+---- RESULTS
+253146
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 253146
+====
+
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=BLOOM;
+select STRAIGHT_JOIN count(*) from tpch_kudu.lineitem a
+join [BROADCAST] tpch_kudu.supplier b
+where a.l_suppkey = b.s_suppkey and b.s_nationkey > 0 and b.s_nationkey < 5;
+---- RESULTS
+984456
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 984456
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=MIN_MAX;
+select STRAIGHT_JOIN count(*) from tpch_kudu.lineitem a
+join [BROADCAST] tpch_kudu.supplier b
+where a.l_suppkey = b.s_suppkey and b.s_nationkey > 0 and b.s_nationkey < 5;
+---- RESULTS
+984456
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 5998888
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=ALL;
+select STRAIGHT_JOIN count(*) from tpch_kudu.lineitem a
+join [BROADCAST] tpch_kudu.supplier b
+where a.l_suppkey = b.s_suppkey and b.s_nationkey > 0 and b.s_nationkey < 5;
+---- RESULTS
+984456
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 984456
+====
+
+
+---- QUERY
+####################################################
+# Test case 2: semi join between two tables
+#     apply different types of runtime filters on same
+#     query and compare number of probe rows
+####################################################
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=BLOOM;
+select COUNT(*) from tpch_kudu.lineitem a
+where a.l_suppkey in
+(select b.s_suppkey from tpch_kudu.supplier b where b.s_nationkey = 0);
+---- RESULTS
+253146
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 253146
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=MIN_MAX;
+select COUNT(*) from tpch_kudu.lineitem a
+where a.l_suppkey in
+(select b.s_suppkey from tpch_kudu.supplier b where b.s_nationkey = 0);
+---- RESULTS
+253146
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 5983282
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=ALL;
+select COUNT(*) from tpch_kudu.lineitem a
+where a.l_suppkey in
+(select b.s_suppkey from tpch_kudu.supplier b where b.s_nationkey = 0);
+---- RESULTS
+253146
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 253146
+====
+
+
+---- QUERY
+####################################################
+# Test case 3: semi join on same table
+#     apply different types of runtime filters on same
+#     query and compare number of probe rows
+####################################################
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=BLOOM;
+select COUNT(*) from alltypes a
+where a.id in (select b.id from alltypes b where b.int_col < 10);
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 7300
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=MIN_MAX;
+select COUNT(*) from alltypes a
+where a.id in (select b.id from alltypes b where b.int_col < 10);
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 7300
+====
+---- QUERY
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+set ENABLED_RUNTIME_FILTER_TYPES=ALL;
+select COUNT(*) from alltypes a
+where a.id in (select b.id from alltypes b where b.int_col < 10);
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 7300
+====
+
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
index 20ebf57..e99e06e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
@@ -183,6 +183,8 @@ row_regex: .*FiltersReceived: 0 .*
 ====
 ---- QUERY
 # Global mode. Coordinator should report 1 filter update per backend.
+# For Kudu, join expr has implicit casting so no bloom filter will
+# be created, only min-max filter will be created.
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET RUNTIME_FILTER_MODE=GLOBAL;
 select STRAIGHT_JOIN count(*) from alltypes a
@@ -193,6 +195,9 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RUNTIME_PROFILE
 row_regex: .*FiltersReceived: 3 .*
 row_regex: .*REMOTE.*ms.*ms.*true
+---- RUNTIME_PROFILE: table_format=kudu
+row_regex: .*FiltersReceived: 3 .*
+row_regex: .*REMOTE.*ms.*ms.*true
 ====
 
 
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index 405ab6e..f08f948 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -63,6 +63,8 @@ class TestRuntimeFilters(ImpalaTestSuite):
   def test_basic_filters(self, vector):
     new_vector = deepcopy(vector)
     new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
+    if 'kudu' in str(vector.get_value('table_format')):
+      self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=ALL")
     self.run_test_case('QueryTest/runtime_filters', vector,
         test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS' : str(WAIT_TIME_MS)})
 
@@ -167,11 +169,13 @@ class TestBloomFilters(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestBloomFilters, cls).add_test_dimensions()
-    # Bloom filters are disabled on HBase, Kudu
+    # Bloom filters are disabled on HBase
     cls.ImpalaTestMatrix.add_constraint(
-        lambda v: v.get_value('table_format').file_format not in ['hbase', 'kudu'])
+        lambda v: v.get_value('table_format').file_format not in ['hbase'])
 
   def test_bloom_filters(self, vector):
+    if 'kudu' in str(vector.get_value('table_format')):
+      self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=BLOOM")
     self.run_test_case('QueryTest/bloom_filters', vector)
 
   def test_bloom_wait_time(self, vector):
@@ -199,6 +203,7 @@ class TestMinMaxFilters(ImpalaTestSuite):
         lambda v: v.get_value('table_format').file_format in ['kudu'])
 
   def test_min_max_filters(self, vector):
+    self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=MIN_MAX")
     self.run_test_case('QueryTest/min_max_filters', vector,
         test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
 
@@ -253,6 +258,30 @@ class TestMinMaxFilters(ImpalaTestSuite):
     assert cursor.fetchall() == [(len(matching_vals) + 2,)]
 
 
+# Apply both Bloom filter and Minmax filters
+class TestAllRuntimeFilters(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestAllRuntimeFilters, cls).add_test_dimensions()
+    # All filters are only implemented for Kudu now.
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format in ['kudu'])
+
+  def test_all_runtime_filters(self, vector):
+    self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=ALL")
+    self.run_test_case('QueryTest/all_runtime_filters', vector,
+                       test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+
+  def test_diff_runtime_filter_types(self, vector):
+    # compare number of probe rows when apply different types of runtime filter
+    self.run_test_case('QueryTest/diff_runtime_filter_types', vector,
+                       test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+
+
 @SkipIfLocal.multiple_impalad
 class TestRuntimeRowFilters(ImpalaTestSuite):
   @classmethod
diff --git a/tests/query_test/test_spilling.py b/tests/query_test/test_spilling.py
index 818ba45..3c0c04d 100644
--- a/tests/query_test/test_spilling.py
+++ b/tests/query_test/test_spilling.py
@@ -146,7 +146,7 @@ class TestSpillingBroadcastJoins(ImpalaTestSuite):
   def add_test_dimensions(cls):
     super(TestSpillingBroadcastJoins, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.clear_constraints()
-    # Use parquet because it has 9 input splits for lineitem, hence can have a
+    # Use Kudu because it has 9 input splits for lineitem, hence can have a
     # higher effective dop than parquet, which only has 3 splits.
     cls.ImpalaTestMatrix.add_dimension(create_kudu_dimension('tpch'))
     debug_action_dims = CORE_DEBUG_ACTION_DIMS
@@ -158,4 +158,8 @@ class TestSpillingBroadcastJoins(ImpalaTestSuite):
           'debug_action': debug_action_dims, 'mt_dop': [3]}))
 
   def test_spilling_broadcast_joins(self, vector):
+    # Disable bloom-filter for Kudu since the number of probe rows could be reduced
+    # if runtime bloom-filter is pushed to Kudu, hence change the spilling behavior.
+    self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=MIN_MAX")
+
     self.run_test_case('QueryTest/spilling-broadcast-joins', vector)