You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/18 00:31:50 UTC

[07/16] incubator-impala git commit: IMPALA-4252: Min-max runtime filters for Kudu

IMPALA-4252: Min-max runtime filters for Kudu

This patch implements min-max filters for runtime filters. Each
runtime filter generates a bloom filter or a min-max filter,
depending on if it has HDFS or Kudu targets, respectively.

In RuntimeFilterGenerator in the planner, each hash join node
generates a bloom and min-max filter for each equi-join predicate, but
only those filters that end up being assigned to a target make it into
the final plan.

Min-max filters are only assigned to Kudu scans if the target expr is
a column, as Kudu doesn't support bounds on general exprs, and only if
the join op is '=' and not 'is distinct from', as Kudu doesn't support
returning NULLs if a bound is set.

Min-max filters are inserted into by the PartitionedHashJoinBuilder.
Codegen is used to eliminate branching on the type of filter. String
min-max filters truncate their bounds at 1024 chars, so that the max
amount of memory used by min-max filters is negligible.

For now, min-max filters are only applied at the KuduScanner, which
passes them into the Kudu client.

Future work will address applying min-max filters at HDFS scan nodes
and applying bloom filters at Kudu scan nodes.

Functional Testing:
- Added new planner tests and updated the old ones. (in old tests, a
  lot of runtime filters are renumbered as we always generate min-max
  filters even if they don't end up getting assigned and they take up
  some of the RF ids).
- Updated existing runtime filter tests to work with Kudu.
- Added e2e tests for min-max filter specific functionality.

Perf Testing:
- All tests run on Kudu stress cluster (10 nodes) and tpch_100_kudu,
  timings are averages of 3 runs.
- Ran a contrived query with a filter that does not eliminate any rows
  (full self join of lineitem). The difference in running time was
  negligible - 24.46s with filters on, 24.15s with filters off for
  a ~1% slowdown.
- Ran a contrived query with a filter that elimiates all rows (self
  join on lineitem with a join condition that never matches). The
  filters resulted in a significant speedup - 0.26s with filters on,
  1.46s with filters off for a ~5.6x speedup. This query is added to
  targeted-perf.

Change-Id: I02bad890f5b5f78388a3041bf38f89369b5e2f1c
Reviewed-on: http://gerrit.cloudera.org:8080/7793
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2510fe0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2510fe0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2510fe0a

Branch: refs/heads/master
Commit: 2510fe0aa0c86f460af9040eb413aad76c13cc84
Parents: 3ddafcd
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Mon Oct 23 07:58:34 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Nov 17 21:33:51 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py           |  11 +-
 be/src/codegen/impala-ir.cc                     |   1 +
 be/src/exec/filter-context.cc                   | 158 ++--
 be/src/exec/filter-context.h                    |  25 +-
 be/src/exec/hdfs-parquet-scanner-ir.cc          |   2 +-
 be/src/exec/hdfs-scan-node-base.cc              |   2 +-
 be/src/exec/kudu-scan-node-base.cc              |   2 +-
 be/src/exec/kudu-scan-node-mt.cc                |   5 +-
 be/src/exec/kudu-scan-node.cc                   |   7 +-
 be/src/exec/kudu-scanner.cc                     |  65 +-
 be/src/exec/kudu-scanner.h                      |   6 +-
 be/src/exec/kudu-util.cc                        |  60 +-
 be/src/exec/kudu-util.h                         |   6 +
 be/src/exec/partitioned-hash-join-builder-ir.cc |   1 +
 be/src/exec/partitioned-hash-join-builder.cc    |  37 +-
 be/src/exec/scan-node.cc                        |  16 +-
 be/src/runtime/coordinator-filter-state.h       |  25 +-
 be/src/runtime/coordinator.cc                   |  91 ++-
 be/src/runtime/fragment-instance-state.cc       |   7 +-
 be/src/runtime/fragment-instance-state.h        |   2 +-
 be/src/runtime/query-state.cc                   |   9 +-
 be/src/runtime/query-state.h                    |   3 +-
 be/src/runtime/runtime-filter-bank.cc           | 102 ++-
 be/src/runtime/runtime-filter-bank.h            |  36 +-
 be/src/runtime/runtime-filter-ir.cc             |   7 +-
 be/src/runtime/runtime-filter.cc                |   4 +-
 be/src/runtime/runtime-filter.h                 |  61 +-
 be/src/runtime/runtime-filter.inline.h          |  35 +-
 be/src/runtime/timestamp-value.h                |  15 +
 be/src/service/impala-internal-service.cc       |   6 +-
 be/src/util/CMakeLists.txt                      |   3 +
 be/src/util/min-max-filter-ir.cc                |  76 ++
 be/src/util/min-max-filter-test.cc              | 364 +++++++++
 be/src/util/min-max-filter.cc                   | 529 ++++++++++++
 be/src/util/min-max-filter.h                    | 231 ++++++
 common/thrift/Data.thrift                       |   1 +
 common/thrift/ImpalaInternalService.thrift      |  23 +-
 common/thrift/ImpalaService.thrift              |   8 +-
 common/thrift/PlanNodes.thrift                  |  13 +
 .../org/apache/impala/planner/HashJoinNode.java |   2 +-
 .../org/apache/impala/planner/HdfsScanNode.java |   2 +-
 .../org/apache/impala/planner/KuduScanNode.java |   4 +
 .../org/apache/impala/planner/PlanNode.java     |  27 +-
 .../impala/planner/RuntimeFilterGenerator.java  |  94 ++-
 .../org/apache/impala/planner/PlannerTest.java  |   7 +
 .../queries/PlannerTest/aggregation.test        |   4 +-
 .../PlannerTest/fk-pk-join-detection.test       |  48 +-
 .../queries/PlannerTest/implicit-joins.test     |   4 +-
 .../queries/PlannerTest/inline-view-limit.test  |  16 +-
 .../queries/PlannerTest/inline-view.test        |  44 +-
 .../queries/PlannerTest/join-order.test         | 188 ++---
 .../queries/PlannerTest/joins.test              |  88 +-
 .../queries/PlannerTest/kudu-delete.test        |   8 +-
 .../queries/PlannerTest/kudu-update.test        |  10 +
 .../queries/PlannerTest/kudu.test               |   2 +
 .../queries/PlannerTest/max-row-size.test       |   8 +-
 .../PlannerTest/min-max-runtime-filters.test    | 142 ++++
 .../queries/PlannerTest/nested-collections.test |  20 +-
 .../queries/PlannerTest/order.test              |   8 +-
 .../queries/PlannerTest/outer-joins.test        |  24 +-
 .../PlannerTest/predicate-propagation.test      |  28 +-
 .../PlannerTest/resource-requirements.test      | 126 +--
 .../PlannerTest/runtime-filter-propagation.test |  96 +--
 .../runtime-filter-query-options.test           |  76 +-
 .../PlannerTest/spillable-buffer-sizing.test    |  32 +-
 .../queries/PlannerTest/subquery-rewrite.test   |  82 +-
 .../queries/PlannerTest/tablesample.test        |   4 +-
 .../queries/PlannerTest/tpcds-all.test          | 800 +++++++++----------
 .../queries/PlannerTest/tpch-all.test           | 444 +++++-----
 .../queries/PlannerTest/tpch-kudu.test          | 107 +++
 .../queries/PlannerTest/tpch-nested.test        |  64 +-
 .../queries/PlannerTest/tpch-views.test         | 148 ++--
 .../queries/PlannerTest/union.test              |   8 +-
 .../queries/PlannerTest/views.test              |  40 +-
 .../queries/PlannerTest/with-clause.test        |  32 +-
 .../queries/QueryTest/bloom_filters.test        | 126 +++
 .../queries/QueryTest/bloom_filters_wait.test   |  22 +
 .../queries/QueryTest/explain-level2.test       |   4 +-
 .../queries/QueryTest/explain-level3.test       |   4 +-
 .../queries/QueryTest/min_max_filters.test      | 121 +++
 .../queries/QueryTest/runtime_filters.test      | 177 ++--
 .../queries/QueryTest/runtime_filters_wait.test |  23 -
 .../primitive_min_max_runtime_filter.test       |   9 +
 tests/common/impala_test_suite.py               |   8 +-
 tests/query_test/test_kudu.py                   |   6 +
 tests/query_test/test_runtime_filters.py        |  95 ++-
 tests/util/test_file_parser.py                  |  17 +
 87 files changed, 3855 insertions(+), 1649 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index b3ad25d..1d0f38e 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -210,7 +210,16 @@ ir_functions = [
   "_ZN6impala9UnionNode16MaterializeBatchEPNS_8RowBatchEPPh"],
   ["BLOOM_FILTER_INSERT_NO_AVX2", "_ZN6impala11BloomFilter12InsertNoAvx2Ej"],
   ["BLOOM_FILTER_INSERT_AVX2", "_ZN6impala11BloomFilter10InsertAvx2Ej"],
-  ["SELECT_NODE_COPY_ROWS", "_ZN6impala10SelectNode8CopyRowsEPNS_8RowBatchE"]
+  ["SELECT_NODE_COPY_ROWS", "_ZN6impala10SelectNode8CopyRowsEPNS_8RowBatchE"],
+  ["BOOL_MIN_MAX_FILTER_INSERT", "_ZN6impala16BoolMinMaxFilter6InsertEPv"],
+  ["TINYINT_MIN_MAX_FILTER_INSERT", "_ZN6impala19TinyIntMinMaxFilter6InsertEPv"],
+  ["SMALLINT_MIN_MAX_FILTER_INSERT", "_ZN6impala20SmallIntMinMaxFilter6InsertEPv"],
+  ["INT_MIN_MAX_FILTER_INSERT", "_ZN6impala15IntMinMaxFilter6InsertEPv"],
+  ["BIGINT_MIN_MAX_FILTER_INSERT", "_ZN6impala18BigIntMinMaxFilter6InsertEPv"],
+  ["FLOAT_MIN_MAX_FILTER_INSERT", "_ZN6impala17FloatMinMaxFilter6InsertEPv"],
+  ["DOUBLE_MIN_MAX_FILTER_INSERT", "_ZN6impala18DoubleMinMaxFilter6InsertEPv"],
+  ["STRING_MIN_MAX_FILTER_INSERT", "_ZN6impala18StringMinMaxFilter6InsertEPv"],
+  ["TIMESTAMP_MIN_MAX_FILTER_INSERT", "_ZN6impala21TimestampMinMaxFilter6InsertEPv"]
 ]
 
 enums_preamble = '\

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 2ae10a4..4b79e8b 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -61,6 +61,7 @@
 #include "udf/udf-ir.cc"
 #include "util/bloom-filter-ir.cc"
 #include "util/hash-util-ir.cc"
