You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/04/12 23:18:40 UTC

[06/50] incubator-impala git commit: IMPALA-3141: Send dummy filters when filter production is disabled

IMPALA-3141: Send dummy filters when filter production is disabled

The PHJ may disable runtime filter production for one of several
reasons, including a predicted high false-positive rate. If the filters
are not produced, any scans will wait for their entire timeout before
continuing.

This patch changes the filter logic to always send a filter, even if one
wasn't actually produced by the PHJ. To preserve correctness, that
filter must contain every element of the set. Such a filter is
represented by (BloomFilter*)NULL. This allows us to make no changes to
RuntimeFilter::Eval(), which already returns true if the member Bloom
filter is NULL.

In RPCs, a new field is added to TBloomFilter to identify filters that
are always true.

The HdfsParquetScanner checks to see if filters would always return true
for any element, and disables them if so.

There is some miscellaneous cleanup in this patch, particularly the
removal of unused members in BloomFilter.

This patch has been manually tested on queries that would otherwise take
a long time to time-out. A unit test was added to ensure that queries do
not wait.

Change-Id: I04b3e6542651c1e7b77a9bab01d0e3d9506af42f
Reviewed-on: http://gerrit.cloudera.org:8080/2475
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0d1eab7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0d1eab7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0d1eab7a

Branch: refs/heads/master
Commit: 0d1eab7a9ebc7e4653f9043ef90672c6f3088c3b
Parents: c06912e
Author: Henry Robinson <he...@cloudera.com>
Authored: Fri Mar 4 16:08:58 2016 -0800
Committer: Henry Robinson <he...@cloudera.com>
Committed: Thu Mar 24 23:17:50 2016 +0000

----------------------------------------------------------------------
 be/src/benchmarks/bloom-filter-benchmark.cc     |  7 +-
 be/src/exec/blocking-join-node.cc               |  2 +-
 be/src/exec/blocking-join-node.h                |  8 +-
 be/src/exec/hash-join-node.cc                   |  4 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  7 +-
 be/src/exec/hdfs-scan-node.cc                   |  2 +-
 be/src/exec/partitioned-hash-join-node.cc       | 43 +++++-----
 be/src/exec/partitioned-hash-join-node.h        |  2 +-
 be/src/runtime/coordinator.cc                   | 39 +++++----
 be/src/runtime/runtime-filter.cc                | 86 +++++++++-----------
 be/src/runtime/runtime-filter.h                 | 22 +++--
 be/src/runtime/runtime-filter.inline.h          |  6 +-
 be/src/util/bloom-filter-test.cc                | 24 +++---
 be/src/util/bloom-filter.cc                     | 37 ++++-----
 be/src/util/bloom-filter.h                      | 30 +++----
 be/src/util/cpu-info.cc                         | 31 +++++--
 be/src/util/cpu-info.h                          | 15 +++-
 common/thrift/ImpalaInternalService.thrift      |  4 +
 common/thrift/PlanNodes.thrift                  |  4 -
 .../cloudera/impala/planner/HashJoinNode.java   |  7 --
 .../cloudera/impala/planner/PlanFragment.java   | 70 +---------------
 .../queries/QueryTest/runtime_filters_wait.test | 22 +++++
 22 files changed, 226 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/benchmarks/bloom-filter-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/bloom-filter-benchmark.cc b/be/src/benchmarks/bloom-filter-benchmark.cc
