You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/01/21 00:50:12 UTC

[impala] branch master updated (cfe6085 -> 79aae23)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from cfe6085  IMPALA-9158: Support loading primary key/foreign key constraints in LocalCatalog Mode.
     new 195bd76  IMPALA-4224: part 1: schedule join builds
     new 702e6c4  IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
     new 79aae23  IMPALA-9154: Make runtime filter propagation asynchronous

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/benchmarks/bloom-filter-benchmark.cc        |  37 +-
 be/src/runtime/backend-client.h                    |  23 --
 be/src/runtime/client-cache.cc                     |   1 -
 be/src/runtime/coordinator-backend-state.cc        |  64 +++-
 be/src/runtime/coordinator-backend-state.h         |   6 +-
 be/src/runtime/coordinator-filter-state.h          |  64 +++-
 be/src/runtime/coordinator.cc                      | 226 ++++++-----
 be/src/runtime/coordinator.h                       |  13 +-
 be/src/runtime/data-stream-test.cc                 |   6 +
 be/src/runtime/decimal-value.h                     |  12 +-
 be/src/runtime/decimal-value.inline.h              |   4 +-
 be/src/runtime/exec-env.cc                         |   1 -
 be/src/runtime/fragment-instance-state.cc          |   9 +-
 be/src/runtime/fragment-instance-state.h           |   9 +-
 be/src/runtime/query-state.cc                      |  11 +-
 be/src/runtime/query-state.h                       |   9 +-
 be/src/runtime/runtime-filter-bank.cc              | 168 +++++---
 be/src/runtime/runtime-filter-bank.h               |  29 +-
 be/src/runtime/runtime-filter.h                    |   3 +-
 be/src/runtime/timestamp-value.h                   |  14 +-
 be/src/scheduling/query-schedule.cc                |   7 +-
 be/src/scheduling/query-schedule.h                 |   9 +-
 be/src/scheduling/request-pool-service.h           |   1 -
 be/src/scheduling/scheduler.cc                     |  77 +++-
 be/src/scheduling/scheduler.h                      |   9 +-
 be/src/service/client-request-state.cc             |   5 +-
 be/src/service/client-request-state.h              |   2 +-
 be/src/service/data-stream-service.cc              |  34 ++
 be/src/service/data-stream-service.h               |  10 +
 be/src/service/frontend.h                          |   1 -
 be/src/service/impala-internal-service.cc          |  21 -
 be/src/service/impala-internal-service.h           |   4 -
 be/src/service/impala-server.cc                    |  18 +-
 be/src/service/impala-server.h                     |  15 +-
 be/src/util/bloom-filter-test.cc                   | 108 ++++--
 be/src/util/bloom-filter.cc                        | 101 +++--
 be/src/util/bloom-filter.h                         |  81 +++-
 be/src/util/min-max-filter-test.cc                 | 247 ++++++------
 be/src/util/min-max-filter.cc                      | 426 ++++++++++-----------
 be/src/util/min-max-filter.h                       |  44 +--
 common/protobuf/common.proto                       |  17 +
 common/protobuf/data_stream_service.proto          |  79 ++++
 common/thrift/DataSinks.thrift                     |   3 +-
 common/thrift/Frontend.thrift                      |   3 +-
 common/thrift/ImpalaInternalService.thrift         |  95 +----
 .../org/apache/impala/planner/JoinBuildSink.java   |   6 +-
 46 files changed, 1307 insertions(+), 825 deletions(-)


[impala] 01/03: IMPALA-4224: part 1: schedule join builds

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 195bd7635f326061e999fe2a5e4a20ce6a4a9874
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Sun Dec 22 01:13:26 2019 -0800

    IMPALA-4224: part 1: schedule join builds
    
    This adds the scheduler logic for fragments with join builds at their
    root. These fragments need to be co-located with the fragment with the
    join node.
    
    The new code is not active yet because the planner does not generate
    plans with join builds (except for planner tests). This change
    was validated in the context of a larger patch that enables the join
    build plans via the planner and makes query execution work.
    
    Change-Id: I779463cfa2ea9b372607d2be6d5d2252a6469e34
    Reviewed-on: http://gerrit.cloudera.org:8080/14944
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/query-schedule.cc                |  7 +-
 be/src/scheduling/query-schedule.h                 |  9 ++-
 be/src/scheduling/scheduler.cc                     | 77 ++++++++++++++++------
 be/src/scheduling/scheduler.h                      |  9 ++-
 common/thrift/DataSinks.thrift                     |  3 +-
 common/thrift/Frontend.thrift                      |  3 +-
 common/thrift/ImpalaInternalService.thrift         | 12 ++++
 .../org/apache/impala/planner/JoinBuildSink.java   |  6 +-
 8 files changed, 98 insertions(+), 28 deletions(-)

diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 857a182..25fe3b2 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -124,7 +124,7 @@ void QuerySchedule::Init() {
       PlanNodeId dest_node_id = fragment.output_sink.stream_sink.dest_node_id;
       FragmentIdx dest_idx = plan_node_to_fragment_idx_[dest_node_id];
       FragmentExecParams& dest_params = fragment_exec_params_[dest_idx];
-      dest_params.input_fragments.push_back(fragment.idx);
+      dest_params.exchange_input_fragments.push_back(fragment.idx);
     }
   }
 }
@@ -181,6 +181,11 @@ void QuerySchedule::Validate() const {
     }
   }
 