+#include "util/min-max-filter-ir.cc"
 
 #pragma clang diagnostic pop
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/filter-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index 7f7318e..70618df 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -20,6 +20,7 @@
 #include "codegen/codegen-anyval.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/tuple-row.h"
+#include "util/min-max-filter.h"
 #include "util/runtime-profile-counters.h"
 
 using namespace impala;
@@ -77,11 +78,24 @@ bool FilterContext::Eval(TupleRow* row) const noexcept {
 }
 
 void FilterContext::Insert(TupleRow* row) const noexcept {
-  if (local_bloom_filter == NULL) return;
-  void* val = expr_eval->GetValue(row);
-  uint32_t filter_hash = RawValue::GetHashValue(
-      val, expr_eval->root().type(), RuntimeFilterBank::DefaultHashSeed());
-  local_bloom_filter->Insert(filter_hash);
+  if (filter->is_bloom_filter()) {
+    if (local_bloom_filter == nullptr) return;
+    void* val = expr_eval->GetValue(row);
+    uint32_t filter_hash = RawValue::GetHashValue(
+        val, expr_eval->root().type(), RuntimeFilterBank::DefaultHashSeed());
+    local_bloom_filter->Insert(filter_hash);
+  } else {
+    DCHECK(filter->is_min_max_filter());
+    if (local_min_max_filter == nullptr) return;
+    void* val = expr_eval->GetValue(row);
+    local_min_max_filter->Insert(val);
+  }
+}
+
+void FilterContext::MaterializeValues() const {
+  if (filter->is_min_max_filter() && local_min_max_filter != nullptr) {
+    local_min_max_filter->MaterializeValues();
+  }
 }
 
 // An example of the generated code for TPCH-Q2: RF002 -> n_regionkey
@@ -219,17 +233,17 @@ Status FilterContext::CodegenEval(
 //     %"class.std::vector.101" zeroinitializer }
 //
 // define void @FilterContextInsert(%"struct.impala::FilterContext"* %this,
-//     %"class.impala::TupleRow"* %row) #43 {
+//     %"class.impala::TupleRow"* %row) #37 {
 // entry:
 //   %0 = alloca i16
 //   %local_bloom_filter_ptr = getelementptr inbounds %"struct.impala::FilterContext",
 //       %"struct.impala::FilterContext"* %this, i32 0, i32 3
 //   %local_bloom_filter_arg = load %"class.impala::BloomFilter"*,
 //       %"class.impala::BloomFilter"** %local_bloom_filter_ptr
-//   %bloom_is_null = icmp eq %"class.impala::BloomFilter"* %local_bloom_filter_arg, null
-//   br i1 %bloom_is_null, label %bloom_is_null1, label %bloom_not_null
+//   %filter_is_null = icmp eq %"class.impala::BloomFilter"* %local_bloom_filter_arg, null
+//   br i1 %filter_is_null, label %filters_null, label %filters_not_null
 //
-// bloom_not_null:                                   ; preds = %entry
+// filters_not_null:                                 ; preds = %entry
 //   %expr_eval_ptr = getelementptr inbounds %"struct.impala::FilterContext",
 //       %"struct.impala::FilterContext"* %this, i32 0, i32 0
 //   %expr_eval_arg = load %"class.impala::ScalarExprEvaluator"*,
@@ -240,29 +254,29 @@ Status FilterContext::CodegenEval(
 //   %is_null = trunc i32 %result to i1
 //   br i1 %is_null, label %val_is_null, label %val_not_null
 //
-// bloom_is_null1:                                   ; preds = %entry
+// filters_null:                                     ; preds = %entry
 //   ret void
 //
-// val_not_null:                                     ; preds = %bloom_not_null
+// val_not_null:                                     ; preds = %filters_not_null
 //   %1 = ashr i32 %result, 16
 //   %2 = trunc i32 %1 to i16
 //   store i16 %2, i16* %0
 //   %native_ptr = bitcast i16* %0 to i8*
 //   br label %insert_filter
 //
-// val_is_null:                                      ; preds = %bloom_not_null
+// val_is_null:                                      ; preds = %filters_not_null
 //   br label %insert_filter
 //
 // insert_filter:                                    ; preds = %val_not_null, %val_is_null
 //   %val_ptr_phi = phi i8* [ %native_ptr, %val_not_null ], [ null, %val_is_null ]
 //   %hash_value = call i32 @_ZN6impala8RawValue12GetHashValueEPKvRKNS_10ColumnTypeEj(
 //       i8* %val_ptr_phi, %"struct.impala::ColumnType"* @expr_type_arg, i32 1234)
-//   call void @_ZN6impala11BloomFilter9InsertAvxEj(
+//   call void @_ZN6impala11BloomFilter10InsertAvx2Ej(
 //       %"class.impala::BloomFilter"* %local_bloom_filter_arg, i32 %hash_value)
 //   ret void
 // }
-Status FilterContext::CodegenInsert(
-    LlvmCodeGen* codegen, ScalarExpr* filter_expr, llvm::Function** fn) {
+Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_expr,
+    FilterContext* ctx, llvm::Function** fn) {
   llvm::LLVMContext& context = codegen->context();
   LlvmBuilder builder(context);
 
@@ -279,23 +293,38 @@ Status FilterContext::CodegenInsert(
   llvm::Value* this_arg = args[0];
   llvm::Value* row_arg = args[1];
 
-  // Load 'local_bloom_filter' from 'this_arg' FilterContext object.
-  llvm::Value* local_bloom_filter_ptr =
-      builder.CreateStructGEP(nullptr, this_arg, 3, "local_bloom_filter_ptr");
-  llvm::Value* local_bloom_filter_arg =
-      builder.CreateLoad(local_bloom_filter_ptr, "local_bloom_filter_arg");
-
-  // Check if 'local_bloom_filter' is NULL and return if so.
-  llvm::Value* bloom_is_null =
-      builder.CreateIsNull(local_bloom_filter_arg, "bloom_is_null");
-  llvm::BasicBlock* bloom_not_null_block =
-      llvm::BasicBlock::Create(context, "bloom_not_null", insert_filter_fn);
-  llvm::BasicBlock* bloom_is_null_block =
-      llvm::BasicBlock::Create(context, "bloom_is_null", insert_filter_fn);
-  builder.CreateCondBr(bloom_is_null, bloom_is_null_block, bloom_not_null_block);
-  builder.SetInsertPoint(bloom_is_null_block);
+  llvm::Value* local_filter_arg;
+  if (ctx->filter->is_bloom_filter()) {
+    // Load 'local_bloom_filter' from 'this_arg' FilterContext object.
+    llvm::Value* local_bloom_filter_ptr =
+        builder.CreateStructGEP(nullptr, this_arg, 3, "local_bloom_filter_ptr");
+    local_filter_arg =
+        builder.CreateLoad(local_bloom_filter_ptr, "local_bloom_filter_arg");
+  } else {
+    DCHECK(ctx->filter->is_min_max_filter());
+    // Load 'local_min_max_filter' from 'this_arg' FilterContext object.
+    llvm::Value* local_min_max_filter_ptr =
+        builder.CreateStructGEP(nullptr, this_arg, 4, "local_min_max_filter_ptr");
+    llvm::PointerType* min_max_filter_type =
+        codegen->GetPtrType(MinMaxFilter::GetLlvmClassName(filter_expr->type().type))
+            ->getPointerTo();
+    local_min_max_filter_ptr = builder.CreatePointerCast(
+        local_min_max_filter_ptr, min_max_filter_type, "cast_min_max_filter_ptr");
+    local_filter_arg =
+        builder.CreateLoad(local_min_max_filter_ptr, "local_min_max_filter_arg");
+  }
+
+  // Check if 'local_bloom_filter' or 'local_min_max_filter' are NULL (depending on
+  // filter desc) and return if so.
+  llvm::Value* filter_null = builder.CreateIsNull(local_filter_arg, "filter_is_null");
+  llvm::BasicBlock* filter_not_null_block =
+      llvm::BasicBlock::Create(context, "filters_not_null", insert_filter_fn);
+  llvm::BasicBlock* filter_null_block =
+      llvm::BasicBlock::Create(context, "filters_null", insert_filter_fn);
+  builder.CreateCondBr(filter_null, filter_null_block, filter_not_null_block);
+  builder.SetInsertPoint(filter_null_block);
   builder.CreateRetVoid();
-  builder.SetInsertPoint(bloom_not_null_block);
+  builder.SetInsertPoint(filter_not_null_block);
 
   llvm::BasicBlock* val_not_null_block =
       llvm::BasicBlock::Create(context, "val_not_null", insert_filter_fn);
@@ -327,47 +356,60 @@ Status FilterContext::CodegenInsert(
   llvm::Value* null_ptr = codegen->null_ptr_value();
   builder.CreateBr(insert_filter_block);
 
-  // Saves 'result' on the stack and passes a pointer to it to 'insert_bloom_filter_fn'.
+  // Saves 'result' on the stack and passes a pointer to it to Insert().
   builder.SetInsertPoint(val_not_null_block);
   llvm::Value* native_ptr = result.ToNativePtr();
   native_ptr = builder.CreatePointerCast(native_ptr, codegen->ptr_type(), "native_ptr");
   builder.CreateBr(insert_filter_block);
 
-  // Get the arguments in place to call 'get_hash_value_fn'.
+  // Get the arguments in place to call Insert().
   builder.SetInsertPoint(insert_filter_block);
   llvm::PHINode* val_ptr_phi = builder.CreatePHI(codegen->ptr_type(), 2, "val_ptr_phi");
   val_ptr_phi->addIncoming(native_ptr, val_not_null_block);
   val_ptr_phi->addIncoming(null_ptr, val_is_null_block);
 
-  // Create a global constant of the filter expression's ColumnType. It needs to be a
-  // constant for constant propagation and dead code elimination in 'get_hash_value_fn'.
-  llvm::Type* col_type = codegen->GetType(ColumnType::LLVM_CLASS_NAME);
-  llvm::Constant* expr_type_arg = codegen->ConstantToGVPtr(
-      col_type, filter_expr->type().ToIR(codegen), "expr_type_arg");
+  // Insert into the bloom filter.
+  if (ctx->filter->is_bloom_filter()) {
+    // Create a global constant of the filter expression's ColumnType. It needs to be a
+    // constant for constant propagation and dead code elimination in 'get_hash_value_fn'.
+    llvm::Type* col_type = codegen->GetType(ColumnType::LLVM_CLASS_NAME);
+    llvm::Constant* expr_type_arg = codegen->ConstantToGVPtr(
+        col_type, filter_expr->type().ToIR(codegen), "expr_type_arg");
+
+    // Call RawValue::GetHashValue() on the result of the filter's expression.
+    llvm::Value* seed_arg =
+        codegen->GetIntConstant(TYPE_INT, RuntimeFilterBank::DefaultHashSeed());
+    llvm::Value* get_hash_value_args[] = {val_ptr_phi, expr_type_arg, seed_arg};
+    llvm::Function* get_hash_value_fn =
+        codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE, false);
+    DCHECK(get_hash_value_fn != nullptr);
+    llvm::Value* hash_value =
+        builder.CreateCall(get_hash_value_fn, get_hash_value_args, "hash_value");
+
+    // Call Insert() on the bloom filter.
+    llvm::Function* insert_bloom_filter_fn;
+    if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
+      insert_bloom_filter_fn =
+          codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_AVX2, false);
+    } else {
+      insert_bloom_filter_fn =
+          codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_NO_AVX2, false);
+    }
+    DCHECK(insert_bloom_filter_fn != nullptr);
 
-  // Call RawValue::GetHashValue() on the result of the filter's expression.
-  llvm::Value* seed_arg =
-      codegen->GetIntConstant(TYPE_INT, RuntimeFilterBank::DefaultHashSeed());
-  llvm::Value* get_hash_value_args[] = {val_ptr_phi, expr_type_arg, seed_arg};
-  llvm::Function* get_hash_value_fn =
-      codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE, false);
-  DCHECK(get_hash_value_fn != nullptr);
-  llvm::Value* hash_value =
-      builder.CreateCall(get_hash_value_fn, get_hash_value_args, "hash_value");
-
-  // Call Insert() on the bloom filter.
-  llvm::Value* insert_args[] = {local_bloom_filter_arg, hash_value};
-  llvm::Function* insert_bloom_filter_fn;
-  if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
-    insert_bloom_filter_fn =
-        codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_AVX2, false);
+    llvm::Value* insert_args[] = {local_filter_arg, hash_value};
+    builder.CreateCall(insert_bloom_filter_fn, insert_args);
   } else {
-    insert_bloom_filter_fn =
-        codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_NO_AVX2, false);
+    DCHECK(ctx->filter->is_min_max_filter());
+    // The function for inserting into the min-max filter.
+    llvm::Function* min_max_insert_fn = codegen->GetFunction(
+        MinMaxFilter::GetInsertIRFunctionType(filter_expr->type().type), false);
+    DCHECK(min_max_insert_fn != nullptr);
+
+    llvm::Value* insert_filter_args[] = {local_filter_arg, val_ptr_phi};
+    builder.CreateCall(min_max_insert_fn, insert_filter_args);
   }
 