index e574dbe..7374ed4 100644
--- a/be/src/benchmarks/bloom-filter-benchmark.cc
+++ b/be/src/benchmarks/bloom-filter-benchmark.cc
@@ -99,7 +99,7 @@ namespace initialize {
 void Benchmark(int batch_size, void* data) {
   int * d = reinterpret_cast<int*>(data);
   for (int i = 0; i < batch_size; ++i) {
-    BloomFilter bf(*d, nullptr, nullptr);
+    BloomFilter bf(*d);
   }
 }
 
@@ -109,8 +109,7 @@ void Benchmark(int batch_size, void* data) {
 namespace insert {
 
 struct TestData {
-  explicit TestData(int log_heap_size)
-      : bf(log_heap_size, nullptr, nullptr), data(1ull << 20) {
+  explicit TestData(int log_heap_size) : bf(log_heap_size), data(1ull << 20) {
     for (size_t i = 0; i < data.size(); ++i) {
       data[i] = MakeRand();
     }
@@ -137,7 +136,7 @@ namespace find {
 
 struct TestData {
   TestData(int log_heap_size, size_t size)
-      : bf(log_heap_size, nullptr, nullptr),
+      : bf(log_heap_size),
         vec_mask((1ull << static_cast<int>(floor(log2(size))))-1),
         present(size),
         absent(size),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index c07a856..952c608 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -42,7 +42,7 @@ BlockingJoinNode::BlockingJoinNode(const string& node_name, const TJoinOp::type
     probe_batch_pos_(-1),
     current_probe_row_(NULL),
     semi_join_staging_row_(NULL),
-    can_add_runtime_filters_(false) {
+    runtime_filters_enabled_(false) {
 }
 
 Status BlockingJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 6f18d2b..cbd3b1e 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -93,8 +93,12 @@ class BlockingJoinNode : public ExecNode {
 
   /// If true, this node can build filters from the build side that can be used elsewhere
   /// in the plan to eliminate rows early.
-  /// Note that we disable probe filters if we are inside a subplan.
-  bool can_add_runtime_filters_;
+  /// Filters might be disabled during execution in several cases, including if we are
+  /// inside a subplan or the false-positive rate would be too high.
+  /// TODO: Consider moving into FilterContext, which will allow us to track enabled state
+  /// per-filter, and also alows us to move this state into only HJ nodes that support
+  /// filter production.
+  bool runtime_filters_enabled_;
 
   RuntimeProfile::Counter* build_timer_;   // time to prepare build side
   RuntimeProfile::Counter* probe_timer_;   // time to process the probe (left child) batch

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 2193c9e..7f8f410 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -58,7 +58,7 @@ HashJoinNode::HashJoinNode(
   match_one_build_ = (join_op_ == TJoinOp::LEFT_SEMI_JOIN);
   match_all_build_ =
     (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN);
-  can_add_runtime_filters_ = FLAGS_enable_probe_side_filtering;
+  runtime_filters_enabled_ = FLAGS_enable_probe_side_filtering;
 }
 
 Status HashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {
@@ -233,7 +233,7 @@ Status HashJoinNode::ConstructBuildSide(RuntimeState* state) {
   // We only do this if the build side is sufficiently small.
   // TODO: Better heuristic? Currently we simply compare the size of the HT with a
   // constant value.
-  if (can_add_runtime_filters_) {
+  if (runtime_filters_enabled_) {
     if (!state->filter_bank()->ShouldDisableFilter(hash_tbl_->size())) {
       AddRuntimeExecOption("Build-Side Filter Built");
       hash_tbl_->AddBloomFilters();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 5aa67fa..5adc97f 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -877,7 +877,7 @@ Status HdfsParquetScanner::Prepare(ScannerContext* context) {
   for (int i = 0; i < context->filter_ctxs().size(); ++i) {
     const FilterContext* ctx = &context->filter_ctxs()[i];
     DCHECK(ctx->filter != NULL);
-    filter_ctxs_.push_back(ctx);
+    if (!ctx->filter->AlwaysTrue()) filter_ctxs_.push_back(ctx);
   }
   filter_stats_.resize(filter_ctxs_.size());
   return Status::OK();
@@ -1748,18 +1748,19 @@ inline bool HdfsParquetScanner::ReadRow(const vector<ColumnReader*>& column_read
     for (int i = 0; i < num_filters; ++i) {
       LocalFilterStats* stats = &filter_stats_[i];
       if (!stats->enabled) continue;
+      const RuntimeFilter* filter = filter_ctxs_[i]->filter;
       ++stats->total_possible;
       // Check filter effectiveness every ROWS_PER_FILTER_SELECTIVITY_CHECK rows.
       if (UNLIKELY(
           !(stats->total_possible & (ROWS_PER_FILTER_SELECTIVITY_CHECK - 1)))) {
         double reject_ratio = stats->rejected / static_cast<double>(stats->considered);
-        if (reject_ratio < FLAGS_parquet_min_filter_reject_ratio) {
+        if (filter->AlwaysTrue() ||
+            reject_ratio < FLAGS_parquet_min_filter_reject_ratio) {
           stats->enabled = 0;
           continue;
         }
       }
       ++stats->considered;
-      const RuntimeFilter* filter = filter_ctxs_[i]->filter;
       void* e = filter_ctxs_[i]->expr->GetValue(tuple_row_mem);
       if (!filter->Eval<void>(e, filter_ctxs_[i]->expr->root()->type())) {
         ++stats->rejected;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index c43bdcb..5947ac1 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -1136,7 +1136,7 @@ bool HdfsScanNode::PartitionPassesFilterPredicates(int32_t partition_id,
 
     // Not quite right because bitmap could arrive after Eval(), but we're ok with
     // off-by-one errors.
-    bool processed = ctx.filter->GetBloomFilter();
+    bool processed = ctx.filter->HasBloomFilter();
     bool passed_filter = ctx.filter->Eval<void>(e, ctx.expr->root()->type());
     ctx.stats->IncrCounters(stats_name, 1, processed, !passed_filter);
     if (!passed_filter)  return false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index b796acf..151b0c6 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -66,7 +66,7 @@ PartitionedHashJoinNode::PartitionedHashJoinNode(
     process_probe_batch_fn_level0_(NULL) {
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
 
-  can_add_runtime_filters_ =
+  runtime_filters_enabled_ =
       FLAGS_enable_phj_probe_side_filtering && tnode.runtime_filters.size() > 0;
 }
 
@@ -128,7 +128,7 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
 
   // Disable probe-side filters if we are inside a subplan because no node
   // inside the subplan can use them.
-  if (IsInSubplan()) can_add_runtime_filters_ = false;
+  if (IsInSubplan()) runtime_filters_enabled_ = false;
 
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   runtime_state_ = state;
@@ -400,10 +400,10 @@ Status PartitionedHashJoinNode::Partition::Spill(bool unpin_all_build) {
     }
   }
 
-  if (parent_->can_add_runtime_filters_) {
+  if (parent_->runtime_filters_enabled_) {
     // Disabling runtime filter push down because not all rows will be included in the
     // filter due to a spilled partition.
-    parent_->can_add_runtime_filters_ = false;
+    parent_->runtime_filters_enabled_ = false;
     parent_->AddRuntimeExecOption("Build-Side Runtime-Filter Disabled (Spilling)");
     VLOG(2) << "Disabling runtime filter construction because a partition will spill.";
   }
@@ -505,7 +505,7 @@ not_built:
 }
 
 bool PartitionedHashJoinNode::AllocateRuntimeFilters(RuntimeState* state) {
-  if (!can_add_runtime_filters_) return false;
+  if (!runtime_filters_enabled_) return false;
   DCHECK(ht_ctx_.get() != NULL);
   for (int i = 0; i < filters_.size(); ++i) {
     filters_[i].local_bloom_filter = state->filter_bank()->AllocateScratchBloomFilter();
@@ -513,19 +513,20 @@ bool PartitionedHashJoinNode::AllocateRuntimeFilters(RuntimeState* state) {
   return true;
 }
 
-bool PartitionedHashJoinNode::PublishRuntimeFilters(RuntimeState* state) {
-  if (can_add_runtime_filters_) {
-    // Add all the bloom filters to the runtime state.
-    // TODO: DCHECK(!is_in_subplan());
+void PartitionedHashJoinNode::PublishRuntimeFilters(RuntimeState* state) {
+  if (runtime_filters_enabled_) {
     AddRuntimeExecOption("Build-Side Runtime-Filter Produced");
-    BOOST_FOREACH(const FilterContext& ctx, filters_) {
-      if (ctx.local_bloom_filter == NULL) continue;
-      state->filter_bank()->UpdateFilterFromLocal(ctx.filter->filter_desc().filter_id,
-          ctx.local_bloom_filter);
-    }
-    return true;
   }
-  return false;
+
+  // Add all the bloom filters to the runtime state. If runtime filters are disabled,
+  // publish a complete Bloom filter (which rejects no values) to allow plan nodes that
+  // are waiting for these filters to make progress.
+  BOOST_FOREACH(const FilterContext& ctx, filters_) {
+    BloomFilter* filter = runtime_filters_enabled_ ?
+        ctx.local_bloom_filter : BloomFilter::ALWAYS_TRUE_FILTER;
+    state->filter_bank()->UpdateFilterFromLocal(
+        ctx.filter->filter_desc().filter_id, filter);
+  }
 }
 
 bool PartitionedHashJoinNode::AppendRowStreamFull(BufferedTupleStream* stream,
@@ -1195,7 +1196,7 @@ Status PartitionedHashJoinNode::BuildHashTables(RuntimeState* state) {
   DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
 
   // Decide whether probe filters will be built.
-  if (input_partition_ == NULL && can_add_runtime_filters_) {
+  if (input_partition_ == NULL && runtime_filters_enabled_) {
     uint64_t num_build_rows = 0;
     BOOST_FOREACH(Partition* partition, hash_partitions_) {
       DCHECK(!partition->is_spilled()) << "Runtime filters enabled despite spilling";
@@ -1209,10 +1210,10 @@ Status PartitionedHashJoinNode::BuildHashTables(RuntimeState* state) {
     // TODO: Better heuristic.
     if (state->filter_bank()->ShouldDisableFilter(num_build_rows)) {
       AddRuntimeExecOption("Build-Side Runtime-Filter Disabled (FP Rate Too High)");
-      can_add_runtime_filters_ = false;
+      runtime_filters_enabled_ = false;
     }
   } else {
-    can_add_runtime_filters_ = false;
+    runtime_filters_enabled_ = false;
   }
 
   // First loop over the partitions and build hash tables for the partitions that did
@@ -1227,13 +1228,13 @@ Status PartitionedHashJoinNode::BuildHashTables(RuntimeState* state) {
     if (!partition->is_spilled()) {
       bool built = false;
       DCHECK(partition->build_rows()->is_pinned());
-      RETURN_IF_ERROR(partition->BuildHashTable(state, &built, can_add_runtime_filters_));
+      RETURN_IF_ERROR(partition->BuildHashTable(state, &built, runtime_filters_enabled_));
       // If we did not have enough memory to build this hash table, we need to spill this
       // partition (clean up the hash table, unpin build).
       if (!built) RETURN_IF_ERROR(partition->Spill(true));
     }
 
-    DCHECK(!can_add_runtime_filters_ || !partition->is_spilled())
+    DCHECK(!runtime_filters_enabled_ || !partition->is_spilled())
         << "Runtime filters enabled despite spilling";
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index e03f05d..119dd61 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -257,7 +257,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   bool AllocateRuntimeFilters(RuntimeState* state);
 
   /// Publish the runtime filters to the fragment-local RuntimeFilterBank.
-  bool PublishRuntimeFilters(RuntimeState* state);
+  void PublishRuntimeFilters(RuntimeState* state);
 
   /// Codegen function to create output row. Assumes that the probe row is non-NULL.
   Status CodegenCreateOutputRow(LlvmCodeGen* codegen, llvm::Function** fn);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index b970333..bb49166 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -638,8 +638,9 @@ string Coordinator::FilterDebugString() {
     row.push_back(state.desc.is_bound_by_partition_columns ? "true" : "false");
 
     if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
-      row.push_back(Substitute("$0 ($1)", state.pending_count,
-              state.src_fragment_instance_idxs.size()));
+      int pending_count = state.completion_time != 0L ? 0 : state.pending_count;
+      row.push_back(Substitute("$0 ($1)", pending_count,
+          state.src_fragment_instance_idxs.size()));
       if (state.first_arrival_time == 0L) {
         row.push_back("N/A");
       } else {
@@ -1966,7 +1967,6 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   // Make a 'master' copy that will be shared by all concurrent delivery RPC attempts.
   shared_ptr<TPublishFilterParams> rpc_params(new TPublishFilterParams());
   unordered_set<int32_t> target_fragment_instance_idxs;
-  unique_ptr<BloomFilter> bloom_filter(new BloomFilter(params.bloom_filter, NULL, NULL));
   {
     lock_guard<SpinLock> l(filter_lock_);
     FilterRoutingTable::iterator it = filter_routing_table_.find(params.filter_id);
@@ -1978,36 +1978,39 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
     DCHECK(!state->desc.has_local_target)
         << "Coordinator received filter that has local target";
 
-    // Receiving unnecessary updates for a broadcast.
-    if (state->pending_count == 0) {
-      DCHECK(state->desc.is_broadcast_join)
-          << "Received more updates than expected for partition filter: "
-          << params.filter_id;
-      return;
-    }
+    // Check if the filter has already been sent, which could happen in two cases: if one
+    // local filter had always_true set - no point waiting for other local filters that
+    // can't affect the aggregated global filter, or if this is a broadcast join, and
+    // another local filter was already received.
+    if (state->pending_count == 0) return;
+    DCHECK_EQ(state->completion_time, 0L);
     if (state->first_arrival_time == 0L) {
       state->first_arrival_time = query_events_->ElapsedTime();
     }
-    --state->pending_count;
 
     if (filter_updates_received_->value() == 0) {
       query_events_->MarkEvent("First dynamic filter received");
     }
     filter_updates_received_->Add(1);
-    if (state->bloom_filter == NULL) {
-      state->bloom_filter =
-          obj_pool()->Add(bloom_filter.release());
+    if (params.bloom_filter.always_true) {
+      state->bloom_filter = NULL;
+      state->pending_count = 0;
     } else {
-      // TODO: Implement BloomFilter::Or(const ThriftBloomFilter&)
-      state->bloom_filter->Or(*bloom_filter);
+      if (state->bloom_filter == NULL) {
+        state->bloom_filter = obj_pool()->Add(new BloomFilter(params.bloom_filter));
+      } else {
+        // TODO: Implement BloomFilter::Or(const ThriftBloomFilter&)
+        state->bloom_filter->Or(BloomFilter(params.bloom_filter));
+      }
+      if (--state->pending_count > 0) return;
     }
 
-    if (state->pending_count > 0) return;
     // No more filters are pending on this filter ID. Create a distribution payload and
     // offer it to the queue.
+    DCHECK_EQ(state->pending_count, 0);
     state->completion_time = query_events_->ElapsedTime();
     target_fragment_instance_idxs = state->target_fragment_instance_idxs;
-    state->bloom_filter->ToThrift(&rpc_params->bloom_filter);
+    BloomFilter::ToThrift(state->bloom_filter, &rpc_params->bloom_filter);
   }
 
   rpc_params->filter_id = params.filter_id;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/runtime/runtime-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index fee8566..675e0f3 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -49,7 +49,7 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s
 RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
     bool is_producer) {
   RuntimeFilter* ret = obj_pool_.Add(new RuntimeFilter(filter_desc));
-  lock_guard<SpinLock> l(runtime_filter_lock_);
+  lock_guard<mutex> l(runtime_filter_lock_);
   if (is_producer) {
     DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
     produced_filters_[filter_desc.filter_id] = ret;
@@ -88,7 +88,7 @@ void RuntimeFilterBank::UpdateFilterFromLocal(uint32_t filter_id,
   TUpdateFilterParams params;
   bool has_local_target = false;
   {
-    lock_guard<SpinLock> l(runtime_filter_lock_);
+    lock_guard<mutex> l(runtime_filter_lock_);
     RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
     DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
                                           << filter_id;
@@ -101,72 +101,65 @@ void RuntimeFilterBank::UpdateFilterFromLocal(uint32_t filter_id,
     // side.
     RuntimeFilter* filter;
     {
-      lock_guard<SpinLock> l(runtime_filter_lock_);
+      lock_guard<mutex> l(runtime_filter_lock_);
       RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
       if (it == consumed_filters_.end()) return;
       filter = it->second;
       // Check if the filter already showed up.
-      if (filter->GetBloomFilter() != NULL) return;
-    }
-    // TODO: Avoid need for this copy.
-    BloomFilter* copy = AllocateScratchBloomFilter();
-    if (copy == NULL) return;
-    copy->Or(*bloom_filter);
-    {
-      // Take lock only to ensure no race with PublishGlobalFilter() - there's no need for
-      // coordination with readers of the filter.
-      lock_guard<SpinLock> l(runtime_filter_lock_);
-      if (filter->GetBloomFilter() == NULL) {
-        filter->SetBloomFilter(copy);
-        state_->runtime_profile()->AddInfoString(
-            Substitute("Filter $0 arrival", filter_id),
-            PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
-      }
+      DCHECK(!filter->HasBloomFilter());
     }
+    filter->SetBloomFilter(bloom_filter);
+    state_->runtime_profile()->AddInfoString(
+        Substitute("Filter $0 arrival", filter_id),
+        PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
   } else if (state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
-      bloom_filter->ToThrift(&params.bloom_filter);
-      params.filter_id = filter_id;
-      params.query_id = query_ctx_.query_id;
+    BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
+    params.filter_id = filter_id;
+    params.query_id = query_ctx_.query_id;
 
-      ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
-          SendFilterToCoordinator, query_ctx_.coord_address, params,
-          ExecEnv::GetInstance()->impalad_client_cache()));
+    ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
+        SendFilterToCoordinator, query_ctx_.coord_address, params,
+        ExecEnv::GetInstance()->impalad_client_cache()));
   }
 }
 
 void RuntimeFilterBank::PublishGlobalFilter(uint32_t filter_id,
     const TBloomFilter& thrift_filter) {
-  lock_guard<SpinLock> l(runtime_filter_lock_);
+  lock_guard<mutex> l(runtime_filter_lock_);
   if (closed_) return;
   RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
   DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
                                         << filter_id;
-  if (it->second->filter_desc().is_broadcast_join &&
-      it->second->GetBloomFilter() != NULL) {
-    // Already showed up from local filter.
-    return;
+  if (thrift_filter.always_true) {
+    it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
+  } else {
+    uint32_t required_space =
+        BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
+    // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
+    // there's not enough memory for it.
+    if (!state_->query_mem_tracker()->TryConsume(required_space)) {
+      VLOG_QUERY << "No memory for global filter: " << filter_id
+                 << " (fragment instance: " << state_->fragment_instance_id() << ")";
+      it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
+    } else {
+      BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter));
+      DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
+      memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+      it->second->SetBloomFilter(bloom_filter);
+    }
   }
-  uint32_t required_space =
-      BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
-  // Silently fail to publish the filter if there's not enough memory for it.
-  if (!state_->query_mem_tracker()->TryConsume(required_space)) return;
-  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter, NULL, NULL));
-  DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-  memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
-  it->second->SetBloomFilter(bloom_filter);
   state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id),
       PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS));
 }
 
 BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter() {
-  lock_guard<SpinLock> l(runtime_filter_lock_);
+  lock_guard<mutex> l(runtime_filter_lock_);
   if (closed_) return NULL;
 
   // Track required space
   uint32_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size_);
   if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL;
-  BloomFilter* bloom_filter =
-      obj_pool_.Add(new BloomFilter(log_filter_size_, NULL, NULL));
+  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size_));
   DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
   memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
   return bloom_filter;
@@ -178,18 +171,17 @@ bool RuntimeFilterBank::ShouldDisableFilter(uint64_t max_ndv) {
 }
 
 void RuntimeFilterBank::Close() {
-  lock_guard<SpinLock> l(runtime_filter_lock_);
+  lock_guard<mutex> l(runtime_filter_lock_);
   closed_ = true;
   obj_pool_.Clear();
   state_->query_mem_tracker()->Release(memory_allocated_->value());
 }
 
 bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
-  if (GetBloomFilter() != NULL) return true;
-  while ((MonotonicMillis() - registration_time_) < timeout_ms) {
+  do {
+    if (HasBloomFilter()) return true;
     SleepForMs(SLEEP_PERIOD_MS);
-    if (GetBloomFilter() != NULL) return true;
-  }
+  } while ((MonotonicMillis() - registration_time_) < timeout_ms);
 
-  return false;
+  return HasBloomFilter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 34aa476..e41c97a 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -44,8 +44,8 @@ class RuntimeState;
 /// fragments update the bloom filters by calling UpdateFilterFromLocal()
 /// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The
 /// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by
-/// AllocateScratchBloomFilter(); this allows RuntimeFilterBank to manage all memory
-/// associated with filters.
+/// AllocateScratchBloomFilter() (or be NULL); this allows RuntimeFilterBank to manage all
+/// memory associated with filters.
 ///
 /// Filters are aggregated at the coordinator, and then made available to consumers after
 /// PublishGlobalFilter() has been called.
@@ -65,7 +65,8 @@ class RuntimeFilterBank {
   RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer);
 
   /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some
-  /// operator in the local fragment instance.
+  /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a
+  /// full filter that contains all elements.
   void UpdateFilterFromLocal(uint32_t filter_id, BloomFilter* bloom_filter);
 
   /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
@@ -104,7 +105,7 @@ class RuntimeFilterBank {
   const TQueryCtx query_ctx_;
 
   /// Lock protecting produced_filters_ and consumed_filters_.
-  SpinLock runtime_filter_lock_;
+  boost::mutex runtime_filter_lock_;
 
   /// Map from filter id to a RuntimeFilter.
   typedef boost::unordered_map<uint32_t, RuntimeFilter*> RuntimeFilterMap;
@@ -147,8 +148,8 @@ class RuntimeFilter {
     registration_time_ = MonotonicMillis();
   }
 
-  /// Returns NULL if no calls to SetBloomFilter() have been made yet.
-  const BloomFilter* GetBloomFilter() const { return bloom_filter_; }
+  /// Returns true if SetBloomFilter() has been called.
+  bool HasBloomFilter() const { return arrival_time_ != 0; }
 
   const TRuntimeFilterDesc& filter_desc() const { return filter_desc_; }
 
@@ -176,11 +177,16 @@ class RuntimeFilter {
   /// false otherwise.
   bool WaitForArrival(int32_t timeout_ms) const;
 
+  /// Returns true if the filter returns true for all elements, i.e. Eval(v) returns true
+  /// for all v.
+  inline bool AlwaysTrue() const;
+
   /// Frequency with which to check for filter arrival in WaitForArrival()
   static const int SLEEP_PERIOD_MS;
 
  private:
-  /// Membership bloom_filter.
+  /// Membership bloom_filter. May be NULL even after arrival_time_ is set. This is a
+  /// compact way of representing a full Bloom filter that contains every element.
   BloomFilter* bloom_filter_;
 
   /// Descriptor of the filter.
@@ -189,7 +195,7 @@ class RuntimeFilter {
   /// Time, in ms, that the filter was registered.
   int64_t registration_time_;
 
-  /// Time, in ms, that the global fiter arrived.
+  /// Time, in ms, that the global fiter arrived. Set in SetBloomFilter().
   int64_t arrival_time_;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/runtime/runtime-filter.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.inline.h b/be/src/runtime/runtime-filter.inline.h
index 6cae673..f7141a8 100644
--- a/be/src/runtime/runtime-filter.inline.h
+++ b/be/src/runtime/runtime-filter.inline.h
@@ -27,7 +27,7 @@
 namespace impala {
 
 inline const RuntimeFilter* RuntimeFilterBank::GetRuntimeFilter(uint32_t filter_id) {
-  boost::lock_guard<SpinLock> l(runtime_filter_lock_);
+  boost::lock_guard<boost::mutex> l(runtime_filter_lock_);
   RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
   if (it == consumed_filters_.end()) return NULL;
   return it->second;
@@ -53,6 +53,10 @@ inline bool RuntimeFilter::Eval(T* val, const ColumnType& col_type) const {
   return bloom_filter_->Find(h);
 }
 
+inline bool RuntimeFilter::AlwaysTrue() const  {
+  return HasBloomFilter() && bloom_filter_ == BloomFilter::ALWAYS_TRUE_FILTER;
+}
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/util/bloom-filter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter-test.cc b/be/src/util/bloom-filter-test.cc
index e9dcca8..d8b5818 100644
--- a/be/src/util/bloom-filter-test.cc
+++ b/be/src/util/bloom-filter-test.cc
@@ -38,7 +38,7 @@ namespace impala {
 // We can construct (and destruct) Bloom filters with different spaces.
 TEST(BloomFilter, Constructor) {
   for (int i = 0; i < 30; ++i) {
-    BloomFilter bf(i, nullptr, nullptr);
+    BloomFilter bf(i);
   }
 }
 
@@ -46,7 +46,7 @@ TEST(BloomFilter, Constructor) {
 TEST(BloomFilter, Insert) {
   srand(0);
   for (int i = 13; i < 17; ++i) {
-    BloomFilter bf(i, nullptr, nullptr);
+    BloomFilter bf(i);
     for (int k = 0; k < (1 << 15); ++k) {
       bf.Insert(MakeRand());
     }
@@ -57,7 +57,7 @@ TEST(BloomFilter, Insert) {
 TEST(BloomFilter, Find) {
   srand(0);
   for (int i = 13; i < 17; ++i) {
-    BloomFilter bf(i, nullptr, nullptr);
+    BloomFilter bf(i);
     for (int k = 0; k < (1 << 15); ++k) {
       const uint64_t to_insert = MakeRand();
       bf.Insert(to_insert);
@@ -71,7 +71,7 @@ TEST(BloomFilter, CumulativeFind) {
   srand(0);
   for (int i = 5; i < 11; ++i) {
     std::vector<uint32_t> inserted;
-    BloomFilter bf(i, nullptr, nullptr);
+    BloomFilter bf(i);
     for (int k = 0; k < (1 << 10); ++k) {
       const uint32_t to_insert = MakeRand();
       inserted.push_back(to_insert);
@@ -104,7 +104,7 @@ TEST(BloomFilter, FindInvalid) {
       double fpp = 1.0 / (1 << log_fpp);
       const size_t ndv = 1 << log_ndv;
       const int log_heap_space = BloomFilter::MinLogSpace(ndv, fpp);
-      BloomFilter bf(log_heap_space, nullptr, nullptr);
+      BloomFilter bf(log_heap_space);
       // Fill up a BF with exactly as much ndv as we planned for it:
       for (size_t i = 0; i < ndv; ++i) {
         bf.Insert(shuffled_insert[i]);
@@ -187,7 +187,7 @@ TEST(BloomFilter, MinSpaceForFpp) {
 }
 
 TEST(BloomFilter, Thrift) {
-  BloomFilter bf(BloomFilter::MinLogSpace(100, 0.01), NULL, NULL);
+  BloomFilter bf(BloomFilter::MinLogSpace(100, 0.01));
   for (int i = 0; i < 10; ++i) bf.Insert(i);
   // Check no unexpected new false positives.
   set<int> missing_ints;
@@ -196,16 +196,20 @@ TEST(BloomFilter, Thrift) {
   }
 
   TBloomFilter to_thrift;
-  bf.ToThrift(&to_thrift);
+  BloomFilter::ToThrift(&bf, &to_thrift);
+  EXPECT_EQ(to_thrift.always_true, false);
 
-  BloomFilter from_thrift(to_thrift, NULL, NULL);
+  BloomFilter from_thrift(to_thrift);
   for (int i = 0; i < 10; ++i) ASSERT_TRUE(from_thrift.Find(i));
   for (int missing: missing_ints) ASSERT_FALSE(from_thrift.Find(missing));
+
+  BloomFilter::ToThrift(NULL, &to_thrift);
+  EXPECT_EQ(to_thrift.always_true, true);
 }
 
 TEST(BloomFilter, Or) {
-  BloomFilter bf1(BloomFilter::MinLogSpace(100, 0.01), NULL, NULL);
-  BloomFilter bf2(BloomFilter::MinLogSpace(100, 0.01), NULL, NULL);
+  BloomFilter bf1(BloomFilter::MinLogSpace(100, 0.01));
+  BloomFilter bf2(BloomFilter::MinLogSpace(100, 0.01));
   for (int i = 60; i < 80; ++i) bf2.Insert(i);
 
   for (int i = 0; i < 10; ++i) bf1.Insert(i);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/util/bloom-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index a58e0fa..8a37a76 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -26,31 +26,22 @@ using namespace std;
 
 namespace impala {
 
-BloomFilter::BloomFilter(const int log_heap_space, RuntimeState* state,
-    BufferedBlockMgr::Client* client)
-    :  // Since log_heap_space is in bytes, we need to convert it to cache lines. There
-       // are 64 = 2^6 bytes in a cache line.
+BloomFilter* BloomFilter::ALWAYS_TRUE_FILTER = NULL;
+
+BloomFilter::BloomFilter(const int log_heap_space)
+    : // Since log_heap_space is in bytes, we need to convert it to cache lines. There
+      // are 64 = 2^6 bytes in a cache line.
       log_num_buckets_(std::max(1, log_heap_space - LOG_BUCKET_WORD_BITS)),
       // Don't use log_num_buckets_ if it will lead to undefined behavior by a shift
       // that is too large.
       directory_mask_((1ull << std::min(63, log_num_buckets_)) - 1),
-      directory_(NULL),
-      state_(state),
-      client_(client) {
+      directory_(NULL) {
   // Since we use 32 bits in the arguments of Insert() and Find(), log_num_buckets_
   // must be limited.
   DCHECK(log_num_buckets_ <= 32)
       << "Bloom filter too large. log_heap_space: " << log_heap_space;
-  DCHECK_EQ(client_ == NULL, state_ == NULL);
   // Each bucket has 64 = 2^6 bytes:
   const size_t alloc_size = directory_size();
-  if (state_) {
-    const bool consume_success = state_->block_mgr()->ConsumeMemory(client_, alloc_size);
-    DCHECK(consume_success) << "ConsumeMemory failed. log_heap_space: "
-                            << log_heap_space
-                            << " log_num_buckets_: " << log_num_buckets_
-                            << " alloc_size: " << alloc_size;
-  }
   const int malloc_failed =
       posix_memalign(reinterpret_cast<void**>(&directory_), 64, alloc_size);
   DCHECK_EQ(malloc_failed, 0) << "Malloc failed. log_heap_space: " << log_heap_space
@@ -59,16 +50,14 @@ BloomFilter::BloomFilter(const int log_heap_space, RuntimeState* state,
   memset(directory_, 0, alloc_size);
 }
 
-BloomFilter::BloomFilter(const TBloomFilter& thrift, RuntimeState* state,
-    BufferedBlockMgr::Client* client)
-    : BloomFilter(thrift.log_heap_space, state, client) {
+BloomFilter::BloomFilter(const TBloomFilter& thrift)
+    : BloomFilter(thrift.log_heap_space) {
   DCHECK_EQ(thrift.directory.size(), directory_size());
   memcpy(directory_, &thrift.directory[0], thrift.directory.size());
 }
 
 BloomFilter::~BloomFilter() {
   if (directory_) {
-    if (state_) state_->block_mgr()->ReleaseMemory(client_, directory_size());
     free(directory_);
     directory_ = NULL;
   }
@@ -78,6 +67,16 @@ void BloomFilter::ToThrift(TBloomFilter* thrift) const {
   thrift->log_heap_space = log_num_buckets_ + LOG_BUCKET_BYTE_SIZE;
   string tmp(reinterpret_cast<const char*>(directory_), directory_size());
   thrift->directory.swap(tmp);
+  thrift->always_true = false;
+}
+
+void BloomFilter::ToThrift(const BloomFilter* filter, TBloomFilter* thrift) {
+  DCHECK(thrift != NULL);
+  if (filter == NULL) {
+    thrift->always_true = true;
+    return;
+  }
+  filter->ToThrift(thrift);
 }
 
 void BloomFilter::Or(const BloomFilter& other) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/util/bloom-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index edb9083..32f6ee6 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -27,8 +27,6 @@
 
 namespace impala {
 
-class RuntimeState;
-
 /// A BloomFilter stores sets of items and offers a query operation indicating whether or
 /// not that item is in the set.  BloomFilters use much less space than other compact data
 /// structures, but they are less accurate: for a small percentage of elements, the query
@@ -50,17 +48,17 @@ class RuntimeState;
 /// bits).
 class BloomFilter {
  public:
-  /// Consumes at most (1 << log_heap_space) bytes on the heap. If state is non-NULL,
-  /// client is also non-NULL and the constructor and destructor call
-  /// BufferedBlockMgr::ConsumeMemory() and ReleaseMemory().
-  BloomFilter(
-      const int log_heap_space, RuntimeState* state, BufferedBlockMgr::Client* client);
-  BloomFilter(const TBloomFilter& thrift, RuntimeState* state,
-      BufferedBlockMgr::Client* client);
+  /// Consumes at most (1 << log_heap_space) bytes on the heap.
+  BloomFilter(const int log_heap_space);
+  BloomFilter(const TBloomFilter& thrift);
   ~BloomFilter();
 
-  /// Serializes this filter as Thrift.
-  void ToThrift(TBloomFilter* thrift) const;
+  /// Representation of a filter which allows all elements to pass.
+  static BloomFilter* ALWAYS_TRUE_FILTER;
+
+  /// Converts 'filter' to its corresponding Thrift representation. If the first argument
+  /// is NULL, it is interpreted as a complete filter which contains all elements.
+  static void ToThrift(const BloomFilter* filter, TBloomFilter* thrift);
 
   /// Adds an element to the BloomFilter. The function used to generate 'hash' need not
   /// have good uniformity, but it should have low collision probability. For instance, if
@@ -126,17 +124,13 @@ class BloomFilter {
   typedef BucketWord Bucket[BUCKET_WORDS];
   Bucket* directory_;
 
-  /// Used only for tracking memory. If both are non-NULL,
-  /// BufferedBlockMgr::{Acquire,Release}Memory() are called when this object allocates
-  /// and frees heap memory. These objects pointed to by state_ and client_ are not owned
-  /// by this BloomFilter.
-  RuntimeState* const state_;
-  BufferedBlockMgr::Client* const client_;
-
   int64_t directory_size() const {
     return 1uLL << (log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
   }
 
+  /// Serializes this filter as Thrift.
+  void ToThrift(TBloomFilter* thrift) const;
+
   DISALLOW_COPY_AND_ASSIGN(BloomFilter);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/util/cpu-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.cc b/be/src/util/cpu-info.cc
index 2e1af59..5a7d2bc 100644
--- a/be/src/util/cpu-info.cc
+++ b/be/src/util/cpu-info.cc
@@ -21,6 +21,7 @@
 #include <boost/algorithm/string.hpp>
 #include <iostream>
 #include <fstream>
+#include <gutil/strings/substitute.h>
 #include <mmintrin.h>
 #include <sstream>
 #include <stdlib.h>
@@ -34,6 +35,7 @@
 using boost::algorithm::contains;
 using boost::algorithm::trim;
 using std::max;
+using strings::Substitute;
 
 DECLARE_bool(abort_on_config_error);
 DEFINE_int32(num_cores, 0, "(Advanced) If > 0, it sets the number of cores available to"
@@ -46,6 +48,7 @@ bool CpuInfo::initialized_ = false;
 int64_t CpuInfo::hardware_flags_ = 0;
 int64_t CpuInfo::original_hardware_flags_;
 long CpuInfo::cache_sizes_[L3_CACHE + 1];
+long CpuInfo::cache_line_sizes_[L3_CACHE + 1];
 int64_t CpuInfo::cycles_per_ms_;
 int CpuInfo::num_cores_ = 1;
 string CpuInfo::model_name_ = "unknown";
@@ -124,11 +127,21 @@ void CpuInfo::Init() {
   for (size_t i = 0; i < 3; ++i) {
     cache_sizes_[i] = data[i];
   }
+  size_t linesize;
+  size_t sizeof_linesize = sizeof(linesize);
+  sysctlbyname("hw.cachelinesize", &linesize, &sizeof_linesize, NULL, 0);
+  for (size_t i = 0; i < 3; ++i) cache_line_sizes_[i] = linesize;
 #else
   // Call sysconf to query for the cache sizes
   cache_sizes_[0] = sysconf(_SC_LEVEL1_DCACHE_SIZE);
   cache_sizes_[1] = sysconf(_SC_LEVEL2_CACHE_SIZE);
   cache_sizes_[2] = sysconf(_SC_LEVEL3_CACHE_SIZE);
+
+  cache_line_sizes_[0] = sysconf(_SC_LEVEL1_DCACHE_LINESIZE);
+  // See bloom-filter.cc for one dependency.
+  DCHECK_EQ(cache_line_sizes_[0], 64) << "Impala expects 64-byte L1 cache lines";
+  cache_line_sizes_[1] = sysconf(_SC_LEVEL2_CACHE_LINESIZE);
+  cache_line_sizes_[2] = sysconf(_SC_LEVEL3_CACHE_LINESIZE);
 #endif
 
   if (max_mhz != 0) {
@@ -171,15 +184,21 @@ void CpuInfo::EnableFeature(long flag, bool enable) {
 string CpuInfo::DebugString() {
   DCHECK(initialized_);
   stringstream stream;
-  int64_t L1 = CacheSize(L1_CACHE);
-  int64_t L2 = CacheSize(L2_CACHE);
-  int64_t L3 = CacheSize(L3_CACHE);
+  string L1 = Substitute("L1 Cache: $0 (Line: $1)",
+      PrettyPrinter::Print(CacheSize(L1_CACHE), TUnit::BYTES),
+      PrettyPrinter::Print(CacheLineSize(L1_CACHE), TUnit::BYTES));
+  string L2 = Substitute("L1 Cache: $0 (Line: $1)",
+      PrettyPrinter::Print(CacheSize(L2_CACHE), TUnit::BYTES),
+      PrettyPrinter::Print(CacheLineSize(L2_CACHE), TUnit::BYTES));
+  string L3 = Substitute("L1 Cache: $0 (Line: $1)",
+      PrettyPrinter::Print(CacheSize(L3_CACHE), TUnit::BYTES),
+      PrettyPrinter::Print(CacheLineSize(L3_CACHE), TUnit::BYTES));
   stream << "Cpu Info:" << endl
          << "  Model: " << model_name_ << endl
          << "  Cores: " << num_cores_ << endl
-         << "  L1 Cache: " << PrettyPrinter::Print(L1, TUnit::BYTES) << endl
-         << "  L2 Cache: " << PrettyPrinter::Print(L2, TUnit::BYTES) << endl
-         << "  L3 Cache: " << PrettyPrinter::Print(L3, TUnit::BYTES) << endl
+         << "  " << L1 << endl
+         << "  " << L2 << endl
+         << "  " << L3 << endl
          << "  Hardware Supports:" << endl;
   for (int i = 0; i < num_flags; ++i) {
     if (IsSupported(flag_mappings[i].flag)) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/be/src/util/cpu-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.h b/be/src/util/cpu-info.h
index 860498e..c3597f3 100644
--- a/be/src/util/cpu-info.h
+++ b/be/src/util/cpu-info.h
@@ -53,7 +53,7 @@ class CpuInfo {
     DCHECK(initialized_);
     return hardware_flags_;
   }
-  
+
   /// Returns whether of not the cpu supports this flag
   inline static bool IsSupported(long flag) {
     DCHECK(initialized_);
@@ -70,6 +70,12 @@ class CpuInfo {
     return cache_sizes_[level];
   }
 
+  /// Returns the size of a line in the cache at this level.
+  static long CacheLineSize(CacheLevel level) {
+    DCHECK(initialized_);
+    return cache_line_sizes_[level];
+  }
+
   /// Returns the number of cpu cycles per millisecond
   static int64_t cycles_per_ms() {
     DCHECK(initialized_);
@@ -77,13 +83,13 @@ class CpuInfo {
   }
 
   /// Returns the number of cores (including hyper-threaded) on this machine.
-  static int num_cores() { 
+  static int num_cores() {
     DCHECK(initialized_);
-    return num_cores_; 
+    return num_cores_;
   }
 
   /// Returns the model name of the cpu (e.g. Intel i7-2600)
-  static std::string model_name() { 
+  static std::string model_name() {
     DCHECK(initialized_);
     return model_name_;
   }
@@ -95,6 +101,7 @@ class CpuInfo {
   static int64_t hardware_flags_;
   static int64_t original_hardware_flags_;
   static long cache_sizes_[L3_CACHE + 1];
+  static long cache_line_sizes_[L3_CACHE + 1];
   static int64_t cycles_per_ms_;
   static int num_cores_;
   static std::string model_name_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 70fd2f1..d3d3080 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -556,6 +556,10 @@ struct TBloomFilter {
   // string for efficiency of (de)serialisation. See BloomFilter::Bucket and
   // BloomFilter::directory_.
   2: binary directory
+
+  // If true, this filter allows all elements to pass (i.e. its selectivity is 1). If
+  // true, 'directory' and 'log_heap_space' are not meaningful.
+  4: required bool always_true
 }
 
 struct TUpdateFilterResult {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index b71291e..eabf171 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -253,10 +253,6 @@ struct THashJoinNode {
 
   // non equi-join predicates
   3: optional list<Exprs.TExpr> other_join_conjuncts
-
-  // If true, this join node can (but may choose not to) generate slot filters
-  // after constructing the build side that can be applied to the probe side.
-  4: optional bool add_probe_filters
 }
 
 struct TNestedLoopJoinNode {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
index 645d39c..1caae3b 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
@@ -46,11 +46,6 @@ import com.google.common.collect.Lists;
 public class HashJoinNode extends JoinNode {
   private final static Logger LOG = LoggerFactory.getLogger(HashJoinNode.class);
 
-  // If true, this node can add filters for the probe side that can be generated
-  // after reading the build side. This can be very helpful if the join is selective and
-  // there are few build rows.
-  private boolean addProbeFilters_;
-
   public HashJoinNode(
       PlanNode outer, PlanNode inner, DistributionMode distrMode, JoinOperator joinOp,
       List<BinaryPredicate> eqJoinConjuncts, List<Expr> otherJoinConjuncts) {
@@ -61,7 +56,6 @@ public class HashJoinNode extends JoinNode {
   }
 
   public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts_; }
-  public void setAddProbeFilters(boolean b) { addProbeFilters_ = b; }
 
   @Override
   public void init(Analyzer analyzer) throws ImpalaException {
@@ -137,7 +131,6 @@ public class HashJoinNode extends JoinNode {
     for (Expr e: otherJoinConjuncts_) {
       msg.hash_join_node.addToOther_join_conjuncts(e.treeToThrift());
     }
-    msg.hash_join_node.setAdd_probe_filters(addProbeFilters_);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java b/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
index 53ca1f3..f702271 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
@@ -127,8 +127,6 @@ public class PlanFragment {
    */
   public void finalize(Analyzer analyzer)
       throws InternalException, NotImplementedException {
-    if (planRoot_ != null) computeCanAddSlotFilters(planRoot_);
-
     if (destNode_ != null) {
       Preconditions.checkState(sink_ == null);
       // we're streaming to an exchange node
@@ -180,73 +178,7 @@ public class PlanFragment {
     return dataPartition_ == DataPartition.UNPARTITIONED ? 1 : planRoot_.getNumNodes();
   }
 
-  /**
-   * Returns true and sets node.canAddPredicate, if we can add single-slot filters at
-   * execution time (i.e. after Prepare() to the plan tree rooted at this node.
-   * That is, 'node' can add filters that can be evaluated at nodes below.
-   *
-   * We compute this by walking the tree bottom up.
-   *
-   * TODO: move this to PlanNode.init() which is normally responsible for computing
-   * internal state of PlanNodes. We cannot do this currently since we need the
-   * distrubutionMode() set on HashJoin nodes. Once we call init() properly for
-   * repartitioned joins, this logic can move to init().
-   */
-  private boolean computeCanAddSlotFilters(PlanNode node) {
-    if (node instanceof HashJoinNode) {
-      HashJoinNode hashJoinNode = (HashJoinNode)node;
-      boolean childResult = computeCanAddSlotFilters(node.getChild(0));
-      if (!childResult) return false;
-      if (hashJoinNode.getJoinOp().equals(JoinOperator.FULL_OUTER_JOIN) ||
-          hashJoinNode.getJoinOp().equals(JoinOperator.LEFT_OUTER_JOIN) ||
-          hashJoinNode.getJoinOp().equals(JoinOperator.LEFT_ANTI_JOIN) ||
-          hashJoinNode.getJoinOp().equals(JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN)) {
-        // It is not correct to push through an outer or anti join on the probe side.
-        // We cannot filter those rows out.
-        return false;
-      }
-      // We can't push down predicates for partitioned joins yet.
-      // TODO: this can be hugely helpful to avoid network traffic. Implement this.
-      if (hashJoinNode.getDistributionMode() == DistributionMode.PARTITIONED) {
-        return false;
-      }
-
-      List<BinaryPredicate> joinConjuncts = hashJoinNode.getEqJoinConjuncts();
-      // We can only add these filters for conjuncts of the form:
-      // <probe_slot> = *. If the hash join has any equal join conjuncts in this form,
-      // mark the hash join node.
-      for (Expr c: joinConjuncts) {
-        if (c.getChild(0) instanceof SlotRef) {
-          hashJoinNode.setAddProbeFilters(true);
-          break;
-        }
-      }
-      // Even if this join cannot add predicates, return true so the parent node can.
-      return true;
-    } else if (node instanceof HdfsScanNode) {
-      // Since currently only the Parquet scanner employs the slot filter optimization,
-      // we enable it only if the majority format is Parquet. Otherwise we are adding
-      // the overhead of creating the SlotFilters in the build side in queries not on
-      // Parquet data.
-      // TODO: Modify the other scanners to exploit the slot filter optimization.
-      HdfsScanNode scanNode = (HdfsScanNode) node;
-      Preconditions.checkNotNull(scanNode.desc_);
-      Preconditions.checkNotNull(scanNode.desc_.getTable() instanceof HdfsTable);
-      HdfsTable table = (HdfsTable) scanNode.desc_.getTable();
-      if (table.getMajorityFormat() == HdfsFileFormat.PARQUET) {
-        return true;
-      } else {
-        return false;
-      }
-    } else {
-      for (PlanNode child : node.getChildren()) {
-        computeCanAddSlotFilters(child);
-      }
-      return false;
-    }
-  }
-
-  /**
+ /**
    * Estimates the per-node number of distinct values of exprs based on the data
    * partition of this fragment and its number of nodes. Returns -1 for an invalid
    * estimate, e.g., because getNumDistinctValues() failed on one of the exprs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d1eab7a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
index 73ea1ae..f2f49f6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
@@ -14,3 +14,25 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 ---- RESULTS
 620
 ====
+
+
+---- QUERY
+####################################################
+# Regression test for IMPALA-3141: Disabled filters should send dummy filters
+# to unblock waiters. Filters are disabled by setting the filter size to be very
+# small, and having a large enough build input to trigger the false-positive check.
+# Note: build side has a low NDV, so if the FP estimate changes to take NDV into
+# account this test will need to be updated.
+####################################################
+
+SET RUNTIME_FILTER_WAIT_TIME_MS=600000;
+SET RUNTIME_FILTER_MODE=GLOBAL;
+set RUNTIME_BLOOM_FILTER_SIZE=4096;
+select STRAIGHT_JOIN count(*) from alltypes a JOIN [BROADCAST]
+  (select c.id from alltypes c CROSS JOIN alltypes d LIMIT 40000) b
+  on a.id = b.id
+---- RESULTS
+40000
+---- RUNTIME_PROFILE
+row_regex: .*Build-Side Runtime-Filter Disabled \(FP Rate Too High\).*
+====