You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/18 08:40:27 UTC

[2/2] incubator-impala git commit: IMPALA-2956: Filters should be able to target multiple scan nodes

IMPALA-2956: Filters should be able to target multiple scan nodes

With this commit runtime filters can be assigned to multiple destination
nodes (scans). For each filter, the destination nodes are determined
using equivalent classes during planning. For each filter, all its
destination nodes are in the left subtree rooted at the join node
that constructs this filter. A runtime filter may have both
local and remote targets. The backend determines how to route each
filter depending on the number and type (local, remote) of its destination
nodes.

With this commit, we enable runtime filter propagation in all the
operands of UNION [ALL|DISTINCT] nodes.

Change-Id: Iad2ce4e579a30616c469312a4e658140d317507b
Reviewed-on: http://gerrit.cloudera.org:8080/2932
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f992dc7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f992dc7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f992dc7f

Branch: refs/heads/master
Commit: f992dc7f88668ada08783b283025bfc8d22cdc66
Parents: 265e39f
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Mon May 2 11:07:17 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Wed May 18 01:40:22 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hash-join-node.cc                   |   4 +-
 be/src/exec/hdfs-scan-node.cc                   |  20 +-
 be/src/exec/partitioned-hash-join-node.cc       |   4 +-
 be/src/runtime/coordinator.cc                   |  82 +++--
 be/src/runtime/coordinator.h                    |  19 +-
 be/src/runtime/runtime-filter.cc                |  22 +-
 common/thrift/PlanNodes.thrift                  |  54 ++-
 .../com/cloudera/impala/analysis/Analyzer.java  |  31 +-
 .../impala/planner/DistributedPlanner.java      |   2 +-
 .../cloudera/impala/planner/HdfsScanNode.java   |   6 -
 .../com/cloudera/impala/planner/PlanNode.java   |   9 +-
 .../impala/planner/RuntimeFilterGenerator.java  | 333 ++++++++++++-------
 .../queries/PlannerTest/data-source-tables.test |   1 -
 .../queries/PlannerTest/hbase.test              |   2 -
 .../queries/PlannerTest/inline-view.test        |   2 +
 .../queries/PlannerTest/join-order.test         |  23 +-
 .../queries/PlannerTest/joins.test              |  22 ++
 .../queries/PlannerTest/kudu-delete.test        |   2 +-
 .../queries/PlannerTest/kudu-update.test        |   4 -
 .../queries/PlannerTest/kudu.test               |  11 +-
 .../queries/PlannerTest/nested-collections.test |  17 +-
 .../queries/PlannerTest/outer-joins.test        |   2 +
 .../PlannerTest/predicate-propagation.test      |   9 +-
 .../PlannerTest/runtime-filter-propagation.test | 124 ++++++-
 .../queries/PlannerTest/subquery-rewrite.test   |   5 +-
 .../queries/PlannerTest/tpcds-all.test          |   6 +
 .../queries/PlannerTest/tpch-all.test           |  24 +-
 .../queries/PlannerTest/tpch-nested.test        |  84 ++---
 .../queries/PlannerTest/union.test              |   2 +
 .../queries/PlannerTest/views.test              |   8 +
 .../queries/PlannerTest/with-clause.test        |   8 +
 .../queries/QueryTest/runtime_filters.test      |  34 ++
 .../queries/QueryTest/runtime_row_filters.test  |  32 ++
 33 files changed, 728 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 9823df6..7163c88 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -86,8 +86,8 @@ Status HashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {
         !tfilter.is_broadcast_join) {
       continue;
     }
-    if (state->query_options().disable_row_runtime_filtering &&
-        !tfilter.is_bound_by_partition_columns) {
+    if (state->query_options().disable_row_runtime_filtering
+        && !tfilter.applied_on_partition_columns) {
       continue;
     }
     filters_.push_back(state->filter_bank()->RegisterFilter(tfilter, true));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 4435aec..a4ecbc7 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -145,13 +145,20 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
 
   const TQueryOptions& query_options = state->query_options();
   for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) {
+    auto it = filter.planid_to_target_ndx.find(tnode.node_id);
+    DCHECK(it != filter.planid_to_target_ndx.end());
+    const TRuntimeFilterTargetDesc& target = filter.targets[it->second];
+    if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL &&
+        !target.is_local_target) {
+      continue;
+    }
     if (query_options.disable_row_runtime_filtering &&
-        !filter.is_bound_by_partition_columns) {
+        !target.is_bound_by_partition_columns) {
       continue;
     }
 
     FilterContext filter_ctx;
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, filter.target_expr, &filter_ctx.expr));
+    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, target.target_expr, &filter_ctx.expr));
     filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, false);
 
     string filter_profile_title = Substitute("Filter $0 ($1)", filter.filter_id,
@@ -159,8 +166,8 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
     RuntimeProfile* profile = state->obj_pool()->Add(
         new RuntimeProfile(state->obj_pool(), filter_profile_title));
     runtime_profile_->AddChild(profile);
-    filter_ctx.stats = state->obj_pool()->Add(
-        new FilterStats(profile, filter.is_bound_by_partition_columns));
+    filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile,
+        target.is_bound_by_partition_columns));
 
     filter_ctxs_.push_back(filter_ctx);
   }