-  DCHECK(insert_bloom_filter_fn != nullptr);
-  builder.CreateCall(insert_bloom_filter_fn, insert_args);
   builder.CreateRetVoid();
 
   *fn = codegen->FinalizeFunction(insert_filter_fn);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/filter-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.h b/be/src/exec/filter-context.h
index 0806fd3..e740b80 100644
--- a/be/src/exec/filter-context.h
+++ b/be/src/exec/filter-context.h
@@ -21,13 +21,14 @@
 
 #include <boost/unordered_map.hpp>
 #include "exprs/scalar-expr-evaluator.h"
+#include "runtime/runtime-filter.h"
 #include "util/runtime-profile.h"
 
 namespace impala {
 
 class BloomFilter;
 class LlvmCodeGen;
-class RuntimeFilter;
+class MinMaxFilter;
 class ScalarExpr;
 class TupleRow;
 
@@ -94,6 +95,9 @@ struct FilterContext {
   /// Working copy of local bloom filter
   BloomFilter* local_bloom_filter = nullptr;
 
+  /// Working copy of local min-max filter
+  MinMaxFilter* local_min_max_filter = nullptr;
+
   /// Struct name in LLVM IR.
   static const char* LLVM_CLASS_NAME;
 
@@ -107,10 +111,15 @@ struct FilterContext {
   /// a match in 'filter'. Returns false otherwise.
   bool Eval(TupleRow* row) const noexcept;
 
-  /// Evaluates 'row' with 'expr_eval' and hashes the resulting value.
-  /// The hash value is then used for setting some bits in 'local_bloom_filter'.
+  /// Evaluates 'row' with 'expr_eval' and inserts the value into 'local_bloom_filter'
+  /// or 'local_min_max_filter' as appropriate.
   void Insert(TupleRow* row) const noexcept;
 
+  /// Materialize filter values by copying any values stored by filters into memory owned
+  /// by the filter. Filters may assume that the memory for Insert()-ed values stays valid
+  /// until this is called.
+  void MaterializeValues() const;
+
   /// Codegen Eval() by codegen'ing the expression 'filter_expr' and replacing the type
   /// argument to RuntimeFilter::Eval() with a constant. On success, 'fn' is set to
   /// the generated function. On failure, an error status is returned.
@@ -119,10 +128,14 @@ struct FilterContext {
 
   /// Codegen Insert() by codegen'ing the expression 'filter_expr', replacing the type
   /// argument to RawValue::GetHashValue() with a constant, and calling into the correct
-  /// version of BloomFilter::Insert(), depending on the presence of AVX.  On success,
-  /// 'fn' is set to the generated function. On failure, an error status is returned.
+  /// version of BloomFilter::Insert() or MinMaxFilter::Insert(), depending on the filter
+  /// desc and if 'local_bloom_filter' or 'local_min_max_filter' are null.
+  /// For bloom filters, it also selects the correct Insert() based on the presence of
+  /// AVX, and for min-max filters it selects the correct Insert() based on type.
+  /// On success, 'fn' is set to the generated function. On failure, an error status is
+  /// returned.
   static Status CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_expr,
-      llvm::Function** fn) WARN_UNUSED_RESULT;
+      FilterContext* ctx, llvm::Function** fn) WARN_UNUSED_RESULT;
 
   // Returns if there is any always_false filter in ctxs. If there is, the counter stats
   // is updated.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/hdfs-parquet-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-ir.cc b/be/src/exec/hdfs-parquet-scanner-ir.cc
index c1574d1..f2355d8 100644
--- a/be/src/exec/hdfs-parquet-scanner-ir.cc
+++ b/be/src/exec/hdfs-parquet-scanner-ir.cc
@@ -70,7 +70,7 @@ bool HdfsParquetScanner::EvalRuntimeFilter(int i, TupleRow* row) {
   LocalFilterStats* stats = &filter_stats_[i];
   const FilterContext* ctx = filter_ctxs_[i];
   ++stats->total_possible;
-  if (stats->enabled && ctx->filter->HasBloomFilter()) {
+  if (stats->enabled && ctx->filter->HasFilter()) {
     ++stats->considered;
     if (!ctx->Eval(row)) {
       ++stats->rejected;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 8ec76e0..9149097 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -661,7 +661,7 @@ bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
       continue;
     }
 
-    bool has_filter = ctx.filter->HasBloomFilter();
+    bool has_filter = ctx.filter->HasFilter();
     bool passed_filter = !has_filter || ctx.Eval(tuple_row_mem);
     ctx.stats->IncrCounters(stats_name, 1, has_filter, !passed_filter);
     if (!passed_filter) return false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index feb0af7..0e7cdfa 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -84,7 +84,7 @@ Status KuduScanNodeBase::Prepare(RuntimeState* state) {
 }
 
 Status KuduScanNodeBase::Open(RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Open(state));
+  RETURN_IF_ERROR(ScanNode::Open(state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   SCOPED_TIMER(runtime_profile_->total_time_counter());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-mt.cc b/be/src/exec/kudu-scan-node-mt.cc
index 2cb7619..22f00e7 100644
--- a/be/src/exec/kudu-scan-node-mt.cc
+++ b/be/src/exec/kudu-scan-node-mt.cc
@@ -59,7 +59,8 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
   RETURN_IF_ERROR(QueryMaintenance(state));
   *eos = false;
 
-  if (scan_token_ == nullptr) {
+  bool scan_token_eos = scan_token_ == nullptr;
+  while (scan_token_eos) {
     scan_token_ = GetNextScanToken();
     if (scan_token_ == nullptr) {
       runtime_profile_->StopPeriodicCounters();
@@ -68,7 +69,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
       *eos = true;
       return Status::OK();
     }
-    RETURN_IF_ERROR(scanner_->OpenNextScanToken(*scan_token_));
+    RETURN_IF_ERROR(scanner_->OpenNextScanToken(*scan_token_, &scan_token_eos));
   }
 
   bool scanner_eos = false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 7f18710..77fac89 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -75,6 +75,8 @@ Status KuduScanNode::Open(RuntimeState* state) {
         state->query_options().num_scanner_threads);
   }
 
+  if (filter_ctxs_.size() > 0) WaitForRuntimeFilters();
+
   thread_avail_cb_id_ = state->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&KuduScanNode::ThreadAvailableCb), this, _1));
   ThreadAvailableCb(state->resource_pool());
@@ -179,8 +181,9 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
 }
 
 Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, const string& scan_token) {
-  RETURN_IF_ERROR(scanner->OpenNextScanToken(scan_token));
-  bool eos = false;
+  bool eos;
+  RETURN_IF_ERROR(scanner->OpenNextScanToken(scan_token, &eos));
+  if (eos) return Status::OK();
   while (!eos && !done_) {
     unique_ptr<RowBatch> row_batch = std::make_unique<RowBatch>(row_desc(),
         runtime_state_->batch_size(), mem_tracker());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index deb9c84..7db8878 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -18,6 +18,7 @@
 #include "exec/kudu-scanner.h"
 
 #include <kudu/client/row_result.h>
+#include <kudu/client/value.h>
 #include <thrift/protocol/TDebugProtocol.h>
 #include <vector>
 #include <string>
@@ -25,9 +26,11 @@
 #include "exec/kudu-util.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "exprs/slot-ref.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
+#include "runtime/runtime-filter.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/string-value.h"
@@ -36,15 +39,18 @@
 #include "gutil/gscoped_ptr.h"
 #include "gutil/strings/substitute.h"
 #include "util/jni-util.h"
+#include "util/min-max-filter.h"
 #include "util/periodic-counter-updater.h"
 #include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 
 using kudu::client::KuduClient;
+using kudu::client::KuduPredicate;
 using kudu::client::KuduScanBatch;
 using kudu::client::KuduSchema;
 using kudu::client::KuduTable;
+using kudu::client::KuduValue;
 
 DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan ReadMode. "
     "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT.");
@@ -136,7 +142,7 @@ void KuduScanner::Close() {
   expr_results_pool_->FreeAll();
 }
 
-Status KuduScanner::OpenNextScanToken(const string& scan_token)  {
+Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
   DCHECK(scanner_ == NULL);
   kudu::client::KuduScanner* scanner;
   KUDU_RETURN_IF_ERROR(kudu::client::KuduScanToken::DeserializeIntoScanner(
@@ -164,10 +170,67 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token)  {
     scanner_->SetRowFormatFlags(row_format_flags);
   }
 
+  if (scan_node_->filter_ctxs_.size() > 0) {
+    for (const FilterContext& ctx : scan_node_->filter_ctxs_) {
+      MinMaxFilter* filter = ctx.filter->get_min_max();
+      if (filter != nullptr && !filter->AlwaysTrue()) {
+        if (filter->AlwaysFalse()) {
+          // We can skip this entire scan.
+          CloseCurrentClientScanner();
+          *eos = true;
+          return Status::OK();
+        } else {
+          auto it = ctx.filter->filter_desc().planid_to_target_ndx.find(scan_node_->id());
+          const TRuntimeFilterTargetDesc& target_desc =
+              ctx.filter->filter_desc().targets[it->second];
+          const string& col_name = target_desc.kudu_col_name;
+          DCHECK(col_name != "");
+          ColumnType col_type = ColumnType::FromThrift(target_desc.kudu_col_type);
+
+          void* min = filter->GetMin();
+          void* max = filter->GetMax();
+          // If the type of the filter is not the same as the type of the target column,
+          // there must be an implicit integer cast and we need to ensure the min/max we
+          // pass to Kudu are within the range of the target column.
+          int64_t int_min;
+          int64_t int_max;
+          if (col_type.type != filter->type()) {
+            DCHECK(col_type.IsIntegerType());
+
+            if (!filter->GetCastIntMinMax(col_type, &int_min, &int_max)) {
+              // The min/max for this filter is outside the range for the target column,
+              // so all rows are filtered out and we can skip the scan.
+              CloseCurrentClientScanner();
+              *eos = true;
+              return Status::OK();
+            }
+            min = &int_min;
+            max = &int_max;
+          }
+
+          KuduValue* min_value;
+          RETURN_IF_ERROR(CreateKuduValue(filter->type(), min, &min_value));
+          KUDU_RETURN_IF_ERROR(
+              scanner_->AddConjunctPredicate(scan_node_->table_->NewComparisonPredicate(
+                  col_name, KuduPredicate::ComparisonOp::GREATER_EQUAL, min_value)),
+              "Failed to add min predicate");
+
+          KuduValue* max_value;
+          RETURN_IF_ERROR(CreateKuduValue(filter->type(), max, &max_value));
+          KUDU_RETURN_IF_ERROR(
+              scanner_->AddConjunctPredicate(scan_node_->table_->NewComparisonPredicate(
+                  col_name, KuduPredicate::ComparisonOp::LESS_EQUAL, max_value)),
+              "Failed to add max predicate");
+        }
+      }
+    }
+  }
+
   {
     SCOPED_TIMER(state_->total_storage_wait_timer());
     KUDU_RETURN_IF_ERROR(scanner_->Open(), "Unable to open scanner");
   }
+  *eos = false;
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index 125881c..e6d4ca9 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -43,8 +43,10 @@ class KuduScanner {
   /// Does not actually open a kudu::client::KuduScanner.
   Status Open();
 
-  /// Opens a new kudu::client::KuduScanner using 'scan_token'.
-  Status OpenNextScanToken(const std::string& scan_token);
+  /// Opens a new kudu::client::KuduScanner using 'scan_token'. If there are no rows to
+  /// scan (eg. because there is a runtime filter that rejects all rows) 'eos' will
+  /// be set to true, otherwise if the return status is OK it will be false.
+  Status OpenNextScanToken(const std::string& scan_token, bool* eos);
 
   /// Fetches the next batch from the current kudu::client::KuduScanner.
   Status GetNext(RowBatch* row_batch, bool* eos);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-util.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index 320a77d..03cb51f 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -35,6 +35,7 @@ using kudu::client::KuduSchema;
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
 using kudu::client::KuduColumnSchema;
+using kudu::client::KuduValue;
 using DataType = kudu::client::KuduColumnSchema::DataType;
 
 DECLARE_bool(disable_kudu);
@@ -111,17 +112,14 @@ void InitKuduLogging() {
   kudu::client::SetVerboseLogLevel(std::max(0, FLAGS_v - 1));
 }
 
-Status WriteKuduTimestampValue(int col, const TimestampValue* tv,
-    kudu::KuduPartialRow* row) {
-  int64_t ts_micros;
-  bool success = tv->UtcToUnixTimeMicros(&ts_micros);
+// Converts a TimestampValue to Kudu's representation which is returned in 'ts_micros'.
+static Status ConvertTimestampValue(const TimestampValue* tv, int64_t* ts_micros) {
+  bool success = tv->UtcToUnixTimeMicros(ts_micros);
   DCHECK(success); // If the value was invalid the slot should've been null.
   if (UNLIKELY(!success)) {
     return Status(TErrorCode::RUNTIME_ERROR,
         "Invalid TimestampValue: " + tv->ToString());
   }
-  KUDU_RETURN_IF_ERROR(row->SetUnixTimeMicros(col, ts_micros),
-      "Could not add Kudu WriteOp.");
   return Status::OK();
 }
 
@@ -170,8 +168,11 @@ Status WriteKuduValue(int col, PrimitiveType type, const void* value,
           "Could not set Kudu row value.");
       break;
     case TYPE_TIMESTAMP:
-      RETURN_IF_ERROR(WriteKuduTimestampValue(col,
-          reinterpret_cast<const TimestampValue*>(value), row));
+      int64_t ts_micros;
+      RETURN_IF_ERROR(ConvertTimestampValue(
+          reinterpret_cast<const TimestampValue*>(value), &ts_micros));
+      KUDU_RETURN_IF_ERROR(
+          row->SetUnixTimeMicros(col, ts_micros), "Could not add Kudu WriteOp.");
       break;
     default:
       return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type));
@@ -196,4 +197,47 @@ ColumnType KuduDataTypeToColumnType(DataType type) {
   return ColumnType(PrimitiveType::INVALID_TYPE);
 }
 
+Status CreateKuduValue(PrimitiveType type, void* value, KuduValue** out) {
+  switch (type) {
+    case TYPE_VARCHAR:
+    case TYPE_STRING: {
+      const StringValue* sv = reinterpret_cast<const StringValue*>(value);
+      kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->ptr), sv->len);
+      *out = KuduValue::CopyString(slice);
+      break;
+    }
+    case TYPE_FLOAT:
+      *out = KuduValue::FromFloat(*reinterpret_cast<const float*>(value));
+      break;
+    case TYPE_DOUBLE:
+      *out = KuduValue::FromDouble(*reinterpret_cast<const double*>(value));
+      break;
+    case TYPE_BOOLEAN:
+      *out = KuduValue::FromBool(*reinterpret_cast<const bool*>(value));
+      break;
+    case TYPE_TINYINT:
+      *out = KuduValue::FromInt(*reinterpret_cast<const int8_t*>(value));
+      break;
+    case TYPE_SMALLINT:
+      *out = KuduValue::FromInt(*reinterpret_cast<const int16_t*>(value));
+      break;
+    case TYPE_INT:
+      *out = KuduValue::FromInt(*reinterpret_cast<const int32_t*>(value));
+      break;
+    case TYPE_BIGINT:
+      *out = KuduValue::FromInt(*reinterpret_cast<const int64_t*>(value));
+      break;
+    case TYPE_TIMESTAMP: {
+      int64_t ts_micros;
+      RETURN_IF_ERROR(ConvertTimestampValue(
+          reinterpret_cast<const TimestampValue*>(value), &ts_micros));
+      *out = KuduValue::FromInt(ts_micros);
+      break;
+    }
+    default:
+      return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type));
+  }
+  return Status::OK();
+}
+
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 11cf16a..5fd1140 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -23,6 +23,7 @@ struct tm;
 
 #include <kudu/client/callbacks.h>
 #include <kudu/client/client.h>
+#include <kudu/client/value.h>
 
 #include "common/status.h"
 #include "runtime/string-value.h"
@@ -84,6 +85,11 @@ void LogKuduMessage(kudu::client::KuduLogSeverity severity, const char* filename
 Status WriteKuduValue(int col, PrimitiveType type, const void* value,
     bool copy_strings, kudu::KuduPartialRow* row) WARN_UNUSED_RESULT;
 
+/// Casts 'value' according to 'type' and create a new KuduValue containing 'value' which
+/// is returned in 'out'.
+Status CreateKuduValue(
+    PrimitiveType type, void* value, kudu::client::KuduValue** out) WARN_UNUSED_RESULT;
+
 /// Takes a Kudu client DataType and returns the corresponding Impala ColumnType.
 ColumnType KuduDataTypeToColumnType(kudu::client::KuduColumnSchema::DataType type);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc
index 8481212..8d8e42d 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -66,6 +66,7 @@ Status PhjBuilder::ProcessBuildBatch(
       return status;
     }
   }
+  for (const FilterContext& ctx : filter_ctxs_) ctx.MaterializeValues();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 584b42d..a86b87c 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -34,6 +34,7 @@
 #include "runtime/runtime-filter.h"
 #include "runtime/runtime-state.h"
 #include "util/bloom-filter.h"
+#include "util/min-max-filter.h"
 #include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/PlanNodes_types.h"
@@ -469,9 +470,16 @@ void PhjBuilder::AllocateRuntimeFilters() {
       << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN";
   DCHECK(ht_ctx_ != NULL);
   for (int i = 0; i < filter_ctxs_.size(); ++i) {
-    filter_ctxs_[i].local_bloom_filter =
-        runtime_state_->filter_bank()->AllocateScratchBloomFilter(
-            filter_ctxs_[i].filter->id());
+    if (filter_ctxs_[i].filter->is_bloom_filter()) {
+      filter_ctxs_[i].local_bloom_filter =
+          runtime_state_->filter_bank()->AllocateScratchBloomFilter(
+              filter_ctxs_[i].filter->id());
+    } else {
+      DCHECK(filter_ctxs_[i].filter->is_min_max_filter());
+      filter_ctxs_[i].local_min_max_filter =
+          runtime_state_->filter_bank()->AllocateScratchMinMaxFilter(
+              filter_ctxs_[i].filter->id(), filter_ctxs_[i].expr_eval->root().type());
+    }
   }
 }
 
@@ -491,12 +499,22 @@ void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
   for (const FilterContext& ctx : filter_ctxs_) {
     // TODO: Consider checking actual number of bits set in filter to compute FP rate.
     // TODO: Consider checking this every few batches or so.
-    bool fp_rate_too_high = runtime_state_->filter_bank()->FpRateTooHigh(
-        ctx.filter->filter_size(), num_build_rows);
-    runtime_state_->filter_bank()->UpdateFilterFromLocal(ctx.filter->id(),
-        fp_rate_too_high ? BloomFilter::ALWAYS_TRUE_FILTER : ctx.local_bloom_filter);
+    BloomFilter* bloom_filter = nullptr;
+    if (ctx.local_bloom_filter != nullptr) {
+      if (runtime_state_->filter_bank()->FpRateTooHigh(
+              ctx.filter->filter_size(), num_build_rows)) {
+        bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+      } else {
+        bloom_filter = ctx.local_bloom_filter;
+        ++num_enabled_filters;
+      }
+    } else if (ctx.local_min_max_filter != nullptr
+        && !ctx.local_min_max_filter->AlwaysTrue()) {
+      ++num_enabled_filters;
+    }
 
-    num_enabled_filters += !fp_rate_too_high;
+    runtime_state_->filter_bank()->UpdateFilterFromLocal(
+        ctx.filter->id(), bloom_filter, ctx.local_min_max_filter);
   }
 
   if (filter_ctxs_.size() > 0) {
@@ -959,7 +977,8 @@ Status PhjBuilder::CodegenInsertRuntimeFilters(
   int num_filters = filter_exprs.size();
   for (int i = 0; i < num_filters; ++i) {
     llvm::Function* insert_fn;
-    RETURN_IF_ERROR(FilterContext::CodegenInsert(codegen, filter_exprs_[i], &insert_fn));
+    RETURN_IF_ERROR(FilterContext::CodegenInsert(
+        codegen, filter_exprs_[i], &filter_ctxs_[i], &insert_fn));
     llvm::PointerType* filter_context_type =
         codegen->GetPtrType(FilterContext::LLVM_CLASS_NAME);
     llvm::Value* filter_context_ptr =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 18fc473..27726f8 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -75,12 +75,16 @@ Status ScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
     filter_ctxs_.emplace_back();
     FilterContext& filter_ctx = filter_ctxs_.back();
     filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false);
-    string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id,
-        PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
-    RuntimeProfile* profile =
-        RuntimeProfile::Create(state->obj_pool(), filter_profile_title);
-    runtime_profile_->AddChild(profile);
-    filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile));
+    // TODO: Enable stats for min-max filters when Kudu exposes info about filters
+    // (KUDU-2162).
+    if (filter_ctx.filter->is_bloom_filter()) {
+      string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id,
+          PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
+      RuntimeProfile* profile =
+          RuntimeProfile::Create(state->obj_pool(), filter_profile_title);
+      runtime_profile_->AddChild(profile);
+      filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile));
+    }
   }
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/coordinator-filter-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index 08944b8..2cb0602 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -43,13 +43,14 @@ struct Coordinator::FilterTarget {
       fragment_idx(f_idx) {}
 };
 
