You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2016/09/01 02:41:45 UTC

incubator-impala git commit: IMPALA-3610: Account for memory used by filters in the coordinator

Repository: incubator-impala
Updated Branches:
  refs/heads/master 1350c3476 -> 24869d40f


IMPALA-3610: Account for memory used by filters in the coordinator

Before this patch, Impala would not account for the memory used to
aggregate runtime filters together in the coordinator. Impala's memory
could therefore be silently overcommitted.

This patch accounts for aggregated filter memory in a new filter
memtracker that is attached to the coordinator's query_mem_tracker(). If
the query memory limit is exceeded when a filter update arrives, that
update is discarded. If the filter is from a partitioned join, the
entire filter can therefore be discarded immediately (to alleviate
memory pressure) and a dummy 'always true' filter is sent to backends to
unblock them.

If the filter is from a broadcast join, no aggregation is done, so there
is no tracking. The Thrift input and output filter data structures are
not tracked (as we generally don't track RPC objects, but plan to in the
future). The filter payload is moved from the input request structure to
the output broadcast structure without copying.

Memory that is added to a memtracker must always be released. To do
this, we need to signal to the coordinator that it is finished, and that
there is no point trying to process any future updates that might arrive
concurrently. This patch adds Coordinator::Done() which is called from
QueryExecState::Done(), and which releases memory from all in-process
runtime filters.

Finally, this patch increases the upper limit for runtime filters to
512MB. This allows testing on very large datasets. The default maximum
is still 16MB, per RUNTIME_FILTER_MAX_SIZE.

Testing: Added a new test that triggers the OOM condition on the
coordinator. All existing runtime filter tests pass.

Change-Id: I3c52c8a1c2e79ef370c77bf264885fc859678d1b
Reviewed-on: http://gerrit.cloudera.org:8080/4066
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/24869d40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/24869d40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/24869d40

Branch: refs/heads/master
Commit: 24869d40fd6b68ae65f19f613626333a062da6b4
Parents: 1350c34
Author: Henry Robinson <he...@cloudera.com>
Authored: Thu Jul 7 17:34:10 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Sep 1 02:35:41 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc                   | 249 +++++++++++++++----
 be/src/runtime/coordinator.h                    |  39 +--
 be/src/runtime/mem-tracker.h                    |  10 +-
 be/src/runtime/plan-fragment-executor.cc        |   1 +
 be/src/runtime/runtime-filter-bank.h            |   2 +-
 be/src/service/impala-server.h                  |   1 -
 be/src/service/query-exec-state.cc              |   1 +
 be/src/util/bloom-filter-test.cc                |  43 +++-
 be/src/util/bloom-filter.cc                     |  17 +-
 be/src/util/bloom-filter.h                      |   4 +-
 .../QueryTest/runtime_row_filters_phj.test      |  45 ++++
 11 files changed, 310 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 66c792c..4728866 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -250,6 +250,80 @@ class Coordinator::FragmentInstanceState {
   int64_t rpc_latency_;
 };
 