@@ -1133,7 +1140,10 @@ bool HdfsScanNode::PartitionPassesFilterPredicates(int32_t partition_id,
   if (template_tuple == NULL) return true;
   TupleRow* tuple_row_mem = reinterpret_cast<TupleRow*>(&template_tuple);
   for (const FilterContext& ctx: filter_ctxs) {
-    if (!ctx.filter->filter_desc().is_bound_by_partition_columns) continue;
+    int target_ndx = ctx.filter->filter_desc().planid_to_target_ndx.at(id_);
+    if (!ctx.filter->filter_desc().targets[target_ndx].is_bound_by_partition_columns) {
+      continue;
+    }
     void* e = ctx.expr->GetValue(tuple_row_mem);
 
     // Not quite right because bitmap could arrive after Eval(), but we're ok with

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 6f29008..8aa48fa 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -100,8 +100,8 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
         !filter.is_broadcast_join) {
       continue;
     }
-    if (state->query_options().disable_row_runtime_filtering &&
-        !filter.is_bound_by_partition_columns) {
+    if (state->query_options().disable_row_runtime_filtering
+        && !filter.applied_on_partition_columns) {
       continue;
     }
     FilterContext filter_ctx;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 50714ea..fd27a51 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -489,39 +489,46 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
   for (const TPlanNode& plan_node: plan_nodes) {
     if (!plan_node.__isset.runtime_filters) continue;
     for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
-      if (filter_mode_ == TRuntimeFilterMode::LOCAL && !filter.has_local_target) {
+      if (filter_mode_ == TRuntimeFilterMode::LOCAL && !filter.has_local_targets) {
         continue;
       }
       FilterState* f = &(filter_routing_table_[filter.filter_id]);
       if (plan_node.__isset.hash_join_node) {
         f->desc = filter;
         f->src = plan_node.node_id;
-        f->pending_count = filter.is_broadcast_join ? 1 : num_hosts;
+        // Set the 'pending_count' to zero to indicate that for a filter with local-only
+        // targets the coordinator does not expect to receive any filter updates.
+        f->pending_count = filter.is_broadcast_join ?
+            (filter.has_remote_targets ? 1 : 0) : num_hosts;
         vector<int> src_idxs;
         for (int i = 0; i < num_hosts; ++i) {
           src_idxs.push_back(start_fragment_instance_idx + i);
         }
 
-        // If this is a broadcast join with a non-local target, only build and publish it
-        // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is an intra-fragment
-        // filter for a broadcast join, it is short-circuited in every fragment instance
-        // and therefore will not be published globally, and should be generated
-        // everywhere it can be used.
-        if (filter.is_broadcast_join && !filter.has_local_target
+        // If this is a broadcast join with only non-local targets, build and publish it
+        // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a broadcast join
+        // or it is a broadcast join with local targets, it should be generated
+        // everywhere the join is executed.
+        if (filter.is_broadcast_join && !filter.has_local_targets
             && num_hosts > MAX_BROADCAST_FILTER_PRODUCERS) {
           random_shuffle(src_idxs.begin(), src_idxs.end());
           src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS);
         }
         f->src_fragment_instance_idxs.insert(src_idxs.begin(), src_idxs.end());
       } else if (plan_node.__isset.hdfs_scan_node) {
-        f->target = plan_node.node_id;
+        auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
+        DCHECK(it != filter.planid_to_target_ndx.end());
+        const TRuntimeFilterTargetDesc& tFilterTarget = filter.targets[it->second];
+        if (filter_mode_ == TRuntimeFilterMode::LOCAL && !tFilterTarget.is_local_target) {
+          continue;
+        }
+        FilterTarget target(tFilterTarget);
         for (int i = 0; i < num_hosts; ++i) {
-          f->target_fragment_instance_idxs.insert(start_fragment_instance_idx + i);
+          target.fragment_instance_idxs.insert(start_fragment_instance_idx + i);
         }
+        f->targets.push_back(target);
       } else {
-        // TODO: Frontend should not target filters at HBase scan nodes.
-        DCHECK(plan_node.__isset.hbase_scan_node)
-            << "Unexpected plan node with runtime filters: "
+        DCHECK(false) << "Unexpected plan node with runtime filters: "
             << ThriftDebugString(plan_node);
       }
     }
@@ -619,14 +626,14 @@ string Coordinator::FilterDebugString() {
   TablePrinter table_printer;
   table_printer.AddColumn("ID", false);
   table_printer.AddColumn("Src. Node", false);
-  table_printer.AddColumn("Tgt. Node", false);
+  table_printer.AddColumn("Tgt. Node(s)", false);
   table_printer.AddColumn("Targets", false);
-  table_printer.AddColumn("Type", false);
+  table_printer.AddColumn("Target type", false);
   table_printer.AddColumn("Partition filter", false);
 
   // Distribution metrics are only meaningful if the coordinator is routing the filter.
   if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
-    table_printer.AddColumn("Pending", false);
+    table_printer.AddColumn("Pending (Expected)", false);
     table_printer.AddColumn("First arrived", false);
     table_printer.AddColumn("Completed", false);
   }
@@ -636,16 +643,21 @@ string Coordinator::FilterDebugString() {
     const FilterState& state = v.second;
     row.push_back(lexical_cast<string>(v.first));
     row.push_back(lexical_cast<string>(state.src));
-    row.push_back(lexical_cast<string>(state.target));
-    row.push_back(lexical_cast<string>(state.target_fragment_instance_idxs.size()));
-
-    if (state.desc.has_local_target) {
-      row.push_back("LOCAL");
-    } else {
-      row.push_back(
-          state.desc.is_broadcast_join ? "GLOBAL (Broadcast)" : "GLOBAL (Partition)");
+    vector<string> target_ids;
+    vector<string> num_target_instances;
+    vector<string> target_types;
+    vector<string> partition_filter;
+    for (const auto& target: state.targets) {
+      target_ids.push_back(lexical_cast<string>(target.node_id));
+      num_target_instances.push_back(
+          lexical_cast<string>(target.fragment_instance_idxs.size()));
+      target_types.push_back(target.is_local ? "LOCAL" : "REMOTE");
+      partition_filter.push_back(target.is_bound_by_partition_columns ? "true" : "false");
     }
-    row.push_back(state.desc.is_bound_by_partition_columns ? "true" : "false");
+    row.push_back(join(target_ids, ", "));
+    row.push_back(join(num_target_instances, ", "));
+    row.push_back(join(target_types, ", "));
+    row.push_back(join(partition_filter, ", "));
 
     if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
       int pending_count = state.completion_time != 0L ? 0 : state.pending_count;
@@ -2017,8 +2029,9 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
       return;
     }
     FilterState* state = &it->second;
-    DCHECK(!state->desc.has_local_target)
-        << "Coordinator received filter that has local target";
+
+    DCHECK(state->desc.has_remote_targets)
+          << "Coordinator received filter that has only local targets";
 
     // Check if the filter has already been sent, which could happen in two cases: if one
     // local filter had always_true set - no point waiting for other local filters that
@@ -2051,16 +2064,21 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
     // offer it to the queue.
     DCHECK_EQ(state->pending_count, 0);
     state->completion_time = query_events_->ElapsedTime();
-    target_fragment_instance_idxs = state->target_fragment_instance_idxs;
+    for (const auto& target: state->targets) {
+      // Don't publish the filter to targets that are in the same fragment as the join
+      // that produced it.
+      if (target.is_local) continue;
+      target_fragment_instance_idxs.insert(target.fragment_instance_idxs.begin(),
+          target.fragment_instance_idxs.end());
+    }
     BloomFilter::ToThrift(state->bloom_filter, &rpc_params->bloom_filter);
   }
 
   rpc_params->filter_id = params.filter_id;
 
-  for (int idx: target_fragment_instance_idxs) {
-    FragmentInstanceState* fragment_inst = fragment_instance_states_[idx];
-    DCHECK(fragment_inst != NULL) << "Missing fragment instance: " << idx;
-
+  for (const auto& target_idx: target_fragment_instance_idxs) {
+    FragmentInstanceState* fragment_inst = fragment_instance_states_[target_idx];
+    DCHECK(fragment_inst != NULL) << "Missing fragment instance: " << target_idx;
     exec_env_->rpc_pool()->Offer(bind<void>(DistributeFilters, rpc_params,
         fragment_inst->impalad_address(), fragment_inst->fragment_instance_id()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 1235473..048523c 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -361,18 +361,29 @@ class Coordinator {
   /// returned, successfully or not. Initialised during StartRemoteFragments().
   boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
 
+  // Represents a runtime filter target.
+  struct FilterTarget {
+    TPlanNodeId node_id;
+    bool is_local;
+    bool is_bound_by_partition_columns;
+    boost::unordered_set<int> fragment_instance_idxs;
+
+    FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) {
+      node_id = tFilterTarget.node_id;
+      is_bound_by_partition_columns = tFilterTarget.is_bound_by_partition_columns;
+      is_local = tFilterTarget.is_local_target;
+    }
+  };
+
   struct FilterState {
     TRuntimeFilterDesc desc;
 
     TPlanNodeId src;
-    TPlanNodeId target;
+    std::vector<FilterTarget> targets;
 
     // Index into fragment_instance_states_ for source fragment instances.
     boost::unordered_set<int> src_fragment_instance_idxs;
 
-    // Index into fragment_instance_states_ for target fragment instances.
-    boost::unordered_set<int> target_fragment_instance_idxs;
-
     /// Number of remaining backends to hear from before filter is complete.
     int pending_count;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/be/src/runtime/runtime-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 7616320..b0126ec 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -72,8 +72,14 @@ RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filte
     DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
     produced_filters_[filter_desc.filter_id] = ret;
   } else {
-    DCHECK(consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end());
-    consumed_filters_[filter_desc.filter_id] = ret;
+    if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
+      consumed_filters_[filter_desc.filter_id] = ret;
+    } else {
+      // The filter has already been registered in this filter bank by another
+      // target node.
+      DCHECK_GT(filter_desc.targets.size(), 1);
+      ret = consumed_filters_[filter_desc.filter_id];
+    }
   }
   return ret;
 }
@@ -104,14 +110,17 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
   DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
       << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
   TUpdateFilterParams params;
+  // A runtime filter may have both local and remote targets.
   bool has_local_target = false;
+  bool has_remote_target = false;
   {
     lock_guard<mutex> l(runtime_filter_lock_);
     RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
     DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
                                           << filter_id;
     it->second->SetBloomFilter(bloom_filter);
-    has_local_target = it->second->filter_desc().has_local_target;
+    has_local_target = it->second->filter_desc().has_local_targets;
+    has_remote_target = it->second->filter_desc().has_remote_targets;
   }
 
   if (has_local_target) {
@@ -123,14 +132,15 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
       RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
       if (it == consumed_filters_.end()) return;
       filter = it->second;
-      // Check if the filter already showed up.
-      DCHECK(!filter->HasBloomFilter());
     }
     filter->SetBloomFilter(bloom_filter);
     state_->runtime_profile()->AddInfoString(
         Substitute("Filter $0 arrival", filter_id),
         PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
-  } else if (state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
+  }
+
+  if (has_remote_target
+      && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
     BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
     params.filter_id = filter_id;
     params.query_id = query_ctx_.query_id;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 5429f34..d095047 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -73,35 +73,57 @@ enum TReplicaPreference {
   REMOTE
 }
 
-// Specification of a runtime filter
+// Specification of a runtime filter target.
+struct TRuntimeFilterTargetDesc {
+  // Target node id
+  1: Types.TPlanNodeId node_id
+
+  // Expr on which the filter is applied
+  2: required Exprs.TExpr target_expr
+
+  // Indicates if 'target_expr' is bound only by partition columns
+  3: required bool is_bound_by_partition_columns
+
+  // Slot ids on which 'target_expr' is bound on
+  4: required list<Types.TSlotId> target_expr_slotids
+
+  // Indicates if this target is on the same fragment as the join that
+  // produced the runtime filter
+  5: required bool is_local_target
+}
+
+// Specification of a runtime filter.
 struct TRuntimeFilterDesc {
   // Filter unique id (within a query)
   1: required i32 filter_id
 
-  // Expr on which the filter is applied; it is fully bound by a scan node.
-  2: required Exprs.TExpr target_expr
-
   // Expr on which the filter is built on a hash join.
-  3: required Exprs.TExpr src_expr
+  2: required Exprs.TExpr src_expr
+
+  // List of targets for this runtime filter
+  3: required list<TRuntimeFilterTargetDesc> targets
+
+  // Map of target node id to the corresponding index in 'targets'
+  4: required map<Types.TPlanNodeId, i32> planid_to_target_ndx
 
-  // If set, indicates if the source join node of this filter is a broadcast or
+  // Indicates if the source join node of this filter is a broadcast or
   // a partitioned join.
-  4: optional bool is_broadcast_join
+  5: required bool is_broadcast_join
 
-  // If set, indicates if filter_expr is bound only by partition columns
-  5: optional bool is_bound_by_partition_columns
+  // Indicates if there is at least one target scan node that is in the
+  // same fragment as the broadcast join that produced the runtime filter
+  6: required bool has_local_targets
 
-  // SlotIds on which the target expr is bound on
-  6: optional list<Types.TSlotId> target_expr_slotids
+  // Indicates if there is at least one target scan node that is not in the same
+  // fragment as the broadcast join that produced the runtime filter
+  7: required bool has_remote_targets
 
-  // If set, indicates that the filter should not be sent to the coordinator because it
-  // is produced by a broadcast join and the target scan node is in the same fragment
-  // as the join.
-  7: optional bool has_local_target
+  // Indicates if this filter is applied only on partition columns
+  8: required bool applied_on_partition_columns
 
   // The estimated number of distinct values that the planner expects the filter to hold.
   // Used to compute the size of the filter.
-  8: optional i64 ndv_estimate
+  9: optional i64 ndv_estimate
 }
 
 // The information contained in subclasses of ScanNode captured in two separate

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java b/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java
index 03f9881..7c77351 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java
@@ -1988,6 +1988,21 @@ public class Analyzer {
     return result;
   }
 
+  /**
+   * Returns the ids of all the slots for which there is a value transfer from 'srcSid'.
+   */
+  public List<SlotId> getValueTransferTargets(SlotId srcSid) {
+    List<SlotId> result = Lists.newArrayList();
+    int maxSlotId = globalState_.valueTransferGraph.getNumSlots();
+    if (srcSid.asInt() >= maxSlotId) return result;
+    for (SlotDescriptor slot: globalState_.descTbl.getSlotDescs()) {
+      SlotId targetSid = slot.getId();
+      if (targetSid.asInt() >= maxSlotId) continue;
+      if (hasValueTransfer(srcSid, targetSid)) result.add(targetSid);
+    }
+    return result;
+  }
+
   public EquivalenceClassId getEquivClassId(SlotId slotId) {
     return globalState_.equivClassBySlotId.get(slotId);
   }
@@ -2434,6 +2449,13 @@ public class Analyzer {
     return globalState_.valueTransferGraph.hasValueTransfer(a, b);
   }
 
+  public boolean validateValueTransferGraph() {
+    StringBuilder actual = new StringBuilder();
+    StringBuilder expected = new StringBuilder();
+    boolean res = globalState_.valueTransferGraph.validate(actual, expected);
+    return res;
+  }
+
   public EventSequence getTimeline() { return globalState_.timeline; }
 
   /**
@@ -2488,6 +2510,12 @@ public class Analyzer {
     // Condensed DAG of value transfers in the new slot domain.
     private boolean[][] valueTransfer_;
 
+    // Number of slots registered at the time when the value transfer graph was
+    // computed.
+    private int numSlots_ = globalState_.descTbl.getMaxSlotId().asInt() + 1;
+
+    public int getNumSlots() { return numSlots_; }
+
     /**
      * Computes all direct and transitive value transfers based on the registered
      * conjuncts of the form <slotref> = <slotref>. The high-level steps are:
@@ -2512,8 +2540,7 @@ public class Analyzer {
       partitionValueTransfers(completeSubGraphs_, origValueTransfers);
 
       // Coalesce complete subgraphs into a single slot and assign new slot ids.
-      int origNumSlots = globalState_.descTbl.getMaxSlotId().asInt() + 1;
-      coalescedSlots_ = new int[origNumSlots];
+      coalescedSlots_ = new int[numSlots_];
       Arrays.fill(coalescedSlots_, -1);
       for (Set<SlotId> equivClass: completeSubGraphs_.getSets()) {
         int representative = nextCoalescedSlotId_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
index bdf32e9..5dc262c 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
@@ -513,7 +513,7 @@ public class DistributedPlanner {
 
     for (RuntimeFilter filter: node.getRuntimeFilters()) {
       filter.setIsBroadcast(doBroadcast);
-      filter.computeHasLocalTarget();
+      filter.computeHasLocalTargets();
       // Work around IMPALA-3450, where cardinalities might be wrong in single-node plans
       // with UNION and LIMITs.
       // TODO: Remove.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
index 60dccc5..611bb77 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
@@ -660,10 +660,4 @@ public class HdfsScanNode extends ScanNode {
 
   @Override
   public boolean hasCorruptTableStats() { return hasCorruptTableStats_; }
-
-  @Override
-  protected void addRuntimeFilter(RuntimeFilter filter) {
-    Preconditions.checkState(filter.getTargetExpr().isBoundByTupleIds(tupleIds_));
-    super.addRuntimeFilter(filter);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
index 895df78..fbbcd0d 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
@@ -626,8 +626,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     StringBuilder output = new StringBuilder();
     List<String> filtersStr = Lists.newArrayList();
     for (RuntimeFilter filter: runtimeFilters_) {
-      Expr expr =
-          (isBuildNode) ? filter.getSrcExpr() : filter.getTargetExpr();
+      Expr expr = null;
+      if (isBuildNode) {
+        expr = filter.getSrcExpr();
+      } else {
+        expr = filter.getTargetExpr(getId());
+      }
+      Preconditions.checkNotNull(expr);
       filtersStr.add(String.format(format, filter.getFilterId(), expr.toSql()));
     }
     output.append(Joiner.on(", ").join(filtersStr) + "\n");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterGenerator.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterGenerator.java
index dd27051..8bdd126 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/RuntimeFilterGenerator.java
@@ -19,6 +19,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import com.cloudera.impala.analysis.Analyzer;
 import com.cloudera.impala.analysis.BinaryPredicate;
@@ -34,9 +35,12 @@ import com.cloudera.impala.catalog.Table;
 import com.cloudera.impala.common.IdGenerator;
 import com.cloudera.impala.planner.PlanNode;
 import com.cloudera.impala.thrift.TRuntimeFilterDesc;
+import com.cloudera.impala.thrift.TRuntimeFilterTargetDesc;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,8 +50,9 @@ import org.slf4j.LoggerFactory;
  * runtime filter propagation. Runtime filter propagation is an optimization technique
  * used to filter scanned tuples or scan ranges based on information collected at
  * runtime. A runtime filter is constructed during the build phase of a join node, and is
- * applied at a scan node on the probe side of that join node. Runtime filters are
- * generated from equi-join predicates but they do not replace the original predicates.
+ * applied at, potentially, multiple scan nodes on the probe side of that join node.
+ * Runtime filters are generated from equi-join predicates but they do not replace the
+ * original predicates.
  *
  * Example: select * from T1, T2 where T1.a = T2.b and T2.c = '1';
  * Assuming that T1 is a fact table and T2 is a significantly smaller dimension table, a
@@ -69,9 +74,6 @@ public final class RuntimeFilterGenerator {
   private final Map<TupleId, List<RuntimeFilter>> runtimeFiltersByTid_ =
       Maps.newHashMap();
 
-  // List of runtime filters generated
-  private final List<RuntimeFilter> runtimeFilters_ = Lists.newArrayList();
-
   // Generator for filter ids
   private final IdGenerator<RuntimeFilterId> filterIdGenerator =
       RuntimeFilterId.createGenerator();
@@ -85,43 +87,92 @@ public final class RuntimeFilterGenerator {
    * the left plan subtree of the associated join node, while rhs_expr is the expr on
    * which the filter is built and can be bound by any number of tuple ids from the
    * right plan subtree. Every runtime filter must record the join node that constructs
-   * the filter and the scan node that applies the filter (destination node).
+   * the filter and the scan nodes that apply the filter (destination nodes).
    */
   public static class RuntimeFilter {
     // Identifier of the filter (unique within a query)
     private final RuntimeFilterId id_;
     // Join node that builds the filter
     private final JoinNode src_;
-    // Scan node that applies the filter
-    private ScanNode target_;
     // Expr (rhs of join predicate) on which the filter is built
     private final Expr srcExpr_;
-    // Expr (lhs of join predicate) on which the filter is applied
-    private Expr targetExpr_;
-    // Slots from base table tuples that are in the same equivalent classes as the slots
-    // of 'targetExpr_'. The slots are grouped by tuple id.
-    private Map<TupleId, List<SlotId>> targetSlotsByTid_;
+    // Expr (lhs of join predicate) from which the targetExprs_ are generated.
+    private final Expr origTargetExpr_;
+    // Runtime filter targets
+    private final List<RuntimeFilterTarget> targets_ = Lists.newArrayList();
+    // Slots from base table tuples that have value transfer from the slots
+    // of 'origTargetExpr_'. The slots are grouped by tuple id.
+    private final Map<TupleId, List<SlotId>> targetSlotsByTid_;
     // If true, the join node building this filter is executed using a broadcast join;
     // set in the DistributedPlanner.createHashJoinFragment()
     private boolean isBroadcastJoin_;
-    // If true, targetExpr_ is bound by only partition columns
-    private boolean isBoundByPartitionColumns_;
-    // If true, the filter is applied by a scan in the same fragment as the join node
-    // which produced it. Set in DistributedPlanner.createHashJoinFragment() if there is a
-    // distributed plan, otherwise always true by definition.
-    private boolean hasLocalTarget_ = true;
     // Estimate of the number of distinct values that will be inserted into this filter,
     // globally across all instances of the source node. Used to compute an optimal size
     // for the filter. A value of -1 means no estimate is available, and default filter
     // parameters should be used.
     private long ndvEstimate_ = -1;
+    // If true, the filter is produced by a broadcast join and there is at least one
+    // destination scan node which is in the same fragment as the join; set in
+    // DistributedPlanner.createHashJoinFragment().
+    private boolean hasLocalTargets_ = false;
+    // If true, there is at least one destination scan node which is not in the same
+    // fragment as the join that produced the filter; set in
+    // DistributedPlanner.createHashJoinFragment().
+    private boolean hasRemoteTargets_ = false;
+    // If set, indicates that the filter can't be assigned to another scan node.
+    // Once set, it can't be unset.
+    private boolean finalized_ = false;
+
+    /**
+     * Internal representation of a runtime filter target.
+     */
+    private static class RuntimeFilterTarget {
+      // Scan node that applies the filter
+      public ScanNode node;
+      // Expr on which the filter is applied
+      public Expr expr;
+      // Indicates if 'expr' is bound only by partition columns
+      public boolean isBoundByPartitionColumns = false;
+      // Indicates if 'node' is in the same fragment as the join that produces the
+      // filter
+      public boolean isLocalTarget = false;
+
+      public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr) {
+        node = targetNode;
+        expr = targetExpr;
+      }
+
+      public TRuntimeFilterTargetDesc toThrift() {
+        TRuntimeFilterTargetDesc tFilterTarget = new TRuntimeFilterTargetDesc();
+        tFilterTarget.setNode_id(node.getId().asInt());
+        tFilterTarget.setTarget_expr(expr.treeToThrift());
+        List<SlotId> sids = Lists.newArrayList();
+        expr.getIds(null, sids);
+        List<Integer> tSlotIds = Lists.newArrayListWithCapacity(sids.size());
+        for (SlotId sid: sids) tSlotIds.add(sid.asInt());
+        tFilterTarget.setTarget_expr_slotids(tSlotIds);
+        tFilterTarget.setIs_bound_by_partition_columns(isBoundByPartitionColumns);
+        tFilterTarget.setIs_local_target(isLocalTarget);
+        return tFilterTarget;
+      }
+
+      @Override
+      public String toString() {
+        StringBuilder output = new StringBuilder();
+        return output.append("Target Id: " + node.getId() + " ")
+            .append("Target expr: " + expr.debugString() + " ")
+            .append("Partition columns: " + isBoundByPartitionColumns)
+            .append("Is local: " + isLocalTarget)
+            .toString();
+      }
+    }
 
     private RuntimeFilter(RuntimeFilterId filterId, JoinNode filterSrcNode,
-        Expr srcExpr, Expr targetExpr, Map<TupleId, List<SlotId>> targetSlots) {
+        Expr srcExpr, Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots) {
       id_ = filterId;
       src_ = filterSrcNode;
       srcExpr_ = srcExpr;
-      targetExpr_ = targetExpr;
+      origTargetExpr_ = origTargetExpr;
       targetSlotsByTid_ = targetSlots;
       computeNdvEstimate();
     }
@@ -135,20 +186,29 @@ public final class RuntimeFilterGenerator {
     @Override
     public int hashCode() { return id_.hashCode(); }
 
+    public void markFinalized() { finalized_ = true; }
+    public boolean isFinalized() { return finalized_; }
+
+    /**
+     * Serializes a runtime filter to Thrift.
+     */
     public TRuntimeFilterDesc toThrift() {
       TRuntimeFilterDesc tFilter = new TRuntimeFilterDesc();
       tFilter.setFilter_id(id_.asInt());
-      tFilter.setTarget_expr(targetExpr_.treeToThrift());
       tFilter.setSrc_expr(srcExpr_.treeToThrift());
-      tFilter.setIs_bound_by_partition_columns(isBoundByPartitionColumns_);
       tFilter.setIs_broadcast_join(isBroadcastJoin_);
-      tFilter.setHas_local_target(hasLocalTarget_);
       tFilter.setNdv_estimate(ndvEstimate_);
-      List<SlotId> sids = Lists.newArrayList();
-      targetExpr_.getIds(null, sids);
-      List<Integer> tSlotIds = Lists.newArrayList();
-      for (SlotId sid: sids) tSlotIds.add(sid.asInt());
-      tFilter.setTarget_expr_slotids(tSlotIds);
+      tFilter.setHas_local_targets(hasLocalTargets_);
+      tFilter.setHas_remote_targets(hasRemoteTargets_);
+      boolean appliedOnPartitionColumns = true;
+      for (int i = 0; i < targets_.size(); ++i) {
+        RuntimeFilterTarget target = targets_.get(i);
+        tFilter.addToTargets(target.toThrift());
+        tFilter.putToPlanid_to_target_ndx(target.node.getId().asInt(), i);
+        appliedOnPartitionColumns =
+            appliedOnPartitionColumns && target.isBoundByPartitionColumns;
+      }
+      tFilter.setApplied_on_partition_columns(appliedOnPartitionColumns);
       return tFilter;
     }
 
@@ -228,31 +288,36 @@ public final class RuntimeFilterGenerator {
 
     /**
      * Static function that returns the ids of slots bound by base table tuples for which
-     * there is a value transfer from 'slotId'. The slots are grouped by tuple id.
+     * there is a value transfer from 'srcSid'. The slots are grouped by tuple id.
      */
     private static Map<TupleId, List<SlotId>> getBaseTblEquivSlots(Analyzer analyzer,
-        SlotId slotId) {
+        SlotId srcSid) {
       Map<TupleId, List<SlotId>> slotsByTid = Maps.newHashMap();
-      for (SlotId equivSlotId: analyzer.getEquivSlots(slotId)) {
-        TupleDescriptor tupleDesc = analyzer.getSlotDesc(equivSlotId).getParent();
-        if (tupleDesc.getTable() == null
-            || !analyzer.hasValueTransfer(slotId, equivSlotId)) {
-          continue;
-        }
+      for (SlotId targetSid: analyzer.getValueTransferTargets(srcSid)) {
+        TupleDescriptor tupleDesc = analyzer.getSlotDesc(targetSid).getParent();
+        if (tupleDesc.getTable() == null) continue;
         List<SlotId> sids = slotsByTid.get(tupleDesc.getId());
         if (sids == null) {
           sids = Lists.newArrayList();
           slotsByTid.put(tupleDesc.getId(), sids);
         }
-        sids.add(equivSlotId);
+        sids.add(targetSid);
       }
       return slotsByTid;
     }
 
-    public ScanNode getTarget() { return target_; }
-    public boolean hasTarget() { return target_ != null; }
+    public Expr getTargetExpr(PlanNodeId targetPlanNodeId) {
+      for (RuntimeFilterTarget target: targets_) {
+        if (target.node.getId() != targetPlanNodeId) continue;
+        return target.expr;
+      }
+      return null;
+    }
+
+    public List<RuntimeFilterTarget> getTargets() { return targets_; }
+    public boolean hasTargets() { return !targets_.isEmpty(); }
     public Expr getSrcExpr() { return srcExpr_; }
-    public Expr getTargetExpr() { return targetExpr_; }
+    public Expr getOrigTargetExpr() { return origTargetExpr_; }
     public Map<TupleId, List<SlotId>> getTargetSlots() { return targetSlotsByTid_; }
     public RuntimeFilterId getFilterId() { return id_; }
 
@@ -268,61 +333,60 @@ public final class RuntimeFilterGenerator {
       return src_.getCardinality() / (double) src_.getChild(0).getCardinality();
     }
 
-    public void setFilterTarget(ScanNode node, Analyzer analyzer) {
-      target_ = node;
+    public void addTarget(ScanNode node, Analyzer analyzer, Expr targetExpr) {
+      Preconditions.checkState(targetExpr.isBoundByTupleIds(node.getTupleIds()));
+      RuntimeFilterTarget target = new RuntimeFilterTarget(node, targetExpr);
+      targets_.add(target);
       // Check if all the slots of targetExpr_ are bound by partition columns
       TupleDescriptor baseTblDesc = node.getTupleDesc();
       Table tbl = baseTblDesc.getTable();
-      if (tbl.getNumClusteringCols() == 0) {
-        isBoundByPartitionColumns_ = false;
-        return;
-      }
+      if (tbl.getNumClusteringCols() == 0) return;
       List<SlotId> sids = Lists.newArrayList();
-      targetExpr_.getIds(null, sids);
+      targetExpr.getIds(null, sids);
       for (SlotId sid: sids) {
         SlotDescriptor slotDesc = analyzer.getSlotDesc(sid);
         if (slotDesc.getColumn() == null
             || slotDesc.getColumn().getPosition() >= tbl.getNumClusteringCols()) {
-          isBoundByPartitionColumns_ = false;
           return;
         }
       }
-      isBoundByPartitionColumns_ = true;
-    }
-
-    public void setTargetExpr(Expr expr) {
-      Preconditions.checkNotNull(expr);
-      targetExpr_ = expr;
+      target.isBoundByPartitionColumns = true;
     }
 
     public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin_ = isBroadcast; }
 
     public void computeNdvEstimate() { ndvEstimate_ = src_.getChild(1).getCardinality(); }
 
-    public void computeHasLocalTarget() {
+    public void computeHasLocalTargets() {
       Preconditions.checkNotNull(src_.getFragment());
-      Preconditions.checkNotNull(target_.getFragment());
-      hasLocalTarget_ = src_.getFragment().getId().equals(target_.getFragment().getId());
+      Preconditions.checkState(hasTargets());
+      for (RuntimeFilterTarget target: targets_) {
+        Preconditions.checkNotNull(target.node.getFragment());
+        boolean isLocal =
+            src_.getFragment().getId().equals(target.node.getFragment().getId());
+        target.isLocalTarget = isLocal;
+        hasLocalTargets_ = hasLocalTargets_ || isLocal;
+        hasRemoteTargets_ = hasRemoteTargets_ || !isLocal;
+      }
     }
 
     /**
      * Assigns this runtime filter to the corresponding plan nodes.
      */
     public void assignToPlanNodes() {
-      Preconditions.checkNotNull(target_);
+      Preconditions.checkState(hasTargets());
       src_.addRuntimeFilter(this);
-      target_.addRuntimeFilter(this);
+      for (RuntimeFilterTarget target: targets_) target.node.addRuntimeFilter(this);
     }
 
     public String debugString() {
       StringBuilder output = new StringBuilder();
       return output.append("FilterID: " + id_ + " ")
           .append("Source: " + src_.getId() + " ")
-          .append("Target: " + target_.getId() + " ")
           .append("SrcExpr: " + getSrcExpr().debugString() +  " ")
-          .append("TargetExpr: " + getTargetExpr().debugString())
-          .append("Selectivity: " + getSelectivity())
-          .toString();
+          .append("Target(s): ")
+          .append(Joiner.on(", ").join(targets_) + " ")
+          .append("Selectivity: " + getSelectivity()).toString();
     }
   }
 
@@ -334,7 +398,7 @@ public final class RuntimeFilterGenerator {
     Preconditions.checkArgument(maxNumFilters >= 0);
     RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator();
     filterGenerator.generateFilters(analyzer, plan);
-    List<RuntimeFilter> filters = filterGenerator.getRuntimeFilters();
+    List<RuntimeFilter> filters = Lists.newArrayList(filterGenerator.getRuntimeFilters());
     if (filters.size() > maxNumFilters) {
       // If more than 'maxNumFilters' were generated, sort them by increasing selectivity
       // and keep the 'maxNumFilters' most selective.
@@ -357,7 +421,16 @@ public final class RuntimeFilterGenerator {
     }
   }
 
-  public List<RuntimeFilter> getRuntimeFilters() { return runtimeFilters_; }
+  /**
+   * Returns a set of all the registered runtime filters.
+   */
+  public Set<RuntimeFilter> getRuntimeFilters() {
+    Set<RuntimeFilter> result = Sets.newHashSet();
+    for (List<RuntimeFilter> filters: runtimeFiltersByTid_.values()) {
+      result.addAll(filters);
+    }
+    return result;
+  }
 
   /**
    * Generates the runtime filters for a query by recursively traversing the single-node
@@ -388,13 +461,10 @@ public final class RuntimeFilterGenerator {
         filters.add(filter);
       }
       generateFilters(analyzer, root.getChild(0));
-      // Unregister all the runtime filters for which no destination scan node could be
-      // found in the left subtree of the join node. This is to ensure that we don't
+      // Finalize every runtime filter of that join. This is to ensure that we don't
       // assign a filter to a scan node from the right subtree of joinNode or ancestor
       // join nodes in case we don't find a destination node in the left subtree.
-      for (RuntimeFilter runtimeFilter: filters) {
-        if (!runtimeFilter.hasTarget()) unregisterRuntimeFilter(runtimeFilter);
-      }
+      for (RuntimeFilter runtimeFilter: filters) finalizeRuntimeFilter(runtimeFilter);
       generateFilters(analyzer, root.getChild(1));
     } else if (root instanceof ScanNode) {
       assignRuntimeFilters(analyzer, (ScanNode) root);
@@ -413,70 +483,101 @@ public final class RuntimeFilterGenerator {
     Map<TupleId, List<SlotId>> targetSlotsByTid = filter.getTargetSlots();
     Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty());
     for (TupleId tupleId: targetSlotsByTid.keySet()) {
-      List<RuntimeFilter> filters = runtimeFiltersByTid_.get(tupleId);
-      if (filters == null) {
-        filters = Lists.newArrayList();
-        runtimeFiltersByTid_.put(tupleId, filters);
-      }
-      filters.add(filter);
+      registerRuntimeFilter(filter, tupleId);
     }
   }
 
-  private void unregisterRuntimeFilter(RuntimeFilter runtimeFilter) {
+  /**
+   * Registers a runtime filter with a specific target tuple id.
+   */
+  private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) {
+    Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
+    List<RuntimeFilter> filters = runtimeFiltersByTid_.get(targetTid);
+    if (filters == null) {
+      filters = Lists.newArrayList();
+      runtimeFiltersByTid_.put(targetTid, filters);
+    }
+    Preconditions.checkState(!filter.isFinalized());
+    filters.add(filter);
+  }
+
+  /**
+   * Finalizes a runtime filter by disassociating it from all the candidate target scan
+   * nodes that haven't been used as destinations for that filter. Also sets the
+   * finalized_ flag of that filter so that it can't be assigned to any other scan nodes.
+   */
+  private void finalizeRuntimeFilter(RuntimeFilter runtimeFilter) {
+    Set<TupleId> targetTupleIds = Sets.newHashSet();
+    for (RuntimeFilter.RuntimeFilterTarget target: runtimeFilter.getTargets()) {
+      targetTupleIds.addAll(target.node.getTupleIds());
+    }
     for (TupleId tupleId: runtimeFilter.getTargetSlots().keySet()) {
-      runtimeFiltersByTid_.get(tupleId).remove(runtimeFilter);
+      if (!targetTupleIds.contains(tupleId)) {
+        runtimeFiltersByTid_.get(tupleId).remove(runtimeFilter);
+      }
     }
+    runtimeFilter.markFinalized();
   }
 
   /**
    * Assigns runtime filters to a specific scan node 'scanNode'.
    * The assigned filters are the ones for which 'scanNode' can be used a destination
    * node. A scan node may be used as a destination node for multiple runtime filters.
+   * Currently, runtime filters can only be assigned to HdfsScanNodes.
    */
   private void assignRuntimeFilters(Analyzer analyzer, ScanNode scanNode) {
-    Preconditions.checkNotNull(scanNode);
+    if (!(scanNode instanceof HdfsScanNode)) return;
     TupleId tid = scanNode.getTupleIds().get(0);
-    // Return if no runtime filter is associated with this scan tuple.
     if (!runtimeFiltersByTid_.containsKey(tid)) return;
     for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) {
-      if (filter.getTarget() != null) continue;
-      if (!filter.getTargetExpr().isBound(tid)) {
-        Preconditions.checkState(filter.getTargetSlots().containsKey(tid));
-        // Modify the filter target expr using the equivalent slots from the scan node
-        // on which the filter will be applied.
-        ExprSubstitutionMap smap = new ExprSubstitutionMap();
-        Expr targetExpr = filter.getTargetExpr();
-        List<SlotRef> exprSlots = Lists.newArrayList();
-        targetExpr.collect(SlotRef.class, exprSlots);
-        List<SlotId> sids = filter.getTargetSlots().get(tid);
-        for (SlotRef slotRef: exprSlots) {
-          for (SlotId sid: sids) {
-            if (analyzer.hasValueTransfer(slotRef.getSlotId(), sid)) {
-              SlotRef newSlotRef = new SlotRef(analyzer.getSlotDesc(sid));
-              newSlotRef.analyzeNoThrow(analyzer);
-              smap.put(slotRef, newSlotRef);
-              break;
-            }
+      if (filter.isFinalized()) continue;
+      Expr targetExpr = computeTargetExpr(filter, tid, analyzer);
+      if (targetExpr == null) continue;
+      filter.addTarget(scanNode, analyzer, targetExpr);
+    }
+  }
+
+  /**
+   * Computes the target expr for a specified runtime filter 'filter' to be applied at
+   * the scan node with target tuple descriptor 'targetTid'.
+   */
+  private Expr computeTargetExpr(RuntimeFilter filter, TupleId targetTid,
+      Analyzer analyzer) {
+    Expr targetExpr = filter.getOrigTargetExpr();
+    if (!targetExpr.isBound(targetTid)) {
+      Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
+      // Modify the filter target expr using the equivalent slots from the scan node
+      // on which the filter will be applied.
+      ExprSubstitutionMap smap = new ExprSubstitutionMap();
+      List<SlotRef> exprSlots = Lists.newArrayList();
+      targetExpr.collect(SlotRef.class, exprSlots);
+      List<SlotId> sids = filter.getTargetSlots().get(targetTid);
+      for (SlotRef slotRef: exprSlots) {
+        for (SlotId sid: sids) {
+          if (analyzer.hasValueTransfer(slotRef.getSlotId(), sid)) {
+            SlotRef newSlotRef = new SlotRef(analyzer.getSlotDesc(sid));
+            newSlotRef.analyzeNoThrow(analyzer);
+            smap.put(slotRef, newSlotRef);
+            break;
           }
         }
-        Preconditions.checkState(exprSlots.size() == smap.size());
-        try {
-          filter.setTargetExpr(targetExpr.substitute(smap, analyzer, true));
-        } catch (Exception e) {
-          // An exception is thrown if we cannot generate a target expr from this
-          // scan node that has the same type as the lhs expr of the join predicate
-          // from which the runtime filter was generated. We skip that scan node and will
-          // try to assign the filter to a different scan node.
-          //
-          // TODO: Investigate if we can generate a type-compatible source/target expr
-          // pair from that scan node instead of skipping it.
-          continue;
-        }
       }
-      Preconditions.checkState(
-          filter.getTargetExpr().getType().matchesType(filter.getSrcExpr().getType()));
-      filter.setFilterTarget(scanNode, analyzer);
-      runtimeFilters_.add(filter);
+      Preconditions.checkState(exprSlots.size() == smap.size());
+      try {
+        targetExpr = targetExpr.substitute(smap, analyzer, true);
+      } catch (Exception e) {
+        // An exception is thrown if we cannot generate a target expr from this
+        // scan node that has the same type as the lhs expr of the join predicate
+        // from which the runtime filter was generated. We skip that scan node and will
+        // try to assign the filter to a different scan node.
+        //
+        // TODO: Investigate if we can generate a type-compatible source/target expr
+        // pair from that scan node instead of skipping it.
+        return null;
+      }
     }
+    Preconditions.checkState(
+        targetExpr.getType().matchesType(filter.getSrcExpr().getType()));
+    return targetExpr;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
index 270c8f6..a7bdf36 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
@@ -52,7 +52,6 @@ where a.tinyint_col = a.smallint_col and a.int_col = a.bigint_col
 ---- PLAN
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id
-|  runtime filters: RF000 <- b.id
 |
 |--01:SCAN DATA SOURCE [functional.alltypes_datasource b]
 |--predicates: b.id = b.int_col, b.id = b.bigint_col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/hbase.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/hbase.test b/testdata/workloads/functional-planner/queries/PlannerTest/hbase.test
index 1f9e7ca..7c0c6e5 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/hbase.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/hbase.test
@@ -494,14 +494,12 @@ where
 ---- PLAN
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = b.int_col
-|  runtime filters: RF000 <- b.int_col
 |
 |--00:SCAN HBASE [functional_hbase.alltypessmall b]
 |     predicates: b.bool_col = FALSE
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col
-|  runtime filters: RF001 <- c.int_col
 |
 |--02:SCAN HBASE [functional_hbase.alltypessmall c]
 |     predicates: c.month = 4

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
index 2332d4e..9eb32d4 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
@@ -1116,6 +1116,7 @@ on (aid < bid and aid = c.id)
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> b.id
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
@@ -1145,6 +1146,7 @@ on (t1.id = v.id and v.int_col is null and v.int_col < 10 and v.id < 10)
 |--02:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
 |     predicates: b.int_col < 10
+|     runtime filters: RF000 -> b.int_col
 |
 01:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test b/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
index ff10ac7..ab2b23c 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
@@ -248,6 +248,7 @@ limit 100
 |
 |--03:SCAN HDFS [tpch.supplier s]
 |     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF001 -> s_nationkey
 |
 07:HASH JOIN [INNER JOIN]
 |  hash predicates: o_custkey = c_custkey
@@ -315,6 +316,7 @@ limit 100
 |  |
 |  03:SCAN HDFS [tpch.supplier s]
 |     partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF001 -> s_nationkey
 |
 07:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: o_custkey = c_custkey
@@ -802,6 +804,7 @@ inner join functional.alltypestiny t6 on (t5.id = t6.id)
 |
 |--04:SCAN HDFS [functional.alltypes t5]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> t5.id
 |
 08:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t4.id = t3.id
@@ -812,6 +815,7 @@ inner join functional.alltypestiny t6 on (t5.id = t6.id)
 |  |
 |  |--02:SCAN HDFS [functional.alltypessmall t3]
 |  |     partitions=4/4 files=4 size=6.32KB
+|  |     runtime filters: RF000 -> t3.id, RF001 -> t3.id
 |  |
 |  06:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.id = t1.id
@@ -819,10 +823,11 @@ inner join functional.alltypestiny t6 on (t5.id = t6.id)
 |  |
 |  |--00:SCAN HDFS [functional.alltypestiny t1]
 |  |     partitions=4/4 files=4 size=460B
+|  |     runtime filters: RF000 -> t1.id, RF001 -> t1.id, RF002 -> t1.id
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
-|     runtime filters: RF002 -> t2.id, RF003 -> t2.id
+|     runtime filters: RF000 -> t2.id, RF001 -> t2.id, RF002 -> t2.id, RF003 -> t2.id
 |
 03:SCAN HDFS [functional.alltypesagg t4]
    partitions=11/11 files=11 size=814.73KB
@@ -861,6 +866,7 @@ right join functional.alltypestiny t6 on (t5.id = t6.id)
 |  |  |
 |  |  |--02:SCAN HDFS [functional.alltypessmall t3]
 |  |  |     partitions=4/4 files=4 size=6.32KB
+|  |  |     runtime filters: RF000 -> t3.id
 |  |  |
 |  |  06:HASH JOIN [RIGHT OUTER JOIN]
 |  |  |  hash predicates: t2.id = t1.id
@@ -871,7 +877,7 @@ right join functional.alltypestiny t6 on (t5.id = t6.id)
 |  |  |
 |  |  01:SCAN HDFS [functional.alltypes t2]
 |  |     partitions=24/24 files=24 size=478.45KB
-|  |     runtime filters: RF002 -> t2.id, RF003 -> t2.id
+|  |     runtime filters: RF000 -> t2.id, RF002 -> t2.id, RF003 -> t2.id
 |  |
 |  03:SCAN HDFS [functional.alltypesagg t4]
 |     partitions=11/11 files=11 size=814.73KB
@@ -910,6 +916,7 @@ inner join functional.alltypestiny t6 on (t3.id = t6.id)
 |  |
 |  |--03:SCAN HDFS [functional.alltypessmall t4]
 |  |     partitions=4/4 files=4 size=6.32KB
+|  |     runtime filters: RF000 -> t4.id
 |  |
 |  08:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.id = t3.id
@@ -917,6 +924,7 @@ inner join functional.alltypestiny t6 on (t3.id = t6.id)
 |  |
 |  |--02:SCAN HDFS [functional.alltypesagg t3]
 |  |     partitions=11/11 files=11 size=814.73KB
+|  |     runtime filters: RF000 -> t3.id, RF002 -> t3.id
 |  |
 |  07:HASH JOIN [RIGHT OUTER JOIN]
 |  |  hash predicates: t2.id = t1.id
@@ -927,7 +935,7 @@ inner join functional.alltypestiny t6 on (t3.id = t6.id)
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
-|     runtime filters: RF002 -> t2.id, RF003 -> t2.id, RF004 -> t2.id
+|     runtime filters: RF000 -> t2.id, RF002 -> t2.id, RF003 -> t2.id, RF004 -> t2.id
 |
 04:SCAN HDFS [functional.alltypes t5]
    partitions=24/24 files=24 size=478.45KB
@@ -961,6 +969,7 @@ inner join functional.alltypestiny t6 on (t3.id = t6.id)
 |  |
 |  |--03:SCAN HDFS [functional.alltypessmall t4]
 |  |     partitions=4/4 files=4 size=6.32KB
+|  |     runtime filters: RF000 -> t4.id
 |  |
 |  08:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.id = t3.id
@@ -968,6 +977,7 @@ inner join functional.alltypestiny t6 on (t3.id = t6.id)
 |  |
 |  |--02:SCAN HDFS [functional.alltypesagg t3]
 |  |     partitions=11/11 files=11 size=814.73KB
+|  |     runtime filters: RF000 -> t3.id, RF001 -> t3.id
 |  |
 |  07:HASH JOIN [RIGHT OUTER JOIN]
 |  |  hash predicates: t2.id = t1.id
@@ -978,7 +988,7 @@ inner join functional.alltypestiny t6 on (t3.id = t6.id)
 |  |
 |  01:SCAN HDFS [functional.alltypes t2]
 |     partitions=24/24 files=24 size=478.45KB
-|     runtime filters: RF001 -> t2.id, RF002 -> t2.id, RF003 -> t2.id
+|     runtime filters: RF000 -> t2.id, RF001 -> t2.id, RF002 -> t2.id, RF003 -> t2.id
 |
 04:SCAN HDFS [functional.alltypes t5]
    partitions=24/24 files=24 size=478.45KB
@@ -1163,10 +1173,11 @@ where t2.month = 1
 |  |
 |  |--05:SCAN HDFS [functional.alltypestiny a]
 |  |     partitions=4/4 files=4 size=460B
+|  |     runtime filters: RF000 -> a.id, RF002 -> a.id
 |  |
 |  06:SCAN HDFS [functional.alltypes b]
 |     partitions=2/24 files=2 size=40.32KB
-|     runtime filters: RF002 -> b.id, RF003 -> b.id
+|     runtime filters: RF000 -> b.id, RF002 -> b.id, RF003 -> b.id
 |
 08:SCAN HDFS [functional.alltypes t3]
    partitions=24/24 files=24 size=478.45KB
@@ -1224,6 +1235,7 @@ where t2.month = 1
 |  |
 |  |--05:SCAN HDFS [functional.alltypestiny a]
 |  |     partitions=4/4 files=4 size=460B
+|  |     runtime filters: RF000 -> a.id, RF001 -> a.id
 |  |
 |  06:SCAN HDFS [functional.alltypes b]
 |     partitions=2/24 files=2 size=40.32KB
@@ -1231,6 +1243,7 @@ where t2.month = 1
 |
 08:SCAN HDFS [functional.alltypes t3]
    partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> t3.id
 ====
 # Regression test for IMPALA-1343.
 SELECT sum(t4.tinyint_col)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
index 2605470..6280bf5 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
@@ -80,6 +80,7 @@ from (select * from functional.alltypestiny) t1
 |
 |--01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
+|     runtime filters: RF000 -> coalesce(functional.alltypestiny.id, functional.alltypestiny.id)
 |
 00:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
@@ -104,6 +105,7 @@ from (select * from functional.alltypestiny) t1
 |  |
 |  01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
+|     runtime filters: RF000 -> coalesce(functional.alltypestiny.id, functional.alltypestiny.id)
 |
 00:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
@@ -390,6 +392,7 @@ limit 5
 |
 |--01:SCAN HDFS [functional.alltypessmall b]
 |     partitions=4/4 files=4 size=6.32KB
+|     runtime filters: RF000 -> b.id, RF001 -> b.id
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
@@ -440,6 +443,7 @@ limit 5
 |  |
 |  01:SCAN HDFS [functional.alltypessmall b]
 |     partitions=4/4 files=4 size=6.32KB
+|     runtime filters: RF000 -> b.id, RF001 -> b.id
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=11/11 files=11 size=814.73KB
@@ -498,6 +502,7 @@ where t1.id = t3.id and t2.id = t3.id
 |
 |--01:SCAN HDFS [functional.testtbl t2]
 |     partitions=1/1 files=0 size=0B
+|     runtime filters: RF000 -> t2.id
 |
 00:SCAN HDFS [functional.testtbl t1]
    partitions=1/1 files=0 size=0B
@@ -522,6 +527,7 @@ where t1.id = t3.id and t2.id = t3.id
 |  |
 |  01:SCAN HDFS [functional.testtbl t2]
 |     partitions=1/1 files=0 size=0B
+|     runtime filters: RF000 -> t2.id
 |
 00:SCAN HDFS [functional.testtbl t1]
    partitions=1/1 files=0 size=0B
@@ -629,6 +635,7 @@ on (b.id = c.id and c.int_col = b.int_col)
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> b.id, RF001 -> b.int_col
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
@@ -653,6 +660,7 @@ on (b.id = c.id and c.int_col = b.int_col)
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> b.id, RF001 -> b.int_col
 |
 05:EXCHANGE [HASH(a.id,a.int_col)]
 |
@@ -735,6 +743,7 @@ on (b.int_col = c.int_col and c.bool_col = b.bool_col)
 |
 |--01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> b.int_col, RF001 -> b.bool_col
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
@@ -767,6 +776,7 @@ on (b.int_col = c.int_col and c.bool_col = b.bool_col)
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> b.int_col, RF001 -> b.bool_col
 |
 06:EXCHANGE [HASH(a.int_col,a.bool_col)]
 |
@@ -847,6 +857,7 @@ where a.id = c.id and a.int_col = c.int_col
 |
 |--01:SCAN HDFS [functional.alltypestiny b]
 |     partitions=4/4 files=4 size=460B
+|     runtime filters: RF000 -> b.id, RF001 -> b.int_col
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
@@ -876,6 +887,7 @@ where a.id = c.id and b.int_col = c.int_col and b.int_col = c.id
 |--02:SCAN HDFS [functional.alltypessmall c]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: c.id = c.int_col
+|     runtime filters: RF000 -> c.id
 |
 01:SCAN HDFS [functional.alltypes b]
    partitions=24/24 files=24 size=478.45KB
@@ -944,6 +956,7 @@ and t3.bigint_col = t2.bigint_col
 |
 |--00:SCAN HDFS [functional.alltypes t1]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> t1.smallint_col, RF001 -> t1.smallint_col
 |
 01:SCAN HDFS [functional.alltypesagg t2]
    partitions=11/11 files=11 size=814.73KB
@@ -969,6 +982,7 @@ and t3.bigint_col = t2.bigint_col
 |  |
 |  00:SCAN HDFS [functional.alltypes t1]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> t1.smallint_col, RF001 -> t1.smallint_col
 |
 05:EXCHANGE [HASH(t2.smallint_col)]
 |
@@ -1118,6 +1132,7 @@ on t2.string_col = t1.string_col and t3.date_string_col = t2.string_col
 |
 |--01:SCAN HDFS [functional.alltypestiny t3]
 |     partitions=4/4 files=4 size=460B
+|     runtime filters: RF000 -> t3.date_string_col
 |
 00:SCAN HDFS [functional.alltypesagg t1]
    partitions=11/11 files=11 size=814.73KB
@@ -1205,6 +1220,7 @@ where b.id < 5
 |--02:SCAN HDFS [functional.alltypessmall]
 |     partitions=1/4 files=1 size=1.58KB
 |     predicates: functional.alltypessmall.id < 5
+|     runtime filters: RF000 -> functional.alltypessmall.id
 |
 01:SCAN HDFS [functional.alltypessmall]
    partitions=1/4 files=1 size=1.57KB
@@ -1246,6 +1262,7 @@ where b.id < 5
 |--03:SCAN HDFS [functional.alltypessmall]
 |     partitions=1/4 files=1 size=1.58KB
 |     predicates: functional.alltypessmall.id < 5
+|     runtime filters: RF000 -> functional.alltypessmall.id
 |
 02:SCAN HDFS [functional.alltypessmall]
    partitions=1/4 files=1 size=1.57KB
@@ -1397,6 +1414,7 @@ inner join functional.JoinTbl k on j.test_id = k.test_id and j.alltypes_id = 500
 |
 |--01:SCAN HDFS [functional.dimtbl d]
 |     partitions=1/1 files=1 size=171B
+|     runtime filters: RF000 -> d.id
 |
 00:SCAN HDFS [functional.jointbl j]
    partitions=1/1 files=1 size=433B
@@ -1429,6 +1447,7 @@ inner join [shuffle] functional.alltypessmall c on b.id = c.id
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> b.id
 |
 00:SCAN HDFS [functional.alltypestiny a]
    partitions=4/4 files=4 size=460B
@@ -1460,6 +1479,7 @@ inner join /* +shuffle */ functional.alltypessmall c on b.id = c.id
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> b.id
 |
 00:SCAN HDFS [functional.alltypestiny a]
    partitions=4/4 files=4 size=460B
@@ -1497,6 +1517,7 @@ functional.alltypessmall c on b.id = c.id
 |  |
 |  01:SCAN HDFS [functional.alltypes b]
 |     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF000 -> b.id
 |
 00:SCAN HDFS [functional.alltypestiny a]
    partitions=4/4 files=4 size=460B
@@ -1790,6 +1811,7 @@ on (b.int_col = c.smallint_col and b.string_col = c.string_col)
 |  02:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
 |     predicates: functional.alltypestiny.int_col = functional.alltypestiny.smallint_col
+|     runtime filters: RF000 -> functional.alltypestiny.smallint_col, RF001 -> functional.alltypestiny.string_col
 |
 09:AGGREGATE [FINALIZE]
 |  group by: string_col, int_col, smallint_col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/kudu-delete.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-delete.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-delete.test
index d3b23f8..dc6cefe 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-delete.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-delete.test
@@ -82,4 +82,4 @@ DELETE FROM KUDU [functional_kudu.testtbl]
 01:SCAN HDFS [functional.alltypes b]
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> b.id, RF001 -> b.id
-====
\ No newline at end of file
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
index 1207027..0c21b36 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
@@ -52,7 +52,6 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id
-|  runtime filters: RF000 <- b.id
 |
 |--01:SCAN HDFS [functional.testtbl b]
 |     partitions=1/1 files=0 size=0B
@@ -66,7 +65,6 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = b.id
-|  runtime filters: RF000 <- b.id
 |
 |--03:EXCHANGE [BROADCAST]
 |  |
@@ -115,7 +113,6 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: a.zip = zip
-|  runtime filters: RF000 <- zip
 |
 |--01:SCAN HDFS [functional.testtbl]
 |     partitions=1/1 files=0 size=0B
@@ -128,7 +125,6 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
 |  hash predicates: a.zip = zip
-|  runtime filters: RF000 <- zip
 |
 |--04:EXCHANGE [BROADCAST]
 |  |

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 6abf6b4..6118c00 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -1,4 +1,3 @@
----- QUERY
 select * from functional_kudu.testtbl
 ---- PLAN
 00:SCAN KUDU [functional_kudu.testtbl]
@@ -10,11 +9,10 @@ NODE 0:
 |
 00:SCAN KUDU [functional_kudu.testtbl]
 ====
----- QUERY
 select * from functional_kudu.testtbl where name = '10'
 ---- PLAN
 00:SCAN KUDU [functional_kudu.testtbl]
-  kudu predicates: name = '10'
+   kudu predicates: name = '10'
 ---- SCANRANGELOCATIONS
 NODE 0:
   KUDU KEYRANGE []:[]
@@ -22,9 +20,8 @@ NODE 0:
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN KUDU [functional_kudu.testtbl]
-  kudu predicates: name = '10'
+   kudu predicates: name = '10'
 ====
----- QUERY
 insert into functional_kudu.testtbl(id) values (10)
 ---- PLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
@@ -39,7 +36,6 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 00:UNION
    constant-operands=1
 ====
----- QUERY
 insert into functional_kudu.testtbl(id) select int_col from functional_kudu.tinyinttable
 ---- PLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
@@ -52,7 +48,6 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 |
 00:SCAN KUDU [functional_kudu.tinyinttable]
 ====
----- QUERY
 insert into functional_kudu.testtbl(id, name)
 select count(distinct id), name from functional_kudu.dimtbl
 group by name
@@ -151,4 +146,4 @@ NODE 0:
 |
 00:SCAN KUDU [functional_kudu.testtbl]
    predicates: name IS NULL, CAST(sin(id) AS BOOLEAN) = TRUE
-====
\ No newline at end of file
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test b/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
index a05c91b..4ab84b3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
@@ -80,6 +80,7 @@ where c_nationkey = n_nationkey and s_nationkey = n_nationkey
 |
 |--06:SCAN HDFS [tpch_nested_parquet.supplier s]
 |     partitions=1/1 files=1 size=111.08MB
+|     runtime filters: RF000 -> s.s_nationkey, RF001 -> s_comment
 |
 05:SCAN HDFS [tpch_nested_parquet.customer c]
    partitions=1/1 files=4 size=554.13MB
@@ -1522,7 +1523,7 @@ where c.c_custkey in
 |  04:UNNEST [c.c_orders o2]
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=554.13MB
+   partitions=1/1 files=4 size=577.87MB
 ====
 # IMPALA-2412: Test join ordering in nested subplans. Same as above
 # but with a few inner joins.
@@ -1566,7 +1567,7 @@ where c.c_custkey in
 |  04:UNNEST [c.c_orders o2]
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=554.13MB
+   partitions=1/1 files=4 size=577.87MB
    predicates: !empty(c.c_orders)
 ====
 # IMPALA-2412: Test join ordering in nested subplans.
@@ -1608,7 +1609,7 @@ where c.c_custkey in
 |  04:UNNEST [c.c_orders o2]
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=554.13MB
+   partitions=1/1 files=4 size=577.87MB
 ====
 # IMPALA-2446: Test predicate assignment when outer join has no conjuncts in
 # the ON clause and there are predicates in the WHERE clause that can be assigned to
@@ -1715,7 +1716,7 @@ inner join o.o_lineitems
 |  03:UNNEST [c.c_orders o]
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=554.13MB
+   partitions=1/1 files=4 size=577.87MB
 ====
 # IMPALA-3065/IMPALA-3062: Test correct assignment of !empty() predicates.
 # Predicates should not be generated if the parent tuple is outer joined.
@@ -1729,7 +1730,7 @@ right outer join tpch_nested_parquet.customer c2
 |  runtime filters: RF000 <- c2.c_custkey
 |
 |--05:SCAN HDFS [tpch_nested_parquet.customer c2]
-|     partitions=1/1 files=4 size=554.13MB
+|     partitions=1/1 files=4 size=577.87MB
 |
 01:SUBPLAN
 |
@@ -1740,7 +1741,7 @@ right outer join tpch_nested_parquet.customer c2
 |  03:UNNEST [c1.c_orders]
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c1]
-   partitions=1/1 files=4 size=554.13MB
+   partitions=1/1 files=4 size=577.87MB
    runtime filters: RF000 -> c1.c_custkey
 ====
 # IMPALA-3065/IMPALA-3062: Test correct assignment of !empty() predicates.
@@ -1767,8 +1768,8 @@ left semi join c2.c_orders o2
 |  hash predicates: c1.c_custkey = c2.c_custkey
 |
 |--01:SCAN HDFS [tpch_nested_parquet.customer c2]
-|     partitions=1/1 files=4 size=554.13MB
+|     partitions=1/1 files=4 size=577.87MB
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c1]
-   partitions=1/1 files=4 size=554.13MB
+   partitions=1/1 files=4 size=577.87MB
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
index 3d1d911..9e2acdf 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
@@ -159,6 +159,7 @@ from functional.testtbl t1
 |--01:SCAN HDFS [functional.testtbl t2]
 |     partitions=1/1 files=0 size=0B
 |     predicates: t2.id = 17
+|     runtime filters: RF000 -> t2.id
 |
 00:SCAN HDFS [functional.testtbl t1]
    partitions=1/1 files=0 size=0B
@@ -758,6 +759,7 @@ inner join functional.alltypestiny c
 |--01:SCAN HDFS [functional.alltypestiny b]
 |     partitions=4/4 files=4 size=460B
 |     predicates: b.int_col < 0
+|     runtime filters: RF000 -> b.id
 |
 00:SCAN HDFS [functional.alltypestiny a]
    partitions=4/4 files=4 size=460B

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f992dc7f/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
index a596868..3ffdbd4 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
@@ -204,6 +204,7 @@ where a.year = 2009 and b.month + 2 <= 4 and b.id = 17
 |--01:SCAN HDFS [functional.alltypessmall b]
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: b.id = 17, CAST(sin(b.smallint_col) AS BOOLEAN) = TRUE
+|     runtime filters: RF000 -> b.id, RF001 -> b.year, RF002 -> b.month, RF003 -> b.smallint_col
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=2/24 files=2 size=38.07KB
@@ -241,6 +242,7 @@ NODE 2:
 |  01:SCAN HDFS [functional.alltypessmall b]
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: b.id = 17, CAST(sin(b.smallint_col) AS BOOLEAN) = TRUE
+|     runtime filters: RF000 -> b.id, RF001 -> b.year, RF002 -> b.month, RF003 -> b.smallint_col
 |
 05:EXCHANGE [HASH(a.id,a.year,a.month,a.tinyint_col)]
 |
@@ -275,6 +277,7 @@ where a.year = 2009 and b.month + 2 <= 4 and b.id = 17
 |--01:SCAN HDFS [functional.alltypessmall]
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: functional.alltypessmall.id = 17, CAST(sin(functional.alltypessmall.smallint_col) AS BOOLEAN) = TRUE
+|     runtime filters: RF000 -> functional.alltypessmall.id, RF001 -> functional.alltypessmall.year, RF002 -> functional.alltypessmall.month, RF003 -> functional.alltypessmall.smallint_col
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=2/24 files=2 size=38.07KB
@@ -312,6 +315,7 @@ NODE 2:
 |  01:SCAN HDFS [functional.alltypessmall]
 |     partitions=2/4 files=2 size=3.16KB
 |     predicates: functional.alltypessmall.id = 17, CAST(sin(functional.alltypessmall.smallint_col) AS BOOLEAN) = TRUE
+|     runtime filters: RF000 -> functional.alltypessmall.id, RF001 -> functional.alltypessmall.year, RF002 -> functional.alltypessmall.month, RF003 -> functional.alltypessmall.smallint_col
 |
 05:EXCHANGE [HASH(functional.alltypes.id,functional.alltypes.year,functional.alltypes.month,functional.alltypes.tinyint_col)]
 |
@@ -874,6 +878,7 @@ where isnull(a.id, 0) = 0 and b.id is null  and b.int_col = 17
 |  03:SCAN HDFS [functional.alltypessmall]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: functional.alltypessmall.int_col = 17
+|     runtime filters: RF000 -> functional.alltypessmall.id
 |
 02:AGGREGATE [FINALIZE]
 |  group by: id, int_col
@@ -966,7 +971,7 @@ and t2.id + t3.int_col > 40
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: t2.id = functional.alltypestiny.id, t2.bigint_col = functional.alltypestiny.bigint_col, t2.smallint_col = functional.alltypestiny.int_col
 |  other predicates: t2.id + functional.alltypestiny.int_col > 40
-|  runtime filters: RF000 <- functional.alltypestiny.id, RF002 <- functional.alltypestiny.int_col, RF001 <- functional.alltypestiny.bigint_col
+|  runtime filters: RF000 <- functional.alltypestiny.id, RF001 <- functional.alltypestiny.bigint_col, RF002 <- functional.alltypestiny.int_col
 |
 |--02:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
@@ -979,7 +984,7 @@ and t2.id + t3.int_col > 40
 |--01:SCAN HDFS [functional.alltypessmall t2]
 |     partitions=4/4 files=4 size=6.32KB
 |     predicates: t2.id + t2.smallint_col > 10, t2.id + t2.bigint_col > 20, t2.id + t2.smallint_col + t2.bigint_col > 30
-|     runtime filters: RF001 -> t2.bigint_col
+|     runtime filters: RF000 -> t2.id, RF001 -> t2.bigint_col, RF002 -> t2.smallint_col
 |
 00:SCAN HDFS [functional.alltypes t1]
    partitions=24/24 files=24 size=478.45KB