-/// State of filters that are received for aggregation.
+/// State of runtime filters that are received for aggregation. A runtime filter will
+/// contain a bloom or min-max filter.
 ///
 /// A broadcast join filter is published as soon as the first update is received for it
 /// and subsequent updates are ignored (as they will be the same).
-/// Updates for a partitioned join filter are aggregated in 'bloom_filter' and this is
-/// published once 'pending_count' reaches 0 and if the filter was not disabled before
-/// that.
+/// Updates for a partitioned join filter are aggregated and then published once
+/// 'pending_count' reaches 0 and if the filter was not disabled before that.
+///
 ///
 /// A filter is disabled if an always_true filter update is received, an OOM is hit,
 /// filter aggregation is complete or if the query is complete.
@@ -61,9 +62,11 @@ class Coordinator::FilterState {
       completion_time_(0L) {
     // bloom_filter_ is a disjunction so the unit value is always_false.
     bloom_filter_.always_false = true;
+    min_max_filter_.always_false = true;
   }
 
   TBloomFilter& bloom_filter() { return bloom_filter_; }
+  TMinMaxFilter& min_max_filter() { return min_max_filter_; }
   boost::unordered_set<int>* src_fragment_instance_idxs() {
     return &src_fragment_instance_idxs_;
   }
@@ -76,9 +79,18 @@ class Coordinator::FilterState {
   int64_t completion_time() const { return completion_time_; }
   const TPlanNodeId& src() const { return src_; }
   const TRuntimeFilterDesc& desc() const { return desc_; }
+  bool is_bloom_filter() const { return desc_.type == TRuntimeFilterType::BLOOM; }
+  bool is_min_max_filter() const { return desc_.type == TRuntimeFilterType::MIN_MAX; }
   int pending_count() const { return pending_count_; }
   void set_pending_count(int pending_count) { pending_count_ = pending_count; }
-  bool disabled() const { return bloom_filter_.always_true; }
+  bool disabled() const {
+    if (is_bloom_filter()) {
+      return bloom_filter_.always_true;
+    } else {
+      DCHECK(is_min_max_filter());
+      return min_max_filter_.always_true;
+    }
+  }
 
   /// Aggregates partitioned join filters and updates memory consumption.
   /// Disables filter if always_true filter is received or OOM is hit.
@@ -100,13 +112,14 @@ class Coordinator::FilterState {
   /// Number of remaining backends to hear from before filter is complete.
   int pending_count_;
 
-  /// BloomFilter aggregated from all source plan nodes, to be broadcast to all
+  /// Filters aggregated from all source plan nodes, to be broadcast to all
   /// destination plan fragment instances. Only set for partitioned joins (broadcast joins
   /// need no aggregation).
   /// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the
   /// output structure in the case of a broadcast join. Similarly, for partitioned joins,
   /// the filter is moved from the following member to the output structure.
   TBloomFilter bloom_filter_;
+  TMinMaxFilter min_max_filter_;
 
   /// Time at which first local filter arrived.
   int64_t first_arrival_time_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 772da33..d0e7b90 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -43,6 +43,7 @@
 #include "util/hdfs-bulk-ops.h"
 #include "util/hdfs-util.h"
 #include "util/histogram-metric.h"
+#include "util/min-max-filter.h"
 #include "util/table-printer.h"
 
 #include "common/names.h"
@@ -321,7 +322,7 @@ void Coordinator::InitFilterRoutingTable() {
           f->src_fragment_instance_idxs()->insert(src_idxs.begin(), src_idxs.end());
 
         // target plan node of filter
-        } else if (plan_node.__isset.hdfs_scan_node) {
+        } else if (plan_node.__isset.hdfs_scan_node || plan_node.__isset.kudu_scan_node) {
           auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
           DCHECK(it != filter.planid_to_target_ndx.end());
           const TRuntimeFilterTargetDesc& t_target = filter.targets[it->second];
@@ -1125,16 +1126,23 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
       target_fragment_idxs.insert(target.fragment_idx);
     }
 
-    // Assign outgoing bloom filter.
-    TBloomFilter& aggregated_filter = state->bloom_filter();
-    filter_mem_tracker_->Release(aggregated_filter.directory.size());
+    if (state->is_bloom_filter()) {
+      // Assign outgoing bloom filter.
+      TBloomFilter& aggregated_filter = state->bloom_filter();
+      filter_mem_tracker_->Release(aggregated_filter.directory.size());
+
+      // TODO: Track memory used by 'rpc_params'.
+      swap(rpc_params.bloom_filter, aggregated_filter);
+      DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true
+          || !rpc_params.bloom_filter.directory.empty());
+      DCHECK(aggregated_filter.directory.empty());
+      rpc_params.__isset.bloom_filter = true;
+    } else {
+      DCHECK(state->is_min_max_filter());
+      MinMaxFilter::Copy(state->min_max_filter(), &rpc_params.min_max_filter);
+      rpc_params.__isset.min_max_filter = true;
+    }
 
-    // TODO: Track memory used by 'rpc_params'.
-    swap(rpc_params.bloom_filter, aggregated_filter);
-    DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true ||
-        !rpc_params.bloom_filter.directory.empty());
-    DCHECK(aggregated_filter.directory.empty());
-    rpc_params.__isset.bloom_filter = true;
     // Filter is complete, and can be released.
     state->Disable(filter_mem_tracker_);
   }