+
+/// State of filters that are received for aggregation.
+///
+/// A broadcast join filter is published as soon as the first update is received for it
+/// and subsequent updates are ignored (as they will be the same).
+/// Updates for a partitioned join filter are aggregated in 'bloom_filter' and this is
+/// published once 'pending_count' reaches 0 and if the filter was not disabled before
+/// that.
+///
+/// A filter is disabled if an always_true filter update is received, an OOM is hit,
+/// filter aggregation is complete or if the query is complete.
+/// Once a filter is disabled, subsequent updates for that filter are ignored.
+class Coordinator::FilterState {
+ public:
+  FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src) : desc_(desc),
+      src_(src), pending_count_(0), first_arrival_time_(0L), completion_time_(0L),
+      disabled_(false) { }
+
+  TBloomFilter* bloom_filter() { return bloom_filter_.get(); }
+  boost::unordered_set<int>* src_fragment_instance_idxs() {
+    return &src_fragment_instance_idxs_;
+  }
+  std::vector<FilterTarget>* targets() { return &targets_; }
+  int64_t first_arrival_time() const { return first_arrival_time_; }
+  int64_t completion_time() const { return completion_time_; }
+  const TPlanNodeId& src() const { return src_; }
+  const TRuntimeFilterDesc& desc() const { return desc_; }
+  int pending_count() const { return pending_count_; }
+  void set_pending_count(int pending_count) { pending_count_ = pending_count; }
+  bool disabled() const { return disabled_; }
+
+  /// Aggregates partitioned join filters and updates memory consumption.
+  /// Disables filter if always_true filter is received or OOM is hit.
+  void ApplyUpdate(const TUpdateFilterParams& params, Coordinator* coord);
+
+  /// Disables a filter. A disabled filter consumes no memory.
+  void Disable(MemTracker* tracker);
+
+ private:
+  /// Contains the specification of the runtime filter.
+  TRuntimeFilterDesc desc_;
+
+  TPlanNodeId src_;
+  std::vector<FilterTarget> targets_;
+
+  // Index into fragment_instance_states_ for source fragment instances.
+  boost::unordered_set<int> src_fragment_instance_idxs_;
+
+  /// Number of remaining backends to hear from before filter is complete.
+  int pending_count_;
+
+  /// BloomFilter aggregated from all source plan nodes, to be broadcast to all
+  /// destination plan fragment instances. Owned by this object so that it can be
+  /// deallocated once finished with. Only set for partitioned joins (broadcast joins
+  /// need no aggregation).
+  /// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the
+  /// output structure in the case of a broadcast join. Similarly, for partitioned joins,
+  /// the filter is moved from the following member to the output structure.
+  std::unique_ptr<TBloomFilter> bloom_filter_;
+
+  /// Time at which first local filter arrived.
+  int64_t first_arrival_time_;
+
+  /// Time at which all local filters arrived.
+  int64_t completion_time_;
+
+  /// True if the filter is permanently disabled for this query.
+  bool disabled_;
+
+  /// TODO: Add a per-object lock so that we can avoid holding the global filter_lock_
+  /// for every filter update.
+
+};
+
 void Coordinator::FragmentInstanceState::ComputeTotalSplitSize(
     const PerNodeScanRanges& per_node_scan_ranges) {
   total_split_size_ = 0;
@@ -285,10 +359,18 @@ Coordinator::Coordinator(const TQueryOptions& query_options, ExecEnv* exec_env,
     obj_pool_(new ObjectPool()),
     query_events_(events),
     filter_routing_table_complete_(false),
-    filter_mode_(query_options.runtime_filter_mode) {
+    filter_mode_(query_options.runtime_filter_mode),
+    torn_down_(false) {
 }
 
 Coordinator::~Coordinator() {
+  DCHECK(torn_down_) << "TearDown() must be called before Coordinator is destroyed";
+
+  // This may be NULL while executing UDFs.
+  if (filter_mem_tracker_.get() != nullptr) {
+    filter_mem_tracker_->UnregisterFromParent();
+  }
+  filter_mem_tracker_.reset();
   query_mem_tracker_.reset();
 }
 
@@ -425,6 +507,9 @@ Status Coordinator::Exec(QuerySchedule& schedule,
 
     executor_.reset(NULL);
   }
+  filter_mem_tracker_.reset(new MemTracker(-1, -1, "Runtime Filter (Coordinator)",
+      query_mem_tracker(), false));
+
   // Initialize the execution profile structures.
   InitExecProfile(request);
 
@@ -460,14 +545,15 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
       if (filter_mode_ == TRuntimeFilterMode::LOCAL && !filter.has_local_targets) {
         continue;
       }
-      FilterState* f = &(filter_routing_table_[filter.filter_id]);
+      FilterRoutingTable::iterator i = filter_routing_table_.emplace(
+          filter.filter_id, FilterState(filter, plan_node.node_id)).first;
+      FilterState* f = &(i->second);
       if (plan_node.__isset.hash_join_node) {
-        f->desc = filter;
-        f->src = plan_node.node_id;
-        // Set the 'pending_count' to zero to indicate that for a filter with local-only
+        // Set the 'pending_count_' to zero to indicate that for a filter with local-only
         // targets the coordinator does not expect to receive any filter updates.
-        f->pending_count = filter.is_broadcast_join ?
+        int pending_count = filter.is_broadcast_join ?
             (filter.has_remote_targets ? 1 : 0) : num_hosts;
+        f->set_pending_count(pending_count);
         vector<int> src_idxs;
         for (int i = 0; i < num_hosts; ++i) {
           src_idxs.push_back(start_fragment_instance_idx + i);
@@ -482,7 +568,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
           random_shuffle(src_idxs.begin(), src_idxs.end());
           src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS);
         }
-        f->src_fragment_instance_idxs.insert(src_idxs.begin(), src_idxs.end());
+        f->src_fragment_instance_idxs()->insert(src_idxs.begin(), src_idxs.end());
       } else if (plan_node.__isset.hdfs_scan_node) {
         auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
         DCHECK(it != filter.planid_to_target_ndx.end());
@@ -494,7 +580,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
         for (int i = 0; i < num_hosts; ++i) {
           target.fragment_instance_idxs.insert(start_fragment_instance_idx + i);
         }
-        f->targets.push_back(target);
+        f->targets()->push_back(target);
       } else {
         DCHECK(false) << "Unexpected plan node with runtime filters: "
             << ThriftDebugString(plan_node);
@@ -605,17 +691,18 @@ string Coordinator::FilterDebugString() {
     table_printer.AddColumn("First arrived", false);
     table_printer.AddColumn("Completed", false);
   }
+  table_printer.AddColumn("Enabled", false);
   lock_guard<SpinLock> l(filter_lock_);
-  for (const FilterRoutingTable::value_type& v: filter_routing_table_) {
+  for (FilterRoutingTable::value_type& v: filter_routing_table_) {
     vector<string> row;
-    const FilterState& state = v.second;
+    FilterState& state = v.second;
     row.push_back(lexical_cast<string>(v.first));
-    row.push_back(lexical_cast<string>(state.src));
+    row.push_back(lexical_cast<string>(state.src()));
     vector<string> target_ids;
     vector<string> num_target_instances;
     vector<string> target_types;
     vector<string> partition_filter;
-    for (const auto& target: state.targets) {
+    for (const FilterTarget& target: *state.targets()) {
       target_ids.push_back(lexical_cast<string>(target.node_id));
       num_target_instances.push_back(
           lexical_cast<string>(target.fragment_instance_idxs.size()));
@@ -628,20 +715,22 @@ string Coordinator::FilterDebugString() {
     row.push_back(join(partition_filter, ", "));
 
     if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
-      int pending_count = state.completion_time != 0L ? 0 : state.pending_count;
+      int pending_count = state.completion_time() != 0L ? 0 : state.pending_count();
       row.push_back(Substitute("$0 ($1)", pending_count,
-          state.src_fragment_instance_idxs.size()));
-      if (state.first_arrival_time == 0L) {
+          state.src_fragment_instance_idxs()->size()));
+      if (state.first_arrival_time() == 0L) {
         row.push_back("N/A");
       } else {
-        row.push_back(PrettyPrinter::Print(state.first_arrival_time, TUnit::TIME_NS));
+        row.push_back(PrettyPrinter::Print(state.first_arrival_time(), TUnit::TIME_NS));
       }
-      if (state.completion_time == 0L) {
+      if (state.completion_time() == 0L) {
         row.push_back("N/A");
       } else {
-        row.push_back(PrettyPrinter::Print(state.completion_time, TUnit::TIME_NS));
+        row.push_back(PrettyPrinter::Print(state.completion_time(), TUnit::TIME_NS));
       }
     }
+
+    row.push_back(!state.disabled() ? "true" : "false");
     table_printer.AddRow(row);
   }
   // Add a line break, as in all contexts this is called we need to start a new line to
@@ -1793,8 +1882,8 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
           if (filter_it == filter_routing_table_.end()) continue;
           FilterState* f = &filter_it->second;
           if (plan_node.__isset.hash_join_node) {
-            if (f->src_fragment_instance_idxs.find(instance_state_idx) ==
-                f->src_fragment_instance_idxs.end()) {
+            if (f->src_fragment_instance_idxs()->find(instance_state_idx) ==
+                f->src_fragment_instance_idxs()->end()) {
               DCHECK(desc.is_broadcast_join);
               continue;
             }
@@ -1931,6 +2020,16 @@ void DistributeFilters(shared_ptr<TPublishFilterParams> params, TNetworkAddress
 
 }
 
+void Coordinator::TearDown() {
+  DCHECK(!torn_down_) << "Coordinator::TearDown() may not be called twice";
+  torn_down_ = true;
+  lock_guard<SpinLock> l(filter_lock_);
+  for (auto& filter: filter_routing_table_) {
+    FilterState* state = &filter.second;
+    state->Disable(filter_mem_tracker_.get());
+  }
+}
+
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
@@ -1952,53 +2051,59 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
     }
     FilterState* state = &it->second;
 
-    DCHECK(state->desc.has_remote_targets)
+    DCHECK(state->desc().has_remote_targets)
           << "Coordinator received filter that has only local targets";
 
-    // Check if the filter has already been sent, which could happen in two cases: if one
-    // local filter had always_true set - no point waiting for other local filters that
-    // can't affect the aggregated global filter, or if this is a broadcast join, and
-    // another local filter was already received.
-    if (state->pending_count == 0) return;
-    DCHECK_EQ(state->completion_time, 0L);
-    if (state->first_arrival_time == 0L) {
-      state->first_arrival_time = query_events_->ElapsedTime();
-    }
+    // Check if the filter has already been sent, which could happen in three cases:
+    //   * if one local filter had always_true set - no point waiting for other local
+    //     filters that can't affect the aggregated global filter
+    //   * if this is a broadcast join, and another local filter was already received
+    //   * if the filter could not be allocated and so an always_true filter was sent
+    //     immediately.
+    if (state->disabled()) return;
 
     if (filter_updates_received_->value() == 0) {
       query_events_->MarkEvent("First dynamic filter received");
     }
     filter_updates_received_->Add(1);
-    if (params.bloom_filter.always_true) {
-      state->bloom_filter = NULL;
-      state->pending_count = 0;
-    } else {
-      if (state->bloom_filter == NULL) {
-        state->bloom_filter = obj_pool()->Add(new BloomFilter(params.bloom_filter));
-      } else {
-        // TODO: Implement BloomFilter::Or(const ThriftBloomFilter&)
-        state->bloom_filter->Or(BloomFilter(params.bloom_filter));
-      }
-      if (--state->pending_count > 0) return;
-    }
 
-    // No more filters are pending on this filter ID. Create a distribution payload and
+    state->ApplyUpdate(params, this);
+
+    if (state->pending_count() > 0 && !state->disabled()) return;
+    // At this point, we either disabled this filter or aggregation is complete.
+    DCHECK(state->disabled() || state->pending_count() == 0);
+
+    // No more updates are pending on this filter ID. Create a distribution payload and
     // offer it to the queue.
-    DCHECK_EQ(state->pending_count, 0);
-    state->completion_time = query_events_->ElapsedTime();
-    for (const auto& target: state->targets) {
+    for (FilterTarget target: *state->targets()) {
       // Don't publish the filter to targets that are in the same fragment as the join
       // that produced it.
       if (target.is_local) continue;
       target_fragment_instance_idxs.insert(target.fragment_instance_idxs.begin(),
           target.fragment_instance_idxs.end());
     }
-    BloomFilter::ToThrift(state->bloom_filter, &rpc_params->bloom_filter);
+
+    // Assign outgoing bloom filter.
+    if (state->bloom_filter() != NULL) {
+      // Complete filter case.
+      // TODO: Replace with move() in Thrift 0.9.3.
+      TBloomFilter* aggregated_filter = state->bloom_filter();
+      filter_mem_tracker_->Release(aggregated_filter->directory.size());
+      swap(rpc_params->bloom_filter, *aggregated_filter);
+      DCHECK_EQ(aggregated_filter->directory.size(), 0);
+    } else {
+      // Disabled filter case (due to OOM or due to receiving an always_true filter).
+      rpc_params->bloom_filter.always_true = true;
+    }
+
+    // Filter is complete, and can be released.
+    state->Disable(filter_mem_tracker_.get());
+    DCHECK_EQ(state->bloom_filter(), reinterpret_cast<TBloomFilter*>(NULL));
   }
 
   rpc_params->filter_id = params.filter_id;
 
-  for (const auto& target_idx: target_fragment_instance_idxs) {
+  for (int32_t target_idx: target_fragment_instance_idxs) {
     FragmentInstanceState* fragment_inst = fragment_instance_states_[target_idx];
     DCHECK(fragment_inst != NULL) << "Missing fragment instance: " << target_idx;
     exec_env_->rpc_pool()->Offer(bind<void>(DistributeFilters, rpc_params,
@@ -2006,4 +2111,52 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   }
 }
 
+
+void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
+    Coordinator* coord) {
+  DCHECK_GT(pending_count_, 0);
+  DCHECK_EQ(completion_time_, 0L);
+  if (first_arrival_time_ == 0L) {
+    first_arrival_time_ = coord->query_events_->ElapsedTime();
+  }
+
+  --pending_count_;
+  if (params.bloom_filter.always_true) {
+    Disable(coord->filter_mem_tracker_.get());
+  } else if (bloom_filter_.get() == NULL) {
+    int64_t heap_space = params.bloom_filter.directory.size();
+    if (!coord->filter_mem_tracker_.get()->TryConsume(heap_space)) {
+      VLOG_QUERY << "Not enough memory to allocate filter: "
+                 << PrettyPrinter::Print(heap_space, TUnit::BYTES)
+                 << " (query: " << PrintId(coord->query_id()) << ")";
+      // Disable, as one missing update means a correct filter cannot be produced.
+      Disable(coord->filter_mem_tracker_.get());
+    } else {
+      bloom_filter_.reset(new TBloomFilter());
+      // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to
+      // move the payload from the request rather than copy it and take double the memory
+      // cost. After this point, params.bloom_filter is an empty filter and should not be
+      // read.
+      TBloomFilter* non_const_filter =
+          &const_cast<TBloomFilter&>(params.bloom_filter);
+      swap(*bloom_filter_.get(), *non_const_filter);
+      DCHECK_EQ(non_const_filter->directory.size(), 0);
+    }
+  } else {
+    BloomFilter::Or(params.bloom_filter, bloom_filter_.get());
+  }
+
+  if (pending_count_ == 0 || disabled_) {
+    completion_time_ = coord->query_events_->ElapsedTime();
+  }
+}
+
+void Coordinator::FilterState::Disable(MemTracker* tracker) {
+  disabled_ = true;
+  if (bloom_filter_.get() == NULL) return;
+  int64_t heap_space = bloom_filter_.get()->directory.size();
+  tracker->Release(heap_space);
+  bloom_filter_.reset();
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index aa10468..0e0b6d5 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -48,7 +48,6 @@
 namespace impala {
 
 class CountingBarrier;
-class BloomFilter;
 class DataStreamMgr;
 class DataSink;
 class RowBatch;
@@ -191,8 +190,12 @@ class Coordinator {
   /// filter to fragment instances.
   void UpdateFilter(const TUpdateFilterParams& params);
 
+  /// Called once the query is complete to tear down any remaining state.
+  void TearDown();
+
  private:
   class FragmentInstanceState;
+  struct FilterState;
 
   /// Typedef for boost utility to compute averaged stats
   /// TODO: including the median doesn't compile, looks like some includes are missing
@@ -364,7 +367,7 @@ class Coordinator {
   /// returned, successfully or not. Initialised during StartRemoteFragments().
   boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
 
-  // Represents a runtime filter target.
+  /// Represents a runtime filter target.
   struct FilterTarget {
     TPlanNodeId node_id;
     bool is_local;
@@ -378,31 +381,6 @@ class Coordinator {
     }
   };
 
-  struct FilterState {
-    TRuntimeFilterDesc desc;
-
-    TPlanNodeId src;
-    std::vector<FilterTarget> targets;
-
-    // Index into fragment_instance_states_ for source fragment instances.
-    boost::unordered_set<int> src_fragment_instance_idxs;
-
-    /// Number of remaining backends to hear from before filter is complete.
-    int pending_count;
-
-    /// BloomFilter aggregated from all source plan nodes, to be broadcast to all
-    /// destination plan fragment instances. Owned by the coordinator's object pool.
-    BloomFilter* bloom_filter;
-
-    /// Time at which first local filter arrived.
-    int64_t first_arrival_time;
-
-    /// Time at which all local filters arrived.
-    int64_t completion_time;
-
-    FilterState() : bloom_filter(NULL), first_arrival_time(0L), completion_time(0L) { }
-  };
-
   /// Protects filter_routing_table_.
   SpinLock filter_lock_;
 
@@ -421,6 +399,13 @@ class Coordinator {
   /// The filtering mode for this query. Set in constructor.
   const TRuntimeFilterMode::type filter_mode_;
 
+  /// Tracks the memory consumed by runtime filters during aggregation. Child of
+  /// query_mem_tracker_.
+  std::unique_ptr<MemTracker> filter_mem_tracker_;
+
+  /// True if and only if TearDown() has been called.
+  bool torn_down_;
+
   /// Returns a pretty-printed table of the current filter state.
   std::string FilterDebugString();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 52119a2..6c13af6 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -47,13 +47,13 @@ class QueryResourceMgr;
 /// By default, memory consumption is tracked via calls to Consume()/Release(), either to
 /// the tracker itself or to one of its descendents. Alternatively, a consumption metric
 /// can specified, and then the metric's value is used as the consumption rather than the
-/// tally maintained by Consume() and Release(). A tcmalloc metric is used to track process
-/// memory consumption, since the process memory usage may be higher than the computed
-/// total memory (tcmalloc does not release deallocated memory immediately).
+/// tally maintained by Consume() and Release(). A tcmalloc metric is used to track
+/// process memory consumption, since the process memory usage may be higher than the
+/// computed total memory (tcmalloc does not release deallocated memory immediately).
 //
 /// GcFunctions can be attached to a MemTracker in order to free up memory if the limit is
-/// reached. If LimitExceeded() is called and the limit is exceeded, it will first call the
-/// GcFunctions to try to free memory and recheck the limit. For example, the process
+/// reached. If LimitExceeded() is called and the limit is exceeded, it will first call
+/// the GcFunctions to try to free memory and recheck the limit. For example, the process
 /// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so
 /// this will be called before the process limit is reported as exceeded. GcFunctions are
 /// called in the order they are added, so expensive functions should be added last.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index c7bc3a6..76d2c02 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -86,6 +86,7 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
 Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
   lock_guard<mutex> l(prepare_lock_);
   DCHECK(!is_prepared_);
+
   if (is_cancelled_) return Status::CANCELLED;
 
   is_prepared_ = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/be/src/runtime/runtime-filter-bank.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 7af9fb9..51408a6 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -106,7 +106,7 @@ class RuntimeFilterBank {
   void Close();
 
   static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024;           // 4KB
-  static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024;   // 16MB
+  static const int64_t MAX_BLOOM_FILTER_SIZE = 512 * 1024 * 1024;  // 512MB
 
  private:
   /// Returns the the space (in bytes) required for a filter to achieve the configured

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 2b34ffc..f756932 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -51,7 +51,6 @@ namespace impala {
 class ExecEnv;
 class DataSink;
 class CancellationWork;
-class Coordinator;
 class ImpalaHttpHandler;
 class RowDescriptor;
 class TCatalogUpdate;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 50c2bf4..cea24bb 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -550,6 +550,7 @@ void ImpalaServer::QueryExecState::Done() {
       LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
             << " because of error: " << status.GetDetail();
     }
+    coord_->TearDown();
   }
 
   // Update result set cache metrics, and update mem limit accounting.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/be/src/util/bloom-filter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter-test.cc b/be/src/util/bloom-filter-test.cc
index 1da30e2..f86e2eb 100644
--- a/be/src/util/bloom-filter-test.cc
+++ b/be/src/util/bloom-filter-test.cc
@@ -247,21 +247,42 @@ TEST(BloomFilter, Thrift) {
   EXPECT_EQ(to_thrift.always_true, true);
 }
 
-TEST(BloomFilter, Or) {
+TEST(BloomFilter, ThriftOr) {
   BloomFilter bf1(BloomFilter::MinLogSpace(100, 0.01));
   BloomFilter bf2(BloomFilter::MinLogSpace(100, 0.01));
-  for (int i = 60; i < 80; ++i) BfInsert(bf2, i);
 
+  for (int i = 60; i < 80; ++i) BfInsert(bf2, i);
   for (int i = 0; i < 10; ++i) BfInsert(bf1, i);
-  bf2.Or(bf1);
-  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(bf2, i));
-  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(bf2, i));
-
-  for (int i = 11; i < 50; ++i) BfInsert(bf1, i);
-  bf2.Or(bf1);
-  for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(bf2, i));
-  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(bf2, i));
-  ASSERT_FALSE(BfFind(bf2, 81));
+
+  TBloomFilter bf1_thrift;
+  TBloomFilter bf2_thrift;
+
+  // Create TBloomFilter with BloomFilter values.
+  BloomFilter::ToThrift(&bf1, &bf1_thrift);
+  BloomFilter::ToThrift(&bf2, &bf2_thrift);
+
+  // Or the TBloomFilters.
+  BloomFilter::Or(bf1_thrift, &bf2_thrift);
+
+  // Apply aggregated TBloomFilter to BloomFilter to verify values with BfFind().
+  BloomFilter bf3(bf2_thrift);
+  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(bf3, i));
+  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(bf3, i));
+
+  // Insert another value to aggregated BloomFilter.
+  for (int i = 11; i < 50; ++i) BfInsert(bf3, i);
+
+  // Convert to TBloomFilter again and do Or().
+  TBloomFilter bf3_thrift;
+  BloomFilter::ToThrift(&bf3, &bf3_thrift);
+
+  BloomFilter::Or(bf1_thrift, &bf3_thrift);
+
+  // Apply TBloomFilter back to BloomFilter and verify if aggregation was correct.
+  BloomFilter bf4(bf3_thrift);
+  for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(bf4, i));
+  for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(bf4, i));
+  ASSERT_FALSE(BfFind(bf4, 81));
 }
 
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/be/src/util/bloom-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index bc8a956..7d8c8f7 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -83,13 +83,16 @@ void BloomFilter::ToThrift(const BloomFilter* filter, TBloomFilter* thrift) {
   filter->ToThrift(thrift);
 }
 
-void BloomFilter::Or(const BloomFilter& other) {
-  DCHECK_EQ(log_num_buckets_, other.log_num_buckets_);
-  BucketWord* dir_ptr = reinterpret_cast<BucketWord*>(directory_);
-  const BucketWord* other_dir_ptr = reinterpret_cast<const BucketWord*>(other.directory_);
-  int directory_size_in_words = directory_size() / sizeof(BucketWord);
-  // TODO: use SIMD here:
-  for (int i = 0; i < directory_size_in_words; ++i) dir_ptr[i] |= other_dir_ptr[i];
+void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
+  DCHECK(out != NULL);
+  DCHECK_EQ(in.log_heap_space, out->log_heap_space);
+  out->always_true |= in.always_true;
+  if (out->always_true) {
+    out->directory.resize(0);
+    return;
+  }
+
+  for (int i = 0; i < in.directory.size(); ++i) out->directory[i] |= in.directory[i];
 }
 
 // The following three methods are derived from

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/be/src/util/bloom-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 2f83f9e..7a94995 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -78,8 +78,8 @@ class BloomFilter {
   /// high probabilty) if it is not.
   bool Find(const uint32_t hash) const;
 
-  /// Computes the logical OR of this filter with 'other' and stores the result in 'this'.
-  void Or(const BloomFilter& other);
+  /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
+  static void Or(const TBloomFilter& in, TBloomFilter* out);
 
   /// As more distinct items are inserted into a BloomFilter, the false positive rate
   /// rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24869d40/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters_phj.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters_phj.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters_phj.test
index 451018a..8c8f770 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters_phj.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters_phj.test
@@ -46,3 +46,48 @@ select straight_join count(1)
 row_regex: .*1 of 1 Runtime Filter Published.*
 row_regex: .*Rows rejected: 2.43K \(2432\).*
 ====
+
+
+---- QUERY
+####################################################
+# Test case 3: Filters will not be used if they exceed
+# the configured memory limit on the coordinator.
+# To test this, we need to construct a query where memory
+# consumption on the coordinator exceeds MEM_LIMIT, but
+# not on the backends (because otherwise they will disable
+# the filters through another path). We set MEM_LIMIT to
+# the minimum possible then set filter size to be roughly
+# half that: since the coordinator must aggregate two of
+# these filters (and indeed must create one as well), it
+# will exceed the memory limit. This is checked for
+# indirectly by confirming that the filter had no effect
+# (when usually it would be selective).
+####################################################
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=128MB;
+SET RUNTIME_FILTER_MAX_SIZE=500MB;
+SET MEM_LIMIT=140MB;
+select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
+    on a.month = b.id and b.int_col = -3
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 \(128.00 MB\).*
+row_regex: .*Files processed: 8.*
+row_regex: .*Files rejected: 0.*
+====
+---- QUERY
+# Confirm that with broadcast join, memory limit is not hit.
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=128MB;
+SET RUNTIME_FILTER_MAX_SIZE=500MB;
+SET MEM_LIMIT=140MB;
+select STRAIGHT_JOIN * from alltypes a join [BROADCAST] alltypes b
+    on a.month = b.id and b.int_col = -3
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 \(128.00 MB\).*
+row_regex: .*Files processed: 8.*
+row_regex: .*Files rejected: 8.*
+====