+  // Check that all fragments have instances.
+  for (const FragmentExecParams& fp: fragment_exec_params_) {
+    DCHECK_GT(fp.instance_exec_params.size(), 0) << fp.fragment;
+  }
+
   for (const auto& elem: per_backend_exec_params_) {
     const BackendExecParams& bp = elem.second;
     DCHECK(!bp.instance_params.empty() || bp.is_coord_backend);
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index d019eb2..a57ebb7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -106,6 +106,9 @@ struct FInstanceExecParams {
   /// uniquely identify it to a receiver. -1 = invalid.
   int sender_id;
 
+  // List of input join build finstances for joins in this finstance.
+  std::vector<TJoinBuildInput> join_build_inputs;
+
   /// The parent FragmentExecParams
   const FragmentExecParams& fragment_exec_params;
   const TPlanFragment& fragment() const;
@@ -136,7 +139,9 @@ struct FragmentExecParams {
 
   bool is_coord_fragment;
   const TPlanFragment& fragment;
-  std::vector<FragmentIdx> input_fragments;
+
+  // Fragments that are inputs to an ExchangeNode of this fragment.
+  std::vector<FragmentIdx> exchange_input_fragments;
   std::vector<FInstanceExecParams> instance_exec_params;
 
   FragmentExecParams(const TPlanFragment& fragment)
@@ -372,7 +377,7 @@ class QuerySchedule {
   string executor_group_;
 
   /// Populate fragment_exec_params_ from request_.plan_exec_info.
-  /// Sets is_coord_fragment and input_fragments.
+  /// Sets is_coord_fragment and exchange_input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
   void Init();
 
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 626afde..b458fa6 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -202,7 +202,10 @@ void Scheduler::ComputeFragmentExecParams(
     const ExecutorConfig& executor_config, QuerySchedule* schedule) {
   const TQueryExecRequest& exec_request = schedule->request();
 
-  // for each plan, compute the FInstanceExecParams for the tree of fragments
+  // for each plan, compute the FInstanceExecParams for the tree of fragments.
+  // The plans are in dependency order, so we compute parameters for each plan
+  // *before* its input join build plans. This allows the join build plans to
+  // be easily co-located with the plans consuming their output.
   for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
     // set instance_id, host, per_node_scan_ranges
     ComputeFragmentExecParams(executor_config, plan_exec_info,
@@ -210,13 +213,11 @@ void Scheduler::ComputeFragmentExecParams(
 
     // Set destinations, per_exch_num_senders, sender_id.
     for (const TPlanFragment& src_fragment : plan_exec_info.fragments) {
+      VLOG(3) << "Computing exec params for fragment " << src_fragment.display_name;
       if (!src_fragment.output_sink.__isset.stream_sink) continue;
       FragmentIdx dest_idx =
           schedule->GetFragmentIdx(src_fragment.output_sink.stream_sink.dest_node_id);
-      DCHECK_LT(dest_idx, plan_exec_info.fragments.size());
-      const TPlanFragment& dest_fragment = plan_exec_info.fragments[dest_idx];
-      FragmentExecParams* dest_params =
-          schedule->GetFragmentExecParams(dest_fragment.idx);
+      FragmentExecParams* dest_params = schedule->GetFragmentExecParams(dest_idx);
       FragmentExecParams* src_params = schedule->GetFragmentExecParams(src_fragment.idx);
 
       // populate src_params->destinations
@@ -254,15 +255,24 @@ void Scheduler::ComputeFragmentExecParams(
 void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
     const TPlanExecInfo& plan_exec_info, FragmentExecParams* fragment_params,
     QuerySchedule* schedule) {
-  // traverse input fragments
-  for (FragmentIdx input_fragment_idx : fragment_params->input_fragments) {
+  // Create exec params for child fragments connected by an exchange. Instance creation
+  // for this fragment depends on where the input fragment instances are scheduled.
+  for (FragmentIdx input_fragment_idx : fragment_params->exchange_input_fragments) {
     ComputeFragmentExecParams(executor_config, plan_exec_info,
         schedule->GetFragmentExecParams(input_fragment_idx), schedule);
   }
 
   const TPlanFragment& fragment = fragment_params->fragment;
-  // case 1: single instance executed at coordinator
-  if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
+  if (fragment.output_sink.__isset.join_build_sink) {
+    // case 0: join build fragment, co-located with its parent fragment. Join build
+    // fragments may be unpartitioned if they are co-located with the root fragment.
+    VLOG(3) << "Computing exec params for collocated join build fragment "
+            << fragment_params->fragment.display_name;
+    CreateCollocatedJoinBuildInstances(fragment_params, schedule);
+  } else if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
+    // case 1: root fragment instance executed at coordinator
+    VLOG(3) << "Computing exec params for coordinator fragment "
+            << fragment_params->fragment.display_name;
     const TBackendDescriptor& local_be_desc = executor_config.local_be_desc;
     const TNetworkAddress& coord = local_be_desc.address;
     DCHECK(local_be_desc.__isset.krpc_address);
@@ -282,20 +292,18 @@ void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
       auto first_entry = fragment_params->scan_range_assignment.begin();
       instance_params.per_node_scan_ranges = first_entry->second;
     }
-
-    return;
-  }
-
-  if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)
+  } else if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)
       || ContainsScanNode(fragment.plan)) {
+    VLOG(3) << "Computing exec params for scan and/or union fragment.";
     // case 2: leaf fragment (i.e. no input fragments) with a single scan node.
     // case 3: union fragment, which may have scan nodes and may have input fragments.
     CreateCollocatedAndScanInstances(executor_config, fragment_params, schedule);
   } else {
+    VLOG(3) << "Computing exec params for interior fragment.";
     // case 4: interior (non-leaf) fragment without a scan or union.
     // We assign the same hosts as those of our leftmost input fragment (so that a
     // merge aggregation fragment runs on the hosts that provide the input data).
-    CreateCollocatedInstances(fragment_params, schedule);
+    CreateInputCollocatedInstances(fragment_params, schedule);
   }
 }
 
@@ -367,7 +375,7 @@ void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_
   // the input scan, for consistency with the previous behaviour of only using
   // the parallelism of the scan.
   if (has_union) {
-    for (FragmentIdx idx : fragment_params->input_fragments) {
+    for (FragmentIdx idx : fragment_params->exchange_input_fragments) {
       std::unordered_map<TNetworkAddress, int> input_fragment_instances_per_host;
       const FragmentExecParams& input_params = *schedule->GetFragmentExecParams(idx);
       for (const FInstanceExecParams& instance_params :
@@ -503,20 +511,47 @@ vector<vector<TScanRangeParams>> Scheduler::AssignRangesToInstances(
   return per_instance_ranges;
 }
 
-void Scheduler::CreateCollocatedInstances(
+void Scheduler::CreateInputCollocatedInstances(
     FragmentExecParams* fragment_params, QuerySchedule* schedule) {
-  DCHECK_GE(fragment_params->input_fragments.size(), 1);
-  const FragmentExecParams* input_fragment_params =
-      schedule->GetFragmentExecParams(fragment_params->input_fragments[0]);
+  DCHECK_GE(fragment_params->exchange_input_fragments.size(), 1);
+  const FragmentExecParams& input_fragment_params =
+      *schedule->GetFragmentExecParams(fragment_params->exchange_input_fragments[0]);
   int per_fragment_instance_idx = 0;
   for (const FInstanceExecParams& input_instance_params :
-      input_fragment_params->instance_exec_params) {
+      input_fragment_params.instance_exec_params) {
     fragment_params->instance_exec_params.emplace_back(schedule->GetNextInstanceId(),
         input_instance_params.host, input_instance_params.krpc_host,
         per_fragment_instance_idx++, *fragment_params);
   }
 }
 
+void Scheduler::CreateCollocatedJoinBuildInstances(
+    FragmentExecParams* fragment_params, QuerySchedule* schedule) {
+  const TPlanFragment& fragment = fragment_params->fragment;
+  DCHECK(fragment.output_sink.__isset.join_build_sink);
+  const TJoinBuildSink& sink = fragment.output_sink.join_build_sink;
+  int join_fragment_idx = schedule->GetFragmentIdx(sink.dest_node_id);
+  FragmentExecParams* join_fragment_params =
+      schedule->GetFragmentExecParams(join_fragment_idx);
+  DCHECK(!join_fragment_params->instance_exec_params.empty())
+      << "Parent fragment instances must already be created.";
+  int per_fragment_instance_idx = 0;
+  for (FInstanceExecParams& parent_exec_params :
+      join_fragment_params->instance_exec_params) {
+    TUniqueId instance_id = schedule->GetNextInstanceId();
+    fragment_params->instance_exec_params.emplace_back(instance_id,
+        parent_exec_params.host, parent_exec_params.krpc_host,
+        per_fragment_instance_idx++, *fragment_params);
+    TJoinBuildInput build_input;
+    build_input.__set_join_node_id(sink.dest_node_id);
+    build_input.__set_input_finstance_id(instance_id);
+    parent_exec_params.join_build_inputs.emplace_back(build_input);
+    VLOG(3) << "Linked join build for node id=" << sink.dest_node_id
+            << " build finstance=" << PrintId(instance_id)
+            << " dst finstance=" << PrintId(parent_exec_params.instance_id);
+  }
+}
+
 Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_config,
     PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
     bool node_random_replica, const vector<TScanRangeLocationList>& locations,
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index a2da46f..f5f5c2e 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -395,7 +395,14 @@ class Scheduler {
   /// For each instance of fragment_params's input fragment, create a collocated
   /// instance for fragment_params's fragment.
   /// Expects that fragment_params only has a single input fragment.
-  void CreateCollocatedInstances(
+  void CreateInputCollocatedInstances(
+      FragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+  /// Create instances for a fragment that has a join build sink as its root.
+  /// These instances will be collocated with the fragment instances that consume
+  /// the join build. Therefore, those instances must have already been created
+  /// by the scheduler.
+  void CreateCollocatedJoinBuildInstances(
       FragmentExecParams* fragment_params, QuerySchedule* schedule);
 
   /// Add all hosts that the scans identified by 'scan_ids' are executed on to
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 19dd86b..fdf8f6c 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -100,7 +100,8 @@ struct TKuduTableSink {
 
 // Sink to create the build side of a JoinNode.
 struct TJoinBuildSink {
-  1: required Types.TJoinTableId join_table_id
+  // destination join node id
+  1: required Types.TPlanNodeId dest_node_id
 
   // only set for hash join build sinks
   2: required list<Exprs.TExpr> build_exprs
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index e6e1fd8..716818b 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -406,7 +406,8 @@ struct TPlanExecInfo {
 
 // Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
 struct TQueryExecRequest {
-  // exec info for all plans; the first one materializes the query result
+  // exec info for all plans; the first one materializes the query result and subsequent
+  // ones materialize join builds that are input for preceding plans in the list.
   1: optional list<TPlanExecInfo> plan_exec_info
 
   // Metadata of the query result set (only for select)
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 2285d4b..55eb0cf 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -613,6 +613,15 @@ struct TRuntimeFilterSource {
   2: required i32 filter_id
 }
 
+// Information about the input fragment instance of a join node.
+struct TJoinBuildInput {
+  // The join node id that will consume this join build.
+  1: required Types.TPlanNodeId join_node_id
+
+  // Fragment instance id of the input fragment instance.
+  2: required Types.TUniqueId input_finstance_id
+}
+
 // Execution parameters of a single fragment instance.
 struct TPlanFragmentInstanceCtx {
   // TPlanFragment.idx
@@ -647,6 +656,9 @@ struct TPlanFragmentInstanceCtx {
 
   // List of runtime filters produced by nodes in the finstance.
   8: optional list<TRuntimeFilterSource> filters_produced
+
+  // List of input join build finstances for joins in this finstance.
+  9: optional list<TJoinBuildInput> join_build_inputs
 }
 
 
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index 5610646..63e733f 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -38,6 +38,9 @@ public class JoinBuildSink extends DataSink {
   // id of join's build-side table assigned during planning
   private final JoinTableId joinTableId_;
 
+  // Reference to the join node that consumes the build side.
+  private final JoinNode joinNode_;
+
   private final List<Expr> buildExprs_ = new ArrayList<>();
 
   /**
@@ -46,6 +49,7 @@ public class JoinBuildSink extends DataSink {
   public JoinBuildSink(JoinTableId joinTableId, JoinNode joinNode) {
     Preconditions.checkState(joinTableId.isValid());
     joinTableId_ = joinTableId;
+    joinNode_ = joinNode;
     Preconditions.checkNotNull(joinNode);
     Preconditions.checkState(joinNode instanceof JoinNode);
     if (!(joinNode instanceof HashJoinNode)) return;
@@ -61,7 +65,7 @@ public class JoinBuildSink extends DataSink {
   @Override
   protected void toThriftImpl(TDataSink tsink) {
     TJoinBuildSink tBuildSink = new TJoinBuildSink();
-    tBuildSink.setJoin_table_id(joinTableId_.asInt());
+    tBuildSink.setDest_node_id(joinNode_.getId().asInt());
     for (Expr buildExpr: buildExprs_) {
       tBuildSink.addToBuild_exprs(buildExpr.treeToThrift());
     }


[impala] 02/03: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 702e6c4fa8b7a71ea7f8444245466fd117bf98b1
Author: Fang-Yu Rao <fa...@cloudera.com>
AuthorDate: Sat Jun 8 09:06:57 2019 -0700

    IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
    
    Previously the aggregation and propagation of a runtime filter in Impala is
    implemented using Thrift RPC, which suffers from a disadvantage that the number
    of connections in a cluster grows with both the number of queries and cluster
    size. This patch ports the functions that implement the aggregation and
    propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
    respctively, to KRPC, which requires only one connection per direction between
    every pair of hosts, thus reducing the number of connections in a cluster.
    
    In addition, this patch also incorporates KRPC sidecar when the runtime filter
    is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
    Bloom filter contents when a Bloom filter is serialized to be transmitted and
    hence reduces the serialization overhead. Due to the incorporation of KRPC
    sidecar, a SpinLock is also added to prevent a BloomFilter from being
    deallocated before its associated KRPC call finishes.
    
    Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
    also modified accordingly because of the changes to the signatures of some
    functions in BloomFilter.
    
    Testing:
    This patch has passed the exhaustive tests.
    
    Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8
    Reviewed-on: http://gerrit.cloudera.org:8080/13882
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/14974
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Tim Armstrong <ta...@cloudera.com>
---
 be/src/benchmarks/bloom-filter-benchmark.cc |  37 ++-
 be/src/runtime/backend-client.h             |  23 --
 be/src/runtime/client-cache.cc              |   1 -
 be/src/runtime/coordinator-backend-state.cc |  36 ++-
 be/src/runtime/coordinator-backend-state.h  |   3 +-
 be/src/runtime/coordinator-filter-state.h   |  27 +-
 be/src/runtime/coordinator.cc               | 135 +++++----
 be/src/runtime/coordinator.h                |   9 +-
 be/src/runtime/data-stream-test.cc          |   6 +
 be/src/runtime/decimal-value.h              |  12 +-
 be/src/runtime/decimal-value.inline.h       |   4 +-
 be/src/runtime/exec-env.cc                  |   1 -
 be/src/runtime/fragment-instance-state.cc   |   9 +-
 be/src/runtime/fragment-instance-state.h    |   9 +-
 be/src/runtime/query-state.cc               |  11 +-
 be/src/runtime/query-state.h                |   9 +-
 be/src/runtime/runtime-filter-bank.cc       | 168 +++++++----
 be/src/runtime/runtime-filter-bank.h        |  29 +-
 be/src/runtime/runtime-filter.h             |   3 +-
 be/src/runtime/timestamp-value.h            |  14 +-
 be/src/scheduling/request-pool-service.h    |   1 -
 be/src/service/client-request-state.cc      |   5 +-
 be/src/service/client-request-state.h       |   2 +-
 be/src/service/data-stream-service.cc       |  34 +++
 be/src/service/data-stream-service.h        |  10 +
 be/src/service/frontend.h                   |   1 -
 be/src/service/impala-internal-service.cc   |  21 --
 be/src/service/impala-internal-service.h    |   4 -
 be/src/service/impala-server.cc             |  18 +-
 be/src/service/impala-server.h              |  15 +-
 be/src/util/bloom-filter-test.cc            | 108 +++++--
 be/src/util/bloom-filter.cc                 | 101 ++++---
 be/src/util/bloom-filter.h                  |  81 +++++-
 be/src/util/min-max-filter-test.cc          | 247 ++++++++--------
 be/src/util/min-max-filter.cc               | 426 +++++++++++++---------------
 be/src/util/min-max-filter.h                |  44 +--
 common/protobuf/common.proto                |  17 ++
 common/protobuf/data_stream_service.proto   |  79 ++++++
 common/thrift/ImpalaInternalService.thrift  |  83 ------
 39 files changed, 1090 insertions(+), 753 deletions(-)

diff --git a/be/src/benchmarks/bloom-filter-benchmark.cc b/be/src/benchmarks/bloom-filter-benchmark.cc
index f216911..1e4938d 100644
--- a/be/src/benchmarks/bloom-filter-benchmark.cc
+++ b/be/src/benchmarks/bloom-filter-benchmark.cc
@@ -21,6 +21,7 @@
 #include <iostream>
 #include <vector>
 
+#include "kudu/rpc/rpc_controller.h"
 #include "runtime/bufferpool/buffer-allocator.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/bufferpool/buffer-pool.h"
@@ -36,6 +37,8 @@
 using namespace std;
 using namespace impala;
 
+using kudu::rpc::RpcController;
+
 // Tests Bloom filter performance on:
 //
 // 1. Construct/destruct pairs
@@ -283,18 +286,44 @@ struct TestData {
   explicit TestData(int log_bufferpool_size, BufferPool::ClientHandle* client) {
     BloomFilter bf(client);
     CHECK(bf.Init(log_bufferpool_size).ok());
-    BloomFilter::ToThrift(&bf, &tbf1);
-    BloomFilter::ToThrift(&bf, &tbf2);
+
+    RpcController controller1;
+    RpcController controller2;
+    BloomFilter::ToProtobuf(&bf, &controller1, &pbf1);
+    BloomFilter::ToProtobuf(&bf, &controller2, &pbf2);
+
+    // Need to set 'always_false_' of pbf2 to false because
+    // (i) 'always_false_' of a BloomFilter is set to true when the Bloom filter
+    // hasn't had any elements inserted (since nothing is inserted to the
+    /// BloomFilter bf),
+    // (ii) ToProtobuf() will set 'always_false_' of a BloomFilterPB
+    // to true, and
+    // (iii) Or() will check 'always_false_' of the output BloomFilterPB is not true
+    /// before performing the corresponding bit operations.
+    /// The field 'always_false_' was added by IMPALA-5789, which aims to allow
+    /// an HdfsScanner to early terminate the scan at file and split granularities.
+    pbf2.set_always_false(false);
+
+    int64_t directory_size = BloomFilter::GetExpectedMemoryUsed(log_bufferpool_size);
+    string d1(reinterpret_cast<const char*>(bf.directory_), directory_size);
+    string d2(reinterpret_cast<const char*>(bf.directory_), directory_size);
+
+    directory1 = d1;
+    directory2 = d2;
+
     bf.Close();
   }
 
-  TBloomFilter tbf1, tbf2;
+  BloomFilterPB pbf1, pbf2;
+  string directory1, directory2;
 };
 
 void Benchmark(int batch_size, void* data) {
   TestData* d = reinterpret_cast<TestData*>(data);
   for (int i = 0; i < batch_size; ++i) {
-    BloomFilter::Or(d->tbf1, &d->tbf2);
+    BloomFilter::Or(d->pbf1, reinterpret_cast<const uint8_t*>((d->directory1).data()),
+        &(d->pbf2), reinterpret_cast<uint8_t*>(const_cast<char*>((d->directory2).data())),
+        d->directory1.size());
   }
 }
 
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index fee4d39..656bbc3 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -39,29 +39,6 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
     : ImpalaInternalServiceClient(iprot, oprot) {
   }
 
-/// We intentionally disable this clang warning as we intend to hide the
-/// the same-named functions defined in the base class.
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Woverloaded-virtual"
-
-  void UpdateFilter(TUpdateFilterResult& _return, const TUpdateFilterParams& params,
-      bool* send_done) {
-    DCHECK(!*send_done);
-    ImpalaInternalServiceClient::send_UpdateFilter(params);
-    *send_done = true;
-    ImpalaInternalServiceClient::recv_UpdateFilter(_return);
-  }
-
-  void PublishFilter(TPublishFilterResult& _return, const TPublishFilterParams& params,
-      bool* send_done) {
-    DCHECK(!*send_done);
-    ImpalaInternalServiceClient::send_PublishFilter(params);
-    *send_done = true;
-    ImpalaInternalServiceClient::recv_PublishFilter(_return);
-  }
-
-#pragma clang diagnostic pop
-
 };
 
 }
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index 26d0de3..fdf44c1 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -29,7 +29,6 @@
 #include "util/metrics.h"
 #include "util/network-util.h"
 #include "rpc/thrift-util.h"
-#include "gen-cpp/ImpalaInternalService.h"
 
 #include "common/names.h"
 using namespace apache::thrift;
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 062a97d..72e245e 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -23,6 +23,7 @@
 #include "exec/exec-node.h"
 #include "exec/kudu-util.h"
 #include "exec/scan-node.h"
+#include "gen-cpp/data_stream_service.proxy.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/util/monotime.h"
@@ -37,6 +38,7 @@
 #include "runtime/fragment-instance-state.h"
 #include "runtime/krpc-data-stream-sender.h"
 #include "service/control-service.h"
+#include "service/data-stream-service.h"
 #include "util/counting-barrier.h"
 #include "util/error-util-internal.h"
 #include "util/network-util.h"
@@ -549,21 +551,35 @@ bool Coordinator::BackendState::Cancel() {
   return true;
 }
 
-void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
-  DCHECK(rpc_params.dst_query_id == query_id_);
+void Coordinator::BackendState::PublishFilter(
+    const PublishFilterParamsPB& rpc_params, RpcController& controller) {
+  DCHECK_EQ(ProtoToQueryId(rpc_params.dst_query_id()), query_id_);
   // If the backend is already done, it's not waiting for this filter, so we skip
   // sending it in this case.
   if (IsDone()) return;
 
-  if (fragments_.count(rpc_params.dst_fragment_idx) == 0) return;
+  if (fragments_.count(rpc_params.dst_fragment_idx()) == 0) return;
   Status status;
-  ImpalaBackendConnection backend_client(
-      ExecEnv::GetInstance()->impalad_client_cache(), host_, &status);
-  if (!status.ok()) return;
-  TPublishFilterResult res;
-  status = backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, rpc_params, &res);
-  if (!status.ok()) {
-    LOG(WARNING) << "Error publishing filter, continuing..." << status.GetDetail();
+
+  std::unique_ptr<DataStreamServiceProxy> proxy;
+  Status get_proxy_status =
+      DataStreamService::GetProxy(krpc_host_, host_.hostname, &proxy);
+  if (!get_proxy_status.ok()) {
+    // Failing to send a filter is not a query-wide error - the remote fragment will
+    // continue regardless.
+    LOG(ERROR) << "Couldn't get proxy: " << get_proxy_status.msg().msg();
+    return;
+  }
+
+  PublishFilterResultPB res;
+  kudu::Status rpc_status = proxy->PublishFilter(rpc_params, &res, &controller);
+  if (!rpc_status.ok()) {
+    LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString();
+    return;
+  }
+  if (res.status().status_code() != TErrorCode::OK) {
+    LOG(ERROR) << "PublishFilter() operation failed: "
+               << Status(res.status()).GetDetail();
   }
 }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 9d390e2..fd80b86 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -106,7 +106,8 @@ class Coordinator::BackendState {
 
   /// Make a PublishFilter rpc with given params if this backend has instances of the
   /// fragment with idx == rpc_params->dst_fragment_idx, otherwise do nothing.
-  void PublishFilter(const TPublishFilterParams& rpc_params);
+  void PublishFilter(
+      const PublishFilterParamsPB& rpc_params, kudu::rpc::RpcController& controller);
 
   /// Cancel execution at this backend if anything is running. Returns true
   /// if cancellation was attempted, false otherwise.
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index e16abea..55c6b3d 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -61,12 +61,13 @@ class Coordinator::FilterState {
   FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
     : desc_(desc), src_(src) {
     // bloom_filter_ is a disjunction so the unit value is always_false.
-    bloom_filter_.always_false = true;
-    min_max_filter_.always_false = true;
+    bloom_filter_.set_always_false(true);
+    min_max_filter_.set_always_false(true);
   }
 
-  TBloomFilter& bloom_filter() { return bloom_filter_; }
-  TMinMaxFilter& min_max_filter() { return min_max_filter_; }
+  BloomFilterPB& bloom_filter() { return bloom_filter_; }
+  string& bloom_filter_directory() { return bloom_filter_directory_; }
+  MinMaxFilterPB& min_max_filter() { return min_max_filter_; }
   std::vector<FilterTarget>* targets() { return &targets_; }
   const std::vector<FilterTarget>& targets() const { return targets_; }
   int64_t first_arrival_time() const { return first_arrival_time_; }
@@ -81,16 +82,17 @@ class Coordinator::FilterState {
   void set_num_producers(int num_producers) { num_producers_ = num_producers; }
   bool disabled() const {
     if (is_bloom_filter()) {
-      return bloom_filter_.always_true;
+      return bloom_filter_.always_true();
     } else {
       DCHECK(is_min_max_filter());
-      return min_max_filter_.always_true;
+      return min_max_filter_.always_true();
     }
   }
 
   /// Aggregates partitioned join filters and updates memory consumption.
   /// Disables filter if always_true filter is received or OOM is hit.
-  void ApplyUpdate(const TUpdateFilterParams& params, Coordinator* coord);
+  void ApplyUpdate(const UpdateFilterParamsPB& params, Coordinator* coord,
+      kudu::rpc::RpcContext* context);
 
   /// Disables a filter. A disabled filter consumes no memory.
   void Disable(MemTracker* tracker);
@@ -110,13 +112,16 @@ class Coordinator::FilterState {
   int num_producers_ = 0;
 
   /// Filters aggregated from all source plan nodes, to be broadcast to all
-  /// destination plan fragment instances. Only set for partitioned joins (broadcast joins
-  /// need no aggregation).
+  /// destination plan fragment instances. Only set for partitioned joins (broadcast
+  /// joins need no aggregation).
   /// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the
   /// output structure in the case of a broadcast join. Similarly, for partitioned joins,
   /// the filter is moved from the following member to the output structure.
-  TBloomFilter bloom_filter_;
-  TMinMaxFilter min_max_filter_;
+  BloomFilterPB bloom_filter_;
+  /// When the filter is a Bloom filter, we use this string to store the contents of the
+  /// aggregated Bloom filter.
+  string bloom_filter_directory_;
+  MinMaxFilterPB min_max_filter_;
 
   /// Time at which first local filter arrived.
   int64_t first_arrival_time_ = 0L;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 9ac5ee6..48d81f9 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -31,8 +31,9 @@
 #include "common/hdfs.h"
 #include "exec/data-sink.h"
 #include "exec/plan-root-sink.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/hdfs-fs-cache.h"
@@ -55,6 +56,9 @@
 
 #include "common/names.h"
 
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcSidecar;
 using namespace apache::thrift;
 using namespace rapidjson;
 using boost::algorithm::iequals;
@@ -1107,7 +1111,7 @@ vector<TNetworkAddress> Coordinator::GetActiveBackends(
   return result;
 }
 
-void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
+void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* context) {
   shared_lock<shared_mutex> lock(filter_routing_table_->lock);
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
@@ -1118,8 +1122,9 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK(filter_routing_table_->is_complete)
       << "Filter received before routing table complete";
 
-  TPublishFilterParams rpc_params;
+  PublishFilterParamsPB rpc_params;
   unordered_set<int> target_fragment_idxs;
+  string bloom_filter_directory;
   {
     lock_guard<SpinLock> l(filter_routing_table_->update_lock);
     if (!IsExecuting()) {
@@ -1127,17 +1132,17 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
                 << query_id();
       return;
     }
-    auto it = filter_routing_table_->id_to_filter.find(params.filter_id);
+    auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
     if (it == filter_routing_table_->id_to_filter.end()) {
-      LOG(INFO) << "Could not find filter with id: " << params.filter_id;
+      LOG(INFO) << "Could not find filter with id: " << params.filter_id();
       return;
     }
     FilterState* state = &it->second;
 
     DCHECK(state->desc().has_remote_targets)
-          << "Coordinator received filter that has only local targets";
+        << "Coordinator received filter that has only local targets";
 
-    // Check if the filter has already been sent, which could happen in four cases:
+    // Check if the filter has already been sent, which could happen in five cases:
     //   * if one local filter had always_true set - no point waiting for other local
     //     filters that can't affect the aggregated global filter
     //   * if this is a broadcast join, and another local filter was already received
@@ -1145,6 +1150,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
     //     immediately.
     //   * query execution finished and resources were released: filters do not need
     //     to be processed.
+    //   * if the inbound sidecar for Bloom filter cannot be successfully retrieved.
     if (state->disabled()) return;
 
     if (filter_updates_received_->value() == 0) {
@@ -1152,7 +1158,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
     }
     filter_updates_received_->Add(1);
 
-    state->ApplyUpdate(params, this);
+    state->ApplyUpdate(params, this, context);
 
     if (state->pending_count() > 0 && !state->disabled()) return;
     // At this point, we either disabled this filter or aggregation is complete.
@@ -1167,33 +1173,36 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
     }
 
     if (state->is_bloom_filter()) {
-      // Assign outgoing bloom filter.
-      TBloomFilter& aggregated_filter = state->bloom_filter();
-
-      swap(rpc_params.bloom_filter, aggregated_filter);
-      DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true
-          || !rpc_params.bloom_filter.directory.empty());
-      DCHECK(aggregated_filter.directory.empty());
-      rpc_params.__isset.bloom_filter = true;
+      // Assign an outgoing bloom filter.
+      *rpc_params.mutable_bloom_filter() = state->bloom_filter();
+      bloom_filter_directory.swap(state->bloom_filter_directory());
+      DCHECK(rpc_params.bloom_filter().always_false()
+          || rpc_params.bloom_filter().always_true() || !bloom_filter_directory.empty());
     } else {
       DCHECK(state->is_min_max_filter());
-      MinMaxFilter::Copy(state->min_max_filter(), &rpc_params.min_max_filter);
-      rpc_params.__isset.min_max_filter = true;
+      MinMaxFilter::Copy(state->min_max_filter(), rpc_params.mutable_min_max_filter());
     }
 
     // Filter is complete, and can be released.
     state->Disable(filter_mem_tracker_);
   }
 
-  rpc_params.__set_dst_query_id(query_id());
-  rpc_params.__set_filter_id(params.filter_id);
+  TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
+  rpc_params.set_filter_id(params.filter_id());
 
   // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
   for (BackendState* bs: backend_states_) {
     for (int fragment_idx: target_fragment_idxs) {
       if (!IsExecuting()) goto cleanup;
-      rpc_params.__set_dst_fragment_idx(fragment_idx);
-      bs->PublishFilter(rpc_params);
+      rpc_params.set_dst_fragment_idx(fragment_idx);
+      RpcController controller;
+      if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
+          && !rpc_params.bloom_filter().always_true()) {
+        BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), &controller,
+            bloom_filter_directory);
+      }
+      // TODO: make this asynchronous.
+      bs->PublishFilter(rpc_params, controller);
     }
   }
 
@@ -1201,13 +1210,13 @@ cleanup:
   // For bloom filters, the memory used in the filter_routing_table_ is transfered to
   // rpc_params. Hence the Release() function on the filter_mem_tracker_ is called
   // here to ensure that the MemTracker is updated after the memory is actually freed.
-  if (rpc_params.__isset.bloom_filter) {
-    filter_mem_tracker_->Release(rpc_params.bloom_filter.directory.size());
+  if (rpc_params.has_bloom_filter()) {
+    filter_mem_tracker_->Release(bloom_filter_directory.size());
   }
 }
 
-void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
-    Coordinator* coord) {
+void Coordinator::FilterState::ApplyUpdate(
+    const UpdateFilterParamsPB& params, Coordinator* coord, RpcContext* context) {
   DCHECK(!disabled());
   DCHECK_GT(pending_count_, 0);
   DCHECK_EQ(completion_time_, 0L);
@@ -1217,38 +1226,52 @@ void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
 
   --pending_count_;
   if (is_bloom_filter()) {
-    DCHECK(params.__isset.bloom_filter);
-    if (params.bloom_filter.always_true) {
+    DCHECK(params.has_bloom_filter());
+    if (params.bloom_filter().always_true()) {
       Disable(coord->filter_mem_tracker_);
-    } else if (bloom_filter_.always_false) {
-      int64_t heap_space = params.bloom_filter.directory.size();
-      if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
-        VLOG_QUERY << "Not enough memory to allocate filter: "
-                   << PrettyPrinter::Print(heap_space, TUnit::BYTES)
-                   << " (query_id=" << PrintId(coord->query_id()) << ")";
-        // Disable, as one missing update means a correct filter cannot be produced.
+    } else if (params.bloom_filter().always_false()) {
+      if (!bloom_filter_.has_log_bufferpool_space()) {
+        bloom_filter_ = BloomFilterPB(params.bloom_filter());
+      }
+    } else {
+      // If the incoming Bloom filter is neither an always true filter nor an
+      // always false filter, then it must be the case that a non-empty sidecar slice
+      // has been received. Refer to BloomFilter::ToProtobuf() for further details.
+      DCHECK(params.bloom_filter().has_directory_sidecar_idx());
+      kudu::Slice sidecar_slice;
+      kudu::Status status = context->GetInboundSidecar(
+          params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
+      if (!status.ok()) {
+        LOG(ERROR) << "Cannot get inbound sidecar: " << status.message().ToString();
         Disable(coord->filter_mem_tracker_);
+      } else if (bloom_filter_.always_false()) {
+        int64_t heap_space = sidecar_slice.size();
+        if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
+          VLOG_QUERY << "Not enough memory to allocate filter: "
+                     << PrettyPrinter::Print(heap_space, TUnit::BYTES)
+                     << " (query_id=" << PrintId(coord->query_id()) << ")";
+          // Disable, as one missing update means a correct filter cannot be produced.
+          Disable(coord->filter_mem_tracker_);
+        } else {
+          bloom_filter_ = params.bloom_filter();
+          bloom_filter_directory_ = sidecar_slice.ToString();
+        }
       } else {
-        // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to
-        // move the payload from the request rather than copy it and take double the
-        // memory cost. After this point, params.bloom_filter is an empty filter and
-        // should not be read.
-        TBloomFilter* non_const_filter = &const_cast<TBloomFilter&>(params.bloom_filter);
-        swap(bloom_filter_, *non_const_filter);
-        DCHECK_EQ(non_const_filter->directory.size(), 0);
+        DCHECK_EQ(bloom_filter_directory_.size(), sidecar_slice.size());
+        BloomFilter::Or(params.bloom_filter(), sidecar_slice.data(), &bloom_filter_,
+            reinterpret_cast<uint8_t*>(const_cast<char*>(bloom_filter_directory_.data())),
+            sidecar_slice.size());
       }
-    } else {
-      BloomFilter::Or(params.bloom_filter, &bloom_filter_);
     }
   } else {
     DCHECK(is_min_max_filter());
-    DCHECK(params.__isset.min_max_filter);
-    if (params.min_max_filter.always_true) {
+    DCHECK(params.has_min_max_filter());
+    if (params.min_max_filter().always_true()) {
       Disable(coord->filter_mem_tracker_);
-    } else if (min_max_filter_.always_false) {
-      MinMaxFilter::Copy(params.min_max_filter, &min_max_filter_);
+    } else if (min_max_filter_.always_false()) {
+      MinMaxFilter::Copy(params.min_max_filter(), &min_max_filter_);
     } else {
-      MinMaxFilter::Or(params.min_max_filter, &min_max_filter_,
+      MinMaxFilter::Or(params.min_max_filter(), &min_max_filter_,
           ColumnType::FromThrift(desc_.src_expr.nodes[0].type));
     }
   }
@@ -1260,15 +1283,15 @@ void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
 
 void Coordinator::FilterState::Disable(MemTracker* tracker) {
   if (is_bloom_filter()) {
-    bloom_filter_.always_true = true;
-    bloom_filter_.always_false = false;
-    tracker->Release(bloom_filter_.directory.size());
-    bloom_filter_.directory.clear();
-    bloom_filter_.directory.shrink_to_fit();
+    bloom_filter_.set_always_true(true);
+    bloom_filter_.set_always_false(false);
+    tracker->Release(bloom_filter_directory_.size());
+    bloom_filter_directory_.clear();
+    bloom_filter_directory_.shrink_to_fit();
   } else {
     DCHECK(is_min_max_filter());
-    min_max_filter_.always_true = true;
-    min_max_filter_.always_false = false;
+    min_max_filter_.set_always_true(true);
+    min_max_filter_.set_always_false(false);
   }
 }
 
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 9ed3324..7b05fde 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -31,12 +31,19 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/dml-exec-state.h"
 #include "util/counting-barrier.h"
 #include "util/progress-updater.h"
 #include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
 
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
 namespace impala {
 
 class AuxErrorInfoPB;
@@ -172,7 +179,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// with others for the same filter ID into a global filter. If all updates for that
   /// filter ID have been received (may be 1 or more per filter), broadcast the global
   /// filter to fragment instances.
-  void UpdateFilter(const TUpdateFilterParams& params);
+  void UpdateFilter(const UpdateFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
   /// Adds to 'document' a serialized array of all backends in a member named
   /// 'backend_states'.
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 90bbf48..4ffdb1d 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -122,6 +122,12 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
     stream_mgr_->CloseSender(request, response, rpc_context);
   }
 
+  virtual void UpdateFilter(
+      const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {}
+
+  virtual void PublishFilter(const PublishFilterParamsPB* req,
+      PublishFilterResultPB* resp, RpcContext* context) {}
+
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
 
  private:
diff --git a/be/src/runtime/decimal-value.h b/be/src/runtime/decimal-value.h
index b34cde0..ea2d126 100644
--- a/be/src/runtime/decimal-value.h
+++ b/be/src/runtime/decimal-value.h
@@ -22,6 +22,7 @@
 #include <ostream>
 
 #include "gen-cpp/Data_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/multi-precision.h"
 #include "runtime/types.h"
 
@@ -59,8 +60,8 @@ class DecimalValue {
     return FromDouble(t.precision, t.scale, d, round, overflow);
   }
 
-  /// Returns a new DecimalValue created from the value in 'tvalue'.
-  static inline DecimalValue FromTColumnValue(const TColumnValue& tvalue);
+  /// Returns a new DecimalValue created from the value in 'value_pb'.
+  static inline DecimalValue FromColumnValuePB(const ColumnValuePB& value_pb);
 
   static inline DecimalValue FromDouble(int precision, int scale, double d,
       bool round, bool* overflow);
@@ -196,11 +197,10 @@ class DecimalValue {
 
   inline DecimalValue<T> Abs() const;
 
-  /// Store the binary representation of this DecimalValue in 'tvalue'.
-  void ToTColumnValue(TColumnValue* tvalue) const {
+  /// Store the binary representation of this DecimalValue in 'value_pb'.
+  void ToColumnValuePB(ColumnValuePB* value_pb) const {
     const uint8_t* data = reinterpret_cast<const uint8_t*>(&value_);
-    tvalue->decimal_val.assign(data, data + sizeof(T));
-    tvalue->__isset.decimal_val = true;
+    value_pb->mutable_decimal_val()->assign(data, data + sizeof(T));
   }
 
  private:
diff --git a/be/src/runtime/decimal-value.inline.h b/be/src/runtime/decimal-value.inline.h
index 6480099..b2d5fc0 100644
--- a/be/src/runtime/decimal-value.inline.h
+++ b/be/src/runtime/decimal-value.inline.h
@@ -61,9 +61,9 @@ inline DecimalValue<T> DecimalValue<T>::FromDouble(int precision, int scale, dou
 }
 
 template <typename T>
-inline DecimalValue<T> DecimalValue<T>::FromTColumnValue(const TColumnValue& tvalue) {
+inline DecimalValue<T> DecimalValue<T>::FromColumnValuePB(const ColumnValuePB& value_pb) {
   T value = 0;
-  memcpy(&value, tvalue.decimal_val.c_str(), tvalue.decimal_val.length());
+  memcpy(&value, value_pb.decimal_val().c_str(), value_pb.decimal_val().length());
   return DecimalValue<T>(value);
 }
 
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 7d7fea4..d68c469 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -27,7 +27,6 @@
 #include "common/logging.h"
 #include "common/object-pool.h"
 #include "exec/kudu-util.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "kudu/rpc/service_if.h"
 #include "rpc/rpc-mgr.h"
 #include "runtime/backend-client.h"
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index b74bbe3..9236852 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -32,6 +32,7 @@
 #include "exec/hdfs-scan-node-base.h"
 #include "exec/exchange-node.h"
 #include "exec/scan-node.h"
+#include "kudu/rpc/rpc_context.h"
 #include "runtime/exec-env.h"
 #include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
@@ -49,6 +50,7 @@
 #include "util/periodic-counter-updater.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
+using kudu::rpc::RpcContext;
 using namespace impala;
 using namespace apache::thrift;
 
@@ -528,10 +530,11 @@ Status FragmentInstanceState::WaitForOpen() {
   return opened_promise_.Get();
 }
 
-void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
+void FragmentInstanceState::PublishFilter(
+    const PublishFilterParamsPB& params, RpcContext* context) {
   VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
-            << " filter_id=" << params.filter_id;
-  runtime_state_->filter_bank()->PublishGlobalFilter(params);
+            << " filter_id=" << params.filter_id();
+  runtime_state_->filter_bank()->PublishGlobalFilter(params, context);
 }
 
 const string& FragmentInstanceState::ExecStateToString(FInstanceExecStatePB state) {
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 69e081b..c151457 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -29,6 +29,7 @@
 #include "util/promise.h"
 
 #include "gen-cpp/control_service.pb.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
 #include "runtime/row-batch.h"
@@ -36,6 +37,12 @@
 #include "util/promise.h"
 #include "util/runtime-profile.h"
 
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
 namespace impala {
 
 class TPlanFragmentCtx;
@@ -90,7 +97,7 @@ class FragmentInstanceState {
   Status WaitForOpen();
 
   /// Publishes filter with ID 'filter_id' to this fragment instance's filter bank.
-  void PublishFilter(const TPublishFilterParams& params);
+  void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
   /// Called periodically by query state thread to get the current status of this fragment
   /// instance. The fragment instance's status is stored in 'instance_status' and its
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index e5a804c..678aecd 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -23,6 +23,7 @@
 #include "common/thread-debug-info.h"
 #include "exec/kudu-util.h"
 #include "exprs/expr.h"
+#include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/util/monotime.h"
@@ -40,6 +41,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/scanner-mem-limiter.h"
 #include "service/control-service.h"
+#include "service/data-stream-service.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
 #include "util/metrics.h"
@@ -50,7 +52,6 @@
 #include "gen-cpp/control_service.proxy.h"
 
 using kudu::MonoDelta;
-using kudu::rpc::RpcController;
 using kudu::rpc::RpcSidecar;
 
 #include "common/names.h"
@@ -675,11 +676,11 @@ void QueryState::Cancel() {
   for (auto entry: fis_map_) entry.second->Cancel();
 }
 
-void QueryState::PublishFilter(const TPublishFilterParams& params) {
+void QueryState::PublishFilter(const PublishFilterParamsPB& params, RpcContext* context) {
   if (!WaitForPrepare().ok()) return;
-  DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx), 1);
-  for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx]) {
-    fis->PublishFilter(params);
+  DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx()), 1);
+  for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx()]) {
+    fis->PublishFilter(params, context);
   }
 }
 
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 86156f4..f359f1a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -27,12 +27,19 @@
 #include "common/object-pool.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
 #include "runtime/tmp-file-mgr.h"
 #include "util/container-util.h"
 #include "util/counting-barrier.h"
 #include "util/uid-util.h"
 
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
 namespace impala {
 
 class ControlServiceProxy;
@@ -192,7 +199,7 @@ class QueryState {
       const TUniqueId& instance_id, FragmentInstanceState** fi_state);
 
   /// Blocks until all fragment instances have finished their Prepare phase.
-  void PublishFilter(const TPublishFilterParams& params);
+  void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
   /// Cancels all actively executing fragment instances. Blocks until all fragment
   /// instances have finished their Prepare phase. Idempotent.
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 56f63aa..21a0bfd 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -17,19 +17,26 @@
 
 #include "runtime/runtime-filter-bank.h"
 
+#include <chrono>
+
 #include <boost/algorithm/string/join.hpp>
 
 #include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/data_stream_service.proxy.h"
 #include "gutil/strings/substitute.h"
-#include "runtime/client-cache.h"
-#include "runtime/exec-env.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "runtime/backend-client.h"
 #include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/client-cache.h"
+#include "runtime/exec-env.h"
 #include "runtime/initial-reservations.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
+#include "service/data-stream-service.h"
 #include "service/impala-server.h"
 #include "util/bit-util.h"
 #include "util/bloom-filter.h"
@@ -38,6 +45,9 @@
 
 #include "common/names.h"
 
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcSidecar;
 using namespace impala;
 using namespace boost;
 using namespace strings;
@@ -102,32 +112,35 @@ RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filte
   return ret;
 }
 
-namespace {
+void RuntimeFilterBank::UpdateFilterCompleteCb(
+    const RpcController* rpc_controller, const UpdateFilterResultPB* res) {
+  const kudu::Status controller_status = rpc_controller->status();
 
-/// Sends a filter to the coordinator. Executed asynchronously in the context of
-/// ExecEnv::rpc_pool().
-void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
-    ImpalaBackendClientCache* client_cache) {
-  Status status;
-  ImpalaBackendConnection coord(client_cache, address, &status);
-  if (!status.ok()) {
-    // Failing to send a filter is not a query-wide error - the remote fragment will
-    // continue regardless.
-    // TODO: Retry.
-    LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
-    return;
+  // In the case of an unsuccessful KRPC call, e.g., request dropped due to
+  // backpressure, we only log this event w/o retrying. Failing to send a
+  // filter is not a query-wide error - the remote fragment will continue
+  // regardless.
+  if (!controller_status.ok()) {
+    LOG(ERROR) << "UpdateFilter() failed: " << controller_status.message().ToString();
   }
-  TUpdateFilterResult res;
-  status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
-}
+  // DataStreamService::UpdateFilter() should never set an error status
+  DCHECK_EQ(res->status().status_code(), TErrorCode::OK);
 
+  {
+    std::unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
+    DCHECK_GT(num_inflight_rpcs_, 0);
+    --num_inflight_rpcs_;
+  }
+  krpcs_done_cv_.notify_one();
 }
 
 void RuntimeFilterBank::UpdateFilterFromLocal(
     int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
   DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
       << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
-  TUpdateFilterParams params;
+  // This function is only called from ExecNode::Open() or more specifically
+  // PartitionedHashJoinNode::Open().
+  DCHECK(!closed_);
   // A runtime filter may have both local and remote targets.
   bool has_local_target = false;
   bool has_remote_target = false;
@@ -159,64 +172,108 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
 
   if (has_remote_target
       && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
-    params.__set_filter_id(filter_id);
-    params.__set_query_id(state_->query_id());
+    UpdateFilterParamsPB params;
+    // The memory associated with the following 2 objects needs to live until
+    // the asynchronous KRPC call proxy->UpdateFilterAsync() is completed.
+    // Hence, we allocate these 2 objects in 'obj_pool_'.
+    UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB);
+    RpcController* controller = obj_pool_.Add(new RpcController);
+
+    TUniqueIdToUniqueIdPB(state_->query_id(), params.mutable_query_id());
+    params.set_filter_id(filter_id);
     if (type == TRuntimeFilterType::BLOOM) {
-      BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
-      params.__isset.bloom_filter = true;
+      BloomFilter::ToProtobuf(bloom_filter, controller, params.mutable_bloom_filter());
     } else {
-      DCHECK(type == TRuntimeFilterType::MIN_MAX);
-      min_max_filter->ToThrift(&params.min_max_filter);
-      params.__isset.min_max_filter = true;
+      DCHECK_EQ(type, TRuntimeFilterType::MIN_MAX);
+      min_max_filter->ToProtobuf(params.mutable_min_max_filter());
+    }
+    const TNetworkAddress& krpc_address = state_->query_ctx().coord_krpc_address;
+    const TNetworkAddress& host_address = state_->query_ctx().coord_address;
+
+    // Use 'proxy' to send the filter to the coordinator.
+    unique_ptr<DataStreamServiceProxy> proxy;
+    Status get_proxy_status =
+        DataStreamService::GetProxy(krpc_address, host_address.hostname, &proxy);
+    if (!get_proxy_status.ok()) {
+      // Failing to send a filter is not a query-wide error - the remote fragment will
+      // continue regardless.
+      LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1",
+          host_address.hostname, get_proxy_status.msg().msg());
+      return;
     }
 
-    ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
-        SendFilterToCoordinator, state_->query_ctx().coord_address, params,
-        ExecEnv::GetInstance()->impalad_client_cache()));
+    // Increment 'num_inflight_rpcs_' to make sure that the filter will not be deallocated
+    // in Close() until all in-flight RPCs complete.
+    {
+      unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
+      DCHECK_GE(num_inflight_rpcs_, 0);
+      ++num_inflight_rpcs_;
+    }
+
+    proxy->UpdateFilterAsync(params, res, controller,
+        boost::bind(&RuntimeFilterBank::UpdateFilterCompleteCb, this, controller, res));
   }
 }
 
-void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params) {
+void RuntimeFilterBank::PublishGlobalFilter(
+    const PublishFilterParamsPB& params, RpcContext* context) {
   lock_guard<mutex> l(runtime_filter_lock_);
   if (closed_) return;
-  RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id);
+  RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id());
   DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
-                                        << params.filter_id;
+                                        << params.filter_id();
 
   BloomFilter* bloom_filter = nullptr;
   MinMaxFilter* min_max_filter = nullptr;
   if (it->second->is_bloom_filter()) {
-    DCHECK(params.__isset.bloom_filter);
-    if (params.bloom_filter.always_true) {
+    DCHECK(params.has_bloom_filter());
+    if (params.bloom_filter().always_true()) {
       bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
     } else {
-      int64_t required_space =
-          BloomFilter::GetExpectedMemoryUsed(params.bloom_filter.log_bufferpool_space);
+      int64_t required_space = BloomFilter::GetExpectedMemoryUsed(
+          params.bloom_filter().log_bufferpool_space());
       DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
           << "BufferPool Client should have enough reservation to fulfill bloom filter "
              "allocation";
       bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
-      Status status = bloom_filter->Init(params.bloom_filter);
-      if (!status.ok()) {
-        LOG(ERROR) << "Unable to allocate memory for bloom filter: "
-                   << status.GetDetail();
-        bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+
+      kudu::Slice sidecar_slice;
+      if (params.bloom_filter().has_directory_sidecar_idx()) {
+        kudu::Status status = context->GetInboundSidecar(
+            params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
+        if (!status.ok()) {
+          LOG(ERROR) << "Failed to get Bloom filter sidecar: "
+                     << status.message().ToString();
+          bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+        }
       } else {
-        bloom_filters_.push_back(bloom_filter);
-        DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
-        bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
+        DCHECK(params.bloom_filter().always_false());
+      }
+
+      if (bloom_filter != BloomFilter::ALWAYS_TRUE_FILTER) {
+        Status status = bloom_filter->Init(
+            params.bloom_filter(), sidecar_slice.data(), sidecar_slice.size());
+        if (!status.ok()) {
+          LOG(ERROR) << "Unable to allocate memory for bloom filter: "
+                     << status.GetDetail();
+          bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+        } else {
+          bloom_filters_.push_back(bloom_filter);
+          DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
+          bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
+        }
       }
     }
   } else {
     DCHECK(it->second->is_min_max_filter());
-    DCHECK(params.__isset.min_max_filter);
-    min_max_filter = MinMaxFilter::Create(
-        params.min_max_filter, it->second->type(), &obj_pool_, filter_mem_tracker_.get());
+    DCHECK(params.has_min_max_filter());
+    min_max_filter = MinMaxFilter::Create(params.min_max_filter(), it->second->type(),
+        &obj_pool_, filter_mem_tracker_.get());
     min_max_filters_.push_back(min_max_filter);
   }
   it->second->SetFilter(bloom_filter, min_max_filter);
   state_->runtime_profile()->AddInfoString(
-      Substitute("Filter $0 arrival", params.filter_id),
+      Substitute("Filter $0 arrival", params.filter_id()),
       PrettyPrinter::Print(it->second->arrival_delay_ms(), TUnit::TIME_MS));
 }
 
@@ -278,8 +335,21 @@ void RuntimeFilterBank::CancelLocked() {
 }
 
 void RuntimeFilterBank::Close() {
-  lock_guard<mutex> l(runtime_filter_lock_);
+  // Wait for all in-flight RPCs to complete before closing the filters.
+  {
+    unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
+    while (num_inflight_rpcs_ > 0) {
+      krpcs_done_cv_.wait(l1);
+    }
+  }
+
+  lock_guard<mutex> l2(runtime_filter_lock_);
   CancelLocked();
+  // We do not have to set 'closed_' to true before waiting for all in-flight RPCs to
+  // drain because the async build thread in
+  // BlockingJoinNode::ProcessBuildInputAndOpenProbe() should have exited by the time
+  // Close() is called so there shouldn't be any new RPCs being issued when this function
+  // is called.
   closed_ = true;
   for (BloomFilter* filter : bloom_filters_) filter->Close();
   for (MinMaxFilter* filter : min_max_filters_) filter->Close();
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 1208f03..78a95cf 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -20,6 +20,7 @@
 
 #include "codegen/impala-ir.h"
 #include "common/object-pool.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/mem-pool.h"
 #include "runtime/types.h"
@@ -29,6 +30,15 @@
 #include <boost/thread/lock_guard.hpp>
 #include <boost/unordered_map.hpp>
 
+#include <condition_variable>
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+class RpcController;
+} // namespace rpc
+} // namespace kudu
+
 namespace impala {
 
 class BloomFilter;
@@ -36,7 +46,6 @@ class MemTracker;
 class MinMaxFilter;
 class RuntimeFilter;
 class RuntimeState;
-class TBloomFilter;
 class TRuntimeFilterDesc;
 class TQueryCtx;
 
@@ -94,7 +103,8 @@ class RuntimeFilterBank {
 
   /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
   /// consumption by operators that wish to use it for filtering.
-  void PublishGlobalFilter(const TPublishFilterParams& params);
+  void PublishGlobalFilter(
+      const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
   /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
   /// 'filter_size' would have an expected false-positive rate which would exceed
@@ -150,6 +160,16 @@ class RuntimeFilterBank {
   /// All filters expected to be consumed by the local plan fragment instance.
   RuntimeFilterMap consumed_filters_;
 
+  /// Lock protecting 'num_inflight_rpcs_' and it should not be taken at the same
+  /// time as runtime_filter_lock_.
+  SpinLock num_inflight_rpcs_lock_;
+  /// Use 'num_inflight_rpcs_' to keep track of the number of current in-flight
+  /// KRPC calls to prevent the memory pointed to by a BloomFilter* being
+  /// deallocated in RuntimeFilterBank::Close() before all KRPC calls have
+  /// been completed.
+  int32_t num_inflight_rpcs_ = 0;
+  std::condition_variable_any krpcs_done_cv_;
+
   /// Fragment instance's runtime state.
   RuntimeState* state_;
 
@@ -184,6 +204,11 @@ class RuntimeFilterBank {
   /// in ClaimBufferReservation(). Reservations are returned to the initial reservations
   /// pool in Close().
   BufferPool::ClientHandle buffer_pool_client_;
+
+  /// This is the callback for the asynchronous rpc UpdateFilterAsync() in
+  /// UpdateFilterFromLocal().
+  void UpdateFilterCompleteCb(
+      const kudu::rpc::RpcController* rpc_controller, const UpdateFilterResultPB* res);
 };
 
 }
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 5b65f7a..618788c 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -67,7 +67,8 @@ class RuntimeFilter {
 
   MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); }
 
-  /// Sets the internal filter bloom_filter to 'bloom_filter'. Can only legally be called
+  /// Sets the internal filter bloom_filter to 'bloom_filter' or 'min_max_filter'
+  /// depending on the type of this RuntimeFilter. Can only legally be called
   /// once per filter. Does not acquire the memory associated with 'bloom_filter'.
   void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
 
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index 2b2ce79..4eb5c37 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -27,6 +27,7 @@
 
 #include "common/global-types.h"
 #include "gen-cpp/Data_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "udf/udf.h"
 #include "util/hash-util.h"
 
@@ -166,17 +167,16 @@ class TimestampValue {
     *ptp = boost::posix_time::ptime(date_, time_);
   }
 
-  // Store the binary representation of this TimestampValue in 'tvalue'.
-  void ToTColumnValue(TColumnValue* tvalue) const {
+  // Store the binary representation of this TimestampValue in 'pvalue'.
+  void ToColumnValuePB(ColumnValuePB* pvalue) const {
     const uint8_t* data = reinterpret_cast<const uint8_t*>(this);
-    tvalue->timestamp_val.assign(data, data + Size());
-    tvalue->__isset.timestamp_val = true;
+    pvalue->mutable_timestamp_val()->assign(data, data + Size());
   }
 
-  // Returns a new TimestampValue created from the value in 'tvalue'.
-  static TimestampValue FromTColumnValue(const TColumnValue& tvalue) {
+  // Returns a new TimestampValue created from the value in 'value_pb'.
+  static TimestampValue FromColumnValuePB(const ColumnValuePB& value_pb) {
     TimestampValue value;
-    memcpy(&value, tvalue.timestamp_val.c_str(), Size());
+    memcpy(&value, value_pb.timestamp_val().c_str(), Size());
     value.Validate();
     return value;
   }
diff --git a/be/src/scheduling/request-pool-service.h b/be/src/scheduling/request-pool-service.h
index 02642c4..ad38900 100644
--- a/be/src/scheduling/request-pool-service.h
+++ b/be/src/scheduling/request-pool-service.h
@@ -20,7 +20,6 @@
 
 #include <jni.h>
 
-#include "gen-cpp/ImpalaInternalService.h"
 #include "common/status.h"
 #include "util/metrics-fwd.h"
 
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 1bdcfcf..6197d1c 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1409,9 +1409,10 @@ Status ClientRequestState::UpdateBackendExecStatus(
   return coord_->UpdateBackendExecStatus(request, thrift_profiles);
 }
 
-void ClientRequestState::UpdateFilter(const TUpdateFilterParams& params) {
+void ClientRequestState::UpdateFilter(
+    const UpdateFilterParamsPB& params, RpcContext* context) {
   DCHECK(coord_.get());
-  coord_->UpdateFilter(params);
+  coord_->UpdateFilter(params, context);
 }
 
 bool ClientRequestState::GetDmlStats(TDmlResult* dml_result, Status* query_status) {
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index ac901a4..1ef3b1d 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -190,7 +190,7 @@ class ClientRequestState {
   /// object.
   Status UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
       const TRuntimeProfileForest& thrift_profiles) WARN_UNUSED_RESULT;
-  void UpdateFilter(const TUpdateFilterParams& params);
+  void UpdateFilter(const UpdateFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
   /// Populate DML stats in 'dml_result' if this request succeeded.
   /// Sets 'query_status' to the overall query status.
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 42b14f9..71ec8cc 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -29,7 +29,9 @@
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
+#include "service/impala-server.h"
 #include "util/memory-metrics.h"
 #include "util/parse-util.h"
 
@@ -106,6 +108,38 @@ void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
   ExecEnv::GetInstance()->stream_mgr()->AddData(request, response, rpc_context);
 }
 
+void DataStreamService::UpdateFilter(
+    const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {
+  // This failpoint is to allow jitter to be injected.
+  DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
+  DCHECK(req->has_filter_id());
+  DCHECK(req->has_query_id());
+  DCHECK(req->has_bloom_filter() || req->has_min_max_filter());
+  ExecEnv::GetInstance()->impala_server()->UpdateFilter(resp, *req, context);
+  RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
+}
+
+void DataStreamService::PublishFilter(
+    const PublishFilterParamsPB* req, PublishFilterResultPB* resp, RpcContext* context) {
+  // This failpoint is to allow jitter to be injected.
+  DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
+  DCHECK(req->has_filter_id());
+  DCHECK(req->has_dst_query_id());
+  DCHECK(req->has_dst_fragment_idx());
+  DCHECK(req->has_bloom_filter() || req->has_min_max_filter());
+  QueryState::ScopedRef qs(ProtoToQueryId(req->dst_query_id()));
+
+  if (qs.get() != nullptr) {
+    qs->PublishFilter(*req, context);
+    RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
+  } else {
+    string err_msg = Substitute("Query State not found for query_id=$0",
+        PrintId(ProtoToQueryId(req->dst_query_id())));
+    LOG(INFO) << err_msg;
+    RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
+  }
+}
+
 template<typename ResponsePBType>
 void DataStreamService::RespondRpc(const Status& status,
     ResponsePBType* response, kudu::rpc::RpcContext* ctx) {
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index 539974c..24a2249 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -63,6 +63,16 @@ class DataStreamService : public DataStreamServiceIf {
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, kudu::rpc::RpcContext* context);
 
+  /// Called by fragment instances that produce local runtime filters to deliver them to
+  /// the coordinator for aggregation and broadcast.
+  virtual void UpdateFilter(const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp,
+      kudu::rpc::RpcContext* context);
+
+  /// Called by the coordinator to deliver global runtime filters to fragments for
+  /// application at plan nodes.
+  virtual void PublishFilter(const PublishFilterParamsPB* req,
+      PublishFilterResultPB* resp, kudu::rpc::RpcContext* context);
+
   /// Respond to a RPC passed in 'response'/'ctx' with 'status' and release
   /// the payload memory from 'mem_tracker'. Takes ownership of 'ctx'.
   template<typename ResponsePBType>
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index d703406..69a5736 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -22,7 +22,6 @@
 
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/ImpalaHiveServer2Service.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/LineageGraph_types.h"
 #include "common/status.h"
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index 22a8243..3b7af8a 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -43,24 +43,3 @@ template <typename T> void SetUnknownIdError(
       Substitute("Unknown $0 id: $1", id_type, PrintId(id))));
   status.SetTStatus(status_container);
 }
-
-void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
-    const TUpdateFilterParams& params) {
-  DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
-  DCHECK(params.__isset.filter_id);
-  DCHECK(params.__isset.query_id);
-  DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
-  impala_server_->UpdateFilter(return_val, params);
-}
-
-void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
-    const TPublishFilterParams& params) {
-  DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
-  DCHECK(params.__isset.filter_id);
-  DCHECK(params.__isset.dst_query_id);
-  DCHECK(params.__isset.dst_fragment_idx);
-  DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
-  QueryState::ScopedRef qs(params.dst_query_id);
-  if (qs.get() == nullptr) return;
-  qs->PublishFilter(params);
-}
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index c75122b..425678b 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -30,10 +30,6 @@ class ImpalaServer;
 class ImpalaInternalService : public ImpalaInternalServiceIf {
  public:
   ImpalaInternalService();
-  virtual void UpdateFilter(TUpdateFilterResult& return_val,
-      const TUpdateFilterParams& params);
-  virtual void PublishFilter(TPublishFilterResult& return_val,
-      const TPublishFilterParams& params);
 
  private:
   ImpalaServer* impala_server_;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 11ea058..76184f8 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -53,6 +53,7 @@
 #include "exec/external-data-source-executor.h"
 #include "exprs/timezone_db.h"
 #include "gen-cpp/CatalogService_constants.h"
+#include "kudu/rpc/rpc_context.h"
 #include "kudu/util/random_util.h"
 #include "rpc/authentication.h"
 #include "rpc/rpc-trace.h"
@@ -95,7 +96,6 @@
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/DataSinks_types.h"
 #include "gen-cpp/ImpalaService_types.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/LineageGraph_types.h"
 #include "gen-cpp/Frontend_types.h"
 
@@ -113,6 +113,7 @@ using boost::system_time;
 using boost::uuids::random_generator;
 using boost::uuids::uuid;
 using kudu::GetRandomSeed32;
+using kudu::rpc::RpcContext;
 using namespace apache::hive::service::cli::thrift;
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
@@ -2537,17 +2538,18 @@ Status ImpalaServer::CheckClientRequestSession(
   return Status::OK();
 }
 
-void ImpalaServer::UpdateFilter(TUpdateFilterResult& result,
-    const TUpdateFilterParams& params) {
-  DCHECK(params.__isset.query_id);
-  DCHECK(params.__isset.filter_id);
+void ImpalaServer::UpdateFilter(UpdateFilterResultPB* result,
+    const UpdateFilterParamsPB& params, RpcContext* context) {
+  DCHECK(params.has_query_id());
+  DCHECK(params.has_filter_id());
   shared_ptr<ClientRequestState> client_request_state =
-      GetClientRequestState(params.query_id);
+      GetClientRequestState(ProtoToQueryId(params.query_id()));
   if (client_request_state.get() == nullptr) {
-    LOG(INFO) << "Could not find client request state: " << PrintId(params.query_id);
+    LOG(INFO) << "Could not find client request state: "
+              << PrintId(ProtoToQueryId(params.query_id()));
     return;
   }
-  client_request_state->UpdateFilter(params);
+  client_request_state->UpdateFilter(params, context);
 }
 
 Status ImpalaServer::CheckNotShuttingDown() const {
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 7e78532..0746784 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -49,6 +49,12 @@
 #include "util/thread-pool.h"
 #include "util/time.h"
 
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
 namespace impala {
 using kudu::ThreadSafeRandom;
 
@@ -57,12 +63,7 @@ class DataSink;
 class CancellationWork;
 class ImpalaHttpHandler;
 class RowDescriptor;
-class TCatalogUpdate;
-class TPlanExecRequest;
-class TPlanExecParams;
 class TDmlResult;
-class TReportExecStatusArgs;
-class TReportExecStatusResult;
 class TNetworkAddress;
 class TClientRequest;
 class TExecRequest;
@@ -345,8 +346,8 @@ class ImpalaServer : public ImpalaServiceIf,
   virtual void CloseImpalaOperation(
       TCloseImpalaOperationResp& return_val, const TCloseImpalaOperationReq& request);
 
-  /// ImpalaInternalService rpcs
-  void UpdateFilter(TUpdateFilterResult& return_val, const TUpdateFilterParams& params);
+  void UpdateFilter(UpdateFilterResultPB* return_val, const UpdateFilterParamsPB& params,
+      kudu::rpc::RpcContext* context);
 
   /// Generates a unique id for this query and sets it in the given query context.
   /// Prepares the given query context by populating fields required for evaluating
diff --git a/be/src/util/bloom-filter-test.cc b/be/src/util/bloom-filter-test.cc
index e8e7e2e..7a7d27f 100644
--- a/be/src/util/bloom-filter-test.cc
+++ b/be/src/util/bloom-filter-test.cc
@@ -21,6 +21,7 @@
 #include <unordered_set>
 #include <vector>
 
+#include "kudu/rpc/rpc_controller.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/mem-tracker.h"
@@ -28,11 +29,14 @@
 #include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 
-using namespace std;
+#include "gen-cpp/data_stream_service.pb.h"
 
-namespace {
+using namespace std;
 
 using namespace impala;
+using kudu::rpc::RpcController;
+
+namespace bloom_filter_test_util {
 
 // Make a random uint64_t, avoiding the absent high bit and the low-entropy low bits
 // produced by rand().
@@ -68,24 +72,47 @@ bool BfFind(BloomFilter& bf, uint32_t h) {
 // Computes union of 'x' and 'y'. Computes twice with AVX enabled and disabled and
 // verifies both produce the same result. 'success' is set to true if both union
 // computations returned the same result and set to false otherwise.
-TBloomFilter BfUnion(const BloomFilter& x, const BloomFilter& y, bool* success) {
-  TBloomFilter thrift_x, thrift_y;
-  BloomFilter::ToThrift(&x, &thrift_x);
-  BloomFilter::ToThrift(&y, &thrift_y);
-  BloomFilter::Or(thrift_x, &thrift_y);
+void BfUnion(const BloomFilter& x, const BloomFilter& y, int64_t directory_size,
+    bool* success, BloomFilterPB* protobuf, std::string* directory) {
+  BloomFilterPB protobuf_x, protobuf_y;
+  RpcController controller_x;
+  RpcController controller_y;
+  BloomFilter::ToProtobuf(&x, &controller_x, &protobuf_x);
+  BloomFilter::ToProtobuf(&y, &controller_y, &protobuf_y);
+
+  string directory_x(reinterpret_cast<const char*>(x.directory_), directory_size);
+  string directory_y(reinterpret_cast<const char*>(y.directory_), directory_size);
+
+  BloomFilter::Or(protobuf_x, reinterpret_cast<const uint8_t*>(directory_x.data()),
+      &protobuf_y, reinterpret_cast<uint8_t*>(const_cast<char*>(directory_y.data())),
+      directory_size);
+
   {
     CpuInfo::TempDisable t1(CpuInfo::AVX);
     CpuInfo::TempDisable t2(CpuInfo::AVX2);
-    TBloomFilter thrift_x2, thrift_y2;
-    BloomFilter::ToThrift(&x, &thrift_x2);
-    BloomFilter::ToThrift(&y, &thrift_y2);
-    BloomFilter::Or(thrift_x2, &thrift_y2);
-    *success = thrift_y.directory == thrift_y2.directory;
+    BloomFilterPB protobuf_x2, protobuf_y2;
+    RpcController controller_x2;
+    RpcController controller_y2;
+    BloomFilter::ToProtobuf(&x, &controller_x2, &protobuf_x2);
+    BloomFilter::ToProtobuf(&y, &controller_y2, &protobuf_y2);
+
+    string directory_x2(reinterpret_cast<const char*>(x.directory_), directory_size);
+    string directory_y2(reinterpret_cast<const char*>(y.directory_), directory_size);
+
+    BloomFilter::Or(protobuf_x2, reinterpret_cast<const uint8_t*>(directory_x2.data()),
+        &protobuf_y2, reinterpret_cast<uint8_t*>(const_cast<char*>(directory_y2.data())),
+        directory_size);
+
+    *success = directory_y.compare(directory_y2) == 0;
   }
-  return thrift_y;
+
+  *protobuf = protobuf_y;
+  *directory = directory_y;
 }
 
-}  // namespace
+} // namespace bloom_filter_test_util
+
+using namespace bloom_filter_test_util;
 
 namespace impala {
 
@@ -204,12 +231,15 @@ class BloomFilterTest : public testing::Test {
     return bloom_filter;
   }
 
-  BloomFilter* CreateBloomFilter(TBloomFilter t_filter) {
+  BloomFilter* CreateBloomFilter(BloomFilterPB filter_pb, const std::string& directory) {
     int64_t filter_size =
-        BloomFilter::GetExpectedMemoryUsed(t_filter.log_bufferpool_space);
+        BloomFilter::GetExpectedMemoryUsed(filter_pb.log_bufferpool_space());
     EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
     BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
-    EXPECT_OK(bloom_filter->Init(t_filter));
+
+    EXPECT_OK(bloom_filter->Init(
+        filter_pb, reinterpret_cast<const uint8_t*>(directory.data()), directory.size()));
+
     bloom_filters_.push_back(bloom_filter);
     EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
     return bloom_filter;
@@ -311,7 +341,7 @@ TEST_F(BloomFilterTest, FindInvalid) {
   }
 }
 
-TEST_F(BloomFilterTest, Thrift) {
+TEST_F(BloomFilterTest, Protobuf) {
   BloomFilter* bf = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
   for (int i = 0; i < 10; ++i) BfInsert(*bf, i);
   // Check no unexpected new false positives.
@@ -320,19 +350,27 @@ TEST_F(BloomFilterTest, Thrift) {
     if (!BfFind(*bf, i)) missing_ints.insert(i);
   }
 
-  TBloomFilter to_thrift;
-  BloomFilter::ToThrift(bf, &to_thrift);
-  EXPECT_EQ(to_thrift.always_true, false);
+  BloomFilterPB to_protobuf;
+
+  RpcController controller;
+  BloomFilter::ToProtobuf(bf, &controller, &to_protobuf);
+
+  EXPECT_EQ(to_protobuf.always_true(), false);
+
+  std::string directory(reinterpret_cast<const char*>(bf->directory_),
+      BloomFilter::GetExpectedMemoryUsed(BloomFilter::MinLogSpace(100, 0.01)));
 
-  BloomFilter* from_thrift = CreateBloomFilter(to_thrift);
-  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_thrift, i));
-  for (int missing: missing_ints) ASSERT_FALSE(BfFind(*from_thrift, missing));
+  BloomFilter* from_protobuf = CreateBloomFilter(to_protobuf, directory);
 
-  BloomFilter::ToThrift(NULL, &to_thrift);
-  EXPECT_EQ(to_thrift.always_true, true);
+  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_protobuf, i));
+  for (int missing : missing_ints) ASSERT_FALSE(BfFind(*from_protobuf, missing));
+
+  RpcController controller_2;
+  BloomFilter::ToProtobuf(nullptr, &controller_2, &to_protobuf);
+  EXPECT_EQ(to_protobuf.always_true(), true);
 }
 
-TEST_F(BloomFilterTest, ThriftOr) {
+TEST_F(BloomFilterTest, ProtobufOr) {
   BloomFilter* bf1 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
   BloomFilter* bf2 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
 
@@ -340,7 +378,15 @@ TEST_F(BloomFilterTest, ThriftOr) {
   for (int i = 0; i < 10; ++i) BfInsert(*bf1, i);
 
   bool success;
-  BloomFilter *bf3 = CreateBloomFilter(BfUnion(*bf1, *bf2, &success));
+  BloomFilterPB protobuf;
+  std::string directory;
+  int64_t directory_size =
+      BloomFilter::GetExpectedMemoryUsed(BloomFilter::MinLogSpace(100, 0.01));
+
+  BfUnion(*bf1, *bf2, directory_size, &success, &protobuf, &directory);
+
+  BloomFilter* bf3 = CreateBloomFilter(protobuf, directory);
+
   ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
   for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
   for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
@@ -348,8 +394,10 @@ TEST_F(BloomFilterTest, ThriftOr) {
   // Insert another value to aggregated BloomFilter.
   for (int i = 11; i < 50; ++i) BfInsert(*bf3, i);
 
-  // Apply TBloomFilter back to BloomFilter and verify if aggregation was correct.
-  BloomFilter *bf4 = CreateBloomFilter(BfUnion(*bf1, *bf3, &success));
+  // Apply BloomFilterPB back to BloomFilter and verify if aggregation was correct.
+  BfUnion(*bf1, *bf3, directory_size, &success, &protobuf, &directory);
+  BloomFilter* bf4 = CreateBloomFilter(protobuf, directory);
+
   ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
   for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
   for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index 0a2f8fc..c41c81b 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -17,6 +17,8 @@
 
 #include "util/bloom-filter.h"
 
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
 
@@ -55,12 +57,13 @@ Status BloomFilter::Init(const int log_bufferpool_space) {
   return Status::OK();
 }
 
-Status BloomFilter::Init(const TBloomFilter& thrift) {
-  RETURN_IF_ERROR(Init(thrift.log_bufferpool_space));
-  if (directory_ != nullptr && !thrift.always_false) {
+Status BloomFilter::Init(const BloomFilterPB& protobuf, const uint8_t* directory_in,
+    size_t directory_in_size) {
+  RETURN_IF_ERROR(Init(protobuf.log_bufferpool_space()));
+  if (directory_ != nullptr && !protobuf.always_false()) {
     always_false_ = false;
-    DCHECK_EQ(thrift.directory.size(), directory_size());
-    memcpy(directory_, &thrift.directory[0], thrift.directory.size());
+    DCHECK_EQ(directory_in_size, directory_size());
+    memcpy(directory_, directory_in, directory_in_size);
   }
   return Status::OK();
 }
@@ -73,32 +76,66 @@ void BloomFilter::Close() {
   }
 }
 
-void BloomFilter::ToThrift(TBloomFilter* thrift) const {
-  thrift->log_bufferpool_space = log_num_buckets_ + LOG_BUCKET_BYTE_SIZE;
+void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
+    kudu::rpc::RpcController* controller, const char* directory,
+    unsigned long directory_size) {
+  DCHECK(rpc_params != nullptr);
+  DCHECK(!rpc_params->always_false());
+  DCHECK(!rpc_params->always_true());
+  kudu::Slice dir_slice(directory, directory_size);
+  unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar =
+      kudu::rpc::RpcSidecar::FromSlice(dir_slice);
+
+  int sidecar_idx = -1;
+  kudu::Status sidecar_status =
+      controller->AddOutboundSidecar(std::move(rpc_sidecar), &sidecar_idx);
+  if (!sidecar_status.ok()) {
+    LOG(ERROR) << "Cannot add outbound sidecar: " << sidecar_status.message().ToString();
+    // If AddOutboundSidecar() fails, we 'disable' the BloomFilterPB by setting it to
+    // an always true filter.
+    rpc_params->set_always_false(false);
+    rpc_params->set_always_true(true);
+    return;
+  }
+  rpc_params->set_directory_sidecar_idx(sidecar_idx);
+  rpc_params->set_always_false(false);
+  rpc_params->set_always_true(false);
+}
+
+void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
+    kudu::rpc::RpcController* controller, const string& directory) {
+      AddDirectorySidecar(rpc_params, controller,
+      reinterpret_cast<const char*>(&(directory[0])),
+      static_cast<unsigned long>(directory.size()));
+}
+
+void BloomFilter::ToProtobuf(
+    BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const {
+  protobuf->set_log_bufferpool_space(log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
   if (always_false_) {
-    thrift->always_false = true;
-    thrift->always_true = false;
+    protobuf->set_always_false(true);
+    protobuf->set_always_true(false);
     return;
   }
-  thrift->directory.assign(reinterpret_cast<const char*>(directory_),
+  BloomFilter::AddDirectorySidecar(protobuf, controller,
+      reinterpret_cast<const char*>(directory_),
       static_cast<unsigned long>(directory_size()));
-  thrift->always_false = false;
-  thrift->always_true = false;
 }
 
-void BloomFilter::ToThrift(const BloomFilter* filter, TBloomFilter* thrift) {
-  DCHECK(thrift != nullptr);
+void BloomFilter::ToProtobuf(const BloomFilter* filter,
+    kudu::rpc::RpcController* controller, BloomFilterPB* protobuf) {
+  DCHECK(protobuf != nullptr);
+  // If filter == nullptr, then this BloomFilter is an always true filter.
   if (filter == nullptr) {
-    thrift->always_true = true;
-    DCHECK_EQ(thrift->always_false, false);
+    protobuf->set_always_true(true);
+    DCHECK(!protobuf->always_false());
     return;
   }
-  filter->ToThrift(thrift);
+  filter->ToProtobuf(protobuf, controller);
 }
 
 // The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. However, we
 // compile with -fno-strict-aliasing.
-
 void BloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept {
   // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is
   // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second
@@ -184,20 +221,17 @@ OrEqualArrayAvx(size_t n, const char* __restrict__ in, char* __restrict__ out) {
 }
 } //namespace
 
-void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
+void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in,
+    BloomFilterPB* out, uint8_t* directory_out, size_t directory_size) {
   DCHECK(out != nullptr);
   DCHECK(&in != out);
   // These cases are impossible in current code. If they become possible in the future,
   // memory usage should be tracked accordingly.
-  DCHECK(!out->always_false);
-  DCHECK(!out->always_true);
-  DCHECK(!in.always_true);
-  if (in.always_false) return;
-  DCHECK_EQ(in.log_bufferpool_space, out->log_bufferpool_space);
-  DCHECK_EQ(in.directory.size(), out->directory.size())
-      << "Equal log heap space " << in.log_bufferpool_space
-      << ", but different directory sizes: " << in.directory.size() << ", "
-      << out->directory.size();
+  DCHECK(!out->always_false());
+  DCHECK(!out->always_true());
+  DCHECK(!in.always_true());
+  if (in.always_false()) return;
+  DCHECK_EQ(in.log_bufferpool_space(), out->log_bufferpool_space());
   // The trivial loop out[i] |= in[i] should auto-vectorize with gcc at -O3, but it is not
   // written in a way that is very friendly to auto-vectorization. Instead, we manually
   // vectorize, increasing the speed by up to 56x.
@@ -205,13 +239,14 @@ void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
   // TODO: Tune gcc flags to auto-vectorize the trivial loop instead of hand-vectorizing
   // it. This might not be possible.
   if (CpuInfo::IsSupported(CpuInfo::AVX)) {
-    OrEqualArrayAvx(in.directory.size(), &in.directory[0], &out->directory[0]);
+    OrEqualArrayAvx(directory_size, reinterpret_cast<const char*>(directory_in),
+        reinterpret_cast<char*>(directory_out));
   } else {
-    const __m128i* simd_in = reinterpret_cast<const __m128i*>(&in.directory[0]);
+    const __m128i* simd_in = reinterpret_cast<const __m128i*>(directory_in);
     const __m128i* const simd_in_end =
-        reinterpret_cast<const __m128i*>(&in.directory[0] + in.directory.size());
-    __m128i* simd_out = reinterpret_cast<__m128i*>(&out->directory[0]);
-    // in.directory has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
+        reinterpret_cast<const __m128i*>(directory_in + directory_size);
+    __m128i* simd_out = reinterpret_cast<__m128i*>(directory_out);
+    // directory_in has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
     // == 16, we can do two _mm_or_si128's in each iteration without checking array
     // bounds.
     while (simd_in != simd_in_end) {
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 73cb01e..9c22120 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -27,11 +27,35 @@
 
 #include "common/compiler-util.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "gutil/macros.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "util/cpu-info.h"
 #include "util/hash-util.h"
 
+namespace kudu {
+namespace rpc {
+class RpcController;
+} // namespace rpc
+} // namespace kudu
+
+namespace impala {
+class BloomFilter;
+} // namespace impala
+
+// Need this forward declaration since we make bloom_filter_test_util::BfUnion() a friend
+// function.
+namespace bloom_filter_test_util {
+void BfUnion(const impala::BloomFilter& x, const impala::BloomFilter& y,
+    int64_t directory_size, bool* success, impala::BloomFilterPB* protobuf,
+    std::string* directory);
+} // namespace bloom_filter_test_util
+
+// Need this forward declaration since we make either::TestData a friend struct.
+namespace either {
+struct TestData;
+} // namespace either
+
 namespace impala {
 
 /// A BloomFilter stores sets of items and offers a query operation indicating whether or
@@ -66,17 +90,21 @@ class BloomFilter {
 
   /// Reset the filter state, allocate/reallocate and initialize the 'directory_'. All
   /// calls to Insert() and Find() should only be done between the calls to Init() and
-  /// Close().Init and Close are safe to call multiple times.
+  /// Close(). Init and Close are safe to call multiple times.
   Status Init(const int log_bufferpool_space);
-  Status Init(const TBloomFilter& thrift);
+  Status Init(const BloomFilterPB& protobuf, const uint8_t* directory_in,
+      size_t directory_in_size);
   void Close();
 
   /// Representation of a filter which allows all elements to pass.
   static constexpr BloomFilter* const ALWAYS_TRUE_FILTER = NULL;
 
-  /// Converts 'filter' to its corresponding Thrift representation. If the first argument
-  /// is NULL, it is interpreted as a complete filter which contains all elements.
-  static void ToThrift(const BloomFilter* filter, TBloomFilter* thrift);
+  /// Converts 'filter' to its corresponding Protobuf representation.
+  /// If the first argument is NULL, it is interpreted as a complete filter which
+  /// contains all elements.
+  /// Also sets a sidecar on 'controller' containing the Bloom filter's directory.
+  static void ToProtobuf(const BloomFilter* filter, kudu::rpc::RpcController* controller,
+      BloomFilterPB* protobuf);
 
   bool AlwaysFalse() const { return always_false_; }
 
@@ -91,8 +119,12 @@ class BloomFilter {
   /// high probabilty) if it is not.
   bool Find(const uint32_t hash) const noexcept;
 
-  /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
-  static void Or(const TBloomFilter& in, TBloomFilter* out);
+  /// This function computes the logical OR of 'directory_in' with 'directory_out'
+  /// and stores the result in 'directory_out'.
+  /// Additional checks are also performed to make sure the related fields of
+  /// 'in' and 'out' are well-defined.
+  static void Or(const BloomFilterPB& in, const uint8_t* directory_in, BloomFilterPB* out,
+      uint8_t* directory_out, size_t directory_size);
 
   /// As more distinct items are inserted into a BloomFilter, the false positive rate
   /// rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter
@@ -119,6 +151,20 @@ class BloomFilter {
     return sizeof(Bucket) * (1LL << std::max(1, log_heap_size - LOG_BUCKET_WORD_BITS));
   }
 
+  /// The following two functions set a sidecar on 'controller' containing the Bloom
+  /// filter's directory. Two interfaces are provided because this function may be called
+  /// in different contexts depending on whether or not the caller has access to an
+  /// instantiated BloomFilter. It is also required that 'rpc_params' is neither an
+  /// always false nor an always true Bloom filter when calling this function. Moreover,
+  /// since we directly pass the reference to Bloom filter's directory when instantiating
+  /// the corresponding RpcSidecar, we have to make sure that 'directory' is alive until
+  /// the RPC is done.
+  static void AddDirectorySidecar(BloomFilterPB* rpc_params,
+      kudu::rpc::RpcController* controller, const char* directory,
+      unsigned long directory_size);
+  static void AddDirectorySidecar(BloomFilterPB* rpc_params,
+      kudu::rpc::RpcController* controller, const string& directory);
+
  private:
   // always_false_ is true when the bloom filter hasn't had any elements inserted.
   bool always_false_ = true;
@@ -182,8 +228,8 @@ class BloomFilter {
     return 1uLL << (log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
   }
 
-  /// Serializes this filter as Thrift.
-  void ToThrift(TBloomFilter* thrift) const;
+  /// Serializes this filter as Protobuf.
+  void ToProtobuf(BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const;
 
 /// Some constants used in hashing. #defined for efficiency reasons.
 #define IMPALA_BLOOM_HASH_CONSTANTS                                             \
@@ -196,6 +242,23 @@ class BloomFilter {
       __attribute__((aligned(32))) = {IMPALA_BLOOM_HASH_CONSTANTS};
 
   DISALLOW_COPY_AND_ASSIGN(BloomFilter);
+
+  /// List 'BloomFilterTest_Protobuf_Test' as a friend class to run the backend
+  /// test in 'bloom-filter-test.cc' since it has to access the private field of
+  /// 'directory_' in BloomFilter.
+  friend class BloomFilterTest_Protobuf_Test;
+
+  /// List 'bloom_filter_test_util::BfUnion()' as a friend function to run the backend
+  /// test in 'bloom-filter-test.cc' since it has to access the private field of
+  /// 'directory_' in BloomFilter.
+  friend void bloom_filter_test_util::BfUnion(const impala::BloomFilter& x,
+      const impala::BloomFilter& y, int64_t directory_size, bool* success,
+      impala::BloomFilterPB* protobuf, std::string* directory);
+
+  /// List 'either::Test' as a friend struct to run the benchmark in
+  /// 'bloom-filter-benchmark.cc' since it has to access the private field of
+  /// 'directory_' in BloomFilter.
+  friend struct either::TestData;
 };
 
 // To set 8 bits in an 32-byte Bloom filter, we set one bit in each 32-bit uint32_t. This
diff --git a/be/src/util/min-max-filter-test.cc b/be/src/util/min-max-filter-test.cc
index a75ca1a..a5c0667 100644
--- a/be/src/util/min-max-filter-test.cc
+++ b/be/src/util/min-max-filter-test.cc
@@ -18,6 +18,7 @@
 #include "testutil/gtest-util.h"
 #include "util/min-max-filter.h"
 
+#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/decimal-value.h"
 #include "runtime/decimal-value.inline.h"
 #include "runtime/string-value.inline.h"
@@ -50,15 +51,15 @@ TEST(MinMaxFilterTest, TestBoolMinMaxFilter) {
   EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMax()), b1);
 
   // Check the behavior of Or.
-  TMinMaxFilter tFilter1;
-  tFilter1.min.__set_bool_val(false);
-  tFilter1.max.__set_bool_val(true);
-  TMinMaxFilter tFilter2;
-  tFilter2.min.__set_bool_val(false);
-  tFilter2.max.__set_bool_val(false);
-  MinMaxFilter::Or(tFilter1, &tFilter2, ColumnType(PrimitiveType::TYPE_BOOLEAN));
-  EXPECT_FALSE(tFilter2.min.bool_val);
-  EXPECT_TRUE(tFilter2.max.bool_val);
+  MinMaxFilterPB pFilter1;
+  pFilter1.mutable_min()->set_bool_val(false);
+  pFilter1.mutable_max()->set_bool_val(true);
+  MinMaxFilterPB pFilter2;
+  pFilter2.mutable_min()->set_bool_val(false);
+  pFilter2.mutable_max()->set_bool_val(false);
+  MinMaxFilter::Or(pFilter1, &pFilter2, ColumnType(PrimitiveType::TYPE_BOOLEAN));
+  EXPECT_FALSE(pFilter2.min().bool_val());
+  EXPECT_TRUE(pFilter2.max().bool_val());
 
   filter->Close();
 }
@@ -84,14 +85,14 @@ TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
   // Test the behavior of an empty filter.
   EXPECT_TRUE(int_filter->AlwaysFalse());
   EXPECT_FALSE(int_filter->AlwaysTrue());
-  TMinMaxFilter tFilter;
-  int_filter->ToThrift(&tFilter);
-  EXPECT_TRUE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_FALSE(tFilter.min.__isset.int_val);
-  EXPECT_FALSE(tFilter.max.__isset.int_val);
+  MinMaxFilterPB pFilter;
+  int_filter->ToProtobuf(&pFilter);
+  EXPECT_TRUE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_FALSE(pFilter.min().has_int_val());
+  EXPECT_FALSE(pFilter.max().has_int_val());
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, int_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -109,25 +110,25 @@ TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
   int_filter->Insert(&i4);
   CheckIntVals(int_filter, i4, i2);
 
-  int_filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_EQ(tFilter.min.int_val, i4);
-  EXPECT_EQ(tFilter.max.int_val, i2);
+  int_filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_EQ(pFilter.min().int_val(), i4);
+  EXPECT_EQ(pFilter.max().int_val(), i2);
   MinMaxFilter* int_filter2 =
-      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, int_type, &obj_pool, &mem_tracker);
   CheckIntVals(int_filter2, i4, i2);
 
   // Check the behavior of Or.
-  TMinMaxFilter tFilter1;
-  tFilter1.min.__set_int_val(4);
-  tFilter1.max.__set_int_val(8);
-  TMinMaxFilter tFilter2;
-  tFilter2.min.__set_int_val(2);
-  tFilter2.max.__set_int_val(7);
-  MinMaxFilter::Or(tFilter1, &tFilter2, int_type);
-  EXPECT_EQ(tFilter2.min.int_val, 2);
-  EXPECT_EQ(tFilter2.max.int_val, 8);
+  MinMaxFilterPB pFilter1;
+  pFilter1.mutable_min()->set_int_val(4);
+  pFilter1.mutable_max()->set_int_val(8);
+  MinMaxFilterPB pFilter2;
+  pFilter2.mutable_min()->set_int_val(2);
+  pFilter2.mutable_max()->set_int_val(7);
+  MinMaxFilter::Or(pFilter1, &pFilter2, int_type);
+  EXPECT_EQ(pFilter2.min().int_val(), 2);
+  EXPECT_EQ(pFilter2.max().int_val(), 8);
 
   int_filter->Close();
   empty_filter->Close();
@@ -162,13 +163,13 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   filter->MaterializeValues();
   EXPECT_TRUE(filter->AlwaysFalse());
   EXPECT_FALSE(filter->AlwaysTrue());
-  TMinMaxFilter tFilter;
-  filter->ToThrift(&tFilter);
-  EXPECT_TRUE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
+  MinMaxFilterPB pFilter;
+  filter->ToProtobuf(&pFilter);
+  EXPECT_TRUE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
 
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -191,11 +192,11 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   filter->MaterializeValues();
   CheckStringVals(filter, c, d);
 
-  filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_EQ(tFilter.min.string_val, c);
-  EXPECT_EQ(tFilter.max.string_val, d);
+  filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_EQ(pFilter.min().string_val(), c);
+  EXPECT_EQ(pFilter.max().string_val(), d);
 
   // Test that strings longer than 1024 are truncated.
   string b1030(1030, 'b');
@@ -227,14 +228,14 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   for (int i = trailIndex; i < 1024; ++i) truncTrailMaxChar[i] = 0;
   CheckStringVals(filter, b1024, truncTrailMaxChar);
 
-  filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_EQ(tFilter.min.string_val, b1024);
-  EXPECT_EQ(tFilter.max.string_val, truncTrailMaxChar);
+  filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_EQ(pFilter.min().string_val(), b1024);
+  EXPECT_EQ(pFilter.max().string_val(), truncTrailMaxChar);
 
   MinMaxFilter* filter2 =
-      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
   CheckStringVals(filter2, b1024, truncTrailMaxChar);
 
   // Check that if the entire string is the max char and therefore after truncating for
@@ -249,12 +250,12 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   filter->Insert(&cVal);
   EXPECT_TRUE(filter->AlwaysTrue());
 
-  filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_TRUE(tFilter.always_true);
+  filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_TRUE(pFilter.always_true());
 
   MinMaxFilter* always_true_filter =
-      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
   EXPECT_FALSE(always_true_filter->AlwaysFalse());
   EXPECT_TRUE(always_true_filter->AlwaysTrue());
 
@@ -276,20 +277,20 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   limit_filter->MaterializeValues();
   EXPECT_TRUE(limit_filter->AlwaysTrue());
 
-  limit_filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_TRUE(tFilter.always_true);
+  limit_filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_TRUE(pFilter.always_true());
 
   // Check the behavior of Or.
-  TMinMaxFilter tFilter1;
-  tFilter1.min.__set_string_val("a");
-  tFilter1.max.__set_string_val("d");
-  TMinMaxFilter tFilter2;
-  tFilter2.min.__set_string_val("b");
-  tFilter2.max.__set_string_val("e");
-  MinMaxFilter::Or(tFilter1, &tFilter2, string_type);
-  EXPECT_EQ(tFilter2.min.string_val, "a");
-  EXPECT_EQ(tFilter2.max.string_val, "e");
+  MinMaxFilterPB pFilter1;
+  pFilter1.mutable_min()->set_string_val("a");
+  pFilter1.mutable_max()->set_string_val("d");
+  MinMaxFilterPB pFilter2;
+  pFilter2.mutable_min()->set_string_val("b");
+  pFilter2.mutable_max()->set_string_val("e");
+  MinMaxFilter::Or(pFilter1, &pFilter2, string_type);
+  EXPECT_EQ(pFilter2.min().string_val(), "a");
+  EXPECT_EQ(pFilter2.max().string_val(), "e");
 
   filter->Close();
   empty_filter->Close();
@@ -317,14 +318,14 @@ TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
   // Test the behavior of an empty filter.
   EXPECT_TRUE(filter->AlwaysFalse());
   EXPECT_FALSE(filter->AlwaysTrue());
-  TMinMaxFilter tFilter;
-  filter->ToThrift(&tFilter);
-  EXPECT_TRUE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_FALSE(tFilter.min.__isset.timestamp_val);
-  EXPECT_FALSE(tFilter.max.__isset.timestamp_val);
+  MinMaxFilterPB pFilter;
+  filter->ToProtobuf(&pFilter);
+  EXPECT_TRUE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_FALSE(pFilter.min().has_timestamp_val());
+  EXPECT_FALSE(pFilter.max().has_timestamp_val());
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, timestamp_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -342,25 +343,25 @@ TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
   filter->Insert(&t4);
   CheckTimestampVals(filter, t2, t3);
 
-  filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.min), t2);
-  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.max), t3);
+  filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter.min()), t2);
+  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter.max()), t3);
   MinMaxFilter* filter2 =
-      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, timestamp_type, &obj_pool, &mem_tracker);
   CheckTimestampVals(filter2, t2, t3);
 
   // Check the behavior of Or.
-  TMinMaxFilter tFilter1;
-  t2.ToTColumnValue(&tFilter1.min);
-  t4.ToTColumnValue(&tFilter1.max);
-  TMinMaxFilter tFilter2;
-  t1.ToTColumnValue(&tFilter2.min);
-  t3.ToTColumnValue(&tFilter2.max);
-  MinMaxFilter::Or(tFilter1, &tFilter2, timestamp_type);
-  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.min), t2);
-  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.max), t3);
+  MinMaxFilterPB pFilter1;
+  t2.ToColumnValuePB(pFilter1.mutable_min());
+  t4.ToColumnValuePB(pFilter1.mutable_max());
+  MinMaxFilterPB pFilter2;
+  t1.ToColumnValuePB(pFilter2.mutable_min());
+  t3.ToColumnValuePB(pFilter2.mutable_max());
+  MinMaxFilter::Or(pFilter1, &pFilter2, timestamp_type);
+  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter2.min()), t2);
+  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter2.max()), t3);
 
   filter->Close();
   empty_filter->Close();
@@ -391,16 +392,16 @@ void CheckDecimalVals(
 }
 
 void CheckDecimalEmptyFilter(MinMaxFilter* filter, const ColumnType& column_type,
-    TMinMaxFilter* tFilter, ObjectPool* obj_pool, MemTracker* mem_tracker) {
+    MinMaxFilterPB* pFilter, ObjectPool* obj_pool, MemTracker* mem_tracker) {
   EXPECT_TRUE(filter->AlwaysFalse());
   EXPECT_FALSE(filter->AlwaysTrue());
-  filter->ToThrift(tFilter);
-  EXPECT_TRUE(tFilter->always_false);
-  EXPECT_FALSE(tFilter->always_true);
-  EXPECT_FALSE(tFilter->min.__isset.decimal_val);
-  EXPECT_FALSE(tFilter->max.__isset.decimal_val);
+  filter->ToProtobuf(pFilter);
+  EXPECT_TRUE(pFilter->always_false());
+  EXPECT_FALSE(pFilter->always_true());
+  EXPECT_FALSE(pFilter->min().has_decimal_val());
+  EXPECT_FALSE(pFilter->max().has_decimal_val());
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(*tFilter, column_type, obj_pool, mem_tracker);
+      MinMaxFilter::Create(*pFilter, column_type, obj_pool, mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
   empty_filter->Close();
@@ -427,30 +428,30 @@ void CheckDecimalEmptyFilter(MinMaxFilter* filter, const ColumnType& column_type
     CheckDecimalVals(filter##SIZE, d3##SIZE, d2##SIZE);                              \
   } while (false)
 
-#define DECIMAL_CHECK_THRIFT(SIZE)                                                  \
-  do {                                                                              \
-    filter##SIZE->ToThrift(&tFilter##SIZE);                                         \
-    EXPECT_FALSE(tFilter##SIZE.always_false);                                       \
-    EXPECT_FALSE(tFilter##SIZE.always_true);                                        \
-    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter##SIZE.min), d3##SIZE); \
-    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter##SIZE.max), d2##SIZE); \
-    MinMaxFilter* filter##SIZE##2 = MinMaxFilter::Create(                           \
-        tFilter##SIZE, decimal##SIZE##_type, &obj_pool, &mem_tracker);              \
-    CheckDecimalVals(filter##SIZE##2, d3##SIZE, d2##SIZE);                          \
-    filter##SIZE##2->Close();                                                       \
+#define DECIMAL_CHECK_PROTOBUF(SIZE)                                                   \
+  do {                                                                                 \
+    filter##SIZE->ToProtobuf(&pFilter##SIZE);                                          \
+    EXPECT_FALSE(pFilter##SIZE.always_false());                                        \
+    EXPECT_FALSE(pFilter##SIZE.always_true());                                         \
+    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter##SIZE.min()), d3##SIZE); \
+    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter##SIZE.max()), d2##SIZE); \
+    MinMaxFilter* filter##SIZE##2 = MinMaxFilter::Create(                              \
+        pFilter##SIZE, decimal##SIZE##_type, &obj_pool, &mem_tracker);                 \
+    CheckDecimalVals(filter##SIZE##2, d3##SIZE, d2##SIZE);                             \
+    filter##SIZE##2->Close();                                                          \
   } while (false)
 
-#define DECIMAL_CHECK_OR(SIZE)                                                       \
-  do {                                                                               \
-    TMinMaxFilter tFilter1##SIZE;                                                    \
-    d3##SIZE.ToTColumnValue(&tFilter1##SIZE.min);                                    \
-    d2##SIZE.ToTColumnValue(&tFilter1##SIZE.max);                                    \
-    TMinMaxFilter tFilter2##SIZE;                                                    \
-    d1##SIZE.ToTColumnValue(&tFilter2##SIZE.min);                                    \
-    d1##SIZE.ToTColumnValue(&tFilter2##SIZE.max);                                    \
-    MinMaxFilter::Or(tFilter1##SIZE, &tFilter2##SIZE, decimal##SIZE##_type);         \
-    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter2##SIZE.min), d3##SIZE); \
-    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter2##SIZE.max), d2##SIZE); \
+#define DECIMAL_CHECK_OR(SIZE)                                                          \
+  do {                                                                                  \
+    MinMaxFilterPB pFilter1##SIZE;                                                      \
+    d3##SIZE.ToColumnValuePB(pFilter1##SIZE.mutable_min());                             \
+    d2##SIZE.ToColumnValuePB(pFilter1##SIZE.mutable_max());                             \
+    MinMaxFilterPB pFilter2##SIZE;                                                      \
+    d1##SIZE.ToColumnValuePB(pFilter2##SIZE.mutable_min());                             \
+    d1##SIZE.ToColumnValuePB(pFilter2##SIZE.mutable_max());                             \
+    MinMaxFilter::Or(pFilter1##SIZE, &pFilter2##SIZE, decimal##SIZE##_type);            \
+    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter2##SIZE.min()), d3##SIZE); \
+    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter2##SIZE.max()), d2##SIZE); \
   } while (false)
 
 // Tests that a DecimalMinMaxFilter returns the expected min/max after having values
@@ -475,13 +476,13 @@ TEST(MinMaxFilterTest, TestDecimalMinMaxFilter) {
   MinMaxFilter* filter8 = MinMaxFilter::Create(decimal8_type, &obj_pool, &mem_tracker);
   MinMaxFilter* filter16 = MinMaxFilter::Create(decimal16_type, &obj_pool, &mem_tracker);
 
-  // Create thrift minmax filters
-  TMinMaxFilter tFilter4, tFilter8, tFilter16;
+  // Create protobuf minmax filters
+  MinMaxFilterPB pFilter4, pFilter8, pFilter16;
 
   // Test the behavior of an empty filter.
-  CheckDecimalEmptyFilter(filter4, decimal4_type, &tFilter4, &obj_pool, &mem_tracker);
-  CheckDecimalEmptyFilter(filter8, decimal8_type, &tFilter8, &obj_pool, &mem_tracker);
-  CheckDecimalEmptyFilter(filter16, decimal16_type, &tFilter16, &obj_pool, &mem_tracker);
+  CheckDecimalEmptyFilter(filter4, decimal4_type, &pFilter4, &obj_pool, &mem_tracker);
+  CheckDecimalEmptyFilter(filter8, decimal8_type, &pFilter8, &obj_pool, &mem_tracker);
+  CheckDecimalEmptyFilter(filter16, decimal16_type, &pFilter16, &obj_pool, &mem_tracker);
 
   // Insert and check
   DECIMAL_INSERT_AND_CHECK(4, 9, 5, 2345.67891, 3456.78912, 1234.56789);
@@ -490,10 +491,10 @@ TEST(MinMaxFilterTest, TestDecimalMinMaxFilter) {
   DECIMAL_INSERT_AND_CHECK(16, 38, 19, 2345678912345678912.2345678912345678912,
       3456789123456789123.3456789123456789123, 1234567891234567891.1234567891234567891);
 
-  // Thrift check
-  DECIMAL_CHECK_THRIFT(4);
-  DECIMAL_CHECK_THRIFT(8);
-  DECIMAL_CHECK_THRIFT(16);
+  // Protobuf check
+  DECIMAL_CHECK_PROTOBUF(4);
+  DECIMAL_CHECK_PROTOBUF(8);
+  DECIMAL_CHECK_PROTOBUF(16);
 
   // Check the behavior of Or.
   DECIMAL_CHECK_OR(4);
diff --git a/be/src/util/min-max-filter.cc b/be/src/util/min-max-filter.cc
index c6da060..dd9b351 100644
--- a/be/src/util/min-max-filter.cc
+++ b/be/src/util/min-max-filter.cc
@@ -82,61 +82,55 @@ IRFunction::Type MinMaxFilter::GetInsertIRFunctionType(ColumnType column_type) {
   }
 }
 
-#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE, THRIFT_TYPE, PRIMITIVE_TYPE)  \
-  const char* NAME##MinMaxFilter::LLVM_CLASS_NAME =                            \
-      "class.impala::" #NAME "MinMaxFilter";                                   \
-  NAME##MinMaxFilter::NAME##MinMaxFilter(const TMinMaxFilter& thrift) {        \
-    DCHECK(!thrift.always_true);                                               \
-    if (thrift.always_false) {                                                 \
-      min_ = numeric_limits<TYPE>::max();                                      \
-      max_ = numeric_limits<TYPE>::lowest();                                   \
-    } else {                                                                   \
-      DCHECK(thrift.__isset.min);                                              \
-      DCHECK(thrift.__isset.max);                                              \
-      DCHECK(thrift.min.__isset.THRIFT_TYPE##_val);                            \
-      DCHECK(thrift.max.__isset.THRIFT_TYPE##_val);                            \
-      min_ = thrift.min.THRIFT_TYPE##_val;                                     \
-      max_ = thrift.max.THRIFT_TYPE##_val;                                     \
-    }                                                                          \
-  }                                                                            \
-  PrimitiveType NAME##MinMaxFilter::type() {                                   \
-    return PrimitiveType::TYPE_##PRIMITIVE_TYPE;                               \
-  }                                                                            \
-  void NAME##MinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {             \
-    if (!AlwaysFalse()) {                                                      \
-      thrift->min.__set_##THRIFT_TYPE##_val(min_);                             \
-      thrift->__isset.min = true;                                              \
-      thrift->max.__set_##THRIFT_TYPE##_val(max_);                             \
-      thrift->__isset.max = true;                                              \
-    }                                                                          \
-    thrift->__set_always_false(AlwaysFalse());                                 \
-    thrift->__set_always_true(false);                                          \
-  }                                                                            \
-  string NAME##MinMaxFilter::DebugString() const {                             \
-    stringstream out;                                                          \
-    out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_            \
-        << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")";     \
-    return out.str();                                                          \
-  }                                                                            \
-  void NAME##MinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {   \
-    if (out->always_false) {                                                   \
-      out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val);            \
-      out->__isset.min = true;                                                 \
-      out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val);            \
-      out->__isset.max = true;                                                 \
-      out->__set_always_false(false);                                          \
-    } else {                                                                   \
-      out->min.__set_##THRIFT_TYPE##_val(                                      \
-          std::min(in.min.THRIFT_TYPE##_val, out->min.THRIFT_TYPE##_val));     \
-      out->max.__set_##THRIFT_TYPE##_val(                                      \
-          std::max(in.max.THRIFT_TYPE##_val, out->max.THRIFT_TYPE##_val));     \
-    }                                                                          \
-  }                                                                            \
-  void NAME##MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) { \
-    out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val);              \
-    out->__isset.min = true;                                                   \
-    out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val);              \
-    out->__isset.max = true;                                                   \
+#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE, PROTOBUF_TYPE, PRIMITIVE_TYPE)        \
+  const char* NAME##MinMaxFilter::LLVM_CLASS_NAME =                                    \
+      "class.impala::" #NAME "MinMaxFilter";                                           \
+  NAME##MinMaxFilter::NAME##MinMaxFilter(const MinMaxFilterPB& protobuf) {             \
+    DCHECK(!protobuf.always_true());                                                   \
+    if (protobuf.always_false()) {                                                     \
+      min_ = numeric_limits<TYPE>::max();                                              \
+      max_ = numeric_limits<TYPE>::lowest();                                           \
+    } else {                                                                           \
+      DCHECK(protobuf.has_min());                                                      \
+      DCHECK(protobuf.has_max());                                                      \
+      DCHECK(protobuf.min().has_##PROTOBUF_TYPE##_val());                              \
+      DCHECK(protobuf.max().has_##PROTOBUF_TYPE##_val());                              \
+      min_ = protobuf.min().PROTOBUF_TYPE##_val();                                     \
+      max_ = protobuf.max().PROTOBUF_TYPE##_val();                                     \
+    }                                                                                  \
+  }                                                                                    \
+  PrimitiveType NAME##MinMaxFilter::type() {                                           \
+    return PrimitiveType::TYPE_##PRIMITIVE_TYPE;                                       \
+  }                                                                                    \
+  void NAME##MinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {                \
+    if (!AlwaysFalse()) {                                                              \
+      protobuf->mutable_min()->set_##PROTOBUF_TYPE##_val(min_);                        \
+      protobuf->mutable_max()->set_##PROTOBUF_TYPE##_val(max_);                        \
+    }                                                                                  \
+    protobuf->set_always_false(AlwaysFalse());                                         \
+    protobuf->set_always_true(false);                                                  \
+  }                                                                                    \
+  string NAME##MinMaxFilter::DebugString() const {                                     \
+    stringstream out;                                                                  \
+    out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_                    \
+        << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")";             \
+    return out.str();                                                                  \
+  }                                                                                    \
+  void NAME##MinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {         \
+    if (out->always_false()) {                                                         \
+      out->mutable_min()->set_bool_val(in.min().PROTOBUF_TYPE##_val());                \
+      out->mutable_max()->set_bool_val(in.max().PROTOBUF_TYPE##_val());                \
+      out->set_always_false(false);                                                    \
+    } else {                                                                           \
+      out->mutable_min()->set_##PROTOBUF_TYPE##_val(                                   \
+          std::min(in.min().PROTOBUF_TYPE##_val(), out->min().PROTOBUF_TYPE##_val())); \
+      out->mutable_max()->set_##PROTOBUF_TYPE##_val(                                   \
+          std::max(in.max().PROTOBUF_TYPE##_val(), out->max().PROTOBUF_TYPE##_val())); \
+    }                                                                                  \
+  }                                                                                    \
+  void NAME##MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {       \
+    out->mutable_min()->set_##PROTOBUF_TYPE##_val(in.min().PROTOBUF_TYPE##_val());     \
+    out->mutable_max()->set_##PROTOBUF_TYPE##_val(in.max().PROTOBUF_TYPE##_val());     \
   }
 
 NUMERIC_MIN_MAX_FILTER_FUNCS(Bool, bool, bool, BOOLEAN);
@@ -222,19 +216,17 @@ const char* StringMinMaxFilter::LLVM_CLASS_NAME = "class.impala::StringMinMaxFil
 const int StringMinMaxFilter::MAX_BOUND_LENGTH = 1024;
 
 StringMinMaxFilter::StringMinMaxFilter(
-    const TMinMaxFilter& thrift, MemTracker* mem_tracker)
-  : mem_pool_(mem_tracker),
-    min_buffer_(&mem_pool_),
-    max_buffer_(&mem_pool_) {
-  always_false_ = thrift.always_false;
-  always_true_ = thrift.always_true;
+    const MinMaxFilterPB& protobuf, MemTracker* mem_tracker)
+  : mem_pool_(mem_tracker), min_buffer_(&mem_pool_), max_buffer_(&mem_pool_) {
+  always_false_ = protobuf.always_false();
+  always_true_ = protobuf.always_true();
   if (!always_true_ && !always_false_) {
-    DCHECK(thrift.__isset.min);
-    DCHECK(thrift.__isset.max);
-    DCHECK(thrift.min.__isset.string_val);
-    DCHECK(thrift.max.__isset.string_val);
-    min_ = StringValue(thrift.min.string_val);
-    max_ = StringValue(thrift.max.string_val);
+    DCHECK(protobuf.has_min());
+    DCHECK(protobuf.has_max());
+    DCHECK(protobuf.min().has_string_val());
+    DCHECK(protobuf.max().has_string_val());
+    min_ = StringValue(protobuf.min().string_val());
+    max_ = StringValue(protobuf.max().string_val());
     CopyToBuffer(&min_buffer_, &min_, min_.len);
     CopyToBuffer(&max_buffer_, &max_, max_.len);
   }
@@ -277,17 +269,13 @@ void StringMinMaxFilter::MaterializeValues() {
   }
 }
 
-void StringMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+void StringMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
   if (!always_true_ && !always_false_) {
-    thrift->min.string_val.assign(static_cast<char*>(min_.ptr), min_.len);
-    thrift->min.__isset.string_val = true;
-    thrift->__isset.min = true;
-    thrift->max.string_val.assign(static_cast<char*>(max_.ptr), max_.len);
-    thrift->max.__isset.string_val = true;
-    thrift->__isset.max = true;
+    protobuf->mutable_min()->set_string_val(static_cast<char*>(min_.ptr), min_.len);
+    protobuf->mutable_max()->set_string_val(static_cast<char*>(max_.ptr), max_.len);
   }
-  thrift->__set_always_false(always_false_);
-  thrift->__set_always_true(always_true_);
+  protobuf->set_always_false(always_false_);
+  protobuf->set_always_true(always_true_);
 }
 
 string StringMinMaxFilter::DebugString() const {
@@ -298,28 +286,26 @@ string StringMinMaxFilter::DebugString() const {
   return out.str();
 }
 
-void StringMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  if (out->always_false) {
-    out->min.__set_string_val(in.min.string_val);
-    out->__isset.min = true;
-    out->max.__set_string_val(in.max.string_val);
-    out->__isset.max = true;
-    out->__set_always_false(false);
+void StringMinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  if (out->always_false()) {
+    out->mutable_min()->set_string_val(in.min().string_val());
+    out->mutable_max()->set_string_val(in.max().string_val());
+    out->set_always_false(false);
   } else {
-    StringValue in_min_val = StringValue(in.min.string_val);
-    StringValue out_min_val = StringValue(out->min.string_val);
-    if (in_min_val < out_min_val) out->min.__set_string_val(in.min.string_val);
-    StringValue in_max_val = StringValue(in.max.string_val);
-    StringValue out_max_val = StringValue(out->max.string_val);
-    if (in_max_val > out_max_val) out->max.__set_string_val(in.max.string_val);
+    StringValue in_min_val = StringValue(in.min().string_val());
+    StringValue out_min_val = StringValue(out->min().string_val());
+    if (in_min_val < out_min_val)
+      out->mutable_min()->set_string_val(in.min().string_val());
+    StringValue in_max_val = StringValue(in.max().string_val());
+    StringValue out_max_val = StringValue(out->max().string_val());
+    if (in_max_val > out_max_val)
+      out->mutable_max()->set_string_val(in.max().string_val());
   }
 }
 
-void StringMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  out->min.__set_string_val(in.min.string_val);
-  out->__isset.min = true;
-  out->max.__set_string_val(in.max.string_val);
-  out->__isset.max = true;
+void StringMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  out->mutable_min()->set_string_val(in.min().string_val());
+  out->mutable_max()->set_string_val(in.max().string_val());
 }
 
 void StringMinMaxFilter::CopyToBuffer(
@@ -349,13 +335,13 @@ void StringMinMaxFilter::SetAlwaysTrue() {
 const char* TimestampMinMaxFilter::LLVM_CLASS_NAME =
     "class.impala::TimestampMinMaxFilter";
 
-TimestampMinMaxFilter::TimestampMinMaxFilter(const TMinMaxFilter& thrift) {
-  always_false_ = thrift.always_false;
+TimestampMinMaxFilter::TimestampMinMaxFilter(const MinMaxFilterPB& protobuf) {
+  always_false_ = protobuf.always_false();
   if (!always_false_) {
-    DCHECK(thrift.min.__isset.timestamp_val);
-    DCHECK(thrift.max.__isset.timestamp_val);
-    min_ = TimestampValue::FromTColumnValue(thrift.min);
-    max_ = TimestampValue::FromTColumnValue(thrift.max);
+    DCHECK(protobuf.min().has_timestamp_val());
+    DCHECK(protobuf.max().has_timestamp_val());
+    min_ = TimestampValue::FromColumnValuePB(protobuf.min());
+    max_ = TimestampValue::FromColumnValuePB(protobuf.max());
   }
 }
 
@@ -363,15 +349,13 @@ PrimitiveType TimestampMinMaxFilter::type() {
   return PrimitiveType::TYPE_TIMESTAMP;
 }
 
-void TimestampMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+void TimestampMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
   if (!always_false_) {
-    min_.ToTColumnValue(&thrift->min);
-    thrift->__isset.min = true;
-    max_.ToTColumnValue(&thrift->max);
-    thrift->__isset.max = true;
+    min_.ToColumnValuePB(protobuf->mutable_min());
+    max_.ToColumnValuePB(protobuf->mutable_max());
   }
-  thrift->__set_always_false(always_false_);
-  thrift->__set_always_true(false);
+  protobuf->set_always_false(always_false_);
+  protobuf->set_always_true(false);
 }
 
 string TimestampMinMaxFilter::DebugString() const {
@@ -381,45 +365,46 @@ string TimestampMinMaxFilter::DebugString() const {
   return out.str();
 }
 
-void TimestampMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  if (out->always_false) {
-    out->min.__set_timestamp_val(in.min.timestamp_val);
-    out->__isset.min = true;
-    out->max.__set_timestamp_val(in.max.timestamp_val);
-    out->__isset.max = true;
-    out->__set_always_false(false);
+void TimestampMinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  if (out->always_false()) {
+    out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
+    out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
+    out->set_always_false(false);
   } else {
-    TimestampValue in_min_val = TimestampValue::FromTColumnValue(in.min);
-    TimestampValue out_min_val = TimestampValue::FromTColumnValue(out->min);
-    if (in_min_val < out_min_val) out->min.__set_timestamp_val(in.min.timestamp_val);
-    TimestampValue in_max_val = TimestampValue::FromTColumnValue(in.max);
-    TimestampValue out_max_val = TimestampValue::FromTColumnValue(out->max);
-    if (in_max_val > out_max_val) out->max.__set_timestamp_val(in.max.timestamp_val);
+    TimestampValue in_min_val = TimestampValue::FromColumnValuePB(in.min());
+    TimestampValue out_min_val = TimestampValue::FromColumnValuePB(out->min());
+    if (in_min_val < out_min_val) {
+      out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
+    }
+    TimestampValue in_max_val = TimestampValue::FromColumnValuePB(in.max());
+    TimestampValue out_max_val = TimestampValue::FromColumnValuePB(out->max());
+    if (in_max_val > out_max_val) {
+      out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
+    }
   }
 }
 
-void TimestampMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  out->min.__set_timestamp_val(in.min.timestamp_val);
-  out->__isset.min = true;
-  out->max.__set_timestamp_val(in.max.timestamp_val);
-  out->__isset.max = true;
+void TimestampMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
+  out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
 }
 
 // DECIMAL
 const char* DecimalMinMaxFilter::LLVM_CLASS_NAME = "class.impala::DecimalMinMaxFilter";
-#define DECIMAL_SET_MINMAX(SIZE)                                       \
-  do {                                                                 \
-    DCHECK(thrift.min.__isset.decimal_val);                            \
-    DCHECK(thrift.max.__isset.decimal_val);                            \
-    min##SIZE##_ = Decimal##SIZE##Value::FromTColumnValue(thrift.min); \
-    max##SIZE##_ = Decimal##SIZE##Value::FromTColumnValue(thrift.max); \
+#define DECIMAL_SET_MINMAX(SIZE)                                            \
+  do {                                                                      \
+    DCHECK(protobuf.min().has_decimal_val());                               \
+    DCHECK(protobuf.max().has_decimal_val());                               \
+    min##SIZE##_ = Decimal##SIZE##Value::FromColumnValuePB(protobuf.min()); \
+    max##SIZE##_ = Decimal##SIZE##Value::FromColumnValuePB(protobuf.max()); \
   } while (false)
 
 // Construct the Decimal min-max filter when the min-max filter information
 // comes in through thrift.  This can get called in coordinator, after the filter
 // is sent by executor
-DecimalMinMaxFilter::DecimalMinMaxFilter(const TMinMaxFilter& thrift, int precision)
-  : size_(ColumnType::GetDecimalByteSize(precision)), always_false_(thrift.always_false) {
+DecimalMinMaxFilter::DecimalMinMaxFilter(const MinMaxFilterPB& protobuf, int precision)
+  : size_(ColumnType::GetDecimalByteSize(precision)),
+    always_false_(protobuf.always_false()) {
   if (!always_false_) {
     switch (size_) {
       case DECIMAL_SIZE_4BYTE:
@@ -441,34 +426,32 @@ PrimitiveType DecimalMinMaxFilter::type() {
   return PrimitiveType::TYPE_DECIMAL;
 }
 
-#define DECIMAL_TO_THRIFT(SIZE)                \
-  do {                                         \
-    min##SIZE##_.ToTColumnValue(&thrift->min); \
-    max##SIZE##_.ToTColumnValue(&thrift->max); \
+#define DECIMAL_TO_PROTOBUF(SIZE)                          \
+  do {                                                     \
+    min##SIZE##_.ToColumnValuePB(protobuf->mutable_min()); \
+    max##SIZE##_.ToColumnValuePB(protobuf->mutable_max()); \
   } while (false)
 
 // Construct a thrift min-max filter.  Will be called by the executor
 // to be sent to the coordinator
-void DecimalMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+void DecimalMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
   if (!always_false_) {
     switch (size_) {
       case DECIMAL_SIZE_4BYTE:
-        DECIMAL_TO_THRIFT(4);
+        DECIMAL_TO_PROTOBUF(4);
         break;
       case DECIMAL_SIZE_8BYTE:
-        DECIMAL_TO_THRIFT(8);
+        DECIMAL_TO_PROTOBUF(8);
         break;
       case DECIMAL_SIZE_16BYTE:
-        DECIMAL_TO_THRIFT(16);
+        DECIMAL_TO_PROTOBUF(16);
         break;
       default:
         DCHECK(false) << "DecimalMinMaxFilter: Unknown decimal byte size: " << size_;
     }
-    thrift->__isset.min = true;
-    thrift->__isset.max = true;
   }
-  thrift->__set_always_false(always_false_);
-  thrift->__set_always_true(false);
+  protobuf->set_always_false(always_false_);
+  protobuf->set_always_true(false);
 }
 
 void DecimalMinMaxFilter::Insert(void* val) {
@@ -514,25 +497,24 @@ string DecimalMinMaxFilter::DebugString() const {
   return out.str();
 }
 
-#define DECIMAL_OR(SIZE)                                    \
-  do {                                                      \
-    if (Decimal##SIZE##Value::FromTColumnValue(in.min)      \
-        < Decimal##SIZE##Value::FromTColumnValue(out->min)) \
-      out->min.__set_decimal_val(in.min.decimal_val);       \
-    if (Decimal##SIZE##Value::FromTColumnValue(in.max)      \
-        > Decimal##SIZE##Value::FromTColumnValue(out->max)) \
-      out->max.__set_decimal_val(in.max.decimal_val);       \
+#define DECIMAL_OR(SIZE)                                           \
+  do {                                                             \
+    if (Decimal##SIZE##Value::FromColumnValuePB(in.min())          \
+        < Decimal##SIZE##Value::FromColumnValuePB(out->min()))     \
+      out->mutable_min()->set_decimal_val(in.min().decimal_val()); \
+    if (Decimal##SIZE##Value::FromColumnValuePB(in.max())          \
+        > Decimal##SIZE##Value::FromColumnValuePB(out->max()))     \
+      out->mutable_max()->set_decimal_val(in.max().decimal_val()); \
   } while (false)
 
-void DecimalMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out, int precision) {
-  if (in.always_false) {
+void DecimalMinMaxFilter::Or(
+    const MinMaxFilterPB& in, MinMaxFilterPB* out, int precision) {
+  if (in.always_false()) {
     return;
-  } else if (out->always_false) {
-    out->min.__set_decimal_val(in.min.decimal_val);
-    out->__isset.min = true;
-    out->max.__set_decimal_val(in.max.decimal_val);
-    out->__isset.max = true;
-    out->__set_always_false(false);
+  } else if (out->always_false()) {
+    out->mutable_min()->set_decimal_val(in.min().decimal_val());
+    out->mutable_max()->set_decimal_val(in.max().decimal_val());
+    out->set_always_false(false);
   } else {
     int size = ColumnType::GetDecimalByteSize(precision);
     switch (size) {
@@ -551,11 +533,9 @@ void DecimalMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out, int pr
   }
 }
 
-void DecimalMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  out->min.__set_decimal_val(in.min.decimal_val);
-  out->__isset.min = true;
-  out->max.__set_decimal_val(in.max.decimal_val);
-  out->__isset.max = true;
+void DecimalMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  out->mutable_min()->set_decimal_val(in.min().decimal_val());
+  out->mutable_max()->set_decimal_val(in.max().decimal_val());
 }
 
 // MinMaxFilter
@@ -595,29 +575,29 @@ MinMaxFilter* MinMaxFilter::Create(
   return nullptr;
 }
 
-MinMaxFilter* MinMaxFilter::Create(const TMinMaxFilter& thrift, ColumnType type,
+MinMaxFilter* MinMaxFilter::Create(const MinMaxFilterPB& protobuf, ColumnType type,
     ObjectPool* pool, MemTracker* mem_tracker) {
   switch (type.type) {
     case PrimitiveType::TYPE_BOOLEAN:
-      return pool->Add(new BoolMinMaxFilter(thrift));
+      return pool->Add(new BoolMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_TINYINT:
-      return pool->Add(new TinyIntMinMaxFilter(thrift));
+      return pool->Add(new TinyIntMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_SMALLINT:
-      return pool->Add(new SmallIntMinMaxFilter(thrift));
+      return pool->Add(new SmallIntMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_INT:
-      return pool->Add(new IntMinMaxFilter(thrift));
+      return pool->Add(new IntMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_BIGINT:
-      return pool->Add(new BigIntMinMaxFilter(thrift));
+      return pool->Add(new BigIntMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_FLOAT:
-      return pool->Add(new FloatMinMaxFilter(thrift));
+      return pool->Add(new FloatMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_DOUBLE:
-      return pool->Add(new DoubleMinMaxFilter(thrift));
+      return pool->Add(new DoubleMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_STRING:
-      return pool->Add(new StringMinMaxFilter(thrift, mem_tracker));
+      return pool->Add(new StringMinMaxFilter(protobuf, mem_tracker));
     case PrimitiveType::TYPE_TIMESTAMP:
-      return pool->Add(new TimestampMinMaxFilter(thrift));
+      return pool->Add(new TimestampMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_DECIMAL:
-      return pool->Add(new DecimalMinMaxFilter(thrift, type.precision));
+      return pool->Add(new DecimalMinMaxFilter(protobuf, type.precision));
     default:
       DCHECK(false) << "Unsupported MinMaxFilter type: " << type;
   }
@@ -625,93 +605,93 @@ MinMaxFilter* MinMaxFilter::Create(const TMinMaxFilter& thrift, ColumnType type,
 }
 
 void MinMaxFilter::Or(
-    const TMinMaxFilter& in, TMinMaxFilter* out, const ColumnType& columnType) {
-  if (in.always_false || out->always_true) return;
-  if (in.always_true) {
-    out->__set_always_true(true);
+    const MinMaxFilterPB& in, MinMaxFilterPB* out, const ColumnType& columnType) {
+  if (in.always_false() || out->always_true()) return;
+  if (in.always_true()) {
+    out->set_always_true(true);
     return;
   }
-  if (in.min.__isset.bool_val) {
-    DCHECK(out->min.__isset.bool_val);
+  if (in.min().has_bool_val()) {
+    DCHECK(out->min().has_bool_val());
     BoolMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.byte_val) {
-    DCHECK(out->min.__isset.byte_val);
+  } else if (in.min().has_byte_val()) {
+    DCHECK(out->min().has_byte_val());
     TinyIntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.short_val) {
-    DCHECK(out->min.__isset.short_val);
+  } else if (in.min().has_short_val()) {
+    DCHECK(out->min().has_short_val());
     SmallIntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.int_val) {
-    DCHECK(out->min.__isset.int_val);
+  } else if (in.min().has_int_val()) {
+    DCHECK(out->min().has_int_val());
     IntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.long_val) {
-    DCHECK(out->min.__isset.long_val);
+  } else if (in.min().has_long_val()) {
+    DCHECK(out->min().has_long_val());
     BigIntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.double_val) {
+  } else if (in.min().has_double_val()) {
     // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
-    DCHECK(out->min.__isset.double_val);
+    DCHECK(out->min().has_double_val());
     DoubleMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.string_val) {
-    DCHECK(out->min.__isset.string_val);
+  } else if (in.min().has_string_val()) {
+    DCHECK(out->min().has_string_val());
     StringMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.timestamp_val) {
-    DCHECK(out->min.__isset.timestamp_val);
+  } else if (in.min().has_timestamp_val()) {
+    DCHECK(out->min().has_timestamp_val());
     TimestampMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.decimal_val) {
-    DCHECK(out->min.__isset.decimal_val);
+  } else if (in.min().has_decimal_val()) {
+    DCHECK(out->min().has_decimal_val());
     DecimalMinMaxFilter::Or(in, out, columnType.precision);
     return;
   }
   DCHECK(false) << "Unsupported MinMaxFilter type.";
 }
 
-void MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  out->__set_always_false(in.always_false);
-  out->__set_always_true(in.always_true);
-  if (in.always_false || in.always_true) return;
-  if (in.min.__isset.bool_val) {
-    DCHECK(!out->min.__isset.bool_val);
+void MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  out->set_always_false(in.always_false());
+  out->set_always_true(in.always_true());
+  if (in.always_false() || in.always_true()) return;
+  if (in.min().has_bool_val()) {
+    DCHECK(!out->min().has_bool_val());
     BoolMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.byte_val) {
-    DCHECK(!out->min.__isset.byte_val);
+  } else if (in.min().has_byte_val()) {
+    DCHECK(!out->min().has_byte_val());
     TinyIntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.short_val) {
-    DCHECK(!out->min.__isset.short_val);
+  } else if (in.min().has_short_val()) {
+    DCHECK(!out->min().has_short_val());
     SmallIntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.int_val) {
-    DCHECK(!out->min.__isset.int_val);
+  } else if (in.min().has_int_val()) {
+    DCHECK(!out->min().has_int_val());
     IntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.long_val) {
-    // Handles TimestampMinMaxFilter also as TColumnValue doesn't have a timestamp type.
-    DCHECK(!out->min.__isset.long_val);
+  } else if (in.min().has_long_val()) {
+    // Handles TimestampMinMaxFilter also as ColumnValuePB doesn't have a timestamp type.
+    DCHECK(!out->min().has_long_val());
     BigIntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.double_val) {
-    // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
-    DCHECK(!out->min.__isset.double_val);
+  } else if (in.min().has_double_val()) {
+    // Handles FloatMinMaxFilter also as ColumnValuePB doesn't have a float type.
+    DCHECK(!out->min().has_double_val());
     DoubleMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.string_val) {
-    DCHECK(!out->min.__isset.string_val);
+  } else if (in.min().has_string_val()) {
+    DCHECK(!out->min().has_string_val());
     StringMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.timestamp_val) {
-    DCHECK(!out->min.__isset.timestamp_val);
+  } else if (in.min().has_timestamp_val()) {
+    DCHECK(!out->min().has_timestamp_val());
     TimestampMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.decimal_val) {
-    DCHECK(!out->min.__isset.decimal_val);
+  } else if (in.min().has_decimal_val()) {
+    DCHECK(!out->min().has_decimal_val());
     DecimalMinMaxFilter::Copy(in, out);
     return;
   }
diff --git a/be/src/util/min-max-filter.h b/be/src/util/min-max-filter.h
index 82cf54c..ee08894 100644
--- a/be/src/util/min-max-filter.h
+++ b/be/src/util/min-max-filter.h
@@ -74,25 +74,25 @@ class MinMaxFilter {
   /// until this is called.
   virtual void MaterializeValues() {}
 
-  /// Convert this filter to a thrift representation.
-  virtual void ToThrift(TMinMaxFilter* thrift) const = 0;
+  /// Convert this filter to a protobuf representation.
+  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const = 0;
 
   virtual std::string DebugString() const = 0;
 
   /// Returns a new MinMaxFilter with the given type, allocated from 'mem_tracker'.
   static MinMaxFilter* Create(ColumnType type, ObjectPool* pool, MemTracker* mem_tracker);
 
-  /// Returns a new MinMaxFilter created from the thrift representation, allocated from
+  /// Returns a new MinMaxFilter created from the protobuf representation, allocated from
   /// 'mem_tracker'.
-  static MinMaxFilter* Create(const TMinMaxFilter& thrift, ColumnType type,
+  static MinMaxFilter* Create(const MinMaxFilterPB& protobuf, ColumnType type,
       ObjectPool* pool, MemTracker* mem_tracker);
 
   /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
   static void Or(
-      const TMinMaxFilter& in, TMinMaxFilter* out, const ColumnType& columnType);
+      const MinMaxFilterPB& in, MinMaxFilterPB* out, const ColumnType& columnType);
 
   /// Copies the contents of 'in' into 'out'.
-  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
 
   /// Returns the LLVM_CLASS_NAME for the given type.
   static std::string GetLlvmClassName(PrimitiveType type);
@@ -108,7 +108,7 @@ class MinMaxFilter {
       min_ = std::numeric_limits<TYPE>::max();                                \
       max_ = std::numeric_limits<TYPE>::lowest();                             \
     }                                                                         \
-    NAME##MinMaxFilter(const TMinMaxFilter& thrift);                          \
+    NAME##MinMaxFilter(const MinMaxFilterPB& protobuf);                       \
     virtual ~NAME##MinMaxFilter() {}                                          \
     virtual void* GetMin() override { return &min_; }                         \
     virtual void* GetMax() override { return &max_; }                         \
@@ -121,10 +121,10 @@ class MinMaxFilter {
       return min_ == std::numeric_limits<TYPE>::max()                         \
           && max_ == std::numeric_limits<TYPE>::lowest();                     \
     }                                                                         \
-    virtual void ToThrift(TMinMaxFilter* thrift) const override;              \
+    virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;         \
     virtual std::string DebugString() const override;                         \
-    static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);              \
-    static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);            \
+    static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);            \
+    static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);          \
     static const char* LLVM_CLASS_NAME;                                       \
                                                                               \
    private:                                                                   \
@@ -148,7 +148,7 @@ class StringMinMaxFilter : public MinMaxFilter {
       max_buffer_(&mem_pool_),
       always_false_(true),
       always_true_(false) {}
-  StringMinMaxFilter(const TMinMaxFilter& thrift, MemTracker* mem_tracker);
+  StringMinMaxFilter(const MinMaxFilterPB& protobuf, MemTracker* mem_tracker);
   virtual ~StringMinMaxFilter() {}
   virtual void Close() override { mem_pool_.FreeAll(); }
 
@@ -164,11 +164,11 @@ class StringMinMaxFilter : public MinMaxFilter {
   /// truncating them if necessary.
   virtual void MaterializeValues() override;
 
-  virtual void ToThrift(TMinMaxFilter* thrift) const override;
+  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
   virtual std::string DebugString() const override;
 
-  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
-  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);
+  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
 
   /// Struct name in LLVM IR.
   static const char* LLVM_CLASS_NAME;
@@ -208,7 +208,7 @@ class StringMinMaxFilter : public MinMaxFilter {
 class TimestampMinMaxFilter : public MinMaxFilter {
  public:
   TimestampMinMaxFilter() { always_false_ = true; }
-  TimestampMinMaxFilter(const TMinMaxFilter& thrift);
+  TimestampMinMaxFilter(const MinMaxFilterPB& protobuf);
   virtual ~TimestampMinMaxFilter() {}
 
   virtual void* GetMin() override { return &min_; }
@@ -218,11 +218,11 @@ class TimestampMinMaxFilter : public MinMaxFilter {
   virtual void Insert(void* val) override;
   virtual bool AlwaysTrue() const override { return false; }
   virtual bool AlwaysFalse() const override { return always_false_; }
-  virtual void ToThrift(TMinMaxFilter* thrift) const override;
+  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
   virtual std::string DebugString() const override;
 
-  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
-  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);
+  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
 
   /// Struct name in LLVM IR.
   static const char* LLVM_CLASS_NAME;
@@ -269,7 +269,7 @@ class DecimalMinMaxFilter : public MinMaxFilter {
   DecimalMinMaxFilter(int precision)
     : size_(ColumnType::GetDecimalByteSize(precision)), always_false_(true) {}
 
-  DecimalMinMaxFilter(const TMinMaxFilter& thrift, int precision);
+  DecimalMinMaxFilter(const MinMaxFilterPB& protobuf, int precision);
   virtual ~DecimalMinMaxFilter() {}
 
   virtual void* GetMin() override {
@@ -286,11 +286,11 @@ class DecimalMinMaxFilter : public MinMaxFilter {
   virtual PrimitiveType type() override;
   virtual bool AlwaysTrue() const override { return false; }
   virtual bool AlwaysFalse() const override { return always_false_; }
-  virtual void ToThrift(TMinMaxFilter* thrift) const override;
+  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
   virtual std::string DebugString() const override;
 
-  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out, int precision);
-  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out, int precision);
+  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
 
   void Insert4(void* val);
   void Insert8(void* val);
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
index 0bf2a97..6ec84b4 100644
--- a/common/protobuf/common.proto
+++ b/common/protobuf/common.proto
@@ -45,3 +45,20 @@ enum CompressionType {
   NONE = 0; // No compression.
   LZ4 = 1;
 }
+
+// This is a union over all possible return types.
+// TODO: if we upgrade to proto3, then we can use the oneof feature in Protobuf 3 in
+// the following to save some memory because only one of the fields below is set at a
+// time.
+message ColumnValuePB {
+  optional bool bool_val = 1;
+  optional int32 byte_val = 6;
+  optional int32 short_val = 7;
+  optional int32 int_val = 2;
+  optional int64 long_val = 3;
+  optional double double_val = 4;
+  optional string string_val = 5;
+  optional string binary_val = 8;
+  optional string timestamp_val = 9;
+  optional bytes decimal_val = 10;
+}
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index b0e2b5d..708b814 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -78,6 +78,77 @@ message EndDataStreamResponsePB {
   optional int64 receiver_latency_ns = 2;
 }
 
+message BloomFilterPB {
+  // Log_2 of the bufferpool space required for this filter.
+  // See BloomFilter::BloomFilter() for details.
+  optional int32 log_bufferpool_space = 1;
+
+  // If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
+  // not meaningful.
+  optional bool always_true = 2;
+  optional bool always_false = 3;
+
+  // The sidecar index associated with the directory of a Bloom filter.
+  // A directory is a list of buckets representing the Bloom Filter contents,
+  // laid out contiguously in one string for efficiency of (de)serialisation.
+  // See BloomFilter::Bucket and BloomFilter::directory_.
+  optional int32 directory_sidecar_idx = 4;
+}
+
+message MinMaxFilterPB {
+  // If true, filter allows all elements to pass and 'min'/'max' will not be set.
+  optional bool always_true = 1;
+
+  // If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
+  optional bool always_false = 2;
+
+  optional ColumnValuePB min = 3;
+  optional ColumnValuePB max = 4;
+}
+
+message UpdateFilterParamsPB {
+  // Filter ID, unique within a query.
+  optional int32 filter_id = 1;
+
+  // Query that this filter is for.
+  optional UniqueIdPB query_id = 2;
+
+  optional BloomFilterPB bloom_filter = 3;
+
+  optional MinMaxFilterPB min_max_filter = 4;
+}
+
+message UpdateFilterResultPB {
+  optional StatusPB status = 1;
+
+  // Latency for response in the receiving daemon in nanoseconds.
+  optional int64 receiver_latency_ns = 2;
+}
+
+message PublishFilterParamsPB {
+  // Filter ID, unique within a query.
+  optional int32 filter_id = 1;
+
+  // Query that this filter is for.
+  optional UniqueIdPB dst_query_id = 2;
+
+  // Index of fragment to receive this filter
+  optional int32 dst_fragment_idx = 3;
+
+  // Actual bloom_filter payload
+  optional BloomFilterPB bloom_filter = 4;
+
+  // Actual min_max_filter payload
+  optional MinMaxFilterPB min_max_filter = 5;
+}
+
+message PublishFilterResultPB {
+  optional StatusPB status = 1;
+
+  // Latency for response in the receiving daemon in nanoseconds.
+  optional int64 receiver_latency_ns = 2;
+}
+
 // Handles data transmission between fragment instances.
 service DataStreamService {
   // Override the default authorization method.
@@ -90,4 +161,12 @@ service DataStreamService {
 
   // Called by a sender to close the channel between fragment instances.
   rpc EndDataStream(EndDataStreamRequestPB) returns (EndDataStreamResponsePB);
+
+  // Called by fragment instances that produce local runtime filters to deliver them to
+  // the coordinator for aggregation and broadcast.
+  rpc UpdateFilter(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
+
+  // Called by the coordinator to deliver global runtime filters to fragments for
+  // application at plan nodes.
+  rpc PublishFilter(PublishFilterParamsPB) returns (PublishFilterResultPB);
 }
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 55eb0cf..cbe3f50 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -766,82 +766,6 @@ struct TPoolConfig {
   11: required i64 max_memory_multiple = 0;
 }
 
-struct TBloomFilter {
-  // Log_2 of the bufferpool space required for this filter.
-  // See BloomFilter::BloomFilter() for details.
-  1: required i32 log_bufferpool_space
-
-  // List of buckets representing the Bloom Filter contents, laid out contiguously in one
-  // string for efficiency of (de)serialisation. See BloomFilter::Bucket and
-  // BloomFilter::directory_.
-  2: binary directory
-
-  // If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
-  // not meaningful.
-  3: required bool always_true
-  4: required bool always_false
-}
-
-struct TMinMaxFilter {
-  // If true, filter allows all elements to pass and 'min'/'max' will not be set.
-  1: required bool always_true
-
-  // If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
-  2: required bool always_false
-
-  3: optional Data.TColumnValue min
-  4: optional Data.TColumnValue max
-}
-
-// UpdateFilter
-
-struct TUpdateFilterParams {
-  1: required ImpalaInternalServiceVersion protocol_version
-
-  // Filter ID, unique within a query.
-  // required in V1
-  2: optional i32 filter_id
-
-  // Query that this filter is for.
-  // required in V1
-  3: optional Types.TUniqueId query_id
-
-  // required in V1
-  4: optional TBloomFilter bloom_filter
-
-  5: optional TMinMaxFilter min_max_filter
-}
-
-struct TUpdateFilterResult {
-}
-
-
-// PublishFilter
-
-struct TPublishFilterParams {
-  1: required ImpalaInternalServiceVersion protocol_version
-
-  // Filter ID to update
-  // required in V1
-  2: optional i32 filter_id
-
-  // required in V1
-  3: optional Types.TUniqueId dst_query_id
-
-  // Index of fragment to receive this filter
-  // required in V1
-  4: optional Types.TFragmentIdx dst_fragment_idx
-
-  // Actual bloom_filter payload
-  // required in V1
-  5: optional TBloomFilter bloom_filter
-
-  6: optional TMinMaxFilter min_max_filter
-}
-
-struct TPublishFilterResult {
-}
-
 struct TParseDateStringResult {
   // True iff date string was successfully parsed
   1: required bool valid
@@ -854,11 +778,4 @@ struct TParseDateStringResult {
 
 service ImpalaInternalService {
 
-  // Called by fragment instances that produce local runtime filters to deliver them to
-  // the coordinator for aggregation and broadcast.
-  TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params);
-
-  // Called by the coordinator to deliver global runtime filters to fragments for
-  // application at plan nodes.
-  TPublishFilterResult PublishFilter(1:TPublishFilterParams params);
 }


[impala] 03/03: IMPALA-9154: Make runtime filter propagation asynchronous

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 79aae231443a305ce8503dbc7b4335e8ae3f3946
Author: Fang-Yu Rao <fa...@cloudera.com>
AuthorDate: Wed Nov 27 14:06:34 2019 -0800

    IMPALA-9154: Make runtime filter propagation asynchronous
    
    This patch fixes a bug introduced by IMPALA-7984 that ports the
    functions implementing the aggregation and propagation of runtime
    filters from Thrift RPC to KRPC.
    
    Specifically, in IMPALA-7984, the propagation of an aggregated
    runtime filter was implemented using the synchronous KRPC. Hence, when
    there is a very limited number of KRPC threads for Impala's data stream
    service, e.g., 1, there will be a deadlock if the node running the
    Coordinator is trying to propagate the aggregated filter to the same
    node running the Coordinator since there is no available thread to
    receive the aggregated filter.
    
    This patch makes the propagation of an aggregated runtime filter
    asynchronous to address the issue described above. To prevent the
    memory consumed by the aggregated filter from being reclaimed when the
    aggregated filter is still referenced by some inflight KRPC's, we add an
    additional field in the class Coordinator::FilterState to keep track of
    the number of inflight KRPC's for the propagation of this aggregated
    filter to make sure that we will reclaim the memory only when all the
    associated KRPC's have completed. Moreover, when ReleaseExecResources()
    is invoked by the Coordinator to release all the resources associated
    with query execution, including the memory consumed by the aggregated
    runtime filters, we make sure the consumed memory by the aggregated
    filters is released only when the inflight KRPC's associated with each
    aggregated filter have finished.
    
    Testing:
    - Passed primitive_many_fragments.test with the database tpch30 in an
      Impala minicluster started with the parameter
      --impalad_args=--datastream_service_num_svc_threads=1.
    - Passed the exhaustive tests in the DEBUG build.
    - Passed the core tests in the ASAN build.
    
    Change-Id: Ifb6726d349be701f3a0602b2ad5a934082f188a0
    Reviewed-on: http://gerrit.cloudera.org:8080/14975
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc |  48 +++++++---
 be/src/runtime/coordinator-backend-state.h  |   7 +-
 be/src/runtime/coordinator-filter-state.h   |  37 ++++++--
 be/src/runtime/coordinator.cc               | 131 ++++++++++++++++------------
 be/src/runtime/coordinator.h                |   4 +-
 5 files changed, 151 insertions(+), 76 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 72e245e..20d7b6f 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -551,8 +551,9 @@ bool Coordinator::BackendState::Cancel() {
   return true;
 }
 
-void Coordinator::BackendState::PublishFilter(
-    const PublishFilterParamsPB& rpc_params, RpcController& controller) {
+void Coordinator::BackendState::PublishFilter(FilterState* state,
+    MemTracker* mem_tracker, const PublishFilterParamsPB& rpc_params,
+    RpcController& controller, PublishFilterResultPB& res) {
   DCHECK_EQ(ProtoToQueryId(rpc_params.dst_query_id()), query_id_);
   // If the backend is already done, it's not waiting for this filter, so we skip
   // sending it in this case.
@@ -571,15 +572,42 @@ void Coordinator::BackendState::PublishFilter(
     return;
   }
 
-  PublishFilterResultPB res;
-  kudu::Status rpc_status = proxy->PublishFilter(rpc_params, &res, &controller);
-  if (!rpc_status.ok()) {
-    LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString();
-    return;
+  state->IncrementNumInflightRpcs(1);
+
+  proxy->PublishFilterAsync(rpc_params, &res, &controller,
+      boost::bind(&Coordinator::BackendState::PublishFilterCompleteCb, this, &controller,
+                                state, mem_tracker));
+}
+
+void Coordinator::BackendState::PublishFilterCompleteCb(
+    const kudu::rpc::RpcController* rpc_controller, FilterState* state,
+    MemTracker* mem_tracker) {
+  const kudu::Status controller_status = rpc_controller->status();
+
+  // In the case of an unsuccessful KRPC call, we only log this event w/o retrying.
+  // Failing to send a filter is not a query-wide error - the remote fragment will
+  // continue regardless.
+  if (!controller_status.ok()) {
+    LOG(ERROR) << "PublishFilter() failed: " << controller_status.message().ToString();
   }
-  if (res.status().status_code() != TErrorCode::OK) {
-    LOG(ERROR) << "PublishFilter() operation failed: "
-               << Status(res.status()).GetDetail();
+
+  {
+    lock_guard<SpinLock> l(state->lock());
+
+    state->IncrementNumInflightRpcs(-1);
+
+    if (state->num_inflight_rpcs() == 0) {
+      // Since we disabled the filter once complete and held FilterState::lock_ while
+      // issuing all PublishFilter() rpcs, at this point there can't be any more
+      // PublishFilter() rpcs issued.
+      DCHECK(state->disabled());
+      if (state->is_bloom_filter() && state->bloom_filter_directory().size() > 0) {
+        mem_tracker->Release(state->bloom_filter_directory().size());
+        state->bloom_filter_directory().clear();
+        state->bloom_filter_directory().shrink_to_fit();
+      }
+      state->get_publish_filter_done_cv().notify_one();
+    }
   }
 }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index fd80b86..432857a 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -106,8 +106,11 @@ class Coordinator::BackendState {
 
   /// Make a PublishFilter rpc with given params if this backend has instances of the
   /// fragment with idx == rpc_params->dst_fragment_idx, otherwise do nothing.
-  void PublishFilter(
-      const PublishFilterParamsPB& rpc_params, kudu::rpc::RpcController& controller);
+  void PublishFilter(FilterState* state, MemTracker* mem_tracker,
+      const PublishFilterParamsPB& rpc_params, kudu::rpc::RpcController& controller,
+      PublishFilterResultPB& res);
+  void PublishFilterCompleteCb(const kudu::rpc::RpcController* rpc_controller,
+      FilterState* state, MemTracker* mem_tracker);
 
   /// Cancel execution at this backend if anything is running. Returns true
   /// if cancellation was attempted, false otherwise.
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index 55c6b3d..7e92883 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
+#include <condition_variable>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -56,6 +56,9 @@ struct Coordinator::FilterTarget {
 /// A filter is disabled if an always_true filter update is received, an OOM is hit,
 /// filter aggregation is complete or if the query is complete.
 /// Once a filter is disabled, subsequent updates for that filter are ignored.
+///
+/// This class is not thread safe. Callers must always take 'lock()' themselves when
+/// calling any FilterState functions if thread safety is needed.
 class Coordinator::FilterState {
  public:
   FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
@@ -88,14 +91,30 @@ class Coordinator::FilterState {
       return min_max_filter_.always_true();
     }
   }
+  int num_inflight_rpcs() const { return num_inflight_publish_filter_rpcs_; }
+  SpinLock& lock() { return lock_; }
+  std::condition_variable_any& get_publish_filter_done_cv() {
+    return publish_filter_done_cv_;
+  }
 
   /// Aggregates partitioned join filters and updates memory consumption.
   /// Disables filter if always_true filter is received or OOM is hit.
   void ApplyUpdate(const UpdateFilterParamsPB& params, Coordinator* coord,
       kudu::rpc::RpcContext* context);
 
-  /// Disables a filter. A disabled filter consumes no memory.
-  void Disable(MemTracker* tracker);
+  /// Disables the filter and releases the consumed memory if the filter is a Bloom
+  /// filter.
+  void DisableAndRelease(MemTracker* tracker);
+  /// Disables the filter but does not release the consumed memory.
+  void Disable();
+
+  void IncrementNumInflightRpcs(int i) {
+    num_inflight_publish_filter_rpcs_ += i;
+    DCHECK_GE(num_inflight_publish_filter_rpcs_, 0);
+  }
+
+  /// Waits until any inflight PublishFilter rpcs have completed.
+  void WaitForPublishFilter();
 
  private:
   /// Contains the specification of the runtime filter.
@@ -129,8 +148,15 @@ class Coordinator::FilterState {
   /// Time at which all local filters arrived.
   int64_t completion_time_ = 0L;
 
-  /// TODO: Add a per-object lock so that we can avoid holding the global routing table
+  /// Per-object lock so that we can avoid holding the global routing table
   /// lock for every filter update.
+  SpinLock lock_;
+
+  /// Keeps track of the number of inflight PublishFilter rpcs.
+  int num_inflight_publish_filter_rpcs_ = 0;
+
+  /// Signaled when 'num_inflight_rpcs' reaches 0.
+  std::condition_variable_any publish_filter_done_cv_;
 };
 
 /// Struct to contain all of the data structures for filter routing. Coordinator
@@ -146,9 +172,6 @@ struct Coordinator::FilterRoutingTable {
   // The value is source plan node id and the filter ID.
   boost::unordered_map<int, std::vector<TRuntimeFilterSource>> finstance_filters_produced;
 
-  /// Synchronizes updates to the state of this routing table.
-  SpinLock update_lock;
-
   /// Protects this routing table.
   /// Usage pattern:
   /// 1. To update the routing table: Acquire shared access on 'lock' and
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 48d81f9..9458578 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -348,10 +348,13 @@ void Coordinator::InitFilterRoutingTable() {
       if (!plan_node.__isset.runtime_filters) continue;
       for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
         DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || filter.has_local_targets);
-        auto i = filter_routing_table_->id_to_filter.emplace(
-            filter.filter_id, FilterState(filter, plan_node.node_id)).first;
-        FilterState* f = &(i->second);
+        auto i = filter_routing_table_->id_to_filter
+                     .emplace(std::piecewise_construct,
+                         std::forward_as_tuple(filter.filter_id),
+                         std::forward_as_tuple(filter, plan_node.node_id))
+                     .first;
 
+        FilterState* f = &(i->second);
         // source plan node of filter
         if (plan_node.__isset.hash_join_node) {
           // Set the 'pending_count_' to zero to indicate that for a filter with
@@ -1045,9 +1048,11 @@ void Coordinator::ReleaseExecResources() {
   }
 
   for (auto& filter : filter_routing_table_->id_to_filter) {
-    FilterState* state = &filter.second;
-    state->Disable(filter_mem_tracker_);
+    unique_lock<SpinLock> l(filter.second.lock());
+    filter.second.WaitForPublishFilter();
+    filter.second.DisableAndRelease(filter_mem_tracker_);
   }
+
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
   // At this point some tracked memory may still be used in the coordinator for result
@@ -1124,21 +1129,22 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
 
   PublishFilterParamsPB rpc_params;
   unordered_set<int> target_fragment_idxs;
-  string bloom_filter_directory;
+  if (!IsExecuting()) {
+    LOG(INFO) << "Filter update received for non-executing query with id: "
+        << query_id();
+    return;
+  }
+  auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
+  if (it == filter_routing_table_.get()->id_to_filter.end()) {
+    // This should not be possible since 'id_to_filter' is never changed after
+    // InitFilterRoutingTable().
+    DCHECK(false);
+    LOG(INFO) << "Could not find filter with id: " << rpc_params.filter_id();
+    return;
+  }
+  FilterState* state = &it->second;
   {
-    lock_guard<SpinLock> l(filter_routing_table_->update_lock);
-    if (!IsExecuting()) {
-      LOG(INFO) << "Filter update received for non-executing query with id: "
-                << query_id();
-      return;
-    }
-    auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
-    if (it == filter_routing_table_->id_to_filter.end()) {
-      LOG(INFO) << "Could not find filter with id: " << params.filter_id();
-      return;
-    }
-    FilterState* state = &it->second;
-
+    lock_guard<SpinLock> l(state->lock());
     DCHECK(state->desc().has_remote_targets)
         << "Coordinator received filter that has only local targets";
 
@@ -1165,7 +1171,7 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
 
     // No more updates are pending on this filter ID. Create a distribution payload and
     // offer it to the queue.
-    for (const FilterTarget& target: *state->targets()) {
+    for (const FilterTarget& target : *state->targets()) {
       // Don't publish the filter to targets that are in the same fragment as the join
       // that produced it.
       if (target.is_local) continue;
@@ -1175,44 +1181,47 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
     if (state->is_bloom_filter()) {
       // Assign an outgoing bloom filter.
       *rpc_params.mutable_bloom_filter() = state->bloom_filter();
-      bloom_filter_directory.swap(state->bloom_filter_directory());
+
       DCHECK(rpc_params.bloom_filter().always_false()
-          || rpc_params.bloom_filter().always_true() || !bloom_filter_directory.empty());
+          || rpc_params.bloom_filter().always_true()
+          || !state->bloom_filter_directory().empty());
+
     } else {
       DCHECK(state->is_min_max_filter());
       MinMaxFilter::Copy(state->min_max_filter(), rpc_params.mutable_min_max_filter());
     }
 
-    // Filter is complete, and can be released.
-    state->Disable(filter_mem_tracker_);
-  }
+    // Filter is complete. We disable it so future UpdateFilter rpcs will be ignored,
+    // e.g., if it was a broadcast join.
+    state->Disable();
+
+    TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
+    rpc_params.set_filter_id(params.filter_id());
+
+    // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
+    for (BackendState* bs : backend_states_) {
+      for (int fragment_idx : target_fragment_idxs) {
+        if (!IsExecuting()) {
+          if (rpc_params.has_bloom_filter()) {
+            filter_mem_tracker_->Release(state->bloom_filter_directory().size());
+            state->bloom_filter_directory().clear();
+            state->bloom_filter_directory().shrink_to_fit();
+          }
+          return;
+        }
 
-  TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
-  rpc_params.set_filter_id(params.filter_id());
-
-  // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
-  for (BackendState* bs: backend_states_) {
-    for (int fragment_idx: target_fragment_idxs) {
-      if (!IsExecuting()) goto cleanup;
-      rpc_params.set_dst_fragment_idx(fragment_idx);
-      RpcController controller;
-      if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
-          && !rpc_params.bloom_filter().always_true()) {
-        BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), &controller,
-            bloom_filter_directory);
+        rpc_params.set_dst_fragment_idx(fragment_idx);
+        RpcController* controller = obj_pool()->Add(new RpcController);
+        PublishFilterResultPB* res = obj_pool()->Add(new PublishFilterResultPB);
+        if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
+            && !rpc_params.bloom_filter().always_true()) {
+          BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), controller,
+              state->bloom_filter_directory());
+        }
+        bs->PublishFilter(state, filter_mem_tracker_, rpc_params, *controller, *res);
       }
-      // TODO: make this asynchronous.
-      bs->PublishFilter(rpc_params, controller);
     }
   }
-
-cleanup:
-  // For bloom filters, the memory used in the filter_routing_table_ is transfered to
-  // rpc_params. Hence the Release() function on the filter_mem_tracker_ is called
-  // here to ensure that the MemTracker is updated after the memory is actually freed.
-  if (rpc_params.has_bloom_filter()) {
-    filter_mem_tracker_->Release(bloom_filter_directory.size());
-  }
 }
 
 void Coordinator::FilterState::ApplyUpdate(
@@ -1228,7 +1237,7 @@ void Coordinator::FilterState::ApplyUpdate(
   if (is_bloom_filter()) {
     DCHECK(params.has_bloom_filter());
     if (params.bloom_filter().always_true()) {
-      Disable(coord->filter_mem_tracker_);
+      DisableAndRelease(coord->filter_mem_tracker_);
     } else if (params.bloom_filter().always_false()) {
       if (!bloom_filter_.has_log_bufferpool_space()) {
         bloom_filter_ = BloomFilterPB(params.bloom_filter());
@@ -1243,7 +1252,7 @@ void Coordinator::FilterState::ApplyUpdate(
           params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
       if (!status.ok()) {
         LOG(ERROR) << "Cannot get inbound sidecar: " << status.message().ToString();
-        Disable(coord->filter_mem_tracker_);
+        DisableAndRelease(coord->filter_mem_tracker_);
       } else if (bloom_filter_.always_false()) {
         int64_t heap_space = sidecar_slice.size();
         if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
@@ -1251,7 +1260,7 @@ void Coordinator::FilterState::ApplyUpdate(
                      << PrettyPrinter::Print(heap_space, TUnit::BYTES)
                      << " (query_id=" << PrintId(coord->query_id()) << ")";
           // Disable, as one missing update means a correct filter cannot be produced.
-          Disable(coord->filter_mem_tracker_);
+          DisableAndRelease(coord->filter_mem_tracker_);
         } else {
           bloom_filter_ = params.bloom_filter();
           bloom_filter_directory_ = sidecar_slice.ToString();
@@ -1267,7 +1276,7 @@ void Coordinator::FilterState::ApplyUpdate(
     DCHECK(is_min_max_filter());
     DCHECK(params.has_min_max_filter());
     if (params.min_max_filter().always_true()) {
-      Disable(coord->filter_mem_tracker_);
+      DisableAndRelease(coord->filter_mem_tracker_);
     } else if (min_max_filter_.always_false()) {
       MinMaxFilter::Copy(params.min_max_filter(), &min_max_filter_);
     } else {
@@ -1281,13 +1290,19 @@ void Coordinator::FilterState::ApplyUpdate(
   }
 }
 
-void Coordinator::FilterState::Disable(MemTracker* tracker) {
+void Coordinator::FilterState::DisableAndRelease(MemTracker* tracker) {
+  Disable();
   if (is_bloom_filter()) {
-    bloom_filter_.set_always_true(true);
-    bloom_filter_.set_always_false(false);
     tracker->Release(bloom_filter_directory_.size());
     bloom_filter_directory_.clear();
     bloom_filter_directory_.shrink_to_fit();
+  }
+}
+
+void Coordinator::FilterState::Disable() {
+  if (is_bloom_filter()) {
+    bloom_filter_.set_always_true(true);
+    bloom_filter_.set_always_false(false);
   } else {
     DCHECK(is_min_max_filter());
     min_max_filter_.set_always_true(true);
@@ -1295,6 +1310,12 @@ void Coordinator::FilterState::Disable(MemTracker* tracker) {
   }
 }
 
+void Coordinator::FilterState::WaitForPublishFilter() {
+  while (num_inflight_publish_filter_rpcs_ > 0) {
+    publish_filter_done_cv_.wait(lock_);
+  }
+}
+
 void Coordinator::GetTExecSummary(TExecSummary* exec_summary) {
   lock_guard<SpinLock> l(exec_summary_.lock);
   *exec_summary = exec_summary_.thrift_exec_summary;
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 7b05fde..aa03f30 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -100,8 +100,8 @@ class TUpdateCatalogRequest;
 ///
 /// Lock ordering: (lower-numbered acquired before higher-numbered)
 /// 1. wait_lock_
-/// 2. filter_lock_
-/// 3. exec_state_lock_, backend_states_init_lock_, filter_update_lock_, ExecSummary::lock
+/// 2. Coordinator::FilterRoutingTable::lock
+/// 3. exec_state_lock_, backend_states_init_lock_, FilterState::lock_, ExecSummary::lock
 /// 4. Coordinator::BackendState::lock_ (leafs)
 ///
 /// TODO: move into separate subdirectory and move nested classes into separate files