@@ -1160,27 +1168,40 @@ void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
   }
 
   --pending_count_;
-  if (params.bloom_filter.always_true) {
-    Disable(coord->filter_mem_tracker_);
-  } else if (bloom_filter_.always_false) {
-    int64_t heap_space = params.bloom_filter.directory.size();
-    if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
-      VLOG_QUERY << "Not enough memory to allocate filter: "
-                 << PrettyPrinter::Print(heap_space, TUnit::BYTES)
-                 << " (query: " << coord->query_id() << ")";
-      // Disable, as one missing update means a correct filter cannot be produced.
+  if (is_bloom_filter()) {
+    DCHECK(params.__isset.bloom_filter);
+    if (params.bloom_filter.always_true) {
       Disable(coord->filter_mem_tracker_);
+    } else if (bloom_filter_.always_false) {
+      int64_t heap_space = params.bloom_filter.directory.size();
+      if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
+        VLOG_QUERY << "Not enough memory to allocate filter: "
+                   << PrettyPrinter::Print(heap_space, TUnit::BYTES)
+                   << " (query: " << coord->query_id() << ")";
+        // Disable, as one missing update means a correct filter cannot be produced.
+        Disable(coord->filter_mem_tracker_);
+      } else {
+        // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to
+        // move the payload from the request rather than copy it and take double the
+        // memory cost. After this point, params.bloom_filter is an empty filter and
+        // should not be read.
+        TBloomFilter* non_const_filter = &const_cast<TBloomFilter&>(params.bloom_filter);
+        swap(bloom_filter_, *non_const_filter);
+        DCHECK_EQ(non_const_filter->directory.size(), 0);
+      }
     } else {
-      // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to
-      // move the payload from the request rather than copy it and take double the memory
-      // cost. After this point, params.bloom_filter is an empty filter and should not be
-      // read.
-      TBloomFilter* non_const_filter = &const_cast<TBloomFilter&>(params.bloom_filter);
-      swap(bloom_filter_, *non_const_filter);
-      DCHECK_EQ(non_const_filter->directory.size(), 0);
+      BloomFilter::Or(params.bloom_filter, &bloom_filter_);
     }
   } else {
-    BloomFilter::Or(params.bloom_filter, &bloom_filter_);
+    DCHECK(is_min_max_filter());
+    DCHECK(params.__isset.min_max_filter);
+    if (params.min_max_filter.always_true) {
+      Disable(coord->filter_mem_tracker_);
+    } else if (min_max_filter_.always_false) {
+      MinMaxFilter::Copy(params.min_max_filter, &min_max_filter_);
+    } else {
+      MinMaxFilter::Or(params.min_max_filter, &min_max_filter_);
+    }
   }
 
   if (pending_count_ == 0 || disabled()) {
@@ -1189,11 +1210,17 @@ void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
 }
 
 void Coordinator::FilterState::Disable(MemTracker* tracker) {
-  bloom_filter_.always_true = true;
-  bloom_filter_.always_false = false;
-  tracker->Release(bloom_filter_.directory.size());
-  bloom_filter_.directory.clear();
-  bloom_filter_.directory.shrink_to_fit();
+  if (is_bloom_filter()) {
+    bloom_filter_.always_true = true;
+    bloom_filter_.always_false = false;
+    tracker->Release(bloom_filter_.directory.size());
+    bloom_filter_.directory.clear();
+    bloom_filter_.directory.shrink_to_fit();
+  } else {
+    DCHECK(is_min_max_filter());
+    min_max_filter_.always_true = true;
+    min_max_filter_.always_false = false;
+  }
 }
 
 const TUniqueId& Coordinator::query_id() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index f957dd1..bf437ba 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -414,13 +414,12 @@ Status FragmentInstanceState::WaitForOpen() {
   return opened_promise_.Get();
 }
 
