You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/03/03 05:08:14 UTC

[impala] branch master updated: IMPALA-10898: Add runtime IN-list filters for ORC tables

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 374783c  IMPALA-10898: Add runtime IN-list filters for ORC tables
374783c is described below

commit 374783c55ebdb07fbf669c2c5d80c4b01eb39d2b
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Dec 1 15:23:03 2021 +0800

    IMPALA-10898: Add runtime IN-list filters for ORC tables
    
    ORC files have optional bloom filter indexes for each column. Since
    ORC-1.7.0, the C++ reader supports pushing down predicates to skip
    unreleated RowGroups. The pushed down predicates will be evaludated on
    file indexes (i.e. statistics and bloom filter indexes). Note that only
    EQUALS and IN-list predicates can leverage bloom filter indexes.
    
    Currently Impala has two kinds of runtime filters: bloom filter and
    min-max filter. Unfortunately they can't be converted into EQUALS or
    IN-list predicates. So they can't leverage the file level bloom filter
    indexes.
    
    This patch adds runtime IN-list filters for this purpose. Currently they
    are generated for the build side of a broadcast join. They will only be
    applied on ORC tables and be pushed down to the ORC reader(i.e. ORC
    lib). To avoid exploding the IN-list, if # of distinct values of the
    build side exceeds a threshold (default to 1024), we set the filter to
    ALWAYS_TRUE and clear its entry. The threshold can be configured by a
    new query option, RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT.
    
    Evaluating runtime IN-list filters is much slower than evaluating
    runtime bloom filters due to the current simple implementation (i.e.
    std::unorder_set) and the lack of codegen. So we disable it at row
    level.
    
    For visibility, this patch addes two counters in the HdfsScanNode:
     - NumPushedDownPredicates
     - NumPushedDownRuntimeFilters
    They reflect the predicates and runtime filters that are pushed down to
    the ORC reader.
    
    Currently, runtime IN-list filters are disabled by default. This patch
    extends the query option, ENABLED_RUNTIME_FILTER_TYPES, to support a
    comma separated list of filter types. It defaults to be "BLOOM,MIN_MAX".
    Add "IN_LIST" in it to enable runtime IN-list filters.
    
    Ran perf tests on a 3 instances cluster on my desktop using TPC-DS with
    scale factor 20. It shows significant improvements in some queries:
    
    +-----------+-------------+--------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+--------+
    | Workload  | Query       | File Format        | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval   |
    +-----------+-------------+--------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+--------+
    | TPCDS(20) | TPCDS-Q67A  | orc / snap / block | 35.07  | 44.01       | I -20.32%  |   0.38%    |   1.38%        | 10    | I -25.69%      | -3.58   | -45.33 |
    | TPCDS(20) | TPCDS-Q37   | orc / snap / block | 1.08   | 1.45        | I -25.23%  |   7.14%    |   3.09%        | 10    | I -34.09%      | -3.58   | -12.94 |
    | TPCDS(20) | TPCDS-Q70A  | orc / snap / block | 6.30   | 8.60        | I -26.81%  |   5.24%    |   4.21%        | 10    | I -36.67%      | -3.58   | -14.88 |
    | TPCDS(20) | TPCDS-Q16   | orc / snap / block | 1.33   | 1.85        | I -28.28%  |   4.98%    |   5.92%        | 10    | I -39.38%      | -3.58   | -12.93 |
    | TPCDS(20) | TPCDS-Q18A  | orc / snap / block | 5.70   | 8.06        | I -29.25%  |   3.00%    |   4.12%        | 10    | I -40.30%      | -3.58   | -19.95 |
    | TPCDS(20) | TPCDS-Q22A  | orc / snap / block | 2.01   | 2.97        | I -32.21%  |   6.12%    |   5.94%        | 10    | I -47.68%      | -3.58   | -14.05 |
    | TPCDS(20) | TPCDS-Q77A  | orc / snap / block | 8.49   | 12.44       | I -31.75%  |   6.44%    |   3.96%        | 10    | I -49.71%      | -3.58   | -16.97 |
    | TPCDS(20) | TPCDS-Q75   | orc / snap / block | 7.76   | 12.27       | I -36.76%  |   5.01%    |   3.87%        | 10    | I -59.56%      | -3.58   | -23.26 |
    | TPCDS(20) | TPCDS-Q21   | orc / snap / block | 0.71   | 1.27        | I -44.26%  |   4.56%    |   4.24%        | 10    | I -77.31%      | -3.58   | -28.31 |
    | TPCDS(20) | TPCDS-Q80A  | orc / snap / block | 9.24   | 20.42       | I -54.77%  |   4.03%    |   3.82%        | 10    | I -123.12%     | -3.58   | -40.90 |
    | TPCDS(20) | TPCDS-Q39-1 | orc / snap / block | 1.07   | 2.26        | I -52.74%  | * 23.83% * |   2.60%        | 10    | I -149.68%     | -3.58   | -14.43 |
    | TPCDS(20) | TPCDS-Q39-2 | orc / snap / block | 1.00   | 2.33        | I -56.95%  | * 19.53% * |   2.07%        | 10    | I -151.89%     | -3.58   | -20.81 |
    +-----------+-------------+--------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+--------+
    "Base Avg" is the avg of the original time. "Avg" is the current time.
    
    However, we also see some regressions due to the suboptimal
    implementation. The follow-up JIRAs will focus on improvements:
     - IMPALA-11140: Codegen InListFilter::Insert() and InListFilter::Find()
     - IMPALA-11141: Use exact data types in IN-list filters instead of
       casting data to a set of int64_t or a set of string.
     - IMPALA-11142: Consider IN-list filters in partitioned joins.
    
    Tests:
     - Test IN-list filter on string, date and all integer types
     - Test IN-list filter with NULL
     - Test IN-list filter on complex exprs targets
    
    Change-Id: I25080628233799aa0b6be18d5a832f1385414501
    Reviewed-on: http://gerrit.cloudera.org:8080/18141
    Reviewed-by: Qifan Chen <qc...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/codegen/gen_ir_descriptions.py              |   1 +
 be/src/codegen/impala-ir.cc                        |   1 +
 be/src/exec/filter-context.cc                      |  33 +++-
 be/src/exec/filter-context.h                       |   3 +
 be/src/exec/hdfs-orc-scanner.cc                    | 158 ++++++++++++++-
 be/src/exec/hdfs-orc-scanner.h                     |  30 ++-
 be/src/exec/hdfs-scanner-ir.cc                     |   3 +
 be/src/exec/join-builder.cc                        |  10 +-
 be/src/exec/nested-loop-join-builder.h             |   2 +-
 be/src/exec/orc-metadata-utils.cc                  |   4 +-
 be/src/exec/partitioned-hash-join-builder.cc       |  22 ++-
 be/src/exec/partitioned-hash-join-builder.h        |   2 +-
 be/src/exec/scan-node.cc                           |  16 +-
 be/src/runtime/coordinator-filter-state.h          |   9 +-
 be/src/runtime/coordinator.cc                      |  56 +++++-
 be/src/runtime/runtime-filter-bank.cc              |  63 ++++--
 be/src/runtime/runtime-filter-bank.h               |  24 ++-
 be/src/runtime/runtime-filter-ir.cc                |  30 +--
 be/src/runtime/runtime-filter-test.cc              |   4 +-
 be/src/runtime/runtime-filter.cc                   |  20 +-
 be/src/runtime/runtime-filter.h                    |  17 +-
 be/src/runtime/runtime-filter.inline.h             |  28 +--
 be/src/service/child-query.cc                      |   5 +
 be/src/service/data-stream-service.cc              |   6 +-
 be/src/service/query-options-test.cc               |  60 +++++-
 be/src/service/query-options.cc                    |  51 ++++-
 be/src/service/query-options.h                     |   7 +-
 be/src/util/CMakeLists.txt                         |   4 +-
 be/src/util/debug-util.cc                          |   1 -
 be/src/util/debug-util.h                           |   1 -
 be/src/util/in-list-filter-ir.cc                   |  75 ++++++++
 be/src/util/in-list-filter.cc                      | 213 +++++++++++++++++++++
 be/src/util/in-list-filter.h                       |  97 ++++++++++
 common/protobuf/data_stream_service.proto          |  11 ++
 common/thrift/ImpalaService.thrift                 |   5 +-
 common/thrift/PlanNodes.thrift                     |   8 +-
 common/thrift/Query.thrift                         |   7 +-
 .../impala/planner/RuntimeFilterGenerator.java     | 158 ++++++++-------
 .../org/apache/impala/planner/PlannerTest.java     |  27 ++-
 .../functional/functional_schema_template.sql      |  15 ++
 .../datasets/functional/schema_constraints.csv     |   3 +
 .../PlannerTest/runtime-filter-query-options.test  | 149 ++++++++++++++
 .../queries/QueryTest/in_list_filters.test         | 173 +++++++++++++++++
 tests/query_test/test_runtime_filters.py           |  31 ++-
 44 files changed, 1429 insertions(+), 214 deletions(-)

diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index c1513a3..a482a72 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -217,6 +217,7 @@ ir_functions = [
   ["DECIMAL_MIN_MAX_FILTER_INSERT4", "_ZN6impala19DecimalMinMaxFilter7Insert4EPKv"],
   ["DECIMAL_MIN_MAX_FILTER_INSERT8", "_ZN6impala19DecimalMinMaxFilter7Insert8EPKv"],
   ["DECIMAL_MIN_MAX_FILTER_INSERT16", "_ZN6impala19DecimalMinMaxFilter8Insert16EPKv"],
+  ["IN_LIST_FILTER_INSERT", "_ZN6impala12InListFilter6InsertEPKv"],
   ["KRPC_DSS_GET_PART_EXPR_EVAL",
   "_ZN6impala20KrpcDataStreamSender25GetPartitionExprEvaluatorEi"],
   ["KRPC_DSS_HASH_AND_ADD_ROWS",
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 8ad4573..8e21c3f 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -72,6 +72,7 @@
 #include "udf/udf-ir.cc"
 #include "util/bloom-filter-ir.cc"
 #include "util/hash-util-ir.cc"
+#include "util/in-list-filter-ir.cc"
 #include "util/min-max-filter-ir.cc"
 
 #pragma clang diagnostic pop
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index a706377..ae72596 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -86,11 +86,14 @@ void FilterContext::Insert(TupleRow* row) const noexcept {
     uint32_t filter_hash = RawValue::GetHashValueFastHash32(
         val, expr_eval->root().type(), RuntimeFilterBank::DefaultHashSeed());
     local_bloom_filter->Insert(filter_hash);
-  } else {
-    DCHECK(filter->is_min_max_filter());
+  } else if (filter->is_min_max_filter()) {
     if (local_min_max_filter == nullptr || local_min_max_filter->AlwaysTrue()) return;
     void* val = expr_eval->GetValue(row);
     local_min_max_filter->Insert(val);
+  } else {
+    DCHECK(filter->is_in_list_filter());
+    if (local_in_list_filter == nullptr || local_in_list_filter->AlwaysTrue()) return;
+    local_in_list_filter->Insert(expr_eval->GetValue(row));
   }
 }
 
@@ -391,8 +394,7 @@ Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_exp
         builder.CreateStructGEP(nullptr, this_arg, 3, "local_bloom_filter_ptr");
     local_filter_arg =
         builder.CreateLoad(local_bloom_filter_ptr, "local_bloom_filter_arg");
-  } else {
-    DCHECK(filter_desc.type == TRuntimeFilterType::MIN_MAX);
+  } else if (filter_desc.type == TRuntimeFilterType::MIN_MAX) {
     // Load 'local_min_max_filter' from 'this_arg' FilterContext object.
     llvm::Value* local_min_max_filter_ptr =
         builder.CreateStructGEP(nullptr, this_arg, 4, "local_min_max_filter_ptr");
@@ -403,10 +405,17 @@ Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_exp
         local_min_max_filter_ptr, min_max_filter_type, "cast_min_max_filter_ptr");
     local_filter_arg =
         builder.CreateLoad(local_min_max_filter_ptr, "local_min_max_filter_arg");
+  } else {
+    DCHECK(filter_desc.type == TRuntimeFilterType::IN_LIST);
+    // Load 'local_in_list_filter' from 'this_arg' FilterContext object.
+    llvm::Value* local_in_list_filter_ptr =
+        builder.CreateStructGEP(nullptr, this_arg, 5, "local_in_list_filter_ptr");
+    local_filter_arg =
+        builder.CreateLoad(local_in_list_filter_ptr, "local_in_list_filter_arg");
   }
 
-  // Check if 'local_bloom_filter' or 'local_min_max_filter' are NULL (depending on
-  // filter desc) and return if so.
+  // Check if 'local_bloom_filter', 'local_min_max_filter' or 'local_in_list_filter' are
+  // NULL (depending on filter desc) and return if so.
   llvm::Value* filter_null = builder.CreateIsNull(local_filter_arg, "filter_is_null");
   llvm::BasicBlock* filter_not_null_block =
       llvm::BasicBlock::Create(context, "filters_not_null", insert_filter_fn);
@@ -515,8 +524,7 @@ Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_exp
 
     llvm::Value* insert_args[] = {local_filter_arg, hash_value};
     builder.CreateCall(insert_bloom_filter_fn, insert_args);
-  } else {
-    DCHECK(filter_desc.type == TRuntimeFilterType::MIN_MAX);
+  } else if (filter_desc.type == TRuntimeFilterType::MIN_MAX) {
     // The function for inserting into the min-max filter.
     llvm::Function* min_max_insert_fn = codegen->GetFunction(
         MinMaxFilter::GetInsertIRFunctionType(filter_expr->type()), false);
@@ -524,6 +532,15 @@ Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_exp
 
     llvm::Value* insert_filter_args[] = {local_filter_arg, val_ptr_phi};
     builder.CreateCall(min_max_insert_fn, insert_filter_args);
+  } else {
+    DCHECK(filter_desc.type == TRuntimeFilterType::IN_LIST);
+    // The function for inserting into the in-list filter.
+    llvm::Function* insert_in_list_filter_fn =
+        codegen->GetFunction(IRFunction::IN_LIST_FILTER_INSERT, false);
+    DCHECK(insert_in_list_filter_fn != nullptr);
+
+    llvm::Value* insert_filter_args[] = {local_filter_arg, val_ptr_phi};
+    builder.CreateCall(insert_in_list_filter_fn, insert_filter_args);
   }
 
   builder.CreateRetVoid();
