You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/18 00:31:50 UTC
[07/16] incubator-impala git commit: IMPALA-4252: Min-max runtime
filters for Kudu
IMPALA-4252: Min-max runtime filters for Kudu
This patch implements min-max filters for runtime filters. Each
runtime filter generates a bloom filter or a min-max filter,
depending on if it has HDFS or Kudu targets, respectively.
In RuntimeFilterGenerator in the planner, each hash join node
generates a bloom and min-max filter for each equi-join predicate, but
only those filters that end up being assigned to a target make it into
the final plan.
Min-max filters are only assigned to Kudu scans if the target expr is
a column, as Kudu doesn't support bounds on general exprs, and only if
the join op is '=' and not 'is distinct from', as Kudu doesn't support
returning NULLs if a bound is set.
Min-max filters are inserted into by the PartitionedHashJoinBuilder.
Codegen is used to eliminate branching on the type of filter. String
min-max filters truncate their bounds at 1024 chars, so that the max
amount of memory used by min-max filters is negligible.
For now, min-max filters are only applied at the KuduScanner, which
passes them into the Kudu client.
Future work will address applying min-max filters at HDFS scan nodes
and applying bloom filters at Kudu scan nodes.
Functional Testing:
- Added new planner tests and updated the old ones. (in old tests, a
lot of runtime filters are renumbered as we always generate min-max
filters even if they don't end up getting assigned and they take up
some of the RF ids).
- Updated existing runtime filter tests to work with Kudu.
- Added e2e tests for min-max filter specific functionality.
Perf Testing:
- All tests run on Kudu stress cluster (10 nodes) and tpch_100_kudu,
timings are averages of 3 runs.
- Ran a contrived query with a filter that does not eliminate any rows
(full self join of lineitem). The difference in running time was
negligible - 24.46s with filters on, 24.15s with filters off for
a ~1% slowdown.
- Ran a contrived query with a filter that elimiates all rows (self
join on lineitem with a join condition that never matches). The
filters resulted in a significant speedup - 0.26s with filters on,
1.46s with filters off for a ~5.6x speedup. This query is added to
targeted-perf.
Change-Id: I02bad890f5b5f78388a3041bf38f89369b5e2f1c
Reviewed-on: http://gerrit.cloudera.org:8080/7793
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2510fe0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2510fe0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2510fe0a
Branch: refs/heads/master
Commit: 2510fe0aa0c86f460af9040eb413aad76c13cc84
Parents: 3ddafcd
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Mon Oct 23 07:58:34 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Nov 17 21:33:51 2017 +0000
----------------------------------------------------------------------
be/src/codegen/gen_ir_descriptions.py | 11 +-
be/src/codegen/impala-ir.cc | 1 +
be/src/exec/filter-context.cc | 158 ++--
be/src/exec/filter-context.h | 25 +-
be/src/exec/hdfs-parquet-scanner-ir.cc | 2 +-
be/src/exec/hdfs-scan-node-base.cc | 2 +-
be/src/exec/kudu-scan-node-base.cc | 2 +-
be/src/exec/kudu-scan-node-mt.cc | 5 +-
be/src/exec/kudu-scan-node.cc | 7 +-
be/src/exec/kudu-scanner.cc | 65 +-
be/src/exec/kudu-scanner.h | 6 +-
be/src/exec/kudu-util.cc | 60 +-
be/src/exec/kudu-util.h | 6 +
be/src/exec/partitioned-hash-join-builder-ir.cc | 1 +
be/src/exec/partitioned-hash-join-builder.cc | 37 +-
be/src/exec/scan-node.cc | 16 +-
be/src/runtime/coordinator-filter-state.h | 25 +-
be/src/runtime/coordinator.cc | 91 ++-
be/src/runtime/fragment-instance-state.cc | 7 +-
be/src/runtime/fragment-instance-state.h | 2 +-
be/src/runtime/query-state.cc | 9 +-
be/src/runtime/query-state.h | 3 +-
be/src/runtime/runtime-filter-bank.cc | 102 ++-
be/src/runtime/runtime-filter-bank.h | 36 +-
be/src/runtime/runtime-filter-ir.cc | 7 +-
be/src/runtime/runtime-filter.cc | 4 +-
be/src/runtime/runtime-filter.h | 61 +-
be/src/runtime/runtime-filter.inline.h | 35 +-
be/src/runtime/timestamp-value.h | 15 +
be/src/service/impala-internal-service.cc | 6 +-
be/src/util/CMakeLists.txt | 3 +
be/src/util/min-max-filter-ir.cc | 76 ++
be/src/util/min-max-filter-test.cc | 364 +++++++++
be/src/util/min-max-filter.cc | 529 ++++++++++++
be/src/util/min-max-filter.h | 231 ++++++
common/thrift/Data.thrift | 1 +
common/thrift/ImpalaInternalService.thrift | 23 +-
common/thrift/ImpalaService.thrift | 8 +-
common/thrift/PlanNodes.thrift | 13 +
.../org/apache/impala/planner/HashJoinNode.java | 2 +-
.../org/apache/impala/planner/HdfsScanNode.java | 2 +-
.../org/apache/impala/planner/KuduScanNode.java | 4 +
.../org/apache/impala/planner/PlanNode.java | 27 +-
.../impala/planner/RuntimeFilterGenerator.java | 94 ++-
.../org/apache/impala/planner/PlannerTest.java | 7 +
.../queries/PlannerTest/aggregation.test | 4 +-
.../PlannerTest/fk-pk-join-detection.test | 48 +-
.../queries/PlannerTest/implicit-joins.test | 4 +-
.../queries/PlannerTest/inline-view-limit.test | 16 +-
.../queries/PlannerTest/inline-view.test | 44 +-
.../queries/PlannerTest/join-order.test | 188 ++---
.../queries/PlannerTest/joins.test | 88 +-
.../queries/PlannerTest/kudu-delete.test | 8 +-
.../queries/PlannerTest/kudu-update.test | 10 +
.../queries/PlannerTest/kudu.test | 2 +
.../queries/PlannerTest/max-row-size.test | 8 +-
.../PlannerTest/min-max-runtime-filters.test | 142 ++++
.../queries/PlannerTest/nested-collections.test | 20 +-
.../queries/PlannerTest/order.test | 8 +-
.../queries/PlannerTest/outer-joins.test | 24 +-
.../PlannerTest/predicate-propagation.test | 28 +-
.../PlannerTest/resource-requirements.test | 126 +--
.../PlannerTest/runtime-filter-propagation.test | 96 +--
.../runtime-filter-query-options.test | 76 +-
.../PlannerTest/spillable-buffer-sizing.test | 32 +-
.../queries/PlannerTest/subquery-rewrite.test | 82 +-
.../queries/PlannerTest/tablesample.test | 4 +-
.../queries/PlannerTest/tpcds-all.test | 800 +++++++++----------
.../queries/PlannerTest/tpch-all.test | 444 +++++-----
.../queries/PlannerTest/tpch-kudu.test | 107 +++
.../queries/PlannerTest/tpch-nested.test | 64 +-
.../queries/PlannerTest/tpch-views.test | 148 ++--
.../queries/PlannerTest/union.test | 8 +-
.../queries/PlannerTest/views.test | 40 +-
.../queries/PlannerTest/with-clause.test | 32 +-
.../queries/QueryTest/bloom_filters.test | 126 +++
.../queries/QueryTest/bloom_filters_wait.test | 22 +
.../queries/QueryTest/explain-level2.test | 4 +-
.../queries/QueryTest/explain-level3.test | 4 +-
.../queries/QueryTest/min_max_filters.test | 121 +++
.../queries/QueryTest/runtime_filters.test | 177 ++--
.../queries/QueryTest/runtime_filters_wait.test | 23 -
.../primitive_min_max_runtime_filter.test | 9 +
tests/common/impala_test_suite.py | 8 +-
tests/query_test/test_kudu.py | 6 +
tests/query_test/test_runtime_filters.py | 95 ++-
tests/util/test_file_parser.py | 17 +
87 files changed, 3855 insertions(+), 1649 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index b3ad25d..1d0f38e 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -210,7 +210,16 @@ ir_functions = [
"_ZN6impala9UnionNode16MaterializeBatchEPNS_8RowBatchEPPh"],
["BLOOM_FILTER_INSERT_NO_AVX2", "_ZN6impala11BloomFilter12InsertNoAvx2Ej"],
["BLOOM_FILTER_INSERT_AVX2", "_ZN6impala11BloomFilter10InsertAvx2Ej"],
- ["SELECT_NODE_COPY_ROWS", "_ZN6impala10SelectNode8CopyRowsEPNS_8RowBatchE"]
+ ["SELECT_NODE_COPY_ROWS", "_ZN6impala10SelectNode8CopyRowsEPNS_8RowBatchE"],
+ ["BOOL_MIN_MAX_FILTER_INSERT", "_ZN6impala16BoolMinMaxFilter6InsertEPv"],
+ ["TINYINT_MIN_MAX_FILTER_INSERT", "_ZN6impala19TinyIntMinMaxFilter6InsertEPv"],
+ ["SMALLINT_MIN_MAX_FILTER_INSERT", "_ZN6impala20SmallIntMinMaxFilter6InsertEPv"],
+ ["INT_MIN_MAX_FILTER_INSERT", "_ZN6impala15IntMinMaxFilter6InsertEPv"],
+ ["BIGINT_MIN_MAX_FILTER_INSERT", "_ZN6impala18BigIntMinMaxFilter6InsertEPv"],
+ ["FLOAT_MIN_MAX_FILTER_INSERT", "_ZN6impala17FloatMinMaxFilter6InsertEPv"],
+ ["DOUBLE_MIN_MAX_FILTER_INSERT", "_ZN6impala18DoubleMinMaxFilter6InsertEPv"],
+ ["STRING_MIN_MAX_FILTER_INSERT", "_ZN6impala18StringMinMaxFilter6InsertEPv"],
+ ["TIMESTAMP_MIN_MAX_FILTER_INSERT", "_ZN6impala21TimestampMinMaxFilter6InsertEPv"]
]
enums_preamble = '\
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 2ae10a4..4b79e8b 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -61,6 +61,7 @@
#include "udf/udf-ir.cc"
#include "util/bloom-filter-ir.cc"
#include "util/hash-util-ir.cc"
+#include "util/min-max-filter-ir.cc"
#pragma clang diagnostic pop
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/filter-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index 7f7318e..70618df 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -20,6 +20,7 @@
#include "codegen/codegen-anyval.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/tuple-row.h"
+#include "util/min-max-filter.h"
#include "util/runtime-profile-counters.h"
using namespace impala;
@@ -77,11 +78,24 @@ bool FilterContext::Eval(TupleRow* row) const noexcept {
}
void FilterContext::Insert(TupleRow* row) const noexcept {
- if (local_bloom_filter == NULL) return;
- void* val = expr_eval->GetValue(row);
- uint32_t filter_hash = RawValue::GetHashValue(
- val, expr_eval->root().type(), RuntimeFilterBank::DefaultHashSeed());
- local_bloom_filter->Insert(filter_hash);
+ if (filter->is_bloom_filter()) {
+ if (local_bloom_filter == nullptr) return;
+ void* val = expr_eval->GetValue(row);
+ uint32_t filter_hash = RawValue::GetHashValue(
+ val, expr_eval->root().type(), RuntimeFilterBank::DefaultHashSeed());
+ local_bloom_filter->Insert(filter_hash);
+ } else {
+ DCHECK(filter->is_min_max_filter());
+ if (local_min_max_filter == nullptr) return;
+ void* val = expr_eval->GetValue(row);
+ local_min_max_filter->Insert(val);
+ }
+}
+
+void FilterContext::MaterializeValues() const {
+ if (filter->is_min_max_filter() && local_min_max_filter != nullptr) {
+ local_min_max_filter->MaterializeValues();
+ }
}
// An example of the generated code for TPCH-Q2: RF002 -> n_regionkey
@@ -219,17 +233,17 @@ Status FilterContext::CodegenEval(
// %"class.std::vector.101" zeroinitializer }
//
// define void @FilterContextInsert(%"struct.impala::FilterContext"* %this,
-// %"class.impala::TupleRow"* %row) #43 {
+// %"class.impala::TupleRow"* %row) #37 {
// entry:
// %0 = alloca i16
// %local_bloom_filter_ptr = getelementptr inbounds %"struct.impala::FilterContext",
// %"struct.impala::FilterContext"* %this, i32 0, i32 3
// %local_bloom_filter_arg = load %"class.impala::BloomFilter"*,
// %"class.impala::BloomFilter"** %local_bloom_filter_ptr
-// %bloom_is_null = icmp eq %"class.impala::BloomFilter"* %local_bloom_filter_arg, null
-// br i1 %bloom_is_null, label %bloom_is_null1, label %bloom_not_null
+// %filter_is_null = icmp eq %"class.impala::BloomFilter"* %local_bloom_filter_arg, null
+// br i1 %filter_is_null, label %filters_null, label %filters_not_null
//
-// bloom_not_null: ; preds = %entry
+// filters_not_null: ; preds = %entry
// %expr_eval_ptr = getelementptr inbounds %"struct.impala::FilterContext",
// %"struct.impala::FilterContext"* %this, i32 0, i32 0
// %expr_eval_arg = load %"class.impala::ScalarExprEvaluator"*,
@@ -240,29 +254,29 @@ Status FilterContext::CodegenEval(
// %is_null = trunc i32 %result to i1
// br i1 %is_null, label %val_is_null, label %val_not_null
//
-// bloom_is_null1: ; preds = %entry
+// filters_null: ; preds = %entry
// ret void
//
-// val_not_null: ; preds = %bloom_not_null
+// val_not_null: ; preds = %filters_not_null
// %1 = ashr i32 %result, 16
// %2 = trunc i32 %1 to i16
// store i16 %2, i16* %0
// %native_ptr = bitcast i16* %0 to i8*
// br label %insert_filter
//
-// val_is_null: ; preds = %bloom_not_null
+// val_is_null: ; preds = %filters_not_null
// br label %insert_filter
//
// insert_filter: ; preds = %val_not_null, %val_is_null
// %val_ptr_phi = phi i8* [ %native_ptr, %val_not_null ], [ null, %val_is_null ]
// %hash_value = call i32 @_ZN6impala8RawValue12GetHashValueEPKvRKNS_10ColumnTypeEj(
// i8* %val_ptr_phi, %"struct.impala::ColumnType"* @expr_type_arg, i32 1234)
-// call void @_ZN6impala11BloomFilter9InsertAvxEj(
+// call void @_ZN6impala11BloomFilter10InsertAvx2Ej(
// %"class.impala::BloomFilter"* %local_bloom_filter_arg, i32 %hash_value)
// ret void
// }
-Status FilterContext::CodegenInsert(
- LlvmCodeGen* codegen, ScalarExpr* filter_expr, llvm::Function** fn) {
+Status FilterContext::CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_expr,
+ FilterContext* ctx, llvm::Function** fn) {
llvm::LLVMContext& context = codegen->context();
LlvmBuilder builder(context);
@@ -279,23 +293,38 @@ Status FilterContext::CodegenInsert(
llvm::Value* this_arg = args[0];
llvm::Value* row_arg = args[1];
- // Load 'local_bloom_filter' from 'this_arg' FilterContext object.
- llvm::Value* local_bloom_filter_ptr =
- builder.CreateStructGEP(nullptr, this_arg, 3, "local_bloom_filter_ptr");
- llvm::Value* local_bloom_filter_arg =
- builder.CreateLoad(local_bloom_filter_ptr, "local_bloom_filter_arg");
-
- // Check if 'local_bloom_filter' is NULL and return if so.
- llvm::Value* bloom_is_null =
- builder.CreateIsNull(local_bloom_filter_arg, "bloom_is_null");
- llvm::BasicBlock* bloom_not_null_block =
- llvm::BasicBlock::Create(context, "bloom_not_null", insert_filter_fn);
- llvm::BasicBlock* bloom_is_null_block =
- llvm::BasicBlock::Create(context, "bloom_is_null", insert_filter_fn);
- builder.CreateCondBr(bloom_is_null, bloom_is_null_block, bloom_not_null_block);
- builder.SetInsertPoint(bloom_is_null_block);
+ llvm::Value* local_filter_arg;
+ if (ctx->filter->is_bloom_filter()) {
+ // Load 'local_bloom_filter' from 'this_arg' FilterContext object.
+ llvm::Value* local_bloom_filter_ptr =
+ builder.CreateStructGEP(nullptr, this_arg, 3, "local_bloom_filter_ptr");
+ local_filter_arg =
+ builder.CreateLoad(local_bloom_filter_ptr, "local_bloom_filter_arg");
+ } else {
+ DCHECK(ctx->filter->is_min_max_filter());
+ // Load 'local_min_max_filter' from 'this_arg' FilterContext object.
+ llvm::Value* local_min_max_filter_ptr =
+ builder.CreateStructGEP(nullptr, this_arg, 4, "local_min_max_filter_ptr");
+ llvm::PointerType* min_max_filter_type =
+ codegen->GetPtrType(MinMaxFilter::GetLlvmClassName(filter_expr->type().type))
+ ->getPointerTo();
+ local_min_max_filter_ptr = builder.CreatePointerCast(
+ local_min_max_filter_ptr, min_max_filter_type, "cast_min_max_filter_ptr");
+ local_filter_arg =
+ builder.CreateLoad(local_min_max_filter_ptr, "local_min_max_filter_arg");
+ }
+
+ // Check if 'local_bloom_filter' or 'local_min_max_filter' are NULL (depending on
+ // filter desc) and return if so.
+ llvm::Value* filter_null = builder.CreateIsNull(local_filter_arg, "filter_is_null");
+ llvm::BasicBlock* filter_not_null_block =
+ llvm::BasicBlock::Create(context, "filters_not_null", insert_filter_fn);
+ llvm::BasicBlock* filter_null_block =
+ llvm::BasicBlock::Create(context, "filters_null", insert_filter_fn);
+ builder.CreateCondBr(filter_null, filter_null_block, filter_not_null_block);
+ builder.SetInsertPoint(filter_null_block);
builder.CreateRetVoid();
- builder.SetInsertPoint(bloom_not_null_block);
+ builder.SetInsertPoint(filter_not_null_block);
llvm::BasicBlock* val_not_null_block =
llvm::BasicBlock::Create(context, "val_not_null", insert_filter_fn);
@@ -327,47 +356,60 @@ Status FilterContext::CodegenInsert(
llvm::Value* null_ptr = codegen->null_ptr_value();
builder.CreateBr(insert_filter_block);
- // Saves 'result' on the stack and passes a pointer to it to 'insert_bloom_filter_fn'.
+ // Saves 'result' on the stack and passes a pointer to it to Insert().
builder.SetInsertPoint(val_not_null_block);
llvm::Value* native_ptr = result.ToNativePtr();
native_ptr = builder.CreatePointerCast(native_ptr, codegen->ptr_type(), "native_ptr");
builder.CreateBr(insert_filter_block);
- // Get the arguments in place to call 'get_hash_value_fn'.
+ // Get the arguments in place to call Insert().
builder.SetInsertPoint(insert_filter_block);
llvm::PHINode* val_ptr_phi = builder.CreatePHI(codegen->ptr_type(), 2, "val_ptr_phi");
val_ptr_phi->addIncoming(native_ptr, val_not_null_block);
val_ptr_phi->addIncoming(null_ptr, val_is_null_block);
- // Create a global constant of the filter expression's ColumnType. It needs to be a
- // constant for constant propagation and dead code elimination in 'get_hash_value_fn'.
- llvm::Type* col_type = codegen->GetType(ColumnType::LLVM_CLASS_NAME);
- llvm::Constant* expr_type_arg = codegen->ConstantToGVPtr(
- col_type, filter_expr->type().ToIR(codegen), "expr_type_arg");
+ // Insert into the bloom filter.
+ if (ctx->filter->is_bloom_filter()) {
+ // Create a global constant of the filter expression's ColumnType. It needs to be a
+ // constant for constant propagation and dead code elimination in 'get_hash_value_fn'.
+ llvm::Type* col_type = codegen->GetType(ColumnType::LLVM_CLASS_NAME);
+ llvm::Constant* expr_type_arg = codegen->ConstantToGVPtr(
+ col_type, filter_expr->type().ToIR(codegen), "expr_type_arg");
+
+ // Call RawValue::GetHashValue() on the result of the filter's expression.
+ llvm::Value* seed_arg =
+ codegen->GetIntConstant(TYPE_INT, RuntimeFilterBank::DefaultHashSeed());
+ llvm::Value* get_hash_value_args[] = {val_ptr_phi, expr_type_arg, seed_arg};
+ llvm::Function* get_hash_value_fn =
+ codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE, false);
+ DCHECK(get_hash_value_fn != nullptr);
+ llvm::Value* hash_value =
+ builder.CreateCall(get_hash_value_fn, get_hash_value_args, "hash_value");
+
+ // Call Insert() on the bloom filter.
+ llvm::Function* insert_bloom_filter_fn;
+ if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
+ insert_bloom_filter_fn =
+ codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_AVX2, false);
+ } else {
+ insert_bloom_filter_fn =
+ codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_NO_AVX2, false);
+ }
+ DCHECK(insert_bloom_filter_fn != nullptr);
- // Call RawValue::GetHashValue() on the result of the filter's expression.
- llvm::Value* seed_arg =
- codegen->GetIntConstant(TYPE_INT, RuntimeFilterBank::DefaultHashSeed());
- llvm::Value* get_hash_value_args[] = {val_ptr_phi, expr_type_arg, seed_arg};
- llvm::Function* get_hash_value_fn =
- codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE, false);
- DCHECK(get_hash_value_fn != nullptr);
- llvm::Value* hash_value =
- builder.CreateCall(get_hash_value_fn, get_hash_value_args, "hash_value");
-
- // Call Insert() on the bloom filter.
- llvm::Value* insert_args[] = {local_bloom_filter_arg, hash_value};
- llvm::Function* insert_bloom_filter_fn;
- if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
- insert_bloom_filter_fn =
- codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_AVX2, false);
+ llvm::Value* insert_args[] = {local_filter_arg, hash_value};
+ builder.CreateCall(insert_bloom_filter_fn, insert_args);
} else {
- insert_bloom_filter_fn =
- codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_NO_AVX2, false);
+ DCHECK(ctx->filter->is_min_max_filter());
+ // The function for inserting into the min-max filter.
+ llvm::Function* min_max_insert_fn = codegen->GetFunction(
+ MinMaxFilter::GetInsertIRFunctionType(filter_expr->type().type), false);
+ DCHECK(min_max_insert_fn != nullptr);
+
+ llvm::Value* insert_filter_args[] = {local_filter_arg, val_ptr_phi};
+ builder.CreateCall(min_max_insert_fn, insert_filter_args);
}
- DCHECK(insert_bloom_filter_fn != nullptr);
- builder.CreateCall(insert_bloom_filter_fn, insert_args);
builder.CreateRetVoid();
*fn = codegen->FinalizeFunction(insert_filter_fn);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/filter-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.h b/be/src/exec/filter-context.h
index 0806fd3..e740b80 100644
--- a/be/src/exec/filter-context.h
+++ b/be/src/exec/filter-context.h
@@ -21,13 +21,14 @@
#include <boost/unordered_map.hpp>
#include "exprs/scalar-expr-evaluator.h"
+#include "runtime/runtime-filter.h"
#include "util/runtime-profile.h"
namespace impala {
class BloomFilter;
class LlvmCodeGen;
-class RuntimeFilter;
+class MinMaxFilter;
class ScalarExpr;
class TupleRow;
@@ -94,6 +95,9 @@ struct FilterContext {
/// Working copy of local bloom filter
BloomFilter* local_bloom_filter = nullptr;
+ /// Working copy of local min-max filter
+ MinMaxFilter* local_min_max_filter = nullptr;
+
/// Struct name in LLVM IR.
static const char* LLVM_CLASS_NAME;
@@ -107,10 +111,15 @@ struct FilterContext {
/// a match in 'filter'. Returns false otherwise.
bool Eval(TupleRow* row) const noexcept;
- /// Evaluates 'row' with 'expr_eval' and hashes the resulting value.
- /// The hash value is then used for setting some bits in 'local_bloom_filter'.
+ /// Evaluates 'row' with 'expr_eval' and inserts the value into 'local_bloom_filter'
+ /// or 'local_min_max_filter' as appropriate.
void Insert(TupleRow* row) const noexcept;
+ /// Materialize filter values by copying any values stored by filters into memory owned
+ /// by the filter. Filters may assume that the memory for Insert()-ed values stays valid
+ /// until this is called.
+ void MaterializeValues() const;
+
/// Codegen Eval() by codegen'ing the expression 'filter_expr' and replacing the type
/// argument to RuntimeFilter::Eval() with a constant. On success, 'fn' is set to
/// the generated function. On failure, an error status is returned.
@@ -119,10 +128,14 @@ struct FilterContext {
/// Codegen Insert() by codegen'ing the expression 'filter_expr', replacing the type
/// argument to RawValue::GetHashValue() with a constant, and calling into the correct
- /// version of BloomFilter::Insert(), depending on the presence of AVX. On success,
- /// 'fn' is set to the generated function. On failure, an error status is returned.
+ /// version of BloomFilter::Insert() or MinMaxFilter::Insert(), depending on the filter
+ /// desc and if 'local_bloom_filter' or 'local_min_max_filter' are null.
+ /// For bloom filters, it also selects the correct Insert() based on the presence of
+ /// AVX, and for min-max filters it selects the correct Insert() based on type.
+ /// On success, 'fn' is set to the generated function. On failure, an error status is
+ /// returned.
static Status CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_expr,
- llvm::Function** fn) WARN_UNUSED_RESULT;
+ FilterContext* ctx, llvm::Function** fn) WARN_UNUSED_RESULT;
// Returns if there is any always_false filter in ctxs. If there is, the counter stats
// is updated.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/hdfs-parquet-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-ir.cc b/be/src/exec/hdfs-parquet-scanner-ir.cc
index c1574d1..f2355d8 100644
--- a/be/src/exec/hdfs-parquet-scanner-ir.cc
+++ b/be/src/exec/hdfs-parquet-scanner-ir.cc
@@ -70,7 +70,7 @@ bool HdfsParquetScanner::EvalRuntimeFilter(int i, TupleRow* row) {
LocalFilterStats* stats = &filter_stats_[i];
const FilterContext* ctx = filter_ctxs_[i];
++stats->total_possible;
- if (stats->enabled && ctx->filter->HasBloomFilter()) {
+ if (stats->enabled && ctx->filter->HasFilter()) {
++stats->considered;
if (!ctx->Eval(row)) {
++stats->rejected;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 8ec76e0..9149097 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -661,7 +661,7 @@ bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
continue;
}
- bool has_filter = ctx.filter->HasBloomFilter();
+ bool has_filter = ctx.filter->HasFilter();
bool passed_filter = !has_filter || ctx.Eval(tuple_row_mem);
ctx.stats->IncrCounters(stats_name, 1, has_filter, !passed_filter);
if (!passed_filter) return false;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index feb0af7..0e7cdfa 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -84,7 +84,7 @@ Status KuduScanNodeBase::Prepare(RuntimeState* state) {
}
Status KuduScanNodeBase::Open(RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::Open(state));
+ RETURN_IF_ERROR(ScanNode::Open(state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
SCOPED_TIMER(runtime_profile_->total_time_counter());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-mt.cc b/be/src/exec/kudu-scan-node-mt.cc
index 2cb7619..22f00e7 100644
--- a/be/src/exec/kudu-scan-node-mt.cc
+++ b/be/src/exec/kudu-scan-node-mt.cc
@@ -59,7 +59,8 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
RETURN_IF_ERROR(QueryMaintenance(state));
*eos = false;
- if (scan_token_ == nullptr) {
+ bool scan_token_eos = scan_token_ == nullptr;
+ while (scan_token_eos) {
scan_token_ = GetNextScanToken();
if (scan_token_ == nullptr) {
runtime_profile_->StopPeriodicCounters();
@@ -68,7 +69,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
*eos = true;
return Status::OK();
}
- RETURN_IF_ERROR(scanner_->OpenNextScanToken(*scan_token_));
+ RETURN_IF_ERROR(scanner_->OpenNextScanToken(*scan_token_, &scan_token_eos));
}
bool scanner_eos = false;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 7f18710..77fac89 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -75,6 +75,8 @@ Status KuduScanNode::Open(RuntimeState* state) {
state->query_options().num_scanner_threads);
}
+ if (filter_ctxs_.size() > 0) WaitForRuntimeFilters();
+
thread_avail_cb_id_ = state->resource_pool()->AddThreadAvailableCb(
bind<void>(mem_fn(&KuduScanNode::ThreadAvailableCb), this, _1));
ThreadAvailableCb(state->resource_pool());
@@ -179,8 +181,9 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
}
Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, const string& scan_token) {
- RETURN_IF_ERROR(scanner->OpenNextScanToken(scan_token));
- bool eos = false;
+ bool eos;
+ RETURN_IF_ERROR(scanner->OpenNextScanToken(scan_token, &eos));
+ if (eos) return Status::OK();
while (!eos && !done_) {
unique_ptr<RowBatch> row_batch = std::make_unique<RowBatch>(row_desc(),
runtime_state_->batch_size(), mem_tracker());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index deb9c84..7db8878 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -18,6 +18,7 @@
#include "exec/kudu-scanner.h"
#include <kudu/client/row_result.h>
+#include <kudu/client/value.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <vector>
#include <string>
@@ -25,9 +26,11 @@
#include "exec/kudu-util.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
+#include "exprs/slot-ref.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.h"
+#include "runtime/runtime-filter.h"
#include "runtime/runtime-state.h"
#include "runtime/row-batch.h"
#include "runtime/string-value.h"
@@ -36,15 +39,18 @@
#include "gutil/gscoped_ptr.h"
#include "gutil/strings/substitute.h"
#include "util/jni-util.h"
+#include "util/min-max-filter.h"
#include "util/periodic-counter-updater.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
using kudu::client::KuduClient;
+using kudu::client::KuduPredicate;
using kudu::client::KuduScanBatch;
using kudu::client::KuduSchema;
using kudu::client::KuduTable;
+using kudu::client::KuduValue;
DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan ReadMode. "
"Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT.");
@@ -136,7 +142,7 @@ void KuduScanner::Close() {
expr_results_pool_->FreeAll();
}
-Status KuduScanner::OpenNextScanToken(const string& scan_token) {
+Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
DCHECK(scanner_ == NULL);
kudu::client::KuduScanner* scanner;
KUDU_RETURN_IF_ERROR(kudu::client::KuduScanToken::DeserializeIntoScanner(
@@ -164,10 +170,67 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token) {
scanner_->SetRowFormatFlags(row_format_flags);
}
+ if (scan_node_->filter_ctxs_.size() > 0) {
+ for (const FilterContext& ctx : scan_node_->filter_ctxs_) {
+ MinMaxFilter* filter = ctx.filter->get_min_max();
+ if (filter != nullptr && !filter->AlwaysTrue()) {
+ if (filter->AlwaysFalse()) {
+ // We can skip this entire scan.
+ CloseCurrentClientScanner();
+ *eos = true;
+ return Status::OK();
+ } else {
+ auto it = ctx.filter->filter_desc().planid_to_target_ndx.find(scan_node_->id());
+ const TRuntimeFilterTargetDesc& target_desc =
+ ctx.filter->filter_desc().targets[it->second];
+ const string& col_name = target_desc.kudu_col_name;
+ DCHECK(col_name != "");
+ ColumnType col_type = ColumnType::FromThrift(target_desc.kudu_col_type);
+
+ void* min = filter->GetMin();
+ void* max = filter->GetMax();
+ // If the type of the filter is not the same as the type of the target column,
+ // there must be an implicit integer cast and we need to ensure the min/max we
+ // pass to Kudu are within the range of the target column.
+ int64_t int_min;
+ int64_t int_max;
+ if (col_type.type != filter->type()) {
+ DCHECK(col_type.IsIntegerType());
+
+ if (!filter->GetCastIntMinMax(col_type, &int_min, &int_max)) {
+ // The min/max for this filter is outside the range for the target column,
+ // so all rows are filtered out and we can skip the scan.
+ CloseCurrentClientScanner();
+ *eos = true;
+ return Status::OK();
+ }
+ min = &int_min;
+ max = &int_max;
+ }
+
+ KuduValue* min_value;
+ RETURN_IF_ERROR(CreateKuduValue(filter->type(), min, &min_value));
+ KUDU_RETURN_IF_ERROR(
+ scanner_->AddConjunctPredicate(scan_node_->table_->NewComparisonPredicate(
+ col_name, KuduPredicate::ComparisonOp::GREATER_EQUAL, min_value)),
+ "Failed to add min predicate");
+
+ KuduValue* max_value;
+ RETURN_IF_ERROR(CreateKuduValue(filter->type(), max, &max_value));
+ KUDU_RETURN_IF_ERROR(
+ scanner_->AddConjunctPredicate(scan_node_->table_->NewComparisonPredicate(
+ col_name, KuduPredicate::ComparisonOp::LESS_EQUAL, max_value)),
+ "Failed to add max predicate");
+ }
+ }
+ }
+ }
+
{
SCOPED_TIMER(state_->total_storage_wait_timer());
KUDU_RETURN_IF_ERROR(scanner_->Open(), "Unable to open scanner");
}
+ *eos = false;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index 125881c..e6d4ca9 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -43,8 +43,10 @@ class KuduScanner {
/// Does not actually open a kudu::client::KuduScanner.
Status Open();
- /// Opens a new kudu::client::KuduScanner using 'scan_token'.
- Status OpenNextScanToken(const std::string& scan_token);
+ /// Opens a new kudu::client::KuduScanner using 'scan_token'. If there are no rows to
+ /// scan (eg. because there is a runtime filter that rejects all rows) 'eos' will
+ /// be set to true, otherwise if the return status is OK it will be false.
+ Status OpenNextScanToken(const std::string& scan_token, bool* eos);
/// Fetches the next batch from the current kudu::client::KuduScanner.
Status GetNext(RowBatch* row_batch, bool* eos);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-util.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index 320a77d..03cb51f 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -35,6 +35,7 @@ using kudu::client::KuduSchema;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduColumnSchema;
+using kudu::client::KuduValue;
using DataType = kudu::client::KuduColumnSchema::DataType;
DECLARE_bool(disable_kudu);
@@ -111,17 +112,14 @@ void InitKuduLogging() {
kudu::client::SetVerboseLogLevel(std::max(0, FLAGS_v - 1));
}
-Status WriteKuduTimestampValue(int col, const TimestampValue* tv,
- kudu::KuduPartialRow* row) {
- int64_t ts_micros;
- bool success = tv->UtcToUnixTimeMicros(&ts_micros);
+// Converts a TimestampValue to Kudu's representation which is returned in 'ts_micros'.
+static Status ConvertTimestampValue(const TimestampValue* tv, int64_t* ts_micros) {
+ bool success = tv->UtcToUnixTimeMicros(ts_micros);
DCHECK(success); // If the value was invalid the slot should've been null.
if (UNLIKELY(!success)) {
return Status(TErrorCode::RUNTIME_ERROR,
"Invalid TimestampValue: " + tv->ToString());
}
- KUDU_RETURN_IF_ERROR(row->SetUnixTimeMicros(col, ts_micros),
- "Could not add Kudu WriteOp.");
return Status::OK();
}
@@ -170,8 +168,11 @@ Status WriteKuduValue(int col, PrimitiveType type, const void* value,
"Could not set Kudu row value.");
break;
case TYPE_TIMESTAMP:
- RETURN_IF_ERROR(WriteKuduTimestampValue(col,
- reinterpret_cast<const TimestampValue*>(value), row));
+ int64_t ts_micros;
+ RETURN_IF_ERROR(ConvertTimestampValue(
+ reinterpret_cast<const TimestampValue*>(value), &ts_micros));
+ KUDU_RETURN_IF_ERROR(
+ row->SetUnixTimeMicros(col, ts_micros), "Could not add Kudu WriteOp.");
break;
default:
return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type));
@@ -196,4 +197,47 @@ ColumnType KuduDataTypeToColumnType(DataType type) {
return ColumnType(PrimitiveType::INVALID_TYPE);
}
+Status CreateKuduValue(PrimitiveType type, void* value, KuduValue** out) {
+ switch (type) {
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ const StringValue* sv = reinterpret_cast<const StringValue*>(value);
+ kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->ptr), sv->len);
+ *out = KuduValue::CopyString(slice);
+ break;
+ }
+ case TYPE_FLOAT:
+ *out = KuduValue::FromFloat(*reinterpret_cast<const float*>(value));
+ break;
+ case TYPE_DOUBLE:
+ *out = KuduValue::FromDouble(*reinterpret_cast<const double*>(value));
+ break;
+ case TYPE_BOOLEAN:
+ *out = KuduValue::FromBool(*reinterpret_cast<const bool*>(value));
+ break;
+ case TYPE_TINYINT:
+ *out = KuduValue::FromInt(*reinterpret_cast<const int8_t*>(value));
+ break;
+ case TYPE_SMALLINT:
+ *out = KuduValue::FromInt(*reinterpret_cast<const int16_t*>(value));
+ break;
+ case TYPE_INT:
+ *out = KuduValue::FromInt(*reinterpret_cast<const int32_t*>(value));
+ break;
+ case TYPE_BIGINT:
+ *out = KuduValue::FromInt(*reinterpret_cast<const int64_t*>(value));
+ break;
+ case TYPE_TIMESTAMP: {
+ int64_t ts_micros;
+ RETURN_IF_ERROR(ConvertTimestampValue(
+ reinterpret_cast<const TimestampValue*>(value), &ts_micros));
+ *out = KuduValue::FromInt(ts_micros);
+ break;
+ }
+ default:
+ return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type));
+ }
+ return Status::OK();
+}
+
} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 11cf16a..5fd1140 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -23,6 +23,7 @@ struct tm;
#include <kudu/client/callbacks.h>
#include <kudu/client/client.h>
+#include <kudu/client/value.h>
#include "common/status.h"
#include "runtime/string-value.h"
@@ -84,6 +85,11 @@ void LogKuduMessage(kudu::client::KuduLogSeverity severity, const char* filename
Status WriteKuduValue(int col, PrimitiveType type, const void* value,
bool copy_strings, kudu::KuduPartialRow* row) WARN_UNUSED_RESULT;
+/// Casts 'value' according to 'type' and create a new KuduValue containing 'value' which
+/// is returned in 'out'.
+Status CreateKuduValue(
+ PrimitiveType type, void* value, kudu::client::KuduValue** out) WARN_UNUSED_RESULT;
+
/// Takes a Kudu client DataType and returns the corresponding Impala ColumnType.
ColumnType KuduDataTypeToColumnType(kudu::client::KuduColumnSchema::DataType type);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc
index 8481212..8d8e42d 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -66,6 +66,7 @@ Status PhjBuilder::ProcessBuildBatch(
return status;
}
}
+ for (const FilterContext& ctx : filter_ctxs_) ctx.MaterializeValues();
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 584b42d..a86b87c 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -34,6 +34,7 @@
#include "runtime/runtime-filter.h"
#include "runtime/runtime-state.h"
#include "util/bloom-filter.h"
+#include "util/min-max-filter.h"
#include "util/runtime-profile-counters.h"
#include "gen-cpp/PlanNodes_types.h"
@@ -469,9 +470,16 @@ void PhjBuilder::AllocateRuntimeFilters() {
<< "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN";
DCHECK(ht_ctx_ != NULL);
for (int i = 0; i < filter_ctxs_.size(); ++i) {
- filter_ctxs_[i].local_bloom_filter =
- runtime_state_->filter_bank()->AllocateScratchBloomFilter(
- filter_ctxs_[i].filter->id());
+ if (filter_ctxs_[i].filter->is_bloom_filter()) {
+ filter_ctxs_[i].local_bloom_filter =
+ runtime_state_->filter_bank()->AllocateScratchBloomFilter(
+ filter_ctxs_[i].filter->id());
+ } else {
+ DCHECK(filter_ctxs_[i].filter->is_min_max_filter());
+ filter_ctxs_[i].local_min_max_filter =
+ runtime_state_->filter_bank()->AllocateScratchMinMaxFilter(
+ filter_ctxs_[i].filter->id(), filter_ctxs_[i].expr_eval->root().type());
+ }
}
}
@@ -491,12 +499,22 @@ void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
for (const FilterContext& ctx : filter_ctxs_) {
// TODO: Consider checking actual number of bits set in filter to compute FP rate.
// TODO: Consider checking this every few batches or so.
- bool fp_rate_too_high = runtime_state_->filter_bank()->FpRateTooHigh(
- ctx.filter->filter_size(), num_build_rows);
- runtime_state_->filter_bank()->UpdateFilterFromLocal(ctx.filter->id(),
- fp_rate_too_high ? BloomFilter::ALWAYS_TRUE_FILTER : ctx.local_bloom_filter);
+ BloomFilter* bloom_filter = nullptr;
+ if (ctx.local_bloom_filter != nullptr) {
+ if (runtime_state_->filter_bank()->FpRateTooHigh(
+ ctx.filter->filter_size(), num_build_rows)) {
+ bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+ } else {
+ bloom_filter = ctx.local_bloom_filter;
+ ++num_enabled_filters;
+ }
+ } else if (ctx.local_min_max_filter != nullptr
+ && !ctx.local_min_max_filter->AlwaysTrue()) {
+ ++num_enabled_filters;
+ }
- num_enabled_filters += !fp_rate_too_high;
+ runtime_state_->filter_bank()->UpdateFilterFromLocal(
+ ctx.filter->id(), bloom_filter, ctx.local_min_max_filter);
}
if (filter_ctxs_.size() > 0) {
@@ -959,7 +977,8 @@ Status PhjBuilder::CodegenInsertRuntimeFilters(
int num_filters = filter_exprs.size();
for (int i = 0; i < num_filters; ++i) {
llvm::Function* insert_fn;
- RETURN_IF_ERROR(FilterContext::CodegenInsert(codegen, filter_exprs_[i], &insert_fn));
+ RETURN_IF_ERROR(FilterContext::CodegenInsert(
+ codegen, filter_exprs_[i], &filter_ctxs_[i], &insert_fn));
llvm::PointerType* filter_context_type =
codegen->GetPtrType(FilterContext::LLVM_CLASS_NAME);
llvm::Value* filter_context_ptr =
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 18fc473..27726f8 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -75,12 +75,16 @@ Status ScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
filter_ctxs_.emplace_back();
FilterContext& filter_ctx = filter_ctxs_.back();
filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false);
- string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id,
- PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
- RuntimeProfile* profile =
- RuntimeProfile::Create(state->obj_pool(), filter_profile_title);
- runtime_profile_->AddChild(profile);
- filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile));
+ // TODO: Enable stats for min-max filters when Kudu exposes info about filters
+ // (KUDU-2162).
+ if (filter_ctx.filter->is_bloom_filter()) {
+ string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id,
+ PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
+ RuntimeProfile* profile =
+ RuntimeProfile::Create(state->obj_pool(), filter_profile_title);
+ runtime_profile_->AddChild(profile);
+ filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile));
+ }
}
return Status::OK();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/coordinator-filter-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index 08944b8..2cb0602 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -43,13 +43,14 @@ struct Coordinator::FilterTarget {
fragment_idx(f_idx) {}
};
-/// State of filters that are received for aggregation.
+/// State of runtime filters that are received for aggregation. A runtime filter will
+/// contain a bloom or min-max filter.
///
/// A broadcast join filter is published as soon as the first update is received for it
/// and subsequent updates are ignored (as they will be the same).
-/// Updates for a partitioned join filter are aggregated in 'bloom_filter' and this is
-/// published once 'pending_count' reaches 0 and if the filter was not disabled before
-/// that.
+/// Updates for a partitioned join filter are aggregated and then published once
+/// 'pending_count' reaches 0 and if the filter was not disabled before that.
+///
///
/// A filter is disabled if an always_true filter update is received, an OOM is hit,
/// filter aggregation is complete or if the query is complete.
@@ -61,9 +62,11 @@ class Coordinator::FilterState {
completion_time_(0L) {
// bloom_filter_ is a disjunction so the unit value is always_false.
bloom_filter_.always_false = true;
+ min_max_filter_.always_false = true;
}
TBloomFilter& bloom_filter() { return bloom_filter_; }
+ TMinMaxFilter& min_max_filter() { return min_max_filter_; }
boost::unordered_set<int>* src_fragment_instance_idxs() {
return &src_fragment_instance_idxs_;
}
@@ -76,9 +79,18 @@ class Coordinator::FilterState {
int64_t completion_time() const { return completion_time_; }
const TPlanNodeId& src() const { return src_; }
const TRuntimeFilterDesc& desc() const { return desc_; }
+ bool is_bloom_filter() const { return desc_.type == TRuntimeFilterType::BLOOM; }
+ bool is_min_max_filter() const { return desc_.type == TRuntimeFilterType::MIN_MAX; }
int pending_count() const { return pending_count_; }
void set_pending_count(int pending_count) { pending_count_ = pending_count; }
- bool disabled() const { return bloom_filter_.always_true; }
+ bool disabled() const {
+ if (is_bloom_filter()) {
+ return bloom_filter_.always_true;
+ } else {
+ DCHECK(is_min_max_filter());
+ return min_max_filter_.always_true;
+ }
+ }
/// Aggregates partitioned join filters and updates memory consumption.
/// Disables filter if always_true filter is received or OOM is hit.
@@ -100,13 +112,14 @@ class Coordinator::FilterState {
/// Number of remaining backends to hear from before filter is complete.
int pending_count_;
- /// BloomFilter aggregated from all source plan nodes, to be broadcast to all
+ /// Filters aggregated from all source plan nodes, to be broadcast to all
/// destination plan fragment instances. Only set for partitioned joins (broadcast joins
/// need no aggregation).
/// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the
/// output structure in the case of a broadcast join. Similarly, for partitioned joins,
/// the filter is moved from the following member to the output structure.
TBloomFilter bloom_filter_;
+ TMinMaxFilter min_max_filter_;
/// Time at which first local filter arrived.
int64_t first_arrival_time_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 772da33..d0e7b90 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -43,6 +43,7 @@
#include "util/hdfs-bulk-ops.h"
#include "util/hdfs-util.h"
#include "util/histogram-metric.h"
+#include "util/min-max-filter.h"
#include "util/table-printer.h"
#include "common/names.h"
@@ -321,7 +322,7 @@ void Coordinator::InitFilterRoutingTable() {
f->src_fragment_instance_idxs()->insert(src_idxs.begin(), src_idxs.end());
// target plan node of filter
- } else if (plan_node.__isset.hdfs_scan_node) {
+ } else if (plan_node.__isset.hdfs_scan_node || plan_node.__isset.kudu_scan_node) {
auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
DCHECK(it != filter.planid_to_target_ndx.end());
const TRuntimeFilterTargetDesc& t_target = filter.targets[it->second];
@@ -1125,16 +1126,23 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
target_fragment_idxs.insert(target.fragment_idx);
}
- // Assign outgoing bloom filter.
- TBloomFilter& aggregated_filter = state->bloom_filter();
- filter_mem_tracker_->Release(aggregated_filter.directory.size());
+ if (state->is_bloom_filter()) {
+ // Assign outgoing bloom filter.
+ TBloomFilter& aggregated_filter = state->bloom_filter();
+ filter_mem_tracker_->Release(aggregated_filter.directory.size());
+
+ // TODO: Track memory used by 'rpc_params'.
+ swap(rpc_params.bloom_filter, aggregated_filter);
+ DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true
+ || !rpc_params.bloom_filter.directory.empty());
+ DCHECK(aggregated_filter.directory.empty());
+ rpc_params.__isset.bloom_filter = true;
+ } else {
+ DCHECK(state->is_min_max_filter());
+ MinMaxFilter::Copy(state->min_max_filter(), &rpc_params.min_max_filter);
+ rpc_params.__isset.min_max_filter = true;
+ }
- // TODO: Track memory used by 'rpc_params'.
- swap(rpc_params.bloom_filter, aggregated_filter);
- DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true ||
- !rpc_params.bloom_filter.directory.empty());
- DCHECK(aggregated_filter.directory.empty());
- rpc_params.__isset.bloom_filter = true;
// Filter is complete, and can be released.
state->Disable(filter_mem_tracker_);
}
@@ -1160,27 +1168,40 @@ void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
}
--pending_count_;
- if (params.bloom_filter.always_true) {
- Disable(coord->filter_mem_tracker_);
- } else if (bloom_filter_.always_false) {
- int64_t heap_space = params.bloom_filter.directory.size();
- if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
- VLOG_QUERY << "Not enough memory to allocate filter: "
- << PrettyPrinter::Print(heap_space, TUnit::BYTES)
- << " (query: " << coord->query_id() << ")";
- // Disable, as one missing update means a correct filter cannot be produced.
+ if (is_bloom_filter()) {
+ DCHECK(params.__isset.bloom_filter);
+ if (params.bloom_filter.always_true) {
Disable(coord->filter_mem_tracker_);
+ } else if (bloom_filter_.always_false) {
+ int64_t heap_space = params.bloom_filter.directory.size();
+ if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
+ VLOG_QUERY << "Not enough memory to allocate filter: "
+ << PrettyPrinter::Print(heap_space, TUnit::BYTES)
+ << " (query: " << coord->query_id() << ")";
+ // Disable, as one missing update means a correct filter cannot be produced.
+ Disable(coord->filter_mem_tracker_);
+ } else {
+ // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to
+ // move the payload from the request rather than copy it and take double the
+ // memory cost. After this point, params.bloom_filter is an empty filter and
+ // should not be read.
+ TBloomFilter* non_const_filter = &const_cast<TBloomFilter&>(params.bloom_filter);
+ swap(bloom_filter_, *non_const_filter);
+ DCHECK_EQ(non_const_filter->directory.size(), 0);
+ }
} else {
- // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to
- // move the payload from the request rather than copy it and take double the memory
- // cost. After this point, params.bloom_filter is an empty filter and should not be
- // read.
- TBloomFilter* non_const_filter = &const_cast<TBloomFilter&>(params.bloom_filter);
- swap(bloom_filter_, *non_const_filter);
- DCHECK_EQ(non_const_filter->directory.size(), 0);
+ BloomFilter::Or(params.bloom_filter, &bloom_filter_);
}
} else {
- BloomFilter::Or(params.bloom_filter, &bloom_filter_);
+ DCHECK(is_min_max_filter());
+ DCHECK(params.__isset.min_max_filter);
+ if (params.min_max_filter.always_true) {
+ Disable(coord->filter_mem_tracker_);
+ } else if (min_max_filter_.always_false) {
+ MinMaxFilter::Copy(params.min_max_filter, &min_max_filter_);
+ } else {
+ MinMaxFilter::Or(params.min_max_filter, &min_max_filter_);
+ }
}
if (pending_count_ == 0 || disabled()) {
@@ -1189,11 +1210,17 @@ void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
}
void Coordinator::FilterState::Disable(MemTracker* tracker) {
- bloom_filter_.always_true = true;
- bloom_filter_.always_false = false;
- tracker->Release(bloom_filter_.directory.size());
- bloom_filter_.directory.clear();
- bloom_filter_.directory.shrink_to_fit();
+ if (is_bloom_filter()) {
+ bloom_filter_.always_true = true;
+ bloom_filter_.always_false = false;
+ tracker->Release(bloom_filter_.directory.size());
+ bloom_filter_.directory.clear();
+ bloom_filter_.directory.shrink_to_fit();
+ } else {
+ DCHECK(is_min_max_filter());
+ min_max_filter_.always_true = true;
+ min_max_filter_.always_false = false;
+ }
}
const TUniqueId& Coordinator::query_id() const {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index f957dd1..bf437ba 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -414,13 +414,12 @@ Status FragmentInstanceState::WaitForOpen() {
return opened_promise_.Get();
}
-void FragmentInstanceState::PublishFilter(
- int32_t filter_id, const TBloomFilter& thrift_bloom_filter) {
+void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
- << " filter_id=" << filter_id;
+ << " filter_id=" << params.filter_id;
// Wait until Prepare() is done, so we know that the filter bank is set up.
if (!WaitForPrepare().ok()) return;
- runtime_state_->filter_bank()->PublishGlobalFilter(filter_id, thrift_bloom_filter);
+ runtime_state_->filter_bank()->PublishGlobalFilter(params);
}
const TQueryCtx& FragmentInstanceState::query_ctx() const {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index f540b56..4e832f6 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -99,7 +99,7 @@ class FragmentInstanceState {
Status WaitForOpen();
/// Publishes filter with ID 'filter_id' to this fragment instance's filter bank.
- void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter);
+ void PublishFilter(const TPublishFilterParams& params);
/// Returns fragment instance's sink if this is the root fragment instance. Valid after
/// the Prepare phase. May be nullptr.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 6796c82..6bc2591 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -397,12 +397,11 @@ void QueryState::Cancel() {
for (auto entry: fis_map_) entry.second->Cancel();
}
-void QueryState::PublishFilter(int32_t filter_id, int fragment_idx,
- const TBloomFilter& thrift_bloom_filter) {
+void QueryState::PublishFilter(const TPublishFilterParams& params) {
if (!instances_prepared_promise_.Get().ok()) return;
- DCHECK_EQ(fragment_map_.count(fragment_idx), 1);
- for (FragmentInstanceState* fis: fragment_map_[fragment_idx]) {
- fis->PublishFilter(filter_id, thrift_bloom_filter);
+ DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx), 1);
+ for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx]) {
+ fis->PublishFilter(params);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 82f2c52..f7b83a7 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -142,8 +142,7 @@ class QueryState {
FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
/// Blocks until all fragment instances have finished their Prepare phase.
- void PublishFilter(int32_t filter_id, int fragment_idx,
- const TBloomFilter& thrift_bloom_filter);
+ void PublishFilter(const TPublishFilterParams& params);
/// Cancels all actively executing fragment instances. Blocks until all fragment
/// instances have finished their Prepare phase. Idempotent.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 2ae65c8..178aef1 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -27,6 +27,7 @@
#include "service/impala-server.h"
#include "util/bit-util.h"
#include "util/bloom-filter.h"
+#include "util/min-max-filter.h"
#include "common/names.h"
@@ -41,8 +42,12 @@ const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
- : state_(state), closed_(false) {
- memory_allocated_ =
+ : state_(state),
+ filter_mem_tracker_(
+ new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false)),
+ mem_pool_(filter_mem_tracker_.get()),
+ closed_(false) {
+ bloom_memory_allocated_ =
state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
// Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
@@ -66,9 +71,6 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s
default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
default_filter_size_ =
BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
-
- filter_mem_tracker_.reset(
- new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false));
}
RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
@@ -115,27 +117,28 @@ void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params
}
-void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
- BloomFilter* bloom_filter) {
+void RuntimeFilterBank::UpdateFilterFromLocal(
+ int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
<< "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
TUpdateFilterParams params;
// A runtime filter may have both local and remote targets.
bool has_local_target = false;
bool has_remote_target = false;
+ TRuntimeFilterType::type type;
{
lock_guard<mutex> l(runtime_filter_lock_);
RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
<< filter_id;
- it->second->SetBloomFilter(bloom_filter);
+ it->second->SetFilter(bloom_filter, min_max_filter);
has_local_target = it->second->filter_desc().has_local_targets;
has_remote_target = it->second->filter_desc().has_remote_targets;
+ type = it->second->filter_desc().type;
}
if (has_local_target) {
- // Do a short circuit publication by pushing the same BloomFilter to the consumer
- // side.
+ // Do a short circuit publication by pushing the same filter to the consumer side.
RuntimeFilter* filter;
{
lock_guard<mutex> l(runtime_filter_lock_);
@@ -143,7 +146,7 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
if (it == consumed_filters_.end()) return;
filter = it->second;
}
- filter->SetBloomFilter(bloom_filter);
+ filter->SetFilter(bloom_filter, min_max_filter);
state_->runtime_profile()->AddInfoString(
Substitute("Filter $0 arrival", filter_id),
PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
@@ -153,8 +156,14 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
&& state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
params.__set_filter_id(filter_id);
params.__set_query_id(state_->query_id());
- BloomFilter::ToThrift(bloom_filter, ¶ms.bloom_filter);
- params.__isset.bloom_filter = true;
+ if (type == TRuntimeFilterType::BLOOM) {
+ BloomFilter::ToThrift(bloom_filter, ¶ms.bloom_filter);
+ params.__isset.bloom_filter = true;
+ } else {
+ DCHECK(type == TRuntimeFilterType::MIN_MAX);
+ min_max_filter->ToThrift(¶ms.min_max_filter);
+ params.__isset.min_max_filter = true;
+ }
ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
SendFilterToCoordinator, state_->query_ctx().coord_address, params,
@@ -162,32 +171,43 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
}
}
-void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id,
- const TBloomFilter& thrift_filter) {
+void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params) {
lock_guard<mutex> l(runtime_filter_lock_);
if (closed_) return;
- RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
+ RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id);
DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
- << filter_id;
- if (thrift_filter.always_true) {
- it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
- } else {
- int64_t required_space =
- BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
- // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
- // there's not enough memory for it.
- if (!filter_mem_tracker_->TryConsume(required_space)) {
- VLOG_QUERY << "No memory for global filter: " << filter_id
- << " (fragment instance: " << state_->fragment_instance_id() << ")";
- it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
+ << params.filter_id;
+
+ BloomFilter* bloom_filter = nullptr;
+ MinMaxFilter* min_max_filter = nullptr;
+ if (it->second->is_bloom_filter()) {
+ DCHECK(params.__isset.bloom_filter);
+ if (params.bloom_filter.always_true) {
+ bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
} else {
- BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter));
- DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
- memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
- it->second->SetBloomFilter(bloom_filter);
+ int64_t required_space =
+ BloomFilter::GetExpectedHeapSpaceUsed(params.bloom_filter.log_heap_space);
+ // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
+ // there's not enough memory for it.
+ if (!filter_mem_tracker_->TryConsume(required_space)) {
+ VLOG_QUERY << "No memory for global filter: " << params.filter_id
+ << " (fragment instance: " << state_->fragment_instance_id() << ")";
+ bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+ } else {
+ bloom_filter = obj_pool_.Add(new BloomFilter(params.bloom_filter));
+ DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
+ bloom_memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+ }
}
+ } else {
+ DCHECK(it->second->is_min_max_filter());
+ DCHECK(params.__isset.min_max_filter);
+ min_max_filter = MinMaxFilter::Create(
+ params.min_max_filter, it->second->type(), &obj_pool_, &mem_pool_);
}
- state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id),
+ it->second->SetFilter(bloom_filter, min_max_filter);
+ state_->runtime_profile()->AddInfoString(
+ Substitute("Filter $0 arrival", params.filter_id),
PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS));
}
@@ -204,10 +224,21 @@ BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
if (!filter_mem_tracker_->TryConsume(required_space)) return NULL;
BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
- memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+ bloom_memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
return bloom_filter;
}
+MinMaxFilter* RuntimeFilterBank::AllocateScratchMinMaxFilter(
+ int32_t filter_id, ColumnType type) {
+ lock_guard<mutex> l(runtime_filter_lock_);
+ if (closed_) return nullptr;
+
+ RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
+ DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
+
+ return MinMaxFilter::Create(type, &obj_pool_, &mem_pool_);
+}
+
int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
if (ndv == -1) return default_filter_size_;
int64_t required_space =
@@ -227,6 +258,7 @@ void RuntimeFilterBank::Close() {
lock_guard<mutex> l(runtime_filter_lock_);
closed_ = true;
obj_pool_.Clear();
- filter_mem_tracker_->Release(memory_allocated_->value());
+ mem_pool_.FreeAll();
+ filter_mem_tracker_->Release(bloom_memory_allocated_->value());
filter_mem_tracker_->Close();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter-bank.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index d8be8ab..8f6bb42 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -20,6 +20,7 @@
#include "codegen/impala-ir.h"
#include "common/object-pool.h"
+#include "runtime/mem-pool.h"
#include "runtime/types.h"
#include "util/runtime-profile.h"
@@ -31,6 +32,7 @@ namespace impala {
class BloomFilter;
class MemTracker;
+class MinMaxFilter;
class RuntimeFilter;
class RuntimeState;
class TBloomFilter;
@@ -47,9 +49,10 @@ class TQueryCtx;
/// RuntimeFilterBank treats each filter independently.
///
/// All filters must be registered with the filter bank via RegisterFilter(). Local plan
-/// fragments update the bloom filters by calling UpdateFilterFromLocal()
-/// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The
-/// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by
+/// fragments update the filters by calling UpdateFilterFromLocal() (which may only be
+/// called once per filter ID per filter bank), with either a bloom filter or a min-max
+/// filter, depending on the filter's type. The 'bloom_filter' or 'min_max_filter' that is
+/// passed into UpdateFilterFromLocal() must have been allocated by
/// AllocateScratchBloomFilter(); this allows RuntimeFilterBank to manage all memory
/// associated with filters.
///
@@ -58,9 +61,10 @@ class TQueryCtx;
///
/// After PublishGlobalFilter() has been called (and again, it may only be called once per
/// filter_id), the RuntimeFilter object associated with filter_id will have a valid
-/// bloom_filter, and may be used for filter evaluation. This operation occurs without
-/// synchronisation, and neither the thread that calls PublishGlobalFilter() nor the
-/// thread that may call RuntimeFilter::Eval() need to coordinate in any way.
+/// bloom_filter or min_max_filter, and may be used for filter evaluation. This
+/// operation occurs without synchronisation, and neither the thread that calls
+/// PublishGlobalFilter() nor the thread that may call RuntimeFilter::Eval() need to
+/// coordinate in any way.
class RuntimeFilterBank {
public:
RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state);
@@ -70,14 +74,16 @@ class RuntimeFilterBank {
/// bloom_filter itself is unallocated until the first call to PublishGlobalFilter().
RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer);
- /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some
- /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a
- /// full filter that contains all elements.
- void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter);
+ /// Updates a filter's 'bloom_filter' or 'min_max_filter' which has been produced by
+ /// some operator in the local fragment instance. At most one of 'bloom_filter' and
+ /// 'min_max_filter' may be non-NULL, depending on the filter's type. They may both be
+ /// NULL, representing a filter that allows all rows to pass.
+ void UpdateFilterFromLocal(
+ int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
/// Makes a bloom_filter (aggregated globally from all producer fragments) available for
/// consumption by operators that wish to use it for filtering.
- void PublishGlobalFilter(int32_t filter_id, const TBloomFilter& thrift_filter);
+ void PublishGlobalFilter(const TPublishFilterParams& params);
/// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
/// 'filter_size' would have an expected false-positive rate which would exceed
@@ -100,6 +106,9 @@ class RuntimeFilterBank {
/// If there is not enough memory, or if Close() has been called first, returns NULL.
BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);
+ /// Returns a new MinMaxFilter. Handles memory the same as AllocateScratchBloomFilter().
+ MinMaxFilter* AllocateScratchMinMaxFilter(int32_t filter_id, ColumnType type);
+
/// Default hash seed to use when computing hashed values to insert into filters.
static int32_t IR_ALWAYS_INLINE DefaultHashSeed() { return 1234; }
@@ -136,12 +145,15 @@ class RuntimeFilterBank {
/// MemTracker to track Bloom filter memory.
boost::scoped_ptr<MemTracker> filter_mem_tracker_;
+ // Mem pool to track allocations made by filters.
+ MemPool mem_pool_;
+
/// True iff Close() has been called. Used to prevent races between
/// AllocateScratchBloomFilter() and Close().
bool closed_;
/// Total amount of memory allocated to Bloom Filters
- RuntimeProfile::Counter* memory_allocated_;
+ RuntimeProfile::Counter* bloom_memory_allocated_;
/// Precomputed default BloomFilter size.
int64_t default_filter_size_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-ir.cc b/be/src/runtime/runtime-filter-ir.cc
index 4e386cb..6436213 100644
--- a/be/src/runtime/runtime-filter-ir.cc
+++ b/be/src/runtime/runtime-filter-ir.cc
@@ -21,10 +21,9 @@ using namespace impala;
bool IR_ALWAYS_INLINE RuntimeFilter::Eval(
void* val, const ColumnType& col_type) const noexcept {
- // Safe to read bloom_filter_ concurrently with any ongoing SetBloomFilter() thanks
- // to a) the atomicity of / pointer assignments and b) the x86 TSO memory model.
- if (bloom_filter_ == BloomFilter::ALWAYS_TRUE_FILTER) return true;
+ DCHECK(is_bloom_filter());
+ if (bloom_filter_.Load() == BloomFilter::ALWAYS_TRUE_FILTER) return true;
uint32_t h = RawValue::GetHashValue(val, col_type,
RuntimeFilterBank::DefaultHashSeed());
- return bloom_filter_->Find(h);
+ return bloom_filter_.Load()->Find(h);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 228094e..a2fd30e 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -29,9 +29,9 @@ const char* RuntimeFilter::LLVM_CLASS_NAME = "class.impala::RuntimeFilter";
bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
do {
- if (HasBloomFilter()) return true;
+ if (HasFilter()) return true;
SleepForMs(SLEEP_PERIOD_MS);
} while ((MonotonicMillis() - registration_time_) < timeout_ms);
- return HasBloomFilter();
+ return HasFilter();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 5d9531b..40c5f23 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -29,33 +29,45 @@ namespace impala {
class BloomFilter;
-/// RuntimeFilters represent set-membership predicates (implemented with bloom filters)
-/// that are computed during query execution (rather than during planning). They can then
-/// be sent to other operators to reduce their output. For example, a RuntimeFilter might
-/// compute a predicate corresponding to set membership, where the members of that set can
-/// only be computed at runtime (for example, the distinct values of the build side of a
-/// hash table). Other plan nodes can use that predicate by testing for membership of that
-/// set to filter rows early on in the plan tree (e.g. the scan that feeds the probe side
-/// of that join node could eliminate rows from consideration for join matching).
+/// RuntimeFilters represent set-membership predicates that are computed during query
+/// execution (rather than during planning). They can then be sent to other operators to
+/// reduce their output. For example, a RuntimeFilter might compute a predicate
+/// corresponding to set membership, where the members of that set can only be computed at
+/// runtime (for example, the distinct values of the build side of a hash table). Other
+/// plan nodes can use that predicate by testing for membership of that set to filter rows
+/// early on in the plan tree (e.g. the scan that feeds the probe side of that join node
+/// could eliminate rows from consideration for join matching).
+///
+/// A RuntimeFilter may compute its set-membership predicate as a bloom filters or a
+/// min-max filter, depending on its filter description.
class RuntimeFilter {
public:
RuntimeFilter(const TRuntimeFilterDesc& filter, int64_t filter_size)
- : bloom_filter_(NULL), filter_desc_(filter), arrival_time_(0L),
+ : bloom_filter_(nullptr), min_max_filter_(nullptr), filter_desc_(filter),
+ registration_time_(MonotonicMillis()), arrival_time_(0L),
filter_size_(filter_size) {
DCHECK_GT(filter_size_, 0);
- registration_time_ = MonotonicMillis();
}
- /// Returns true if SetBloomFilter() has been called.
- bool HasBloomFilter() const { return arrival_time_ != 0; }
+ /// Returns true if SetFilter() has been called.
+ bool HasFilter() const { return arrival_time_.Load() != 0; }
const TRuntimeFilterDesc& filter_desc() const { return filter_desc_; }
int32_t id() const { return filter_desc().filter_id; }
int64_t filter_size() const { return filter_size_; }
+ ColumnType type() const {
+ return ColumnType::FromThrift(filter_desc().src_expr.nodes[0].type);
+ }
+ bool is_bloom_filter() const { return filter_desc().type == TRuntimeFilterType::BLOOM; }
+ bool is_min_max_filter() const {
+ return filter_desc().type == TRuntimeFilterType::MIN_MAX;
+ }
+
+ MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); }
/// Sets the internal filter bloom_filter to 'bloom_filter'. Can only legally be called
/// once per filter. Does not acquire the memory associated with 'bloom_filter'.
- inline void SetBloomFilter(BloomFilter* bloom_filter);
+ inline void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
/// Returns false iff 'bloom_filter_' has been set via SetBloomFilter() and hash[val] is
/// not in that 'bloom_filter_'. Otherwise returns true. Is safe to call concurrently
@@ -67,8 +79,8 @@ class RuntimeFilter {
/// Returns the amount of time waited since registration for the filter to
/// arrive. Returns 0 if filter has not yet arrived.
int32_t arrival_delay() const {
- if (arrival_time_ == 0L) return 0L;
- return arrival_time_ - registration_time_;
+ if (arrival_time_.Load() == 0L) return 0L;
+ return arrival_time_.Load() - registration_time_;
}
/// Periodically (every 20ms) checks to see if the global filter has arrived. Waits for
@@ -88,21 +100,26 @@ class RuntimeFilter {
static const char* LLVM_CLASS_NAME;
private:
- /// Membership bloom_filter. May be NULL even after arrival_time_ is set. This is a
- /// compact way of representing a full Bloom filter that contains every element.
- BloomFilter* bloom_filter_;
+ /// Membership bloom_filter. May be NULL even after arrival_time_ is set, meaning that
+ /// it does not filter any rows, either because it was not created
+ /// (filter_desc_.bloom_filter is false), there was not enough memory, or the false
+ /// positive rate was determined to be too high.
+ AtomicPtr<BloomFilter> bloom_filter_;
+
+ /// May be NULL even after arrival_time_ is set if filter_desc_.min_max_filter is false.
+ AtomicPtr<MinMaxFilter> min_max_filter_;
/// Reference to the filter's thrift descriptor in the thrift Plan tree.
const TRuntimeFilterDesc& filter_desc_;
/// Time, in ms, that the filter was registered.
- int64_t registration_time_;
+ const int64_t registration_time_;
- /// Time, in ms, that the global fiter arrived. Set in SetBloomFilter().
- int64_t arrival_time_;
+ /// Time, in ms, that the global filter arrived. Set in SetFilter().
+ AtomicInt64 arrival_time_;
/// The size of the Bloom filter, in bytes.
- int64_t filter_size_;
+ const int64_t filter_size_;
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/runtime-filter.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.inline.h b/be/src/runtime/runtime-filter.inline.h
index 128cafd..b2de81d 100644
--- a/be/src/runtime/runtime-filter.inline.h
+++ b/be/src/runtime/runtime-filter.inline.h
@@ -25,6 +25,7 @@
#include "runtime/raw-value.inline.h"
#include "util/bloom-filter.h"
+#include "util/min-max-filter.h"
#include "util/time.h"
namespace impala {
@@ -36,21 +37,35 @@ inline const RuntimeFilter* RuntimeFilterBank::GetRuntimeFilter(int32_t filter_i
return it->second;
}
-inline void RuntimeFilter::SetBloomFilter(BloomFilter* bloom_filter) {
- DCHECK(bloom_filter_ == NULL);
- // TODO: Barrier required here to ensure compiler does not both inline and re-order
- // this assignment. Not an issue for correctness (as assignment is atomic), but
- // potentially confusing.
- bloom_filter_ = bloom_filter;
- arrival_time_ = MonotonicMillis();
+inline void RuntimeFilter::SetFilter(
+ BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
+ DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
+ if (is_bloom_filter()) {
+ bloom_filter_.Store(bloom_filter);
+ } else {
+ DCHECK(is_min_max_filter());
+ min_max_filter_.Store(min_max_filter);
+ }
+ arrival_time_.Store(MonotonicMillis());
}
-inline bool RuntimeFilter::AlwaysTrue() const {
- return HasBloomFilter() && bloom_filter_ == BloomFilter::ALWAYS_TRUE_FILTER;
+inline bool RuntimeFilter::AlwaysTrue() const {
+ if (is_bloom_filter()) {
+ return HasFilter() && bloom_filter_.Load() == BloomFilter::ALWAYS_TRUE_FILTER;
+ } else {
+ DCHECK(is_min_max_filter());
+ return HasFilter() && min_max_filter_.Load()->AlwaysTrue();
+ }
}
inline bool RuntimeFilter::AlwaysFalse() const {
- return bloom_filter_ != BloomFilter::ALWAYS_TRUE_FILTER && bloom_filter_->AlwaysFalse();
+ if (is_bloom_filter()) {
+ return bloom_filter_.Load() != BloomFilter::ALWAYS_TRUE_FILTER
+ && bloom_filter_.Load()->AlwaysFalse();
+ } else {
+ DCHECK(is_min_max_filter());
+ return min_max_filter_.Load() != nullptr && min_max_filter_.Load()->AlwaysFalse();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/runtime/timestamp-value.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index 556225b..445189a 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -26,6 +26,7 @@
#include <gflags/gflags.h>
#include <string>
+#include "gen-cpp/Data_types.h"
#include "udf/udf.h"
#include "util/hash-util.h"
@@ -150,6 +151,20 @@ class TimestampValue {
*ptp = boost::posix_time::ptime(date_, time_);
}
+ // Store the binary representation of this TimestampValue in 'tvalue'.
+ void ToTColumnValue(TColumnValue* tvalue) const {
+ const uint8_t* data = reinterpret_cast<const uint8_t*>(this);
+ tvalue->timestamp_val.assign(data, data + Size());
+ tvalue->__isset.timestamp_val = true;
+ }
+
+ // Returns a new TimestampValue created from the value in 'tvalue'.
+ static TimestampValue FromTColumnValue(const TColumnValue& tvalue) {
+ TimestampValue value;
+ memcpy(&value, tvalue.timestamp_val.c_str(), Size());
+ return value;
+ }
+
bool HasDate() const { return !date_.is_special(); }
bool HasTime() const { return !time_.is_special(); }
bool HasDateOrTime() const { return HasDate() || HasTime(); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/service/impala-internal-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index d8a2a4a..5be8765 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -93,7 +93,7 @@ void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);
DCHECK(params.__isset.filter_id);
DCHECK(params.__isset.query_id);
- DCHECK(params.__isset.bloom_filter);
+ DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
impala_server_->UpdateFilter(return_val, params);
}
@@ -103,8 +103,8 @@ void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
DCHECK(params.__isset.filter_id);
DCHECK(params.__isset.dst_query_id);
DCHECK(params.__isset.dst_fragment_idx);
- DCHECK(params.__isset.bloom_filter);
+ DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
QueryState::ScopedRef qs(params.dst_query_id);
if (qs.get() == nullptr) return;
- qs->PublishFilter(params.filter_id, params.dst_fragment_idx, params.bloom_filter);
+ qs->PublishFilter(params);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 4ff03d6..08002ed 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -55,6 +55,8 @@ add_library(Util
mem-info.cc
memory-metrics.cc
metrics.cc
+ min-max-filter.cc
+ min-max-filter-ir.cc
minidump.cc
network-util.cc
openssl-util.cc
@@ -120,6 +122,7 @@ ADD_BE_TEST(internal-queue-test)
ADD_BE_TEST(logging-support-test)
ADD_BE_TEST(lru-cache-test)
ADD_BE_TEST(metrics-test)
+ADD_BE_TEST(min-max-filter-test)
ADD_BE_TEST(openssl-util-test)
ADD_BE_TEST(parse-util-test)
#ADD_BE_TEST(perf-counters-test)