-void FragmentInstanceState::PublishFilter(
-    int32_t filter_id, const TBloomFilter& thrift_bloom_filter) {
+void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
   VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
-            << " filter_id=" << filter_id;
+            << " filter_id=" << params.filter_id;
   // Wait until Prepare() is done, so we know that the filter bank is set up.
   if (!WaitForPrepare().ok()) return;
-  runtime_state_->filter_bank()->PublishGlobalFilter(filter_id, thrift_bloom_filter);
+  runtime_state_->filter_bank()->PublishGlobalFilter(params);
 }
 
 const TQueryCtx& FragmentInstanceState::query_ctx() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index f540b56..4e832f6 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -99,7 +99,7 @@ class FragmentInstanceState {
   Status WaitForOpen();
 
   /// Publishes filter with ID 'filter_id' to this fragment instance's filter bank.
-  void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter);
+  void PublishFilter(const TPublishFilterParams& params);
 
   /// Returns fragment instance's sink if this is the root fragment instance. Valid after
   /// the Prepare phase. May be nullptr.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 6796c82..6bc2591 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -397,12 +397,11 @@ void QueryState::Cancel() {
   for (auto entry: fis_map_) entry.second->Cancel();
 }
 
-void QueryState::PublishFilter(int32_t filter_id, int fragment_idx,
-    const TBloomFilter& thrift_bloom_filter) {
+void QueryState::PublishFilter(const TPublishFilterParams& params) {
   if (!instances_prepared_promise_.Get().ok()) return;
-  DCHECK_EQ(fragment_map_.count(fragment_idx), 1);
-  for (FragmentInstanceState* fis: fragment_map_[fragment_idx]) {
-    fis->PublishFilter(filter_id, thrift_bloom_filter);
+  DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx), 1);
+  for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx]) {
+    fis->PublishFilter(params);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 82f2c52..f7b83a7 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -142,8 +142,7 @@ class QueryState {
   FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
 
   /// Blocks until all fragment instances have finished their Prepare phase.
-  void PublishFilter(int32_t filter_id, int fragment_idx,
-      const TBloomFilter& thrift_bloom_filter);
+  void PublishFilter(const TPublishFilterParams& params);
 
   /// Cancels all actively executing fragment instances. Blocks until all fragment
   /// instances have finished their Prepare phase. Idempotent.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 2ae65c8..178aef1 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -27,6 +27,7 @@
 #include "service/impala-server.h"
 #include "util/bit-util.h"
 #include "util/bloom-filter.h"
+#include "util/min-max-filter.h"
 
 #include "common/names.h"
 
@@ -41,8 +42,12 @@ const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
 const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
 
 RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
-    : state_(state), closed_(false) {
-  memory_allocated_ =
+  : state_(state),
+    filter_mem_tracker_(
+        new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false)),
+    mem_pool_(filter_mem_tracker_.get()),
+    closed_(false) {
+  bloom_memory_allocated_ =
       state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
 
   // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
@@ -66,9 +71,6 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s
   default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
   default_filter_size_ =
       BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
-
-  filter_mem_tracker_.reset(
-      new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false));
 }
 
 RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
@@ -115,27 +117,28 @@ void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params
 
 }
 