diff --git a/be/src/exec/filter-context.h b/be/src/exec/filter-context.h
index aa80406..e94b2d3 100644
--- a/be/src/exec/filter-context.h
+++ b/be/src/exec/filter-context.h
@@ -103,6 +103,9 @@ struct FilterContext {
   /// Working copy of local min-max filter
   MinMaxFilter* local_min_max_filter = nullptr;
 
+  /// Working copy of local in-list filter
+  InListFilter* local_in_list_filter = nullptr;
+
   /// Struct name in LLVM IR.
   static const char* LLVM_CLASS_NAME;
 
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 3150a27..f9c368a 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -38,6 +38,7 @@
 
 using namespace impala;
 using namespace impala::io;
+using namespace impala::io;
 
 namespace impala {
 
@@ -311,6 +312,11 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
   metadata_range_ = stream_->scan_range();
   num_stripes_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcStripes", TUnit::UNIT);
+  num_pushed_down_predicates_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumPushedDownPredicates", TUnit::UNIT);
+  num_pushed_down_runtime_filters_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumPushedDownRuntimeFilters",
+          TUnit::UNIT);
 
   codegend_process_scratch_batch_fn_ = scan_node_->GetCodegenFn(THdfsFileFormat::ORC);
   if (codegend_process_scratch_batch_fn_ == nullptr) {
@@ -400,7 +406,10 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
   // blob more efficiently.
   row_reader_options_.setEnableLazyDecoding(true);
 
-  RETURN_IF_ERROR(PrepareSearchArguments());
+  // Clone the statistics conjuncts.
+  RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_, expr_perm_pool_.get(),
+      context_->expr_results_pool(), scan_node_->stats_conjunct_evals(),
+      &stats_conjunct_evals_));
 
   // To create OrcColumnReaders, we need the selected orc schema. It's a subset of the
   // file schema: a tree of selected orc types and can only be got from an orc::RowReader
@@ -907,6 +916,8 @@ Status HdfsOrcScanner::NextStripe() {
       RETURN_IF_ERROR(StartColumnReading(*stripe.get()));
     }
     row_reader_options_.range(stripe->getOffset(), stripe_len);
+    // Update SearchArguments in case any new runtime filters arrive.
+    RETURN_IF_ERROR(PrepareSearchArguments());
     try {
       row_reader_ = reader_->createRowReader(row_reader_options_);
     } catch (ResourceError& e) {  // errors throw from the orc scanner
@@ -1252,6 +1263,13 @@ bool HdfsOrcScanner::PrepareInListPredicate(uint64_t orc_column_id,
         << "Non-literal constant expr cannot be used";
     in_list.emplace_back(GetSearchArgumentLiteral(eval, i, type, &predicate_type));
   }
+  return PrepareInListPredicate(orc_column_id, type, in_list, sarg);
+}
+
+bool HdfsOrcScanner::PrepareInListPredicate(uint64_t orc_column_id,
+    const ColumnType& type, const std::vector<orc::Literal>& in_list,
+    orc::SearchArgumentBuilder* sarg) {
+  orc::PredicateDataType predicate_type = GetOrcPredicateDataType(type);
   // The ORC library requires IN-list has at least 2 literals. Converting to EQUALS
   // when there is one.
   if (in_list.size() == 1) {
@@ -1277,25 +1295,33 @@ void HdfsOrcScanner::PrepareIsNullPredicate(bool is_not_null, uint64_t orc_colum
   }
 }
 
+bool HdfsOrcScanner::ShouldUpdateSearchArgument() {
+  int num_current_filters = 0;
+  for (const FilterContext* ctx : filter_ctxs_) {
+    if (IsPushableInListFilter(ctx->filter)) num_current_filters++;
+  }
+  VLOG_FILE << "num_current_filters: " << num_current_filters
+            << ", last num_usable_in_list_filters: " << num_pushable_in_list_filters_;
+  return num_current_filters > num_pushable_in_list_filters_;
+}
+
 Status HdfsOrcScanner::PrepareSearchArguments() {
   if (!state_->query_options().orc_read_statistics) return Status::OK();
+  if (!ShouldUpdateSearchArgument()) return Status::OK();
+  VLOG_FILE << "Building SearchArgument on ORC file " << filename();
 
   const TupleDescriptor* stats_tuple_desc = scan_node_->stats_tuple_desc();
   if (!stats_tuple_desc) return Status::OK();
 
-  // Clone the min/max statistics conjuncts.
-  RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
-      expr_perm_pool_.get(), context_->expr_results_pool(),
-      scan_node_->stats_conjunct_evals(), &stats_conjunct_evals_));
-
   std::unique_ptr<orc::SearchArgumentBuilder> sarg =
       orc::SearchArgumentFactory::newBuilder();
   bool sargs_supported = false;
   const orc::Type* node = nullptr;
   bool pos_field;
   bool missing_field;
+  int num_pushed_down_predicates = 0;
 
-  DCHECK_EQ(stats_tuple_desc->slots().size(), stats_conjunct_evals_.size());
+  DCHECK_GE(stats_tuple_desc->slots().size(), stats_conjunct_evals_.size());
   for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
     SlotDescriptor* slot_desc = stats_tuple_desc->slots()[i];
     // Resolve column path to determine col idx in file schema.
@@ -1309,6 +1335,7 @@ Status HdfsOrcScanner::PrepareSearchArguments() {
       PrepareIsNullPredicate(fn_name == "is_not_null_pred", node->getColumnId(),
           slot_desc->type(), sarg.get());
       sargs_supported = true;
+      num_pushed_down_predicates++;
       continue;
     }
     ScalarExpr* const_expr = eval->root().GetChild(1);
@@ -1336,15 +1363,26 @@ Status HdfsOrcScanner::PrepareSearchArguments() {
         || node->getKind() == orc::TIMESTAMP) {
       continue;
     }