-void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
-    BloomFilter* bloom_filter) {
+void RuntimeFilterBank::UpdateFilterFromLocal(
+    int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
   DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
       << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
   TUpdateFilterParams params;
   // A runtime filter may have both local and remote targets.
   bool has_local_target = false;
   bool has_remote_target = false;
+  TRuntimeFilterType::type type;
   {
     lock_guard<mutex> l(runtime_filter_lock_);
     RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
     DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
                                           << filter_id;
-    it->second->SetBloomFilter(bloom_filter);
+    it->second->SetFilter(bloom_filter, min_max_filter);
     has_local_target = it->second->filter_desc().has_local_targets;
     has_remote_target = it->second->filter_desc().has_remote_targets;
+    type = it->second->filter_desc().type;
   }
 
   if (has_local_target) {
-    // Do a short circuit publication by pushing the same BloomFilter to the consumer
-    // side.
+    // Do a short circuit publication by pushing the same filter to the consumer side.
     RuntimeFilter* filter;
     {
       lock_guard<mutex> l(runtime_filter_lock_);
@@ -143,7 +146,7 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
       if (it == consumed_filters_.end()) return;
       filter = it->second;
     }
-    filter->SetBloomFilter(bloom_filter);
+    filter->SetFilter(bloom_filter, min_max_filter);
     state_->runtime_profile()->AddInfoString(
         Substitute("Filter $0 arrival", filter_id),
         PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
@@ -153,8 +156,14 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
       && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
     params.__set_filter_id(filter_id);
     params.__set_query_id(state_->query_id());
-    BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
-    params.__isset.bloom_filter = true;
+    if (type == TRuntimeFilterType::BLOOM) {
+      BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
+      params.__isset.bloom_filter = true;
+    } else {
+      DCHECK(type == TRuntimeFilterType::MIN_MAX);
+      min_max_filter->ToThrift(&params.min_max_filter);
+      params.__isset.min_max_filter = true;
+    }
 
     ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
         SendFilterToCoordinator, state_->query_ctx().coord_address, params,
@@ -162,32 +171,43 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
   }
 }
 
-void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id,
-    const TBloomFilter& thrift_filter) {
+void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params) {
   lock_guard<mutex> l(runtime_filter_lock_);
   if (closed_) return;
-  RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
+  RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id);
   DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
-                                        << filter_id;
-  if (thrift_filter.always_true) {
-    it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
-  } else {
-    int64_t required_space =
-        BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
-    // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
-    // there's not enough memory for it.
-    if (!filter_mem_tracker_->TryConsume(required_space)) {
-      VLOG_QUERY << "No memory for global filter: " << filter_id
-                 << " (fragment instance: " << state_->fragment_instance_id() << ")";
-      it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
+                                        << params.filter_id;
+
+  BloomFilter* bloom_filter = nullptr;
+  MinMaxFilter* min_max_filter = nullptr;
+  if (it->second->is_bloom_filter()) {
+    DCHECK(params.__isset.bloom_filter);
+    if (params.bloom_filter.always_true) {
+      bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
     } else {
-      BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter));
-      DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-      memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
-      it->second->SetBloomFilter(bloom_filter);
+      int64_t required_space =
+          BloomFilter::GetExpectedHeapSpaceUsed(params.bloom_filter.log_heap_space);
+      // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
+      // there's not enough memory for it.
+      if (!filter_mem_tracker_->TryConsume(required_space)) {
+        VLOG_QUERY << "No memory for global filter: " << params.filter_id
+                   << " (fragment instance: " << state_->fragment_instance_id() << ")";
+        bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+      } else {
+        bloom_filter = obj_pool_.Add(new BloomFilter(params.bloom_filter));
+        DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
+        bloom_memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+      }
     }
+  } else {
+    DCHECK(it->second->is_min_max_filter());
+    DCHECK(params.__isset.min_max_filter);
+    min_max_filter = MinMaxFilter::Create(
+        params.min_max_filter, it->second->type(), &obj_pool_, &mem_pool_);
   }
-  state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id),
+  it->second->SetFilter(bloom_filter, min_max_filter);
+  state_->runtime_profile()->AddInfoString(
+      Substitute("Filter $0 arrival", params.filter_id),
       PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS));
 }
 
@@ -204,10 +224,21 @@ BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
   if (!filter_mem_tracker_->TryConsume(required_space)) return NULL;
   BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
   DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-  memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+  bloom_memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
   return bloom_filter;
 }
 
+MinMaxFilter* RuntimeFilterBank::AllocateScratchMinMaxFilter(
+    int32_t filter_id, ColumnType type) {
+  lock_guard<mutex> l(runtime_filter_lock_);
+  if (closed_) return nullptr;
+
+  RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
+  DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
+
+  return MinMaxFilter::Create(type, &obj_pool_, &mem_pool_);
+}
+
 int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
   if (ndv == -1) return default_filter_size_;
   int64_t required_space =
@@ -227,6 +258,7 @@ void RuntimeFilterBank::Close() {
   lock_guard<mutex> l(runtime_filter_lock_);
   closed_ = true;
   obj_pool_.Clear();
-  filter_mem_tracker_->Release(memory_allocated_->value());
+  mem_pool_.FreeAll();
+  filter_mem_tracker_->Release(bloom_memory_allocated_->value());
   filter_mem_tracker_->Close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter-bank.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index d8be8ab..8f6bb42 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -20,6 +20,7 @@
 
 #include "codegen/impala-ir.h"
 #include "common/object-pool.h"
+#include "runtime/mem-pool.h"
 #include "runtime/types.h"
 #include "util/runtime-profile.h"
 
@@ -31,6 +32,7 @@ namespace impala {
 
 class BloomFilter;
 class MemTracker;
+class MinMaxFilter;
 class RuntimeFilter;
 class RuntimeState;
 class TBloomFilter;
@@ -47,9 +49,10 @@ class TQueryCtx;
 /// RuntimeFilterBank treats each filter independently.
 ///
 /// All filters must be registered with the filter bank via RegisterFilter(). Local plan
-/// fragments update the bloom filters by calling UpdateFilterFromLocal()
-/// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The
-/// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by
+/// fragments update the filters by calling UpdateFilterFromLocal() (which may only be
+/// called once per filter ID per filter bank), with either a bloom filter or a min-max
+/// filter, depending on the filter's type. The 'bloom_filter' or 'min_max_filter' that is
+/// passed into UpdateFilterFromLocal() must have been allocated by
 /// AllocateScratchBloomFilter(); this allows RuntimeFilterBank to manage all memory
 /// associated with filters.
 ///
@@ -58,9 +61,10 @@ class TQueryCtx;
 ///
 /// After PublishGlobalFilter() has been called (and again, it may only be called once per
 /// filter_id), the RuntimeFilter object associated with filter_id will have a valid
-/// bloom_filter, and may be used for filter evaluation. This operation occurs without
-/// synchronisation, and neither the thread that calls PublishGlobalFilter() nor the
-/// thread that may call RuntimeFilter::Eval() need to coordinate in any way.
+/// bloom_filter or min_max_filter, and may be used for filter evaluation. This
+/// operation occurs without synchronisation, and neither the thread that calls
+/// PublishGlobalFilter() nor the thread that may call RuntimeFilter::Eval() need to
+/// coordinate in any way.
 class RuntimeFilterBank {
  public:
   RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state);
@@ -70,14 +74,16 @@ class RuntimeFilterBank {
   /// bloom_filter itself is unallocated until the first call to PublishGlobalFilter().
   RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer);
 
-  /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some
-  /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a
-  /// full filter that contains all elements.
-  void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter);
+  /// Updates a filter's 'bloom_filter' or 'min_max_filter' which has been produced by
+  /// some operator in the local fragment instance. At most one of 'bloom_filter' and
+  /// 'min_max_filter' may be non-NULL, depending on the filter's type. They may both be
+  /// NULL, representing a filter that allows all rows to pass.
+  void UpdateFilterFromLocal(
+      int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
 
   /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
   /// consumption by operators that wish to use it for filtering.
-  void PublishGlobalFilter(int32_t filter_id, const TBloomFilter& thrift_filter);
+  void PublishGlobalFilter(const TPublishFilterParams& params);
 
   /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
   /// 'filter_size' would have an expected false-positive rate which would exceed
@@ -100,6 +106,9 @@ class RuntimeFilterBank {
   /// If there is not enough memory, or if Close() has been called first, returns NULL.
   BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);
 
+  /// Returns a new MinMaxFilter. Handles memory the same as AllocateScratchBloomFilter().
+  MinMaxFilter* AllocateScratchMinMaxFilter(int32_t filter_id, ColumnType type);
+
   /// Default hash seed to use when computing hashed values to insert into filters.
   static int32_t IR_ALWAYS_INLINE DefaultHashSeed() { return 1234; }
 
@@ -136,12 +145,15 @@ class RuntimeFilterBank {
   /// MemTracker to track Bloom filter memory.
   boost::scoped_ptr<MemTracker> filter_mem_tracker_;
 
+  // Mem pool to track allocations made by filters.
+  MemPool mem_pool_;
+
   /// True iff Close() has been called. Used to prevent races between
   /// AllocateScratchBloomFilter() and Close().
   bool closed_;
 
   /// Total amount of memory allocated to Bloom Filters
-  RuntimeProfile::Counter* memory_allocated_;
+  RuntimeProfile::Counter* bloom_memory_allocated_;
 
   /// Precomputed default BloomFilter size.
   int64_t default_filter_size_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-ir.cc b/be/src/runtime/runtime-filter-ir.cc
index 4e386cb..6436213 100644
--- a/be/src/runtime/runtime-filter-ir.cc
+++ b/be/src/runtime/runtime-filter-ir.cc
@@ -21,10 +21,9 @@ using namespace impala;
 
 bool IR_ALWAYS_INLINE RuntimeFilter::Eval(
     void* val, const ColumnType& col_type) const noexcept {
-  // Safe to read bloom_filter_ concurrently with any ongoing SetBloomFilter() thanks
-  // to a) the atomicity of / pointer assignments and b) the x86 TSO memory model.
-  if (bloom_filter_ == BloomFilter::ALWAYS_TRUE_FILTER) return true;
+  DCHECK(is_bloom_filter());
+  if (bloom_filter_.Load() == BloomFilter::ALWAYS_TRUE_FILTER) return true;
   uint32_t h = RawValue::GetHashValue(val, col_type,
       RuntimeFilterBank::DefaultHashSeed());
-  return bloom_filter_->Find(h);
+  return bloom_filter_.Load()->Find(h);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 228094e..a2fd30e 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -29,9 +29,9 @@ const char* RuntimeFilter::LLVM_CLASS_NAME = "class.impala::RuntimeFilter";
 
 bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
   do {
-    if (HasBloomFilter()) return true;
+    if (HasFilter()) return true;
     SleepForMs(SLEEP_PERIOD_MS);
   } while ((MonotonicMillis() - registration_time_) < timeout_ms);
 
-  return HasBloomFilter();
+  return HasFilter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 5d9531b..40c5f23 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -29,33 +29,45 @@ namespace impala {
 
 class BloomFilter;
 
-/// RuntimeFilters represent set-membership predicates (implemented with bloom filters)
-/// that are computed during query execution (rather than during planning). They can then
-/// be sent to other operators to reduce their output. For example, a RuntimeFilter might
-/// compute a predicate corresponding to set membership, where the members of that set can
-/// only be computed at runtime (for example, the distinct values of the build side of a
-/// hash table). Other plan nodes can use that predicate by testing for membership of that
-/// set to filter rows early on in the plan tree (e.g. the scan that feeds the probe side
-/// of that join node could eliminate rows from consideration for join matching).
+/// RuntimeFilters represent set-membership predicates that are computed during query
+/// execution (rather than during planning). They can then be sent to other operators to
+/// reduce their output. For example, a RuntimeFilter might compute a predicate
+/// corresponding to set membership, where the members of that set can only be computed at
+/// runtime (for example, the distinct values of the build side of a hash table). Other
+/// plan nodes can use that predicate by testing for membership of that set to filter rows
+/// early on in the plan tree (e.g. the scan that feeds the probe side of that join node
+/// could eliminate rows from consideration for join matching).
+///
+/// A RuntimeFilter may compute its set-membership predicate as a bloom filters or a
+/// min-max filter, depending on its filter description.
 class RuntimeFilter {
  public:
   RuntimeFilter(const TRuntimeFilterDesc& filter, int64_t filter_size)
-      : bloom_filter_(NULL), filter_desc_(filter), arrival_time_(0L),
+      : bloom_filter_(nullptr), min_max_filter_(nullptr), filter_desc_(filter),
+        registration_time_(MonotonicMillis()), arrival_time_(0L),
         filter_size_(filter_size) {
     DCHECK_GT(filter_size_, 0);
-    registration_time_ = MonotonicMillis();
   }
 
-  /// Returns true if SetBloomFilter() has been called.
-  bool HasBloomFilter() const { return arrival_time_ != 0; }
+  /// Returns true if SetFilter() has been called.
+  bool HasFilter() const { return arrival_time_.Load() != 0; }
 
   const TRuntimeFilterDesc& filter_desc() const { return filter_desc_; }
   int32_t id() const { return filter_desc().filter_id; }
   int64_t filter_size() const { return filter_size_; }
+  ColumnType type() const {
+    return ColumnType::FromThrift(filter_desc().src_expr.nodes[0].type);
+  }
+  bool is_bloom_filter() const { return filter_desc().type == TRuntimeFilterType::BLOOM; }
+  bool is_min_max_filter() const {
+    return filter_desc().type == TRuntimeFilterType::MIN_MAX;
+  }
+
+  MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); }
 
   /// Sets the internal filter bloom_filter to 'bloom_filter'. Can only legally be called
   /// once per filter. Does not acquire the memory associated with 'bloom_filter'.
-  inline void SetBloomFilter(BloomFilter* bloom_filter);
+  inline void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
 
   /// Returns false iff 'bloom_filter_' has been set via SetBloomFilter() and hash[val] is
   /// not in that 'bloom_filter_'. Otherwise returns true. Is safe to call concurrently
@@ -67,8 +79,8 @@ class RuntimeFilter {
   /// Returns the amount of time waited since registration for the filter to
   /// arrive. Returns 0 if filter has not yet arrived.
   int32_t arrival_delay() const {
-    if (arrival_time_ == 0L) return 0L;
-    return arrival_time_ - registration_time_;
+    if (arrival_time_.Load() == 0L) return 0L;
+    return arrival_time_.Load() - registration_time_;
   }
 
   /// Periodically (every 20ms) checks to see if the global filter has arrived. Waits for
@@ -88,21 +100,26 @@ class RuntimeFilter {
   static const char* LLVM_CLASS_NAME;
 
  private:
-  /// Membership bloom_filter. May be NULL even after arrival_time_ is set. This is a
-  /// compact way of representing a full Bloom filter that contains every element.
-  BloomFilter* bloom_filter_;
+  /// Membership bloom_filter. May be NULL even after arrival_time_ is set, meaning that
+  /// it does not filter any rows, either because it was not created
+  /// (filter_desc_.bloom_filter is false), there was not enough memory, or the false
+  /// positive rate was determined to be too high.
+  AtomicPtr<BloomFilter> bloom_filter_;
+
+  /// May be NULL even after arrival_time_ is set if filter_desc_.min_max_filter is false.
+  AtomicPtr<MinMaxFilter> min_max_filter_;
 
   /// Reference to the filter's thrift descriptor in the thrift Plan tree.
   const TRuntimeFilterDesc& filter_desc_;
 
   /// Time, in ms, that the filter was registered.
-  int64_t registration_time_;
+  const int64_t registration_time_;
 
-  /// Time, in ms, that the global fiter arrived. Set in SetBloomFilter().
-  int64_t arrival_time_;
+  /// Time, in ms, that the global filter arrived. Set in SetFilter().
+  AtomicInt64 arrival_time_;
 
   /// The size of the Bloom filter, in bytes.
-  int64_t filter_size_;
+  const int64_t filter_size_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.inline.h b/be/src/runtime/runtime-filter.inline.h
index 128cafd..b2de81d 100644
--- a/be/src/runtime/runtime-filter.inline.h
+++ b/be/src/runtime/runtime-filter.inline.h
@@ -25,6 +25,7 @@
 
 #include "runtime/raw-value.inline.h"
 #include "util/bloom-filter.h"
+#include "util/min-max-filter.h"
 #include "util/time.h"
 
 namespace impala {
@@ -36,21 +37,35 @@ inline const RuntimeFilter* RuntimeFilterBank::GetRuntimeFilter(int32_t filter_i
   return it->second;
 }
 
-inline void RuntimeFilter::SetBloomFilter(BloomFilter* bloom_filter) {
-  DCHECK(bloom_filter_ == NULL);
-  // TODO: Barrier required here to ensure compiler does not both inline and re-order
-  // this assignment. Not an issue for correctness (as assignment is atomic), but
-  // potentially confusing.
-  bloom_filter_ = bloom_filter;
-  arrival_time_ = MonotonicMillis();
+inline void RuntimeFilter::SetFilter(
+    BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
+  DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
+  if (is_bloom_filter()) {
+    bloom_filter_.Store(bloom_filter);
+  } else {
+    DCHECK(is_min_max_filter());
+    min_max_filter_.Store(min_max_filter);
+  }
+  arrival_time_.Store(MonotonicMillis());
 }
 
-inline bool RuntimeFilter::AlwaysTrue() const  {
-  return HasBloomFilter() && bloom_filter_ == BloomFilter::ALWAYS_TRUE_FILTER;
+inline bool RuntimeFilter::AlwaysTrue() const {
+  if (is_bloom_filter()) {
+    return HasFilter() && bloom_filter_.Load() == BloomFilter::ALWAYS_TRUE_FILTER;
+  } else {
+    DCHECK(is_min_max_filter());
+    return HasFilter() && min_max_filter_.Load()->AlwaysTrue();
+  }
 }
 
 inline bool RuntimeFilter::AlwaysFalse() const {
-  return bloom_filter_ != BloomFilter::ALWAYS_TRUE_FILTER && bloom_filter_->AlwaysFalse();
+  if (is_bloom_filter()) {
+    return bloom_filter_.Load() != BloomFilter::ALWAYS_TRUE_FILTER
+        && bloom_filter_.Load()->AlwaysFalse();
+  } else {
+    DCHECK(is_min_max_filter());
+    return min_max_filter_.Load() != nullptr && min_max_filter_.Load()->AlwaysFalse();
+  }
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/timestamp-value.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index 556225b..445189a 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -26,6 +26,7 @@
 #include <gflags/gflags.h>
 #include <string>
 
+#include "gen-cpp/Data_types.h"
 #include "udf/udf.h"
 #include "util/hash-util.h"
 
@@ -150,6 +151,20 @@ class TimestampValue {
     *ptp = boost::posix_time::ptime(date_, time_);
   }
 
+  // Store the binary representation of this TimestampValue in 'tvalue'.
+  void ToTColumnValue(TColumnValue* tvalue) const {
+    const uint8_t* data = reinterpret_cast<const uint8_t*>(this);
+    tvalue->timestamp_val.assign(data, data + Size());
+    tvalue->__isset.timestamp_val = true;
+  }
+
+  // Returns a new TimestampValue created from the value in 'tvalue'.
+  static TimestampValue FromTColumnValue(const TColumnValue& tvalue) {
+    TimestampValue value;
+    memcpy(&value, tvalue.timestamp_val.c_str(), Size());
+    return value;
+  }
+
   bool HasDate() const { return !date_.is_special(); }
   bool HasTime() const { return !time_.is_special(); }
   bool HasDateOrTime() const { return HasDate() || HasTime(); }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/service/impala-internal-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index d8a2a4a..5be8765 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -93,7 +93,7 @@ void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
   FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);
   DCHECK(params.__isset.filter_id);
   DCHECK(params.__isset.query_id);
-  DCHECK(params.__isset.bloom_filter);
+  DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
   impala_server_->UpdateFilter(return_val, params);
 }
 
@@ -103,8 +103,8 @@ void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
   DCHECK(params.__isset.filter_id);
   DCHECK(params.__isset.dst_query_id);
   DCHECK(params.__isset.dst_fragment_idx);
-  DCHECK(params.__isset.bloom_filter);
+  DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
   QueryState::ScopedRef qs(params.dst_query_id);
   if (qs.get() == nullptr) return;
-  qs->PublishFilter(params.filter_id, params.dst_fragment_idx, params.bloom_filter);
+  qs->PublishFilter(params);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 4ff03d6..08002ed 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -55,6 +55,8 @@ add_library(Util
   mem-info.cc
   memory-metrics.cc
   metrics.cc
+  min-max-filter.cc
+  min-max-filter-ir.cc
   minidump.cc
   network-util.cc
   openssl-util.cc
@@ -120,6 +122,7 @@ ADD_BE_TEST(internal-queue-test)
 ADD_BE_TEST(logging-support-test)
 ADD_BE_TEST(lru-cache-test)
 ADD_BE_TEST(metrics-test)
+ADD_BE_TEST(min-max-filter-test)
 ADD_BE_TEST(openssl-util-test)
 ADD_BE_TEST(parse-util-test)
 #ADD_BE_TEST(perf-counters-test)