-
+    bool success;
     if (fn_name == "in_iterate" || fn_name == "in_set_lookup") {
-      sargs_supported |= PrepareInListPredicate(
+      success = PrepareInListPredicate(
           node->getColumnId(), slot_desc->type(), eval, sarg.get());
+      if (success) {
+        sargs_supported = true;
+        num_pushed_down_predicates++;
+      }
       continue;
     }
-    sargs_supported |= PrepareBinaryPredicate(fn_name, node->getColumnId(),
+    success = PrepareBinaryPredicate(fn_name, node->getColumnId(),
         slot_desc->type(), eval, sarg.get());
+    if (success) {
+      sargs_supported = true;
+      num_pushed_down_predicates++;
+    }
   }
+  VLOG_FILE << "Pushed " << num_pushed_down_predicates << " predicates down";
+  COUNTER_SET(num_pushed_down_predicates_counter_, num_pushed_down_predicates);
+  sargs_supported |= UpdateSearchArgumentWithFilters(sarg.get());
   if (sargs_supported) {
     try {
       std::unique_ptr<orc::SearchArgument> final_sarg = sarg->build();
@@ -1363,6 +1401,106 @@ Status HdfsOrcScanner::PrepareSearchArguments() {
   return Status::OK();
 }
 
+bool HdfsOrcScanner::IsPushableInListFilter(const RuntimeFilter* filter) {
+  VLOG_FILE << "Checking readiness";
+  if (!filter || !filter->is_in_list_filter() || !filter->HasFilter()) return false;
+  VLOG_FILE << "Checking partition filters";
+  // Only apply runtime filters on non-partition columns.
+  if (filter->IsBoundByPartitionColumn(GetScanNodeId())) return false;
+  VLOG_FILE << "Checking always_true of filter " << filter->id();
+  InListFilter* in_list_filter = filter->get_in_list_filter();
+  if (in_list_filter->AlwaysTrue()) return false;
+  VLOG_FILE << "Checking target expr of filter " << filter->id();
+  const TRuntimeFilterTargetDesc& target_desc = filter->filter_desc().targets[0];
+  // Filters target on an expr (e.g. 100 * col) can't be simply pushed down.
+  if (target_desc.target_expr.nodes.size() != 1) return false;
+  if (!target_desc.target_expr.nodes[0].__isset.slot_ref) return false;
+  return true;
+}
+
+bool HdfsOrcScanner::UpdateSearchArgumentWithFilters(orc::SearchArgumentBuilder* sarg) {
+  VLOG_FILE << "Updating SearchArgument with runtime filters";
+  int num_usable_filters = 0;
+  int num_pushed_down_filters = 0;
+  for (const FilterContext* ctx : filter_ctxs_) {
+    const RuntimeFilter* filter = ctx->filter;
+    if (!IsPushableInListFilter(filter)) continue;
+    num_usable_filters++;
+    VLOG_FILE << "Filter " << filter->id() << " is usable. "
+              << "Resolving filter target in ORC file " << filename();
+    InListFilter* in_list_filter = filter->get_in_list_filter();
+    const TRuntimeFilterTargetDesc& target_desc = filter->filter_desc().targets[0];
+    DCHECK_EQ(target_desc.target_expr_slotids.size(), 1);
+    TSlotId sid = target_desc.target_expr_slotids[0];
+    const SlotDescriptor* target_slot = nullptr;
+    for (const SlotDescriptor* slot : scan_node_->tuple_desc()->slots()) {
+      if (slot->id() == sid) {
+        target_slot = slot;
+        break;
+      }
+    }
+    if (target_slot == nullptr) {
+      VLOG_FILE << "Can't find slot of id=" << sid << " in "
+                << scan_node_->tuple_desc()->DebugString();
+      continue;
+    }
+    const orc::Type* node = nullptr;
+    bool pos_field;
+    bool missing_field;
+    Status s = schema_resolver_->ResolveColumn(target_slot->col_path(),
+        &node, &pos_field, &missing_field);
+    if (!s.ok()) {
+      VLOG_FILE << "Can't resolve " << target_slot->DebugString() << " in ORC file "
+                << filename();
+      continue;
+    }
+    if (pos_field || missing_field) continue;
+
+    VLOG_FILE << "Generating ORC IN-list for filter " << filter->id();
+    std::vector<orc::Literal> in_list;
+    const ColumnType& col_type = filter->type();
+    switch(col_type.type) {
+      case TYPE_TINYINT:
+      case TYPE_SMALLINT:
+      case TYPE_INT:
+      case TYPE_BIGINT: {
+        for (int64_t v : in_list_filter->values_) {
+          in_list.emplace_back(v);
+        }
+        break;
+      }
+      case TYPE_DATE: {
+        for (int64_t v : in_list_filter->values_) {
+          in_list.emplace_back(orc::PredicateDataType::DATE, v);
+        }
+        break;
+      }
+      case TYPE_STRING: {
+        for (const string& str : in_list_filter->str_values_) {
+          in_list.emplace_back(str.c_str(), str.length());
+        }
+        break;
+      }
+      default: break;
+    }
+    if (in_list_filter->ContainsNull()) {
+      // Add a null literal with type.
+      in_list.emplace_back(GetOrcPredicateDataType(col_type));
+    }
+    if (!in_list.empty()) {
+      VLOG_FILE << "Updated sarg with " << in_list.size() << " items for filter "
+                << filter->id();
+      if (PrepareInListPredicate(node->getColumnId(), col_type, in_list, sarg))
+        num_pushed_down_filters++;
+    }
+  }
+  num_pushable_in_list_filters_ = num_usable_filters;
+  COUNTER_SET(num_pushed_down_runtime_filters_counter_, num_pushed_down_filters);
+  VLOG_FILE << num_usable_filters << " usable filters. Pushed " << num_pushed_down_filters
+            << " filters down.";
+  return num_pushed_down_filters > 0;
+}
+
 Status HdfsOrcScanner::ReadFooterStream(void* buf, uint64_t length, uint64_t offset) {
   Status status;
   if (offset > stream_->file_offset()) {
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 337c6a9..f68002e 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -298,6 +298,16 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// Number of stripes that need to be read.
   RuntimeProfile::Counter* num_stripes_counter_ = nullptr;
 
+  /// Number of predicates that are pushed down to the ORC reader.
+  RuntimeProfile::Counter* num_pushed_down_predicates_counter_ = nullptr;
+
+  /// Number of runtime filters that are pushed down to the ORC reader.
+  RuntimeProfile::Counter* num_pushed_down_runtime_filters_counter_ = nullptr;
+
+  /// Number of arrived runtime IN-list filters that can be pushed down.
+  /// Used in ShouldUpdateSearchArgument(). Init to -1 so the check can pass at first.
+  int num_pushable_in_list_filters_ = -1;
+
   /// Number of collection items read in current row batch. It is a scanner-local counter
   /// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
   /// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
@@ -376,12 +386,14 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
       orc::SearchArgumentBuilder* sarg);
   bool PrepareInListPredicate(uint64_t orc_column_id, const ColumnType& type,
       ScalarExprEvaluator* eval, orc::SearchArgumentBuilder* sarg);
+  bool PrepareInListPredicate(uint64_t orc_column_id, const ColumnType& type,
+      const std::vector<orc::Literal>& in_list, orc::SearchArgumentBuilder* sarg);
   void PrepareIsNullPredicate(bool is_not_null, uint64_t orc_column_id,
       const ColumnType& type, orc::SearchArgumentBuilder* sarg);
 
-  /// Clones the stats conjucts into stats_conjunct_evals_, then builds ORC search
-  /// arguments from the conjuncts. The search arguments will exist for the lifespan of
-  /// the scanner and need not to be updated.
+  /// Builds ORC search arguments from the conjuncts and arrived runtime filters.
+  /// The search arguments will be re-built each time we start reading a new stripe,
+  /// because we may have new runtime filters arrive.
   Status PrepareSearchArguments() WARN_UNUSED_RESULT;
 
   /// Helper function for GetLiteralSearchArguments. The template parameter T is the
@@ -407,6 +419,18 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   }
 
   Status ReadFooterStream(void* buf, uint64_t length, uint64_t offset);
+
+  /// Updates the SearchArgument based on arrived runtime filters.
+  /// Returns true if any filter is applied.
+  bool UpdateSearchArgumentWithFilters(orc::SearchArgumentBuilder* sarg);
+
+  /// Decides whether we should rebuild the SearchArgument. It returns true at the first
+  /// call and whenever a new and usable IN-list filter arrives.
+  bool ShouldUpdateSearchArgument();
+
+  /// Checks whether the runtime filter is a usable IN-list filter that can be pushed
+  /// down.
+  bool IsPushableInListFilter(const RuntimeFilter* filter);
 };
 
 } // namespace impala
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index c3b557b..41cd8fb 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -102,6 +102,9 @@ bool HdfsScanner::EvalRuntimeFilter(int i, TupleRow* row) {
   const FilterContext* ctx = filter_ctxs_[i];
   ++stats->total_possible;
   if (stats->enabled_for_row && ctx->filter->HasFilter()) {
+    // Evaluating IN-list filter is much slower than evaluating the corresponding bloom
+    // filter. Skip it until we improve its performance.
+    if (ctx->filter->is_in_list_filter()) return true;
     ++stats->considered;
     if (!ctx->Eval(row)) {
       ++stats->rejected;
diff --git a/be/src/exec/join-builder.cc b/be/src/exec/join-builder.cc
index d5c1141..b2ab1d9 100644
--- a/be/src/exec/join-builder.cc
+++ b/be/src/exec/join-builder.cc
@@ -168,12 +168,16 @@ void JoinBuilder::PublishRuntimeFilters(const std::vector<FilterContext>& filter
       if (!ctx.local_min_max_filter->AlwaysTrue()) {
         ++num_enabled_filters;
       }
+    } else if (ctx.local_in_list_filter != nullptr) {
+      if (!ctx.local_in_list_filter->AlwaysTrue()) {
+        ++num_enabled_filters;
+      }
     }
 
-    runtime_state->filter_bank()->UpdateFilterFromLocal(
-        ctx.filter->id(), bloom_filter, ctx.local_min_max_filter);
+    runtime_state->filter_bank()->UpdateFilterFromLocal(ctx.filter->id(),
+        bloom_filter, ctx.local_min_max_filter, ctx.local_in_list_filter);
 
-    if ( ctx.local_min_max_filter != nullptr ) {
+    if (ctx.local_min_max_filter != nullptr) {
       VLOG(3) << name() << " published min/max filter: "
               << " id=" << ctx.filter->id()
               << ", details=" << ctx.local_min_max_filter->DebugString();
diff --git a/be/src/exec/nested-loop-join-builder.h b/be/src/exec/nested-loop-join-builder.h
index 685e278..84e159a 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -131,7 +131,7 @@ class NljBuilder : public JoinBuilder {
   inline RowBatchList* input_build_batches() { return &input_build_batches_; }
   inline RowBatchList* copied_build_batches() { return &copied_build_batches_; }
 
-  /// For each filter in filters_, allocate a bloom_filter from the fragment-local
+  /// For each filter in filters_, allocate a minmax_filter from the fragment-local
   /// RuntimeFilterBank and store it in runtime_filters_ to populate during the build
   /// phase.
   void AllocateRuntimeFilters();
diff --git a/be/src/exec/orc-metadata-utils.cc b/be/src/exec/orc-metadata-utils.cc
index 976c125..890db57 100644
--- a/be/src/exec/orc-metadata-utils.cc
+++ b/be/src/exec/orc-metadata-utils.cc
@@ -267,7 +267,7 @@ void OrcSchemaResolver::TranslateColPaths(const SchemaPath& col_path,
       table_col_path->push_back(num_part_cols + 1 + second_idx);
       file_col_path->push_back(second_idx);
     } else {
-      DCHECK_GE(first_idx, num_part_cols);
+      DCHECK_GE(first_idx, num_part_cols) << "col_path: " << PrintNumericPath(col_path);
       // 'col_path' refers to the ACID columns. In table schema they are nested
       // under the synthetic 'row__id' column. 'row__id' is at index 'num_part_cols'.
       table_col_path->push_back(num_part_cols);
@@ -290,7 +290,7 @@ void OrcSchemaResolver::TranslateColPaths(const SchemaPath& col_path,
       table_col_path->push_back(num_part_cols + 1 + second_idx);
       file_col_path->push_back(second_idx);
     } else {
-      DCHECK_GE(first_idx, num_part_cols);
+      DCHECK_GE(first_idx, num_part_cols) << "col_path: " << PrintNumericPath(col_path);
       // 'col_path' refers to the ACID columns. In table schema they are nested
       // under the synthetic 'row__id' column. 'row__id' is at index 'num_part_cols'.
       table_col_path->push_back(num_part_cols);
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 8187331..75c52ec 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -939,17 +939,21 @@ void PhjBuilder::AllocateRuntimeFilters() {
   DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filter_ctxs_.size() == 0)
       << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN";
   DCHECK(ht_ctx_ != nullptr);
-  for (int i = 0; i < filter_ctxs_.size(); ++i) {
-    if (filter_ctxs_[i].filter->is_bloom_filter()) {
-      filter_ctxs_[i].local_bloom_filter =
+  for (FilterContext& filter_ctx : filter_ctxs_) {
+    if (filter_ctx.filter->is_bloom_filter()) {
+      filter_ctx.local_bloom_filter =
           runtime_state_->filter_bank()->AllocateScratchBloomFilter(
-              filter_ctxs_[i].filter->id());
-    } else {
-      DCHECK(filter_ctxs_[i].filter->is_min_max_filter());
-      filter_ctxs_[i].local_min_max_filter =
+              filter_ctx.filter->id());
+    } else if (filter_ctx.filter->is_min_max_filter()) {
+      filter_ctx.local_min_max_filter =
           runtime_state_->filter_bank()->AllocateScratchMinMaxFilter(
-              filter_ctxs_[i].filter->id(), filter_ctxs_[i].expr_eval->root().type());
-      minmax_filter_ctxs_.push_back(&filter_ctxs_[i]);
+              filter_ctx.filter->id(), filter_ctx.expr_eval->root().type());
+      minmax_filter_ctxs_.push_back(&filter_ctx);
+    } else {
+      DCHECK(filter_ctx.filter->is_in_list_filter());
+      filter_ctx.local_in_list_filter =
+          runtime_state_->filter_bank()->AllocateScratchInListFilter(
+              filter_ctx.filter->id(), filter_ctx.expr_eval->root().type());
     }
   }
 
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index d1fe44d..70fa449 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -707,7 +707,7 @@ class PhjBuilder : public JoinBuilder {
   /// row-backing resources to it.
   void CloseAndDeletePartitions(RowBatch* row_batch);
 
-  /// For each filter in filters_, allocate a bloom_filter from the fragment-local
+  /// For each filter in filters_, allocate a runtime_filter from the fragment-local
   /// RuntimeFilterBank and store it in runtime_filters_ to populate during the build
   /// phase.
   void AllocateRuntimeFilters();
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 1c8c833..d036d5b 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -155,16 +155,12 @@ Status ScanNode::Prepare(RuntimeState* state) {
     filter_ctxs_.emplace_back();
     FilterContext& filter_ctx = filter_ctxs_.back();
     filter_ctx.filter = state->filter_bank()->RegisterConsumer(filter_desc);
-    // TODO: Enable stats for min-max filters when Kudu exposes info about filters
-    // (KUDU-2162).
-    if (filter_ctx.filter->is_bloom_filter() || filter_ctx.filter->is_min_max_filter()) {
-      string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id,
-          PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
-      RuntimeProfile* profile =
-          RuntimeProfile::Create(state->obj_pool(), filter_profile_title);
-      runtime_profile_->AddChild(profile);
-      filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile));
-    }
+    string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id,
+        PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
+    RuntimeProfile* profile =
+        RuntimeProfile::Create(state->obj_pool(), filter_profile_title);
+    runtime_profile_->AddChild(profile);
+    filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile));
   }
 
   rows_read_counter_ = PROFILE_RowsRead.Instantiate(runtime_profile());
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index ab21cd4..0a8f189 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -72,6 +72,7 @@ class Coordinator::FilterState {
   BloomFilterPB& bloom_filter() { return bloom_filter_; }
   std::string& bloom_filter_directory() { return bloom_filter_directory_; }
   MinMaxFilterPB& min_max_filter() { return min_max_filter_; }
+  InListFilterPB& in_list_filter() { return in_list_filter_; }
   std::vector<FilterTarget>* targets() { return &targets_; }
   const std::vector<FilterTarget>& targets() const { return targets_; }
   int64_t first_arrival_time() const { return first_arrival_time_; }
@@ -79,6 +80,7 @@ class Coordinator::FilterState {
   const TRuntimeFilterDesc& desc() const { return desc_; }
   bool is_bloom_filter() const { return desc_.type == TRuntimeFilterType::BLOOM; }
   bool is_min_max_filter() const { return desc_.type == TRuntimeFilterType::MIN_MAX; }
+  bool is_in_list_filter() const { return desc_.type == TRuntimeFilterType::IN_LIST; }
   int pending_count() const { return pending_count_; }
   void set_pending_count(int pending_count) { pending_count_ = pending_count; }
   int num_producers() const { return num_producers_; }
@@ -86,9 +88,11 @@ class Coordinator::FilterState {
   bool disabled() const {
     if (is_bloom_filter()) {
       return bloom_filter_.always_true();
-    } else {
-      DCHECK(is_min_max_filter());
+    } else if (is_min_max_filter()) {
       return min_max_filter_.always_true();
+    } else {
+      DCHECK(is_in_list_filter());
+      return in_list_filter_.always_true();
     }
   }
   bool enabled() const { return !disabled(); }
@@ -151,6 +155,7 @@ class Coordinator::FilterState {
   /// aggregated Bloom filter.
   std::string bloom_filter_directory_;
   MinMaxFilterPB min_max_filter_;
+  InListFilterPB in_list_filter_;
 
   /// Time at which first local filter arrived.
   int64_t first_arrival_time_ = 0L;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 6b384d0..0d47448 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -58,6 +58,7 @@
 #include "util/hdfs-util.h"
 #include "util/histogram-metric.h"
 #include "util/kudu-status-util.h"
+#include "util/in-list-filter.h"
 #include "util/min-max-filter.h"
 #include "util/pretty-printer.h"
 #include "util/table-printer.h"
@@ -595,6 +596,7 @@ string Coordinator::FilterDebugString() {
   table_printer.AddColumn("Est fpp", false);
   table_printer.AddColumn("Min value", false);
   table_printer.AddColumn("Max value", false);
+  table_printer.AddColumn("In-list size", false);
   ObjectPool temp_object_pool;
   MemTracker temp_mem_tracker;
   for (auto& v: filter_routing_table_->id_to_filter) {
@@ -643,9 +645,9 @@ string Coordinator::FilterDebugString() {
       stringstream ss;
       ss << setprecision(3) << fpp;
       row.push_back(ss.str());
-      row.push_back("");
-      row.push_back("");
-    } else {
+      // The following 3 fields belong to MinMax/IN-list filters.
+      for (int i = 0; i < 3; ++i) row.push_back("");
+    } else if (state.is_min_max_filter()) {
       // Add the filter type for minmax filters.
       row.push_back(PrintThriftEnum(state.desc().type));
       row.push_back("");
@@ -681,6 +683,25 @@ string Coordinator::FilterDebugString() {
           row.push_back("PartialUpdates");
         }
       }
+      row.push_back("");
+    } else if (state.is_in_list_filter()) {
+      row.push_back(PrintThriftEnum(state.desc().type));
+      // Skip 3 fields belong to Bloom/MinMax filters.
+      for (int i = 0; i < 3; ++i) row.push_back("");
+      const InListFilterPB& in_list_filterPB =
+          const_cast<FilterState*>(&state)->in_list_filter();
+      if (state.AlwaysTrueFilterReceived()) {
+        row.push_back("AlwaysTrue");
+      } else if (state.received_all_updates()) {
+        if (state.AlwaysFalseFlippedToFalse()
+            || InListFilter::AlwaysFalse(in_list_filterPB)) {
+          row.push_back("AlwaysFalse");
+        } else {
+          row.push_back(std::to_string(in_list_filterPB.value().size()));
+        }
+      } else {
+        row.push_back("PartialUpdates");
+      }
     }
     table_printer.AddRow(row);
   }
@@ -1492,9 +1513,11 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
           || rpc_params.bloom_filter().always_true()
           || !state->bloom_filter_directory().empty());
 
-    } else {
-      DCHECK(state->is_min_max_filter());
+    } else if (state->is_min_max_filter()) {
       MinMaxFilter::Copy(state->min_max_filter(), rpc_params.mutable_min_max_filter());
+    } else {
+      DCHECK(state->is_in_list_filter());
+      *rpc_params.mutable_in_list_filter() = state->in_list_filter();
     }
 
     // Filter is complete. We disable it so future UpdateFilter rpcs will be ignored,
@@ -1575,8 +1598,7 @@ void Coordinator::FilterState::ApplyUpdate(
             sidecar_slice.size());
       }
     }
-  } else {
-    DCHECK(is_min_max_filter());
+  } else if (is_min_max_filter()) {
     DCHECK(params.has_min_max_filter());
     ColumnType col_type = ColumnType::FromThrift(desc_.src_expr.nodes[0].type);
     VLOG(3) << "Coordinator::FilterState::ApplyUpdate() on minmax."
@@ -1598,6 +1620,15 @@ void Coordinator::FilterState::ApplyUpdate(
       MinMaxFilter::Or(params.min_max_filter(), &min_max_filter_, col_type);
     }
     VLOG(3) << " Updated accumulated filter=" << DebugString();
+  } else {
+    DCHECK(is_in_list_filter());
+    DCHECK(params.has_in_list_filter());
+    VLOG(3) << "Update IN-list filter " << params.filter_id() << ", "
+            << InListFilter::DebugString(params.in_list_filter());
+    DCHECK(!in_list_filter_.always_true());
+    DCHECK_EQ(in_list_filter_.value_size(), 0);
+    DCHECK(!in_list_filter_.contains_null());
+    in_list_filter_ = params.in_list_filter();
   }
 
   if (pending_count_ == 0 || disabled()) {
@@ -1611,7 +1642,7 @@ void Coordinator::FilterState::DisableAndRelease(
   Release(tracker);
 }
 
-void Coordinator::FilterState::Disable(const bool all_updates_received) {
+void Coordinator::FilterState::Disable(bool all_updates_received) {
   all_updates_received_ = all_updates_received;
   if (is_bloom_filter()) {
     bloom_filter_.set_always_true(true);
@@ -1619,13 +1650,18 @@ void Coordinator::FilterState::Disable(const bool all_updates_received) {
       always_false_flipped_to_false_ = true;
     }
     bloom_filter_.set_always_false(false);
-  } else {
-    DCHECK(is_min_max_filter());
+  } else if (is_min_max_filter()) {
     min_max_filter_.set_always_true(true);
     if (MinMaxFilter::AlwaysFalse(min_max_filter_)) {
       always_false_flipped_to_false_ = true;
     }
     min_max_filter_.set_always_false(false);
+  } else {
+    DCHECK(is_in_list_filter());
+    if (InListFilter::AlwaysFalse(in_list_filter_)) {
+      always_false_flipped_to_false_ = true;
+    }
+    in_list_filter_.set_always_true(true);
   }
 }
 
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 994715f..86b78c4 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -68,6 +68,8 @@ RuntimeFilterBank::RuntimeFilterBank(QueryState* query_state,
         -1, "Runtime Filter Bank", query_state->query_mem_tracker(), false))),
     bloom_memory_allocated_(
         query_state->host_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES)),
+    total_in_list_filter_items_(
+        query_state->host_profile()->AddCounter("InListFilterItems", TUnit::UNIT)),
     total_bloom_filter_mem_required_(total_filter_mem_required) {}
 
 RuntimeFilterBank::~RuntimeFilterBank() {}
@@ -158,7 +160,8 @@ void RuntimeFilterBank::UpdateFilterCompleteCb(
 }
 
 void RuntimeFilterBank::UpdateFilterFromLocal(
-    int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
+    int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter,
+    InListFilter* in_list_filter) {
   DCHECK_NE(query_state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
       << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
   // This function is only called from ExecNode::Open() or more specifically
@@ -187,15 +190,17 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
         return;
       }
       VLOG(3) << "Setting broadcast filter " << filter_id;
-      result_filter->SetFilter(bloom_filter, min_max_filter);
+      result_filter->SetFilter(bloom_filter, min_max_filter, in_list_filter);
       complete_filter = result_filter;
     } else {
+      DCHECK(in_list_filter == nullptr)
+          << "InListFilter should only be generated for broadcast joins";
       // Merge partitioned join filters in parallel - each thread setting the filter will
       // try to merge its filter with a previously merged filter, looping until either
       // it has produced the final filter or it runs out of other filters to merge.
       unique_ptr<RuntimeFilter> tmp_filter = make_unique<RuntimeFilter>(
           result_filter->filter_desc(), result_filter->filter_size());
-      tmp_filter->SetFilter(bloom_filter, min_max_filter);
+      tmp_filter->SetFilter(bloom_filter, min_max_filter, nullptr);
       while (produced_filter.pending_merge_filter != nullptr) {
         unique_ptr<RuntimeFilter> pending_merge =
             std::move(produced_filter.pending_merge_filter);
@@ -245,8 +250,14 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
             << consumed_filter->filter_desc();
       } else {
         consumed_filter->SetFilter(complete_filter);
-        query_state_->host_profile()->AddInfoString(
-            Substitute("Filter $0 arrival", filter_id),
+        string into_key;
+        if (in_list_filter != nullptr) {
+          into_key = Substitute("Filter $0 arrival with $1 items",
+              filter_id, in_list_filter->NumItems());
+        } else {
+          into_key = Substitute("Filter $0 arrival", filter_id);
+        }
+        query_state_->host_profile()->AddInfoString(into_key,
             PrettyPrinter::Print(consumed_filter->arrival_delay_ms(), TUnit::TIME_MS));
       }
     }
@@ -266,9 +277,11 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
     TRuntimeFilterType::type type = complete_filter->filter_desc().type;
     if (type == TRuntimeFilterType::BLOOM) {
       BloomFilter::ToProtobuf(bloom_filter, controller, params.mutable_bloom_filter());
-    } else {
-      DCHECK_EQ(type, TRuntimeFilterType::MIN_MAX);
+    } else if (type == TRuntimeFilterType::MIN_MAX) {
       min_max_filter->ToProtobuf(params.mutable_min_max_filter());
+    } else {
+      DCHECK_EQ(type, TRuntimeFilterType::IN_LIST);
+      InListFilter::ToProtobuf(in_list_filter, params.mutable_in_list_filter());
     }
     const TNetworkAddress& krpc_address = query_state_->query_ctx().coord_ip_address;
     const std::string& hostname = query_state_->query_ctx().coord_hostname;
@@ -315,6 +328,8 @@ void RuntimeFilterBank::PublishGlobalFilter(
   }
   BloomFilter* bloom_filter = nullptr;
   MinMaxFilter* min_max_filter = nullptr;
+  InListFilter* in_list_filter = nullptr;
+  string details;
   if (fs->consumed_filter->is_bloom_filter()) {
     DCHECK(params.has_bloom_filter());
     if (params.bloom_filter().always_true()) {
@@ -354,16 +369,25 @@ void RuntimeFilterBank::PublishGlobalFilter(
         }
       }
     }
-  } else {
-    DCHECK(fs->consumed_filter->is_min_max_filter());
+  } else if (fs->consumed_filter->is_min_max_filter()) {
     DCHECK(params.has_min_max_filter());
     min_max_filter = MinMaxFilter::Create(params.min_max_filter(),
         fs->consumed_filter->type(), &obj_pool_, filter_mem_tracker_);
     fs->min_max_filters.push_back(min_max_filter);
+  } else {
+    DCHECK(fs->consumed_filter->is_in_list_filter());
+    DCHECK(params.has_in_list_filter());
+    DCHECK(query_state_->query_options().__isset.runtime_in_list_filter_entry_limit);
+    int entry_limit = query_state_->query_options().runtime_in_list_filter_entry_limit;
+    in_list_filter = InListFilter::Create(params.in_list_filter(),
+        fs->consumed_filter->type(), entry_limit, &obj_pool_);
+    fs->in_list_filters.push_back(in_list_filter);
+    total_in_list_filter_items_->Add(params.in_list_filter().value_size());
+    details = Substitute(" with $0 items", params.in_list_filter().value_size());
   }
-  fs->consumed_filter->SetFilter(bloom_filter, min_max_filter);
+  fs->consumed_filter->SetFilter(bloom_filter, min_max_filter, in_list_filter);
   query_state_->host_profile()->AddInfoString(
-      Substitute("Filter $0 arrival", params.filter_id()),
+      Substitute("Filter $0 arrival$1", params.filter_id(), details),
       PrettyPrinter::Print(fs->consumed_filter->arrival_delay_ms(), TUnit::TIME_MS));
 }
 
@@ -407,6 +431,22 @@ MinMaxFilter* RuntimeFilterBank::AllocateScratchMinMaxFilter(
   return min_max_filter;
 }
 
+InListFilter* RuntimeFilterBank::AllocateScratchInListFilter(
+    int32_t filter_id, ColumnType type) {
+  auto it = filters_.find(filter_id);
+  DCHECK(it != filters_.end()) << "Filter ID " << filter_id << " not registered";
+  PerFilterState* fs = it->second.get();
+  lock_guard<SpinLock> l(fs->lock);
+  if (closed_) return nullptr;
+
+  DCHECK(query_state_->query_options().__isset.runtime_in_list_filter_entry_limit);
+  int32_t entry_limit = query_state_->query_options().runtime_in_list_filter_entry_limit;
+  InListFilter* in_list_filter =
+      InListFilter::Create(type, entry_limit, &obj_pool_);
+  fs->in_list_filters.push_back(in_list_filter);
+  return in_list_filter;
+}
+
 vector<unique_lock<SpinLock>> RuntimeFilterBank::LockAllFilters() {
   vector<unique_lock<SpinLock>> locks;
   for (auto& entry : filters_) locks.emplace_back(entry.second->lock);
@@ -449,6 +489,7 @@ void RuntimeFilterBank::Close() {
   for (auto& entry : filters_) {
     for (BloomFilter* filter : entry.second->bloom_filters) filter->Close();
     for (MinMaxFilter* filter : entry.second->min_max_filters) filter->Close();
+    for (InListFilter* filter : entry.second->in_list_filters) filter->Close();
   }
   obj_pool_.Clear();
   if (buffer_pool_client_.is_registered()) {
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index b44ea45..7a66ec6 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -46,6 +46,7 @@ namespace impala {
 class BloomFilter;
 class MemTracker;
 class MinMaxFilter;
+class InListFilter;
 class RuntimeFilter;
 class QueryState;
 class TBloomFilter;
@@ -118,12 +119,13 @@ class RuntimeFilterBank {
   /// to check for the filter's arrival.
   RuntimeFilter* RegisterConsumer(const TRuntimeFilterDesc& filter_desc);
 
-  /// Updates a filter's 'bloom_filter' or 'min_max_filter' which has been produced by
-  /// some operator in a local fragment instance. At most one of 'bloom_filter' and
-  /// 'min_max_filter' may be non-NULL, depending on the filter's type. They may both be
-  /// NULL, representing a filter that allows all rows to pass.
-  void UpdateFilterFromLocal(
-      int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
+  /// Updates a filter's 'bloom_filter', 'min_max_filter' or 'in_list_filter' which has
+  /// been produced by some operator in a local fragment instance. At most one of
+  /// 'bloom_filter', 'min_max_filter' and 'in_list_filter' may be non-NULL, depending on
+  /// the filter's type. They may both be NULL, representing a filter that allows all rows
+  /// to pass.
+  void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter,
+      MinMaxFilter* min_max_filter, InListFilter* in_list_filter);
 
   /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
   /// consumption by operators that wish to use it for filtering.
@@ -142,6 +144,9 @@ class RuntimeFilterBank {
   /// Returns a new MinMaxFilter. Handles memory the same as AllocateScratchBloomFilter().
   MinMaxFilter* AllocateScratchMinMaxFilter(int32_t filter_id, ColumnType type);
 
+  /// Returns a new InListFilter. Handles memory the same as AllocateScratchBloomFilter().
+  InListFilter* AllocateScratchInListFilter(int32_t filter_id, ColumnType type);
+
   /// Default hash seed to use when computing hashed values to insert into filters.
   static int32_t IR_ALWAYS_INLINE DefaultHashSeed() { return 1234; }
 
@@ -221,6 +226,10 @@ class RuntimeFilterBank {
     /// Contains references to all the min-max filters generated. Used in Close() to
     /// safely release all memory allocated for MinMaxFilters.
     vector<MinMaxFilter*> min_max_filters;
+
+    /// Contains references to all the in-list filters generated. Used in Close() to
+    /// safely release all memory allocated for InListFilters.
+    vector<InListFilter*> in_list_filters;
   } CACHELINE_ALIGNED;
 
   /// Object pool for objects that will be freed in Close(), e.g. allocated filters.
@@ -260,6 +269,9 @@ class RuntimeFilterBank {
   /// Total amount of memory allocated to Bloom Filters
   RuntimeProfile::Counter* const bloom_memory_allocated_;
 
+  /// Total number of items of all in-list filters.
+  RuntimeProfile::Counter* const total_in_list_filter_items_;
+
   /// Total amount of memory required by the bloom filters as calculated by the planner.
   const int64_t total_bloom_filter_mem_required_;
 
diff --git a/be/src/runtime/runtime-filter-ir.cc b/be/src/runtime/runtime-filter-ir.cc
index 4ae1325..5c4f811 100644
--- a/be/src/runtime/runtime-filter-ir.cc
+++ b/be/src/runtime/runtime-filter-ir.cc
@@ -22,18 +22,26 @@ using namespace impala;
 
 bool IR_ALWAYS_INLINE RuntimeFilter::Eval(
     void* val, const ColumnType& col_type) const noexcept {
-  if (LIKELY((is_bloom_filter()))) {
-    if (bloom_filter_.Load() == BloomFilter::ALWAYS_TRUE_FILTER) return true;
-    uint32_t h = RawValue::GetHashValueFastHash32(
-        val, col_type, RuntimeFilterBank::DefaultHashSeed());
-    return bloom_filter_.Load()->Find(h);
-  } else {
-    DCHECK(is_min_max_filter());
-    // Min/max overlap does not deal with nulls (val==nullptr).
-    if (LIKELY(val)) {
-      MinMaxFilter* filter = get_min_max(); // get the loaded version.
+  switch (filter_desc().type) {
+    case TRuntimeFilterType::BLOOM: {
+      if (bloom_filter_.Load() == BloomFilter::ALWAYS_TRUE_FILTER) return true;
+      uint32_t h = RawValue::GetHashValueFastHash32(
+          val, col_type, RuntimeFilterBank::DefaultHashSeed());
+      return bloom_filter_.Load()->Find(h);
+    }
+    case TRuntimeFilterType::MIN_MAX: {
+      // Min/max overlap does not deal with nulls (val==nullptr).
+      if (LIKELY(val)) {
+        MinMaxFilter* filter = get_min_max(); // get the loaded version.
+        if (LIKELY(filter && !filter->AlwaysTrue())) {
+          return filter->EvalOverlap(col_type, val, val);
+        }
+      }
+    }
+    case TRuntimeFilterType::IN_LIST: {
+      InListFilter* filter = get_in_list_filter();
       if (LIKELY(filter && !filter->AlwaysTrue())) {
-        return filter->EvalOverlap(col_type, val, val);
+        return filter->Find(val, col_type);
       }
     }
   }
diff --git a/be/src/runtime/runtime-filter-test.cc b/be/src/runtime/runtime-filter-test.cc
index 76680a5..597e357 100644
--- a/be/src/runtime/runtime-filter-test.cc
+++ b/be/src/runtime/runtime-filter-test.cc
@@ -92,7 +92,9 @@ TEST_F(RuntimeFilterTest, Arrived) {
       new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); }));
   SleepForMs(100); // give waiting thread a head start
   workers.add_thread(
-      new thread([&tc] { tc.runtime_filter->SetFilter(nullptr, tc.min_max_filter); }));
+      new thread([&tc] {
+        tc.runtime_filter->SetFilter(nullptr, tc.min_max_filter, nullptr);
+      }));
   workers.join_all();
   sw.Stop();
 
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 04be0ab..8dc0d6d 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -25,17 +25,20 @@ using namespace impala;
 
 const char* RuntimeFilter::LLVM_CLASS_NAME = "class.impala::RuntimeFilter";
 
-void RuntimeFilter::SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
+void RuntimeFilter::SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter,
+    InListFilter* in_list_filter) {
   {
     unique_lock<mutex> l(arrival_mutex_);
     DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times.";
-    DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
+    DCHECK(bloom_filter_.Load() == nullptr);
+    DCHECK(min_max_filter_.Load() == nullptr);
+    DCHECK(in_list_filter_.Load() == nullptr);
     if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled.
-    if (is_bloom_filter()) {
-      bloom_filter_.Store(bloom_filter);
-    } else {
-      DCHECK(is_min_max_filter());
-      min_max_filter_.Store(min_max_filter);
+    switch (filter_desc_.type) {
+      case TRuntimeFilterType::BLOOM: bloom_filter_.Store(bloom_filter); break;
+      case TRuntimeFilterType::MIN_MAX: min_max_filter_.Store(min_max_filter); break;
+      case TRuntimeFilterType::IN_LIST: in_list_filter_.Store(in_list_filter); break;
+      default: DCHECK(false);
     }
     arrival_time_.Store(MonotonicMillis());
     has_filter_.Store(true);
@@ -46,7 +49,8 @@ void RuntimeFilter::SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_f
 void RuntimeFilter::SetFilter(RuntimeFilter* other) {
   DCHECK_EQ(id(), other->id());
   SetFilter(is_bloom_filter() ? other->bloom_filter_.Load() : nullptr,
-      is_min_max_filter() ? other->min_max_filter_.Load() : nullptr);
+      is_min_max_filter() ? other->min_max_filter_.Load() : nullptr,
+      is_in_list_filter() ? other->in_list_filter_.Load() : nullptr);
 }
 
 void RuntimeFilter::Or(RuntimeFilter* other) {
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index fa8eccf..ce5aa45 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -23,6 +23,7 @@
 #include "runtime/raw-value.h"
 #include "runtime/runtime-filter-bank.h"
 #include "util/bloom-filter.h"
+#include "util/in-list-filter.h"
 #include "util/condition-variable.h"
 #include "util/time.h"
 
@@ -45,8 +46,8 @@ class RuntimeFilterTest;
 class RuntimeFilter {
  public:
   RuntimeFilter(const TRuntimeFilterDesc& filter, int64_t filter_size)
-      : bloom_filter_(nullptr), min_max_filter_(nullptr), filter_desc_(filter),
-        registration_time_(MonotonicMillis()), arrival_time_(0L),
+      : bloom_filter_(nullptr), min_max_filter_(nullptr), in_list_filter_(nullptr),
+        filter_desc_(filter), registration_time_(MonotonicMillis()), arrival_time_(0L),
         filter_size_(filter_size) {
     DCHECK(filter_desc_.type == TRuntimeFilterType::MIN_MAX || filter_size_ > 0);
   }
@@ -64,6 +65,9 @@ class RuntimeFilter {
   bool is_min_max_filter() const {
     return filter_desc().type == TRuntimeFilterType::MIN_MAX;
   }
+  bool is_in_list_filter() const {
+    return filter_desc().type == TRuntimeFilterType::IN_LIST;
+  }
 
   extdatasource::TComparisonOp::type getCompareOp() const {
     return filter_desc().compareOp;
@@ -71,11 +75,13 @@ class RuntimeFilter {
 
   BloomFilter* get_bloom_filter() const { return bloom_filter_.Load(); }
   MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); }
+  InListFilter* get_in_list_filter() const { return in_list_filter_.Load(); }
 
-  /// Sets the internal filter bloom_filter to 'bloom_filter' or 'min_max_filter'
+  /// Sets the internal filter to 'bloom_filter', 'min_max_filter' or 'in_list_filter'
   /// depending on the type of this RuntimeFilter. Can only legally be called
   /// once per filter. Does not acquire the memory associated with 'bloom_filter'.
-  void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
+  void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter,
+      InListFilter* in_list_filter);
 
   /// Set the internal bloom or min-max filter to the equivalent filter from 'other'.
   /// The parameters of 'other' must be compatible and the filters must have the same
@@ -145,6 +151,9 @@ class RuntimeFilter {
   /// May be NULL even after arrival_time_ is set if filter_desc_.min_max_filter is false.
   AtomicPtr<MinMaxFilter> min_max_filter_;
 
+  /// May be NULL even after arrival_time_ is set if filter_desc_.in_list_filter is false.
+  AtomicPtr<InListFilter> in_list_filter_;
+
   /// Reference to the filter's thrift descriptor in the thrift Plan tree.
   const TRuntimeFilterDesc& filter_desc_;
 
diff --git a/be/src/runtime/runtime-filter.inline.h b/be/src/runtime/runtime-filter.inline.h
index 635b62d..30fe12e 100644
--- a/be/src/runtime/runtime-filter.inline.h
+++ b/be/src/runtime/runtime-filter.inline.h
@@ -29,22 +29,28 @@
 namespace impala {
 
 inline bool RuntimeFilter::AlwaysTrue() const {
-  if (is_bloom_filter()) {
-    return HasFilter() && bloom_filter_.Load() == BloomFilter::ALWAYS_TRUE_FILTER;
-  } else {
-    DCHECK(is_min_max_filter());
-    return HasFilter() && min_max_filter_.Load()->AlwaysTrue();
+  switch (filter_desc().type) {
+    case TRuntimeFilterType::BLOOM:
+      return HasFilter() && bloom_filter_.Load() == BloomFilter::ALWAYS_TRUE_FILTER;
+    case TRuntimeFilterType::MIN_MAX:
+      return HasFilter() && min_max_filter_.Load()->AlwaysTrue();
+    case TRuntimeFilterType::IN_LIST:
+      return HasFilter() && in_list_filter_.Load()->AlwaysTrue();
   }
+  return false;
 }
 
 inline bool RuntimeFilter::AlwaysFalse() const {
-  if (is_bloom_filter()) {
-    return bloom_filter_.Load() != BloomFilter::ALWAYS_TRUE_FILTER
-        && bloom_filter_.Load()->AlwaysFalse();
-  } else {
-    DCHECK(is_min_max_filter());
-    return min_max_filter_.Load() != nullptr && min_max_filter_.Load()->AlwaysFalse();
+  switch (filter_desc().type) {
+    case TRuntimeFilterType::BLOOM:
+      return bloom_filter_.Load() != BloomFilter::ALWAYS_TRUE_FILTER
+             && bloom_filter_.Load()->AlwaysFalse();
+    case TRuntimeFilterType::MIN_MAX:
+      return min_max_filter_.Load() != nullptr && min_max_filter_.Load()->AlwaysFalse();
+    case TRuntimeFilterType::IN_LIST:
+      return in_list_filter_.Load() != nullptr && in_list_filter_.Load()->AlwaysFalse();
   }
+  return false;
 }
 
 }
diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc
index 2c6134d..b3aa501 100644
--- a/be/src/service/child-query.cc
+++ b/be/src/service/child-query.cc
@@ -140,6 +140,11 @@ void PrintQueryOptionValue(const impala::TCompressionCodec& compression_codec,
   }
 }
 
+void PrintQueryOptionValue(const set<impala::TRuntimeFilterType::type>& filter_types,
+    stringstream& val) {
+  val << filter_types;
+}
+
 void ChildQuery::SetQueryOptions(const TQueryOptions& parent_options,
     TExecuteStatementReq* exec_stmt_req) {
   map<string, string> conf;
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 480e42f..e135098 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -116,7 +116,8 @@ void DataStreamService::UpdateFilter(
   DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
   DCHECK(req->has_filter_id());
   DCHECK(req->has_query_id());
-  DCHECK(req->has_bloom_filter() || req->has_min_max_filter());
+  DCHECK(req->has_bloom_filter() || req->has_min_max_filter()
+      || req->has_in_list_filter());
   ExecEnv::GetInstance()->impala_server()->UpdateFilter(resp, *req, context);
   RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
 }
@@ -127,7 +128,8 @@ void DataStreamService::PublishFilter(
   DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
   DCHECK(req->has_filter_id());
   DCHECK(req->has_dst_query_id());
-  DCHECK(req->has_bloom_filter() || req->has_min_max_filter());
+  DCHECK(req->has_bloom_filter() || req->has_min_max_filter()
+      || req->has_in_list_filter());
   QueryState::ScopedRef qs(ProtoToQueryId(req->dst_query_id()));
 
   if (qs.get() != nullptr) {
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index e313afb..c07c58c 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -233,10 +233,6 @@ TEST(QueryOptions, SetEnumOptions) {
   TestEnumCase(options, CASE(kudu_read_mode, TKuduReadMode,
       (DEFAULT, READ_LATEST, READ_AT_SNAPSHOT)), true);
   TestEnumCase(options,
-      CASE(enabled_runtime_filter_types, TEnabledRuntimeFilterTypes,
-          (BLOOM, MIN_MAX, ALL)),
-      true);
-  TestEnumCase(options,
       CASE(kudu_replica_selection, TKuduReplicaSelection, (LEADER_ONLY, CLOSEST_REPLICA)),
       true);
 #undef CASE
@@ -558,6 +554,62 @@ TEST(QueryOptions, CompressionCodec) {
 #undef ENTRY
 }
 
+void VerifyFilterTypes(const set<TRuntimeFilterType::type>& types,
+    const std::initializer_list<TRuntimeFilterType::type>& expects) {
+  EXPECT_EQ(expects.size(), types.size());
+  for (const auto t : expects) {
+    EXPECT_NE(types.end(), types.find(t));
+  }
+}
+
+// Tests for setting of ENABLED_RUNTIME_FILTER_TYPES.
+TEST(QueryOptions, EnabledRuntimeFilterTypes) {
+  const string KEY = "enabled_runtime_filter_types";
+  {
+    TQueryOptions options;
+    EXPECT_TRUE(SetQueryOption(KEY, "all", &options, nullptr).ok());
+    VerifyFilterTypes(options.enabled_runtime_filter_types,
+        {
+            TRuntimeFilterType::BLOOM,
+            TRuntimeFilterType::MIN_MAX,
+            TRuntimeFilterType::IN_LIST
+        });
+  }
+  {
+    TQueryOptions options;
+    EXPECT_TRUE(SetQueryOption(KEY, "bloom,min_max,in_list", &options, nullptr).ok());
+    VerifyFilterTypes(options.enabled_runtime_filter_types,
+        {
+            TRuntimeFilterType::BLOOM,
+            TRuntimeFilterType::MIN_MAX,
+            TRuntimeFilterType::IN_LIST
+        });
+  }
+  {
+    TQueryOptions options;
+    EXPECT_TRUE(SetQueryOption(KEY, "bloom", &options, nullptr).ok());
+    VerifyFilterTypes(options.enabled_runtime_filter_types, {TRuntimeFilterType::BLOOM});
+  }
+  {
+    TQueryOptions options;
+    EXPECT_TRUE(SetQueryOption(KEY, "bloom,min_max", &options, nullptr).ok());
+    VerifyFilterTypes(options.enabled_runtime_filter_types,
+        {
+            TRuntimeFilterType::BLOOM,
+            TRuntimeFilterType::MIN_MAX
+        });
+  }
+  {
+    TQueryOptions options;
+    EXPECT_TRUE(SetQueryOption(KEY, "in_list,bloom", &options, nullptr).ok());
+    VerifyFilterTypes(options.enabled_runtime_filter_types,
+                      {
+                          TRuntimeFilterType::BLOOM,
+                          TRuntimeFilterType::IN_LIST
+                      });
+  }
+}
+
 // Tests for setting of MAX_RESULT_SPOOLING_MEM and
 // MAX_SPILLED_RESULT_SPOOLING_MEM. Setting of these options must maintain the
 // condition 'MAX_RESULT_SPOOLING_MEM <= MAX_SPILLED_RESULT_SPOOLING_MEM'.
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index afb9f1e..2030f3f 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -98,6 +98,24 @@ const string PrintQueryOptionValue(const impala::TCompressionCodec& compression_
   }
 }
 
+std::ostream& impala::operator<<(std::ostream& out,
+    const std::set<impala::TRuntimeFilterType::type>& filter_types) {
+  bool first = true;
+  for (const auto& t : filter_types) {
+    if (!first) out << ",";
+    out << t;
+    first = false;
+  }
+  return out;
+}
+
+const string PrintQueryOptionValue(
+    const set<impala::TRuntimeFilterType::type>& filter_types) {
+  stringstream val;
+  val << filter_types;
+  return val.str();
+}
+
 void impala::TQueryOptionsToMap(const TQueryOptions& query_options,
     map<string, string>* configuration) {
 #define QUERY_OPT_FN(NAME, ENUM, LEVEL)\
@@ -947,11 +965,23 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::ENABLED_RUNTIME_FILTER_TYPES: {
-        // Parse the enabled runtime filter types and validate it.
-        TEnabledRuntimeFilterTypes::type enum_type;
-        RETURN_IF_ERROR(GetThriftEnum(value, "enabled runtime filter types",
-            _TEnabledRuntimeFilterTypes_VALUES_TO_NAMES, &enum_type));
-        query_options->__set_enabled_runtime_filter_types(enum_type);
+        set<TRuntimeFilterType::type> filter_types;
+        if (iequals(value, "all")) {
+          for (const auto& kv : _TRuntimeFilterType_VALUES_TO_NAMES) {
+            filter_types.insert(static_cast<TRuntimeFilterType::type>(kv.first));
+          }
+        } else {
+          // Parse and verify the enabled runtime filter types.
+          vector<string> str_types;
+          split(str_types, value, is_any_of(","), token_compress_on);
+          for (const auto& t : str_types) {
+            TRuntimeFilterType::type filter_type;
+            RETURN_IF_ERROR(GetThriftEnum(t, "runtime filter type",
+                _TRuntimeFilterType_VALUES_TO_NAMES, &filter_type));
+            filter_types.insert(filter_type);
+          }
+        }
+        query_options->__set_enabled_runtime_filter_types(filter_types);
         break;
       }
       case TImpalaQueryOptions::ASYNC_CODEGEN: {
@@ -1160,6 +1190,17 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_orc_async_read(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT: {
+        StringParser::ParseResult result;
+        const int32_t limit =
+            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+        if (value == nullptr || result != StringParser::PARSE_SUCCESS || limit < 0) {
+          return Status(Substitute("Invalid runtime in-list filter entry limit '$0'. "
+              "Only integer value 0 and above is allowed.", value));
+        }
+        query_options->__set_runtime_in_list_filter_entry_limit(limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 9934555..10ebb77 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -31,6 +31,9 @@ namespace impala {
 
 class TQueryOptions;
 
+std::ostream& operator<<(std::ostream& out,
+    const std::set<impala::TRuntimeFilterType::type>& filter_types);
+
 // Maps query option names to option levels used for displaying the query
 // options via SET and SET ALL
 typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
@@ -47,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::ORC_ASYNC_READ+ 1);\
+      TImpalaQueryOptions::RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -272,6 +275,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(abort_java_udf_on_exception,\
       ABORT_JAVA_UDF_ON_EXCEPTION, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(orc_async_read, ORC_ASYNC_READ, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(runtime_in_list_filter_entry_limit,\
+      RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT, TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index e6d5fc7..52e6f1e 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -61,6 +61,8 @@ set(UTIL_SRCS
   histogram-metric.cc
   impalad-metrics.cc
   impala-bloom-filter-buffer-allocator.cc
+  in-list-filter.cc
+  in-list-filter-ir.cc
   jni-util.cc
   json-util.cc
   jwt-util.cc
@@ -260,4 +262,4 @@ ADD_UNIFIED_BE_LSAN_TEST(uid-util-test "UidUtil.*")
 ADD_BE_LSAN_TEST(webserver-test)
 TARGET_LINK_LIBRARIES(webserver-test mini_kdc)
 ADD_UNIFIED_BE_LSAN_TEST(zip-util-test "ZipUtilTest.*")
-ADD_UNIFIED_BE_LSAN_TEST(tagged-ptr-test "TaggedPtrTest.*")
\ No newline at end of file
+ADD_UNIFIED_BE_LSAN_TEST(tagged-ptr-test "TaggedPtrTest.*")
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 1fa1850..7344764 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -101,7 +101,6 @@ PRINT_THRIFT_ENUM_IMPL(TStmtType)
 PRINT_THRIFT_ENUM_IMPL(TUnit)
 PRINT_THRIFT_ENUM_IMPL(TParquetTimestampType)
 PRINT_THRIFT_ENUM_IMPL(TTransactionalType)
-PRINT_THRIFT_ENUM_IMPL(TEnabledRuntimeFilterTypes)
 PRINT_THRIFT_ENUM_IMPL(TMinmaxFilteringLevel)
 PRINT_THRIFT_ENUM_IMPL(TKuduReplicaSelection)
 PRINT_THRIFT_ENUM_IMPL(TMinmaxFilterFastCodePathMode)
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index edec713..5ae7074 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -79,7 +79,6 @@ std::string PrintThriftEnum(const TStmtType::type& value);
 std::string PrintThriftEnum(const TUnit::type& value);
 std::string PrintThriftEnum(const TParquetTimestampType::type& value);
 std::string PrintThriftEnum(const TTransactionalType::type& value);
-std::string PrintThriftEnum(const TEnabledRuntimeFilterTypes::type& value);
 std::string PrintThriftEnum(const TMinmaxFilteringLevel::type& value);
 std::string PrintThriftEnum(const TKuduReplicaSelection::type& value);
 std::string PrintThriftEnum(const TMinmaxFilterFastCodePathMode::type& value);
diff --git a/be/src/util/in-list-filter-ir.cc b/be/src/util/in-list-filter-ir.cc
new file mode 100644
index 0000000..d573954
--- /dev/null
+++ b/be/src/util/in-list-filter-ir.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/object-pool.h"
+#include "util/in-list-filter.h"
+
+namespace impala {
+
+void InListFilter::Insert(const void* val) {
+  if (always_true_) return;
+  if (UNLIKELY(val == nullptr)) {
+    contains_null_ = true;
+    return;
+  }
+  if (UNLIKELY(values_.size() >= entry_limit_ || str_values_.size() >= entry_limit_)) {
+    always_true_ = true;
+    values_.clear();
+    str_values_.clear();
+    return;
+  }
+  switch (type_) {
+    case TYPE_TINYINT:
+      values_.insert(*reinterpret_cast<const int8_t*>(val));
+      break;
+    case TYPE_SMALLINT:
+      values_.insert(*reinterpret_cast<const int16_t*>(val));
+      break;
+    case TYPE_INT:
+      values_.insert(*reinterpret_cast<const int32_t*>(val));
+      break;
+    case TYPE_BIGINT:
+      values_.insert(*reinterpret_cast<const int64_t*>(val));
+      break;
+    case TYPE_DATE:
+      values_.insert(reinterpret_cast<const DateValue*>(val)->Value());
+      break;
+    case TYPE_STRING:
+    case TYPE_VARCHAR: {
+      const StringValue* s = reinterpret_cast<const StringValue*>(val);
+      if (UNLIKELY(s->ptr == nullptr)) {
+        contains_null_ = true;
+      } else {
+        str_total_size_ += s->len;
+        if (str_total_size_ >= STRING_SET_MAX_TOTAL_LENGTH) {
+          always_true_ = true;
+          str_values_.clear();
+          return;
+        }
+        str_values_.insert(string(s->ptr, s->len));
+      }
+      break;
+    }
+    case TYPE_CHAR:
+      str_values_.insert(string(reinterpret_cast<const char*>(val), type_len_));
+      break;
+    default:
+      DCHECK(false) << "Not supported IN-list filter type: " << TypeToString(type_);
+      break;
+  }
+}
+} // namespace impala
diff --git a/be/src/util/in-list-filter.cc b/be/src/util/in-list-filter.cc
new file mode 100644
index 0000000..7dfc0ed
--- /dev/null
+++ b/be/src/util/in-list-filter.cc
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/in-list-filter.h"
+
+#include "common/object-pool.h"
+
+namespace impala {
+
+bool InListFilter::AlwaysFalse() {
+  return !always_true_ && !contains_null_ && values_.empty() && str_values_.empty();
+}
+
+bool InListFilter::AlwaysFalse(const InListFilterPB& filter) {
+  return !filter.always_true() && !filter.contains_null() && filter.value_size() == 0;
+}
+
+bool InListFilter::Find(void* val, const ColumnType& col_type) const noexcept {
+  if (always_true_) return true;
+  if (val == nullptr) return contains_null_;
+  DCHECK_EQ(type_, col_type.type);
+  int64_t v;
+  const StringValue* s;
+  switch (col_type.type) {
+    case TYPE_TINYINT:
+      v = *reinterpret_cast<const int8_t*>(val);
+      break;
+    case TYPE_SMALLINT:
+      v = *reinterpret_cast<const int16_t*>(val);
+      break;
+    case TYPE_INT:
+      v = *reinterpret_cast<const int32_t*>(val);
+      break;
+    case TYPE_BIGINT:
+      v = *reinterpret_cast<const int64_t*>(val);
+      break;
+    case TYPE_DATE:
+      v = reinterpret_cast<const DateValue*>(val)->Value();
+      break;
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+      s = reinterpret_cast<const StringValue*>(val);
+      return str_values_.find(string(s->ptr, s->len)) != str_values_.end();
+    case TYPE_CHAR:
+      return str_values_.find(string(reinterpret_cast<const char*>(val), col_type.len))
+          != str_values_.end();
+    default:
+      DCHECK(false) << "Not support IN-list filter type: " << TypeToString(type_);
+      return false;
+  }
+  return values_.find(v) != values_.end();
+}
+
+InListFilter::InListFilter(ColumnType type, uint32_t entry_limit, bool contains_null):
+  always_true_(false), contains_null_(contains_null), type_(type.type),
+  entry_limit_(entry_limit) {
+  if (type.type == TYPE_CHAR) type_len_ = type.len;
+}
+
+InListFilter* InListFilter::Create(ColumnType type, uint32_t entry_limit,
+    ObjectPool* pool) {
+  return pool->Add(new InListFilter(type, entry_limit));
+}
+
+InListFilter* InListFilter::Create(const InListFilterPB& protobuf, ColumnType type,
+    uint32_t entry_limit, ObjectPool* pool) {
+  InListFilter* filter = pool->Add(
+      new InListFilter(type, entry_limit, protobuf.contains_null()));
+  filter->always_true_ = protobuf.always_true();
+  for (const ColumnValuePB& v : protobuf.value()) {
+    switch (type.type) {
+      case TYPE_TINYINT:
+      case TYPE_SMALLINT:
+      case TYPE_INT:
+      case TYPE_BIGINT:
+      case TYPE_DATE:
+        DCHECK(v.has_long_val());
+        filter->values_.insert(v.long_val());
+        break;
+      case TYPE_STRING:
+      case TYPE_CHAR:
+      case TYPE_VARCHAR:
+        DCHECK(v.has_string_val());
+        // TODO(IMPALA-11143): use mem_tracker
+        filter->str_values_.insert(v.string_val());
+        break;
+      default:
+        DCHECK(false) << "Not support IN-list filter type: " << TypeToString(type.type);
+        return nullptr;
+    }
+  }
+  if (type.IsStringType()) {
+    DCHECK(filter->values_.empty());
+  } else {
+    DCHECK(filter->str_values_.empty());
+  }
+  return filter;
+}
+
+void InListFilter::ToProtobuf(const InListFilter* filter, InListFilterPB* protobuf) {
+  DCHECK(protobuf != nullptr);
+  if (filter == nullptr) {
+    protobuf->set_always_true(true);
+    return;
+  }
+  filter->ToProtobuf(protobuf);
+}
+
+void InListFilter::ToProtobuf(InListFilterPB* protobuf) const {
+  protobuf->set_always_true(always_true_);
+  if (always_true_) return;
+  protobuf->set_contains_null(contains_null_);
+  if (type_ == TYPE_STRING || type_ == TYPE_VARCHAR || type_ == TYPE_CHAR) {
+    for (const string& s : str_values_) {
+      ColumnValuePB* proto = protobuf->add_value();
+      proto->set_string_val(s);
+    }
+  } else {
+    for (int64_t v : values_) {
+      ColumnValuePB* proto = protobuf->add_value();
+      proto->set_long_val(v);
+    }
+  }
+}
+
+int InListFilter::NumItems() const noexcept {
+  int res = contains_null_ ? 1 : 0;
+  if (type_ == TYPE_STRING || type_ == TYPE_VARCHAR || type_ == TYPE_CHAR) {
+    return res + str_values_.size();
+  }
+  return res + values_.size();
+}
+
+string InListFilter::DebugString() const noexcept {
+  std::stringstream ss;
+  bool first_value = true;
+  ss << "IN-list filter: [";
+  if (type_ == TYPE_STRING) {
+    for (const string &s : str_values_) {
+      if (first_value) {
+        first_value = false;
+      } else {
+        ss << ',';
+      }
+      ss << "\"" << s << "\"";
+    }
+  } else {
+    for (int64_t v : values_) {
+      if (first_value) {
+        first_value = false;
+      } else {
+        ss << ',';
+      }
+      ss << v;
+    }
+  }
+  if (contains_null_) {
+    if (!first_value) ss << ',';
+    ss << "NULL";
+  }
+  ss << ']';
+  return ss.str();
+}
+
+string InListFilter::DebugString(const InListFilterPB& filter) {
+  std::stringstream ss;
+  ss << "IN-list filter: " << DebugStringOfList(filter);
+  return ss.str();
+}
+
+string InListFilter::DebugStringOfList(const InListFilterPB& filter) {
+  std::stringstream ss;
+  ss << "[";
+  bool first_value = true;
+  for (const ColumnValuePB& v : filter.value()) {
+    if (first_value) {
+      first_value = false;
+    } else {
+      ss << ',';
+    }
+    if (v.has_byte_val()) {
+      ss << v.byte_val();
+    } else if (v.has_short_val()) {
+      ss << v.short_val();
+    } else if (v.has_int_val()) {
+      ss << v.int_val();
+    } else if (v.has_long_val()) {
+      ss << v.long_val();
+    } else if (v.has_date_val()) {
+      ss << v.date_val();
+    } else if (v.has_string_val()) {
+      ss << "\"" << v.string_val() << "\"";
+    }
+  }
+  ss << ']';
+  return ss.str();
+}
+
+} // namespace impala
diff --git a/be/src/util/in-list-filter.h b/be/src/util/in-list-filter.h
new file mode 100644
index 0000000..5a209fe
--- /dev/null
+++ b/be/src/util/in-list-filter.h
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "impala-ir/impala-ir-functions.h"
+#include "runtime/date-value.h"
+#include "runtime/decimal-value.h"
+#include "runtime/string-buffer.h"
+#include "runtime/string-value.h"
+#include "runtime/timestamp-value.h"
+#include "runtime/types.h"
+
+namespace impala {
+
+class InListFilter {
+ public:
+  /// Upper bound of total length for the string set to avoid it explodes.
+  /// TODO: find a better value based on the implementation of ORC lib, or make this
+  /// configurable.
+  const static uint32_t STRING_SET_MAX_TOTAL_LENGTH = 4 * 1024 * 1024;
+
+  InListFilter(ColumnType type, uint32_t entry_limit, bool contains_null = false);
+  ~InListFilter() {}
+  void Close() {}
+
+  /// Add a new value to the list.
+  void Insert(const void* val);
+
+  std::string DebugString() const noexcept;
+
+  bool ContainsNull() { return contains_null_; }
+  bool AlwaysTrue() { return always_true_; }
+  bool AlwaysFalse();
+  static bool AlwaysFalse(const InListFilterPB& filter);
+
+  /// Makes this filter always return true.
+  void SetAlwaysTrue() { always_true_ = true; }
+
+  bool Find(void* val, const ColumnType& col_type) const noexcept;
+  int NumItems() const noexcept;
+
+  /// Returns a new InListFilter with the given type, allocated from 'mem_tracker'.
+  static InListFilter* Create(ColumnType type, uint32_t entry_limit, ObjectPool* pool);
+
+  /// Returns a new InListFilter created from the protobuf representation, allocated from
+  /// 'mem_tracker'.
+  static InListFilter* Create(const InListFilterPB& protobuf, ColumnType type,
+      uint32_t entry_limit, ObjectPool* pool);
+
+  /// Converts 'filter' to its corresponding Protobuf representation.
+  /// If the first argument is NULL, it is interpreted as a complete filter which
+  /// contains all elements, i.e. always true.
+  static void ToProtobuf(const InListFilter* filter, InListFilterPB* protobuf);
+
+  /// Returns the LLVM_CLASS_NAME for this base class 'InListFilter'.
+  static const char* LLVM_CLASS_NAME;
+
+  /// Return a debug string for 'filter'
+  static std::string DebugString(const InListFilterPB& filter);
+  /// Return a debug string for the list of the 'filter'
+  static std::string DebugStringOfList(const InListFilterPB& filter);
+
+ private:
+  friend class HdfsOrcScanner;
+  void ToProtobuf(InListFilterPB* protobuf) const;
+
+  bool always_true_;
+  bool contains_null_;
+  PrimitiveType type_;
+  // Type len for CHAR type.
+  int type_len_;
+  /// Value set for all numeric types. Use int64_t for simplicity.
+  /// TODO(IMPALA-11141): use the exact type to save memory space.
+  std::unordered_set<int64_t> values_;
+  /// Value set for all string types.
+  std::unordered_set<std::string> str_values_;
+  uint32_t str_total_size_ = 0;
+  uint32_t entry_limit_;
+};
+}
+
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index ee82832..6b9993f 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -106,6 +106,12 @@ message MinMaxFilterPB {
   optional ColumnValuePB max = 4;
 }
 
+message InListFilterPB {
+  optional bool always_true = 1;
+  optional bool contains_null = 2;
+  repeated ColumnValuePB value = 3;
+}
+
 message UpdateFilterParamsPB {
   // Filter ID, unique within a query.
   optional int32 filter_id = 1;
@@ -116,6 +122,8 @@ message UpdateFilterParamsPB {
   optional BloomFilterPB bloom_filter = 3;
 
   optional MinMaxFilterPB min_max_filter = 4;
+
+  optional InListFilterPB in_list_filter = 5;
 }
 
 message UpdateFilterResultPB {
@@ -137,6 +145,9 @@ message PublishFilterParamsPB {
 
   // Actual min_max_filter payload
   optional MinMaxFilterPB min_max_filter = 4;
+
+  // Actual in_list_filter payload
+  optional InListFilterPB in_list_filter = 5;
 }
 
 message PublishFilterResultPB {
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 9e5b1be..fdb0ec1 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -719,8 +719,11 @@ enum TImpalaQueryOptions {
   // warning will be logged if the Java UDF throws an exception.
   ABORT_JAVA_UDF_ON_EXCEPTION = 140;
 
-  // Indicates whether to use ORC's search argument to push down predicates.
+  // Indicates whether to use ORC's async read.
   ORC_ASYNC_READ = 141
+
+  // Maximum number of distinct entries in a runtime in-list filter.
+  RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT = 142;
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c9afb34..5883d15 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -129,13 +129,7 @@ struct TRuntimeFilterTargetDesc {
 enum TRuntimeFilterType {
   BLOOM = 0
   MIN_MAX = 1
-}
-
-// Enabled runtime filter types to be applied to scan nodes.
-enum TEnabledRuntimeFilterTypes {
-  BLOOM = 1
-  MIN_MAX = 2
-  ALL = 3
+  IN_LIST = 2
 }
 
 // The level of filtering of enabled min/max filters to be applied to Parquet scan nodes.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 9711c7e..3e8b0b9 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -446,8 +446,8 @@ struct TQueryOptions {
   103: optional bool retry_failed_queries = false;
 
   // See comment in ImpalaService.thrift
-  104: optional PlanNodes.TEnabledRuntimeFilterTypes enabled_runtime_filter_types =
-      PlanNodes.TEnabledRuntimeFilterTypes.ALL;
+  104: optional set<PlanNodes.TRuntimeFilterType> enabled_runtime_filter_types =
+      [PlanNodes.TRuntimeFilterType.BLOOM, PlanNodes.TRuntimeFilterType.MIN_MAX];
 
   // See comment in ImpalaService.thrift
   105: optional bool async_codegen = false;
@@ -574,6 +574,9 @@ struct TQueryOptions {
 
   // Indicates whether to use ORC's async read.
   142: optional bool orc_async_read = true;
+
+  // See comment in ImpalaService.thrift
+  143: optional i32 runtime_in_list_filter_entry_limit = 1024;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 2597869..bd38c8a 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -18,6 +18,7 @@
 package org.apache.impala.planner;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -47,7 +48,9 @@ import org.apache.impala.analysis.TupleIsNullPredicate;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.KuduColumn;
+import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -57,7 +60,6 @@ import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TColumnValue;
-import org.apache.impala.thrift.TEnabledRuntimeFilterTypes;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterDesc;
@@ -107,6 +109,14 @@ public final class RuntimeFilterGenerator {
   // Should be in sync with corresponding values in runtime-filter-bank.cc.
   private static final long MIN_BLOOM_FILTER_SIZE = 4 * 1024;
   private static final long MAX_BLOOM_FILTER_SIZE = 512 * 1024 * 1024;
+  // Should be in sync with the corresponding value in in-list-filter.h.
+  private static final long IN_LIST_FILTER_STRING_SET_MAX_TOTAL_LENGTH = 4 * 1024 * 1024;
+
+  private static final Set<PrimitiveType> IN_LIST_FILTER_SUPPORTED_TYPES =
+      new HashSet<>(Arrays.asList(
+          PrimitiveType.TINYINT, PrimitiveType.SMALLINT, PrimitiveType.INT,
+          PrimitiveType.BIGINT, PrimitiveType.DATE, PrimitiveType.STRING,
+          PrimitiveType.CHAR, PrimitiveType.VARCHAR));
 
   // Map of base table tuple ids to a list of runtime filters that
   // can be applied at the corresponding scan nodes.
@@ -119,21 +129,24 @@ public final class RuntimeFilterGenerator {
 
   /**
    * Internal class that encapsulates the max, min and default sizes used for creating
-   * bloom filter objects.
+   * bloom filter objects, and entry limit for in-list filters.
    */
   private class FilterSizeLimits {
-    // Maximum filter size, in bytes, rounded up to a power of two.
+    // Maximum bloom filter size, in bytes, rounded up to a power of two.
     public final long maxVal;
 
-    // Minimum filter size, in bytes, rounded up to a power of two.
+    // Minimum bloom filter size, in bytes, rounded up to a power of two.
     public final long minVal;
 
-    // Pre-computed default filter size, in bytes, rounded up to a power of two.
+    // Pre-computed default bloom filter size, in bytes, rounded up to a power of two.
     public final long defaultVal;
 
-    // Target false positive probability, between 0 and 1 exclusive.
+    // Target false positive probability for bloom filters, between 0 and 1 exclusive.
     public final double targetFpp;
 
+    // Maximum entry size for in-list filters.
+    public final long inListFilterEntryLimit;
+
     public FilterSizeLimits(TQueryOptions tQueryOptions) {
       // Round up all limits to a power of two and make sure filter size is more
       // than the min buffer size that can be allocated by the buffer pool.
@@ -156,15 +169,17 @@ public final class RuntimeFilterGenerator {
       targetFpp = tQueryOptions.isSetRuntime_filter_error_rate() ?
           tQueryOptions.getRuntime_filter_error_rate() :
           BackendConfig.INSTANCE.getMaxFilterErrorRate();
+
+      inListFilterEntryLimit = tQueryOptions.getRuntime_in_list_filter_entry_limit();
     }
-  };
+  }
 
-  // Contains size limits for bloom filters.
-  private FilterSizeLimits bloomFilterSizeLimits_;
+  // Contains size limits for runtime filters.
+  final private FilterSizeLimits filterSizeLimits_;
 
   private RuntimeFilterGenerator(TQueryOptions tQueryOptions) {
-    bloomFilterSizeLimits_ = new FilterSizeLimits(tQueryOptions);
-  };
+    filterSizeLimits_ = new FilterSizeLimits(tQueryOptions);
+  }
 
   /**
    * Internal representation of a runtime filter. A runtime filter is generated from
@@ -372,13 +387,22 @@ public final class RuntimeFilterGenerator {
       Preconditions.checkNotNull(idGen);
       Preconditions.checkNotNull(joinPredicate);
       Preconditions.checkNotNull(filterSrcNode);
-      // Only consider binary equality predicates under hash joins
-      if (type == TRuntimeFilterType.BLOOM) {
+      // Only consider binary equality predicates under hash joins for runtime bloom
+      // filters and in-list filters.
+      if (type == TRuntimeFilterType.BLOOM || type == TRuntimeFilterType.IN_LIST) {
         if (!Predicate.isEquivalencePredicate(joinPredicate)
             || filterSrcNode instanceof NestedLoopJoinNode) {
           return null;
         }
       }
+      if (type == TRuntimeFilterType.IN_LIST) {
+        PrimitiveType lhsType = joinPredicate.getChild(0).getType().getPrimitiveType();
+        PrimitiveType rhsType = joinPredicate.getChild(1).getType().getPrimitiveType();
+        Preconditions.checkState(lhsType == rhsType, "Unanalyzed equivalence pred!");
+        if (!IN_LIST_FILTER_SUPPORTED_TYPES.contains(lhsType)) {
+          return null;
+        }
+      }
 
       BinaryPredicate normalizedJoinConjunct = null;
       if (type == TRuntimeFilterType.MIN_MAX) {
@@ -647,13 +671,24 @@ public final class RuntimeFilterGenerator {
     }
 
     /**
-     * Sets the filter size (in bytes) required for a bloom filter to achieve the
-     * configured maximum false-positive rate based on the expected NDV. Also bounds the
-     * filter size between the max and minimum filter sizes supplied to it by
-     * 'filterSizeLimits'.
+     * Sets the filter size (in bytes) based on the filter type.
+     * For bloom filters, the size should be able to achieve the configured maximum
+     * false-positive rate based on the expected NDV. Also bounds the filter size between
+     * the max and minimum filter sizes supplied to it by 'filterSizeLimits'.
+     * For min-max filters, we ignore the size since each filter only keeps two values.
+     * For in-list filters, the size is calculated based on the data types.
      */
     private void calculateFilterSize(FilterSizeLimits filterSizeLimits) {
       if (type_ == TRuntimeFilterType.MIN_MAX) return;
+      if (type_ == TRuntimeFilterType.IN_LIST) {
+        if (srcExpr_.getType().isStringType()) {
+          filterSizeBytes_ = IN_LIST_FILTER_STRING_SET_MAX_TOTAL_LENGTH;
+        } else {
+          // We currently use int64_t(8 bytes) as entry items for all numeric types.
+          filterSizeBytes_ = filterSizeLimits.inListFilterEntryLimit * 8;
+        }
+        return;
+      }
       if (ndvEstimate_ == -1) {
         filterSizeBytes_ = filterSizeLimits.defaultVal;
         return;
@@ -721,8 +756,13 @@ public final class RuntimeFilterGenerator {
         if (numBloomFilters >= maxNumBloomFilters) continue;
         ++numBloomFilters;
       }
-      filter.setIsBroadcast(
-          filter.src_.getDistributionMode() == DistributionMode.BROADCAST);
+      DistributionMode distMode = filter.src_.getDistributionMode();
+      filter.setIsBroadcast(distMode == DistributionMode.BROADCAST);
+      if (filter.getType() == TRuntimeFilterType.IN_LIST
+          && distMode == DistributionMode.PARTITIONED) {
+        LOG.trace("Skip IN-list filter on partitioned join: {}", filter.debugString());
+        continue;
+      }
       filter.computeHasLocalTargets();
       if (LOG.isTraceEnabled()) LOG.trace("Runtime filter: " + filter.debugString());
       filter.assignToPlanNodes();
@@ -762,19 +802,6 @@ public final class RuntimeFilterGenerator {
   }
 
   /**
-   * Returns true if filter type 'filterType' is enabled in the context of the enabled
-   * runtime types 'enabledRuntimeFilterTypes'. Return false otherwise.
-   */
-  public boolean isRuntimeFilterTypeEnabled(TRuntimeFilterType filterType,
-      TEnabledRuntimeFilterTypes enabledRuntimeFilterTypes) {
-    if (enabledRuntimeFilterTypes == TEnabledRuntimeFilterTypes.ALL) return true;
-    return (filterType == TRuntimeFilterType.BLOOM
-            && enabledRuntimeFilterTypes == TEnabledRuntimeFilterTypes.BLOOM
-        || filterType == TRuntimeFilterType.MIN_MAX
-            && enabledRuntimeFilterTypes == TEnabledRuntimeFilterTypes.MIN_MAX);
-  }
-
-  /**
    * Generates the runtime filters for a query by recursively traversing the distributed
    * plan tree rooted at 'root'. In the top-down traversal of the plan tree, candidate
    * runtime filters are generated from equi-join predicates assigned to hash-join nodes.
@@ -796,14 +823,14 @@ public final class RuntimeFilterGenerator {
       joinConjuncts.addAll(joinNode.getConjuncts());
 
       List<RuntimeFilter> filters = new ArrayList<>();
-      TEnabledRuntimeFilterTypes enabledRuntimeFilterTypes =
+      Set<TRuntimeFilterType> enabledRuntimeFilterTypes =
           ctx.getQueryOptions().getEnabled_runtime_filter_types();
       for (TRuntimeFilterType filterType : TRuntimeFilterType.values()) {
-        if (!isRuntimeFilterTypeEnabled(filterType, enabledRuntimeFilterTypes)) continue;
+        if (!enabledRuntimeFilterTypes.contains(filterType)) continue;
         for (Expr conjunct : joinConjuncts) {
           RuntimeFilter filter =
               RuntimeFilter.create(filterIdGenerator, ctx.getRootAnalyzer(), conjunct,
-                  joinNode, filterType, bloomFilterSizeLimits_,
+                  joinNode, filterType, filterSizeLimits_,
                   /* isTimestampTruncation */ false);
           if (filter != null) {
             registerRuntimeFilter(filter);
@@ -817,7 +844,7 @@ public final class RuntimeFilterGenerator {
               && conjunct.getChild(1).getType().isTimestamp()) {
             RuntimeFilter filter2 =
                 RuntimeFilter.create(filterIdGenerator, ctx.getRootAnalyzer(), conjunct,
-                    joinNode, filterType, bloomFilterSizeLimits_,
+                    joinNode, filterType, filterSizeLimits_,
                     /* isTimestampTruncation */ true);
             if (filter2 == null) continue;
             registerRuntimeFilter(filter2);
@@ -902,8 +929,8 @@ public final class RuntimeFilterGenerator {
    *    to 'scanNode' if the filter is produced within the same fragment that contains the
    *    scan node.
    * 3. Only Hdfs and Kudu scan nodes are supported:
-   *     a. If the target is an HdfsScanNode, the filter must be type BLOOM for non
-   *        Parquet tables, or type BLOOM and/or MIN_MAX for Parquet tables.
+   *     a. If the target is an HdfsScanNode, the filter must be type BLOOM/IN_LIST for
+   *        non Parquet tables, or type BLOOM/MIN_MAX/IN_LIST for Parquet tables.
    *     b. If the target is a KuduScanNode, the filter could be type MIN_MAX, and/or
    *        BLOOM, the target must be a slot ref on a column, and the comp op cannot
    *        be 'not distinct'.
@@ -921,7 +948,7 @@ public final class RuntimeFilterGenerator {
         ctx.getQueryOptions().isDisable_row_runtime_filtering();
     boolean enable_overlap_filter = enableOverlapFilter(ctx.getQueryOptions());
     TRuntimeFilterMode runtimeFilterMode = ctx.getQueryOptions().getRuntime_filter_mode();
-    TEnabledRuntimeFilterTypes enabledRuntimeFilterTypes =
+    Set<TRuntimeFilterType> enabledRuntimeFilterTypes =
         ctx.getQueryOptions().getEnabled_runtime_filter_types();
 
     // Init the overlap predicate for the hdfs scan node.
@@ -947,32 +974,30 @@ public final class RuntimeFilterGenerator {
           continue;
         }
         if (filter.getType() == TRuntimeFilterType.MIN_MAX) {
-          boolean allow_min_max =
-              enabledRuntimeFilterTypes == TEnabledRuntimeFilterTypes.MIN_MAX
-              || enabledRuntimeFilterTypes == TEnabledRuntimeFilterTypes.ALL;
-          if (!allow_min_max) {
+          Preconditions.checkState(
+              enabledRuntimeFilterTypes.contains(TRuntimeFilterType.MIN_MAX),
+              "MIN_MAX filters should not be generated");
+          if (!enable_overlap_filter) continue;
+          // Try to compute an overlap predicate for the filter. This predicate will be
+          // used to filter out partitions, or row groups, pages or rows in Parquet data
+          // files.
+          if (!((HdfsScanNode) scanNode)
+                   .tryToComputeOverlapPredicate(
+                       analyzer, filter, targetExpr, isBoundByPartitionColumns)) {
             continue;
           }
-          if (enable_overlap_filter) {
-            // Try to compute an overlap predicate for the filter. This predicate will be
-            // used to filter out partitions, or row groups, pages or rows in Parquet data
-            // files.
-            if (!((HdfsScanNode) scanNode)
-                     .tryToComputeOverlapPredicate(
-                         analyzer, filter, targetExpr, isBoundByPartitionColumns)) {
-              continue;
-            }
-          } else {
+        } else if (filter.getType() == TRuntimeFilterType.IN_LIST) {
+          // Only assign IN-list filters on ORC tables.
+          if (!((HdfsScanNode) scanNode).getFileFormats().contains(HdfsFileFormat.ORC)) {
             continue;
           }
         }
       } else {
-        Preconditions.checkState(scanNode instanceof KuduScanNode);
+        // assign filters to KuduScanNode
         if (filter.getType() == TRuntimeFilterType.BLOOM) {
-          if (enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.BLOOM
-              && enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.ALL) {
-            continue;
-          }
+          Preconditions.checkState(
+              enabledRuntimeFilterTypes.contains(TRuntimeFilterType.BLOOM),
+              "BLOOM filters should not be generated!");
           // TODO: Support Kudu VARCHAR Bloom Filter
           if (targetExpr.getType().isVarchar()) continue;
           // Kudu only supports targeting a single column, not general exprs, so the
@@ -987,12 +1012,10 @@ public final class RuntimeFilterGenerator {
           }
           SlotRef slotRef = (SlotRef) targetExpr;
           if (slotRef.getDesc().getColumn() == null) continue;
-        } else {
-          Preconditions.checkState(filter.getType() == TRuntimeFilterType.MIN_MAX);
-          if (enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.MIN_MAX
-              && enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.ALL) {
-            continue;
-          }
+        } else if (filter.getType() == TRuntimeFilterType.MIN_MAX) {
+          Preconditions.checkState(
+              enabledRuntimeFilterTypes.contains(TRuntimeFilterType.MIN_MAX),
+              "MIN_MAX filters should not be generated!");
           // TODO: IMPALA-9580: Support Kudu VARCHAR Min/Max Filters
           if (targetExpr.getType().isVarchar()) continue;
           SlotRef slotRef = targetExpr.unwrapSlotRef(true);
@@ -1006,6 +1029,13 @@ public final class RuntimeFilterGenerator {
               || filter.getExprCompOp() == Operator.NOT_DISTINCT) {
             continue;
           }
+        } else {
+          Preconditions.checkState(filter.getType() == TRuntimeFilterType.IN_LIST);
+          Preconditions.checkState(
+              enabledRuntimeFilterTypes.contains(TRuntimeFilterType.IN_LIST),
+              "IN_LIST filters should not be generated!");
+          // TODO(IMPALA-11066): Runtime IN-list filters for Kudu tables
+          continue;
         }
       }
       TColumnValue lowValue = null;
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 22c7dd0..2b8bccd 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -32,7 +32,7 @@ import org.apache.impala.datagenerator.HBaseTestDataRegionAssignment;
 import org.apache.impala.service.Frontend.PlanCtx;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.testutil.TestUtils.IgnoreValueFilter;
-import org.apache.impala.thrift.TEnabledRuntimeFilterTypes;
+import org.apache.impala.thrift.TRuntimeFilterType;
 import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TJoinDistributionMode;
@@ -612,13 +612,16 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile("disable-runtime-overlap-filter", options);
 
     options.setMinmax_filter_threshold(1.0);
-    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.BLOOM);
+    options.unsetEnabled_runtime_filter_types();
+    options.addToEnabled_runtime_filter_types(TRuntimeFilterType.BLOOM);
     runPlannerTestFile("disable-runtime-overlap-filter", options);
   }
 
   @Test
   public void testRuntimeFilterQueryOptions() {
-    runPlannerTestFile("runtime-filter-query-options");
+    runPlannerTestFile("runtime-filter-query-options",
+        ImmutableSet.of(
+            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
   @Test
@@ -661,7 +664,9 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testKudu() {
     TQueryOptions options = defaultQueryOptions();
-    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.ALL);
+    options.unsetEnabled_runtime_filter_types();
+    options.addToEnabled_runtime_filter_types(TRuntimeFilterType.BLOOM);
+    options.addToEnabled_runtime_filter_types(TRuntimeFilterType.MIN_MAX);
     addTestDb("kudu_planner_test", "Test DB for Kudu Planner.");
     addTestTable("CREATE EXTERNAL TABLE kudu_planner_test.no_stats STORED AS KUDU " +
         "TBLPROPERTIES ('kudu.table_name' = 'impala::functional_kudu.alltypes');");
@@ -676,7 +681,9 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testKuduUpdate() {
     TQueryOptions options = defaultQueryOptions();
-    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.ALL);
+    options.unsetEnabled_runtime_filter_types();
+    options.addToEnabled_runtime_filter_types(TRuntimeFilterType.BLOOM);
+    options.addToEnabled_runtime_filter_types(TRuntimeFilterType.MIN_MAX);
     runPlannerTestFile("kudu-update", options);
   }
 
@@ -706,7 +713,9 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testKuduTpch() {
     TQueryOptions options = defaultQueryOptions();
-    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.ALL);
+    options.unsetEnabled_runtime_filter_types();
+    options.addToEnabled_runtime_filter_types(TRuntimeFilterType.BLOOM);
+    options.addToEnabled_runtime_filter_types(TRuntimeFilterType.MIN_MAX);
     runPlannerTestFile("tpch-kudu", options,
         ImmutableSet.of(PlannerTestOption.INCLUDE_RESOURCE_HEADER,
             PlannerTestOption.VALIDATE_RESOURCES));
@@ -895,7 +904,8 @@ public class PlannerTest extends PlannerTestBase {
     TQueryOptions options = defaultQueryOptions();
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setDisable_hdfs_num_rows_estimate(false);
-    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.MIN_MAX);
+    options.unsetEnabled_runtime_filter_types();
+    options.addToEnabled_runtime_filter_types(TRuntimeFilterType.MIN_MAX);
     runPlannerTestFile("min-max-runtime-filters-hdfs-num-rows-est-enabled", options,
         ImmutableSet.of(
             PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
@@ -907,7 +917,8 @@ public class PlannerTest extends PlannerTestBase {
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setDisable_hdfs_num_rows_estimate(true);
     options.setMinmax_filter_partition_columns(false);
-    options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.MIN_MAX);
+    options.unsetEnabled_runtime_filter_types();
+    options.addToEnabled_runtime_filter_types(TRuntimeFilterType.MIN_MAX);
     runPlannerTestFile("min-max-runtime-filters", options);
   }
 
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 36d5dff..a6e55e1 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -271,6 +271,21 @@ FROM {db_name}.{table_name};
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+alltypestiny_negative
+---- CREATE
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
+LIKE {db_name}{db_suffix}.alltypestiny STORED AS {file_format};
+---- DEPENDENT_LOAD_HIVE
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} partition (year, month)
+SELECT id, bool_col,
+       -tinyint_col, -smallint_col, -int_col, -bigint_col, -float_col, -double_col,
+       date_string_col, 'x', timestamp_col, year, month
+FROM functional.alltypestiny
+WHERE int_col = 1;
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 alltypesinsert
 ---- CREATE
 CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 5d40a4c..d34334e 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -319,3 +319,6 @@ table_name:alltypessmall_bool_sorted, constraint:restrict_to, table_format:orc/d
 
 table_name:complextypes_arrays_only_view, constraint:restrict_to, table_format:parquet/none/none
 table_name:complextypes_arrays_only_view, constraint:restrict_to, table_format:orc/def/block
+
+# 'alltypestiny_negative' only used in ORC tests.
+table_name:alltypestiny_negative, constraint:restrict_to, table_format:orc/def/block
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
index 30ff98c..87ae8c4 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
@@ -781,3 +781,152 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=4B cardinality=7.30K
    in pipelines: 00(GETNEXT)
 ====
+# ENABLED_RUNTIME_FILTER_TYPES is set as IN_LIST, IN-list filter is assigned
+# to ORC.
+select /* +straight_join */ count(*) from functional_orc_def.alltypes a
+  join /* +broadcast */ functional_orc_def.alltypestiny b on a.id = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=IN_LIST
+DISABLE_ROW_RUNTIME_FILTERING=false
+EXPLAIN_LEVEL=2
+---- DISTRIBUTEDPLAN
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 06(GETNEXT), 03(OPEN)
+|
+05:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=33.96MB mem-reservation=1.96MB thread-reservation=2 runtime-filters-memory=8.00KB
+03:AGGREGATE
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.id = b.id
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[in_list] <- b.id
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=3.43K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--04:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=1 row-size=4B cardinality=353
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=2
+|  01:SCAN HDFS [functional_orc_def.alltypestiny b, RANDOM]
+|     HDFS partitions=4/4 files=4 size=5.55KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/4 rows=353
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=353
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [functional_orc_def.alltypes a, RANDOM]
+   HDFS partitions=24/24 files=24 size=53.97KB
+   runtime filters: RF000[in_list] -> a.id
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=3.43K
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=3.43K
+   in pipelines: 00(GETNEXT)
+====
+# ENABLED_RUNTIME_FILTER_TYPES is set as IN_LIST. Make sure no IN-list filters
+# is generated for partitioned join.
+select /* +straight_join */ count(*) from functional_orc_def.alltypes a
+  join /* +shuffle*/ functional_orc_def.alltypestiny b on a.id = b.id
+---- QUERYOPTIONS
+ENABLED_RUNTIME_FILTER_TYPES=IN_LIST
+DISABLE_ROW_RUNTIME_FILTERING=false
+EXPLAIN_LEVEL=2
+---- DISTRIBUTEDPLAN
+F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|
+07:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 03(OPEN)
+|
+06:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(a.id)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=1.99MB mem-reservation=1.94MB thread-reservation=1
+03:AGGREGATE
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.id = b.id
+|  fk/pk conjuncts: assumed fk/pk
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=8B cardinality=3.43K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--05:EXCHANGE [HASH(b.id)]
+|  |  mem-estimate=28.47KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=1 row-size=4B cardinality=3.43K
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=2
+|  01:SCAN HDFS [functional_orc_def.alltypestiny b, RANDOM]
+|     HDFS partitions=4/4 files=4 size=5.55KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/4 rows=353
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=353
+|     in pipelines: 01(GETNEXT)
+|
+04:EXCHANGE [HASH(a.id)]
+|  mem-estimate=28.47KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=4B cardinality=3.43K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=2
+00:SCAN HDFS [functional_orc_def.alltypes a, RANDOM]
+   HDFS partitions=24/24 files=24 size=53.97KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=3.43K
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=3.43K
+   in pipelines: 00(GETNEXT)
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/in_list_filters.test b/testdata/workloads/functional-query/queries/QueryTest/in_list_filters.test
new file mode 100644
index 0000000..cfe71f1
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/in_list_filters.test
@@ -0,0 +1,173 @@
+====
+---- QUERY
+# Test IN-list filter on partition columns.
+# There are 24 partitions and 24 files in alltypes. 22 of them will be filtered out.
+# Expect 22 / 3 = 7 files be rejected per scan fragment.
+select STRAIGHT_JOIN count(*) from alltypes p join [BROADCAST] alltypestiny b
+on p.month = b.int_col and b.month = 1 and b.string_col = "1"
+---- RESULTS
+620
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 1 items.*
+row_regex: .*Files rejected: 7 \(7\).*
+====
+---- QUERY
+# Test two hop IN-list filters on partition columns.
+# "c.month = 13" won't match any rows in alltypestiny. Expect all files be rejected.
+select STRAIGHT_JOIN count(*) from alltypes a
+    join [BROADCAST] alltypes b
+    join [BROADCAST] alltypestiny c
+    where c.month = 13 and b.year = c.year and a.month = b.month
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 0 items.*
+row_regex: .*Filter 1 arrival with 0 items.*
+row_regex: .*Files rejected: 8 \(8\).*
+====
+---- QUERY
+# Test IN-list filter on string column.
+select STRAIGHT_JOIN count(*) from alltypes a
+    join [BROADCAST] alltypestiny_negative b
+    where a.string_col = b.string_col
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 1 items.*
+row_regex: .*NumPushedDownRuntimeFilters: 1 \(1\).*
+row_regex: .*RowsRead: 0 \(0\).*
+====
+---- QUERY
+# Test IN-list filter on empty strings.
+select STRAIGHT_JOIN count(*) from alltypes a
+    join [BROADCAST] nulltable n
+    where a.string_col = n.b;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 1 items.*
+row_regex: .*NumPushedDownRuntimeFilters: 1 \(1\).*
+row_regex: .*RowsRead: 0 \(0\).*
+====
+---- QUERY
+# Test IN-list filter on tinyint column.
+select STRAIGHT_JOIN count(*) from alltypes a
+    join [BROADCAST] alltypestiny_negative b
+    where a.tinyint_col = b.tinyint_col
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 1 items.*
+row_regex: .*NumPushedDownRuntimeFilters: 1 \(1\).*
+row_regex: .*RowsRead: 0 \(0\).*
+====
+---- QUERY
+# Test IN-list filter on smallint column.
+select STRAIGHT_JOIN count(*) from alltypes a
+    join [BROADCAST] alltypestiny_negative b
+    where a.smallint_col = b.smallint_col
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 1 items.*
+row_regex: .*NumPushedDownRuntimeFilters: 1 \(1\).*
+row_regex: .*RowsRead: 0 \(0\).*
+====
+---- QUERY
+# Test IN-list filter on int column.
+select STRAIGHT_JOIN count(*) from alltypes a
+    join [BROADCAST] alltypestiny_negative b
+    where a.int_col = b.int_col
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 1 items.*
+row_regex: .*NumPushedDownRuntimeFilters: 1 \(1\).*
+row_regex: .*RowsRead: 0 \(0\).*
+====
+---- QUERY
+# Test IN-list filter on bigint column.
+select STRAIGHT_JOIN count(*) from alltypes a
+    join [BROADCAST] alltypestiny_negative b
+    where a.bigint_col = b.bigint_col
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 1 items.*
+row_regex: .*NumPushedDownRuntimeFilters: 1 \(1\).*
+row_regex: .*RowsRead: 0 \(0\).*
+====
+---- QUERY
+# Test IN-list filter on bigint column.
+select STRAIGHT_JOIN count(*) from alltypes a
+    join [BROADCAST] alltypestiny b
+    where a.bigint_col = b.bigint_col + 100
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 2 items.*
+row_regex: .*NumPushedDownRuntimeFilters: 1 \(1\).*
+row_regex: .*RowsRead: 0 \(0\).*
+====
+---- QUERY
+# Test IN-list filter on DATE partition column.
+# 2 of the 4 partitions are filtered out.
+select STRAIGHT_JOIN count(*) from date_tbl a
+    join [BROADCAST] date_tbl b
+    on a.date_part = b.date_col
+---- RESULTS
+11
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 17 items.*
+aggregation(SUM, Files rejected): 2
+====
+---- QUERY
+# Test IN-list filter on DATE non-partition column.
+select STRAIGHT_JOIN count(*) from date_tbl a
+    join [BROADCAST] date_tbl b
+    on a.date_col = b.date_part
+---- RESULTS
+11
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 4 items.*
+row_regex: .*NumPushedDownRuntimeFilters: 1 \(1\).*
+====
+---- QUERY
+# Test IN-list filter with NULL.
+# 'id' is a string column without NULLs. 'null_str' is a string column with all NULLs.
+# The pushed down IN-list filter should be able to filter out all rows.
+select STRAIGHT_JOIN count(*) from nullrows a
+    join [BROADCAST] nullrows b
+    where a.id <=> b.null_str;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 1 items.*
+row_regex: .*NumPushedDownRuntimeFilters: 1 \(1\).*
+row_regex: .*RowsRead: 0 \(0\).*
+====
+---- QUERY
+# Test IN-list filter on complex target expr, i.e. not a simple slot ref.
+# The filter can't be pushed down to the ORC lib since the ORC lib can't evaluate the
+# expr. Expect 7300 / 3 = 2433 rows read per scan fragment on 'alltypes'.
+select STRAIGHT_JOIN count(*) from functional_orc_def.alltypes a
+  join [BROADCAST] functional_orc_def.alltypestiny b
+  on a.id + 1 = b.id
+---- RESULTS
+7
+---- RUNTIME_PROFILE
+row_regex: .*RowsRead: 2.43K \(2433\).*
+====
+---- QUERY
+# Test IN-list filter on wide string that exceeds the total string size.
+# The filter is turned off (always_true=true). Expect it arrives with 0 items.
+# Expect 7300 / 3 = 2433 rows read per scan fragment on 'alltypes'.
+set max_row_size=16m;
+select STRAIGHT_JOIN count(*) from alltypes a
+    join [BROADCAST] widerow b
+    on a.string_col = b.string_col
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 arrival with 0 items.*
+row_regex: .*RowsRead: 2.43K \(2433\).*
+====
+
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index d22e522..bb7783a 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -76,9 +76,7 @@ class TestRuntimeFilters(ImpalaTestSuite):
   def test_basic_filters(self, vector):
     new_vector = deepcopy(vector)
     new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
-    if 'kudu' in str(vector.get_value('table_format')):
-      self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=ALL")
-    self.run_test_case('QueryTest/runtime_filters', vector,
+    self.run_test_case('QueryTest/runtime_filters', new_vector,
         test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS' : str(WAIT_TIME_MS)})
 
   def test_wait_time(self, vector):
@@ -190,8 +188,7 @@ class TestBloomFilters(ImpalaTestSuite):
       add_exec_option_dimension(cls, "async_codegen", 1)
 
   def test_bloom_filters(self, vector):
-    if 'kudu' in str(vector.get_value('table_format')):
-      self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=BLOOM")
+    self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=BLOOM")
     self.run_test_case('QueryTest/bloom_filters', vector)
 
   def test_iceberg_dictionary_runtime_filter(self, vector, unique_database):
@@ -341,7 +338,29 @@ class TestOverlapMinMaxFilters(ImpalaTestSuite):
     self.execute_query("select * from {0}.{1} t1, {0}.{1} t2 where t1.d=t2.d and t2.i=2".
         format(unique_database, tbl_name))
 
-# Apply both Bloom filter and Minmax filters
+
+class TestInListFilters(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestInListFilters, cls).add_test_dimensions()
+    # Currently, IN-list filters are only implemented for orc.
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format in ['orc'])
+    # Enable query option ASYNC_CODEGEN for slow build
+    if build_runs_slowly:
+      add_exec_option_dimension(cls, "async_codegen", 1)
+
+  def test_in_list_filters(self, vector):
+    vector.get_value('exec_option')['enabled_runtime_filter_types'] = 'in_list'
+    vector.get_value('exec_option')['runtime_filter_wait_time_ms'] = WAIT_TIME_MS
+    self.run_test_case('QueryTest/in_list_filters', vector)
+
+
+# Apply Bloom filter, Minmax filter and IN-list filters
 class TestAllRuntimeFilters(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):