You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/11/23 01:40:38 UTC

[impala] branch master updated (65198fa -> e716e76)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 65198fa  IMPALA-9110: Add table loading time break-down metrics for HdfsTable
     new a862282  IMPALA-8709: Add Damerau-Levenshtein edit distance built-in function
     new e716e76  IMPALA-9154: Revert "IMPALA-7984: Port runtime filter from Thrift RPC to KRPC"

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/benchmarks/bloom-filter-benchmark.cc  |  37 +--
 be/src/exprs/expr-test.cc                    |  78 +++--
 be/src/exprs/string-functions-ir.cc          | 108 ++++++-
 be/src/exprs/string-functions.h              |   3 +
 be/src/runtime/backend-client.h              |  23 ++
 be/src/runtime/client-cache.cc               |   1 +
 be/src/runtime/coordinator-backend-state.cc  |  36 +--
 be/src/runtime/coordinator-backend-state.h   |   3 +-
 be/src/runtime/coordinator-filter-state.h    |  27 +-
 be/src/runtime/coordinator.cc                | 135 ++++-----
 be/src/runtime/coordinator.h                 |   9 +-
 be/src/runtime/data-stream-test.cc           |   6 -
 be/src/runtime/decimal-value.h               |  12 +-
 be/src/runtime/decimal-value.inline.h        |   4 +-
 be/src/runtime/exec-env.cc                   |   1 +
 be/src/runtime/fragment-instance-state.cc    |   9 +-
 be/src/runtime/fragment-instance-state.h     |   9 +-
 be/src/runtime/query-state.cc                |  11 +-
 be/src/runtime/query-state.h                 |   9 +-
 be/src/runtime/runtime-filter-bank.cc        | 168 +++--------
 be/src/runtime/runtime-filter-bank.h         |  29 +-
 be/src/runtime/runtime-filter.h              |   3 +-
 be/src/runtime/timestamp-value.h             |  14 +-
 be/src/scheduling/request-pool-service.h     |   1 +
 be/src/service/client-request-state.cc       |   5 +-
 be/src/service/client-request-state.h        |   2 +-
 be/src/service/data-stream-service.cc        |  34 ---
 be/src/service/data-stream-service.h         |  10 -
 be/src/service/frontend.h                    |   1 +
 be/src/service/impala-internal-service.cc    |  21 ++
 be/src/service/impala-internal-service.h     |   4 +
 be/src/service/impala-server.cc              |  18 +-
 be/src/service/impala-server.h               |  15 +-
 be/src/util/bloom-filter-test.cc             | 108 ++-----
 be/src/util/bloom-filter.cc                  | 101 +++----
 be/src/util/bloom-filter.h                   |  81 +----
 be/src/util/min-max-filter-test.cc           | 247 ++++++++--------
 be/src/util/min-max-filter.cc                | 426 ++++++++++++++-------------
 be/src/util/min-max-filter.h                 |  44 +--
 common/function-registry/impala_functions.py |   2 +
 common/protobuf/common.proto                 |  17 --
 common/protobuf/data_stream_service.proto    |  79 -----
 common/thrift/ImpalaInternalService.thrift   |  83 ++++++
 43 files changed, 906 insertions(+), 1128 deletions(-)


[impala] 02/02: IMPALA-9154: Revert "IMPALA-7984: Port runtime filter from Thrift RPC to KRPC"

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e716e76cccf59c2780571429b1b945d6bbc61b8d
Author: Fang-Yu Rao <fa...@cloudera.com>
AuthorDate: Thu Nov 21 15:14:51 2019 -0800

    IMPALA-9154: Revert "IMPALA-7984: Port runtime filter from Thrift RPC to KRPC"
    
    The previous patch porting runtime filter from Thrift RPC to KRPC
    introduces a deadlock if there are a very limited number of threads on
    the Impala cluster.
    
    Specifically, in that patch a Coordinator used a synchronous KRPC to
    propagate an aggregated filter to other hosts. A deadlock would happen
    if there is no thread available on the receiving side to answer that
    KRPC especially the calling and receiving threads are called from the
    same thread pool. One possible way to address this issue is to make
    the call of propagating a runtime filter asynchronous to free the
    calling thread. Before resolving this issue, we revert this patch for
    now.
    
    This reverts commit ec11c18884988e838a8838e1e8ecc37461e1a138.
    
    Change-Id: I32371a515fb607da396914502da8c7fb071406bc
    Reviewed-on: http://gerrit.cloudera.org:8080/14780
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/benchmarks/bloom-filter-benchmark.cc |  37 +--
 be/src/runtime/backend-client.h             |  23 ++
 be/src/runtime/client-cache.cc              |   1 +
 be/src/runtime/coordinator-backend-state.cc |  36 +--
 be/src/runtime/coordinator-backend-state.h  |   3 +-
 be/src/runtime/coordinator-filter-state.h   |  27 +-
 be/src/runtime/coordinator.cc               | 135 ++++-----
 be/src/runtime/coordinator.h                |   9 +-
 be/src/runtime/data-stream-test.cc          |   6 -
 be/src/runtime/decimal-value.h              |  12 +-
 be/src/runtime/decimal-value.inline.h       |   4 +-
 be/src/runtime/exec-env.cc                  |   1 +
 be/src/runtime/fragment-instance-state.cc   |   9 +-
 be/src/runtime/fragment-instance-state.h    |   9 +-
 be/src/runtime/query-state.cc               |  11 +-
 be/src/runtime/query-state.h                |   9 +-
 be/src/runtime/runtime-filter-bank.cc       | 168 ++++-------
 be/src/runtime/runtime-filter-bank.h        |  29 +-
 be/src/runtime/runtime-filter.h             |   3 +-
 be/src/runtime/timestamp-value.h            |  14 +-
 be/src/scheduling/request-pool-service.h    |   1 +
 be/src/service/client-request-state.cc      |   5 +-
 be/src/service/client-request-state.h       |   2 +-
 be/src/service/data-stream-service.cc       |  34 ---
 be/src/service/data-stream-service.h        |  10 -
 be/src/service/frontend.h                   |   1 +
 be/src/service/impala-internal-service.cc   |  21 ++
 be/src/service/impala-internal-service.h    |   4 +
 be/src/service/impala-server.cc             |  18 +-
 be/src/service/impala-server.h              |  15 +-
 be/src/util/bloom-filter-test.cc            | 108 ++-----
 be/src/util/bloom-filter.cc                 | 101 +++----
 be/src/util/bloom-filter.h                  |  81 +-----
 be/src/util/min-max-filter-test.cc          | 247 ++++++++--------
 be/src/util/min-max-filter.cc               | 426 +++++++++++++++-------------
 be/src/util/min-max-filter.h                |  44 +--
 common/protobuf/common.proto                |  17 --
 common/protobuf/data_stream_service.proto   |  79 ------
 common/thrift/ImpalaInternalService.thrift  |  83 ++++++
 39 files changed, 753 insertions(+), 1090 deletions(-)

diff --git a/be/src/benchmarks/bloom-filter-benchmark.cc b/be/src/benchmarks/bloom-filter-benchmark.cc
index 1e4938d..f216911 100644
--- a/be/src/benchmarks/bloom-filter-benchmark.cc
+++ b/be/src/benchmarks/bloom-filter-benchmark.cc
@@ -21,7 +21,6 @@
 #include <iostream>
 #include <vector>
 
-#include "kudu/rpc/rpc_controller.h"
 #include "runtime/bufferpool/buffer-allocator.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/bufferpool/buffer-pool.h"
@@ -37,8 +36,6 @@
 using namespace std;
 using namespace impala;
 
-using kudu::rpc::RpcController;
-
 // Tests Bloom filter performance on:
 //
 // 1. Construct/destruct pairs
@@ -286,44 +283,18 @@ struct TestData {
   explicit TestData(int log_bufferpool_size, BufferPool::ClientHandle* client) {
     BloomFilter bf(client);
     CHECK(bf.Init(log_bufferpool_size).ok());
-
-    RpcController controller1;
-    RpcController controller2;
-    BloomFilter::ToProtobuf(&bf, &controller1, &pbf1);
-    BloomFilter::ToProtobuf(&bf, &controller2, &pbf2);
-
-    // Need to set 'always_false_' of pbf2 to false because
-    // (i) 'always_false_' of a BloomFilter is set to true when the Bloom filter
-    // hasn't had any elements inserted (since nothing is inserted to the
-    /// BloomFilter bf),
-    // (ii) ToProtobuf() will set 'always_false_' of a BloomFilterPB
-    // to true, and
-    // (iii) Or() will check 'always_false_' of the output BloomFilterPB is not true
-    /// before performing the corresponding bit operations.
-    /// The field 'always_false_' was added by IMPALA-5789, which aims to allow
-    /// an HdfsScanner to early terminate the scan at file and split granularities.
-    pbf2.set_always_false(false);
-
-    int64_t directory_size = BloomFilter::GetExpectedMemoryUsed(log_bufferpool_size);
-    string d1(reinterpret_cast<const char*>(bf.directory_), directory_size);
-    string d2(reinterpret_cast<const char*>(bf.directory_), directory_size);
-
-    directory1 = d1;
-    directory2 = d2;
-
+    BloomFilter::ToThrift(&bf, &tbf1);
+    BloomFilter::ToThrift(&bf, &tbf2);
     bf.Close();
   }
 
-  BloomFilterPB pbf1, pbf2;
-  string directory1, directory2;
+  TBloomFilter tbf1, tbf2;
 };
 
 void Benchmark(int batch_size, void* data) {
   TestData* d = reinterpret_cast<TestData*>(data);
   for (int i = 0; i < batch_size; ++i) {
-    BloomFilter::Or(d->pbf1, reinterpret_cast<const uint8_t*>((d->directory1).data()),
-        &(d->pbf2), reinterpret_cast<uint8_t*>(const_cast<char*>((d->directory2).data())),
-        d->directory1.size());
+    BloomFilter::Or(d->tbf1, &d->tbf2);
   }
 }
 
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index e434632..92279fc 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -40,6 +40,29 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
     : ImpalaInternalServiceClient(iprot, oprot) {
   }
 
+/// We intentionally disable this clang warning as we intend to hide the
+/// the same-named functions defined in the base class.
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Woverloaded-virtual"
+
+  void UpdateFilter(TUpdateFilterResult& _return, const TUpdateFilterParams& params,
+      bool* send_done) {
+    DCHECK(!*send_done);
+    ImpalaInternalServiceClient::send_UpdateFilter(params);
+    *send_done = true;
+    ImpalaInternalServiceClient::recv_UpdateFilter(_return);
+  }
+
+  void PublishFilter(TPublishFilterResult& _return, const TPublishFilterParams& params,
+      bool* send_done) {
+    DCHECK(!*send_done);
+    ImpalaInternalServiceClient::send_PublishFilter(params);
+    *send_done = true;
+    ImpalaInternalServiceClient::recv_PublishFilter(_return);
+  }
+
+#pragma clang diagnostic pop
+
 };
 
 }
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index fdf44c1..26d0de3 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -29,6 +29,7 @@
 #include "util/metrics.h"
 #include "util/network-util.h"
 #include "rpc/thrift-util.h"
+#include "gen-cpp/ImpalaInternalService.h"
 
 #include "common/names.h"
 using namespace apache::thrift;
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index cef9f48..5ab055b 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -23,7 +23,6 @@
 #include "exec/exec-node.h"
 #include "exec/kudu-util.h"
 #include "exec/scan-node.h"
-#include "gen-cpp/data_stream_service.proxy.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/util/monotime.h"
@@ -37,7 +36,6 @@
 #include "runtime/fragment-instance-state.h"
 #include "runtime/krpc-data-stream-sender.h"
 #include "service/control-service.h"
-#include "service/data-stream-service.h"
 #include "util/counting-barrier.h"
 #include "util/error-util-internal.h"
 #include "util/network-util.h"
@@ -530,35 +528,21 @@ bool Coordinator::BackendState::Cancel() {
   return true;
 }
 
-void Coordinator::BackendState::PublishFilter(
-    const PublishFilterParamsPB& rpc_params, RpcController& controller) {
-  DCHECK_EQ(ProtoToQueryId(rpc_params.dst_query_id()), query_id_);
+void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
+  DCHECK(rpc_params.dst_query_id == query_id_);
   // If the backend is already done, it's not waiting for this filter, so we skip
   // sending it in this case.
   if (IsDone()) return;
 
-  if (fragments_.count(rpc_params.dst_fragment_idx()) == 0) return;
+  if (fragments_.count(rpc_params.dst_fragment_idx) == 0) return;
   Status status;
-
-  std::unique_ptr<DataStreamServiceProxy> proxy;
-  Status get_proxy_status =
-      DataStreamService::GetProxy(krpc_host_, host_.hostname, &proxy);
-  if (!get_proxy_status.ok()) {
-    // Failing to send a filter is not a query-wide error - the remote fragment will
-    // continue regardless.
-    LOG(ERROR) << "Couldn't get proxy: " << get_proxy_status.msg().msg();
-    return;
-  }
-
-  PublishFilterResultPB res;
-  kudu::Status rpc_status = proxy->PublishFilter(rpc_params, &res, &controller);
-  if (!rpc_status.ok()) {
-    LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString();
-    return;
-  }
-  if (res.status().status_code() != TErrorCode::OK) {
-    LOG(ERROR) << "PublishFilter() operation failed: "
-               << Status(res.status()).GetDetail();
+  ImpalaBackendConnection backend_client(
+      ExecEnv::GetInstance()->impalad_client_cache(), host_, &status);
+  if (!status.ok()) return;
+  TPublishFilterResult res;
+  status = backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, rpc_params, &res);
+  if (!status.ok()) {
+    LOG(WARNING) << "Error publishing filter, continuing..." << status.GetDetail();
   }
 }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 83c280e..ceb7f77 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -99,8 +99,7 @@ class Coordinator::BackendState {
 
   /// Make a PublishFilter rpc with given params if this backend has instances of the
   /// fragment with idx == rpc_params->dst_fragment_idx, otherwise do nothing.
-  void PublishFilter(
-      const PublishFilterParamsPB& rpc_params, kudu::rpc::RpcController& controller);
+  void PublishFilter(const TPublishFilterParams& rpc_params);
 
   /// Cancel execution at this backend if anything is running. Returns true
   /// if cancellation was attempted, false otherwise.
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index 55c6b3d..e16abea 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -61,13 +61,12 @@ class Coordinator::FilterState {
   FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
     : desc_(desc), src_(src) {
     // bloom_filter_ is a disjunction so the unit value is always_false.
-    bloom_filter_.set_always_false(true);
-    min_max_filter_.set_always_false(true);
+    bloom_filter_.always_false = true;
+    min_max_filter_.always_false = true;
   }
 
-  BloomFilterPB& bloom_filter() { return bloom_filter_; }
-  string& bloom_filter_directory() { return bloom_filter_directory_; }
-  MinMaxFilterPB& min_max_filter() { return min_max_filter_; }
+  TBloomFilter& bloom_filter() { return bloom_filter_; }
+  TMinMaxFilter& min_max_filter() { return min_max_filter_; }
   std::vector<FilterTarget>* targets() { return &targets_; }
   const std::vector<FilterTarget>& targets() const { return targets_; }
   int64_t first_arrival_time() const { return first_arrival_time_; }
@@ -82,17 +81,16 @@ class Coordinator::FilterState {
   void set_num_producers(int num_producers) { num_producers_ = num_producers; }
   bool disabled() const {
     if (is_bloom_filter()) {
-      return bloom_filter_.always_true();
+      return bloom_filter_.always_true;
     } else {
       DCHECK(is_min_max_filter());
-      return min_max_filter_.always_true();
+      return min_max_filter_.always_true;
     }
   }
 
   /// Aggregates partitioned join filters and updates memory consumption.
   /// Disables filter if always_true filter is received or OOM is hit.
-  void ApplyUpdate(const UpdateFilterParamsPB& params, Coordinator* coord,
-      kudu::rpc::RpcContext* context);
+  void ApplyUpdate(const TUpdateFilterParams& params, Coordinator* coord);
 
   /// Disables a filter. A disabled filter consumes no memory.
   void Disable(MemTracker* tracker);
@@ -112,16 +110,13 @@ class Coordinator::FilterState {
   int num_producers_ = 0;
 
   /// Filters aggregated from all source plan nodes, to be broadcast to all
-  /// destination plan fragment instances. Only set for partitioned joins (broadcast
-  /// joins need no aggregation).
+  /// destination plan fragment instances. Only set for partitioned joins (broadcast joins
+  /// need no aggregation).
   /// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the
   /// output structure in the case of a broadcast join. Similarly, for partitioned joins,
   /// the filter is moved from the following member to the output structure.
-  BloomFilterPB bloom_filter_;
-  /// When the filter is a Bloom filter, we use this string to store the contents of the
-  /// aggregated Bloom filter.
-  string bloom_filter_directory_;
-  MinMaxFilterPB min_max_filter_;
+  TBloomFilter bloom_filter_;
+  TMinMaxFilter min_max_filter_;
 
   /// Time at which first local filter arrived.
   int64_t first_arrival_time_ = 0L;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index c98203a..1d2e88f 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -30,9 +30,8 @@
 #include "common/hdfs.h"
 #include "exec/data-sink.h"
 #include "exec/plan-root-sink.h"
+#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
-#include "kudu/rpc/rpc_context.h"
-#include "kudu/rpc/rpc_sidecar.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/hdfs-fs-cache.h"
@@ -55,9 +54,6 @@
 
 #include "common/names.h"
 
-using kudu::rpc::RpcContext;
-using kudu::rpc::RpcController;
-using kudu::rpc::RpcSidecar;
 using namespace apache::thrift;
 using namespace rapidjson;
 using boost::algorithm::iequals;
@@ -1000,7 +996,7 @@ vector<TNetworkAddress> Coordinator::GetActiveBackends(
   return result;
 }
 
-void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* context) {
+void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   shared_lock<shared_mutex> lock(filter_routing_table_->lock);
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
@@ -1011,9 +1007,8 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
   DCHECK(filter_routing_table_->is_complete)
       << "Filter received before routing table complete";
 
-  PublishFilterParamsPB rpc_params;
+  TPublishFilterParams rpc_params;
   unordered_set<int> target_fragment_idxs;
-  string bloom_filter_directory;
   {
     lock_guard<SpinLock> l(filter_routing_table_->update_lock);
     if (!IsExecuting()) {
@@ -1021,17 +1016,17 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
                 << query_id();
       return;
     }
-    auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
+    auto it = filter_routing_table_->id_to_filter.find(params.filter_id);
     if (it == filter_routing_table_->id_to_filter.end()) {
-      LOG(INFO) << "Could not find filter with id: " << params.filter_id();
+      LOG(INFO) << "Could not find filter with id: " << params.filter_id;
       return;
     }
     FilterState* state = &it->second;
 
     DCHECK(state->desc().has_remote_targets)
-        << "Coordinator received filter that has only local targets";
+          << "Coordinator received filter that has only local targets";
 
-    // Check if the filter has already been sent, which could happen in five cases:
+    // Check if the filter has already been sent, which could happen in four cases:
     //   * if one local filter had always_true set - no point waiting for other local
     //     filters that can't affect the aggregated global filter
     //   * if this is a broadcast join, and another local filter was already received
@@ -1039,7 +1034,6 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
     //     immediately.
     //   * query execution finished and resources were released: filters do not need
     //     to be processed.
-    //   * if the inbound sidecar for Bloom filter cannot be successfully retrieved.
     if (state->disabled()) return;
 
     if (filter_updates_received_->value() == 0) {
@@ -1047,7 +1041,7 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
     }
     filter_updates_received_->Add(1);
 
-    state->ApplyUpdate(params, this, context);
+    state->ApplyUpdate(params, this);
 
     if (state->pending_count() > 0 && !state->disabled()) return;
     // At this point, we either disabled this filter or aggregation is complete.
@@ -1062,36 +1056,33 @@ void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* c
     }
 
     if (state->is_bloom_filter()) {
-      // Assign an outgoing bloom filter.
-      *rpc_params.mutable_bloom_filter() = state->bloom_filter();
-      bloom_filter_directory.swap(state->bloom_filter_directory());
-      DCHECK(rpc_params.bloom_filter().always_false()
-          || rpc_params.bloom_filter().always_true() || !bloom_filter_directory.empty());
+      // Assign outgoing bloom filter.
+      TBloomFilter& aggregated_filter = state->bloom_filter();
+
+      swap(rpc_params.bloom_filter, aggregated_filter);
+      DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true
+          || !rpc_params.bloom_filter.directory.empty());
+      DCHECK(aggregated_filter.directory.empty());
+      rpc_params.__isset.bloom_filter = true;
     } else {
       DCHECK(state->is_min_max_filter());
-      MinMaxFilter::Copy(state->min_max_filter(), rpc_params.mutable_min_max_filter());
+      MinMaxFilter::Copy(state->min_max_filter(), &rpc_params.min_max_filter);
+      rpc_params.__isset.min_max_filter = true;
     }
 
     // Filter is complete, and can be released.
     state->Disable(filter_mem_tracker_);
   }
 
-  TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
-  rpc_params.set_filter_id(params.filter_id());
+  rpc_params.__set_dst_query_id(query_id());
+  rpc_params.__set_filter_id(params.filter_id);
 
   // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
   for (BackendState* bs: backend_states_) {
     for (int fragment_idx: target_fragment_idxs) {
       if (!IsExecuting()) goto cleanup;
-      rpc_params.set_dst_fragment_idx(fragment_idx);
-      RpcController controller;
-      if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
-          && !rpc_params.bloom_filter().always_true()) {
-        BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), &controller,
-            bloom_filter_directory);
-      }
-      // TODO: make this asynchronous.
-      bs->PublishFilter(rpc_params, controller);
+      rpc_params.__set_dst_fragment_idx(fragment_idx);
+      bs->PublishFilter(rpc_params);
     }
   }
 
@@ -1099,13 +1090,13 @@ cleanup:
   // For bloom filters, the memory used in the filter_routing_table_ is transfered to
   // rpc_params. Hence the Release() function on the filter_mem_tracker_ is called
   // here to ensure that the MemTracker is updated after the memory is actually freed.
-  if (rpc_params.has_bloom_filter()) {
-    filter_mem_tracker_->Release(bloom_filter_directory.size());
+  if (rpc_params.__isset.bloom_filter) {
+    filter_mem_tracker_->Release(rpc_params.bloom_filter.directory.size());
   }
 }
 
-void Coordinator::FilterState::ApplyUpdate(
-    const UpdateFilterParamsPB& params, Coordinator* coord, RpcContext* context) {
+void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
+    Coordinator* coord) {
   DCHECK(!disabled());
   DCHECK_GT(pending_count_, 0);
   DCHECK_EQ(completion_time_, 0L);
@@ -1115,52 +1106,38 @@ void Coordinator::FilterState::ApplyUpdate(
 
   --pending_count_;
   if (is_bloom_filter()) {
-    DCHECK(params.has_bloom_filter());
-    if (params.bloom_filter().always_true()) {
+    DCHECK(params.__isset.bloom_filter);
+    if (params.bloom_filter.always_true) {
       Disable(coord->filter_mem_tracker_);
-    } else if (params.bloom_filter().always_false()) {
-      if (!bloom_filter_.has_log_bufferpool_space()) {
-        bloom_filter_ = BloomFilterPB(params.bloom_filter());
-      }
-    } else {
-      // If the incoming Bloom filter is neither an always true filter nor an
-      // always false filter, then it must be the case that a non-empty sidecar slice
-      // has been received. Refer to BloomFilter::ToProtobuf() for further details.
-      DCHECK(params.bloom_filter().has_directory_sidecar_idx());
-      kudu::Slice sidecar_slice;
-      kudu::Status status = context->GetInboundSidecar(
-          params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
-      if (!status.ok()) {
-        LOG(ERROR) << "Cannot get inbound sidecar: " << status.message().ToString();
+    } else if (bloom_filter_.always_false) {
+      int64_t heap_space = params.bloom_filter.directory.size();
+      if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
+        VLOG_QUERY << "Not enough memory to allocate filter: "
+                   << PrettyPrinter::Print(heap_space, TUnit::BYTES)
+                   << " (query_id=" << PrintId(coord->query_id()) << ")";
+        // Disable, as one missing update means a correct filter cannot be produced.
         Disable(coord->filter_mem_tracker_);
-      } else if (bloom_filter_.always_false()) {
-        int64_t heap_space = sidecar_slice.size();
-        if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
-          VLOG_QUERY << "Not enough memory to allocate filter: "
-                     << PrettyPrinter::Print(heap_space, TUnit::BYTES)
-                     << " (query_id=" << PrintId(coord->query_id()) << ")";
-          // Disable, as one missing update means a correct filter cannot be produced.
-          Disable(coord->filter_mem_tracker_);
-        } else {
-          bloom_filter_ = params.bloom_filter();
-          bloom_filter_directory_ = sidecar_slice.ToString();
-        }
       } else {
-        DCHECK_EQ(bloom_filter_directory_.size(), sidecar_slice.size());
-        BloomFilter::Or(params.bloom_filter(), sidecar_slice.data(), &bloom_filter_,
-            reinterpret_cast<uint8_t*>(const_cast<char*>(bloom_filter_directory_.data())),
-            sidecar_slice.size());
+        // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to
+        // move the payload from the request rather than copy it and take double the
+        // memory cost. After this point, params.bloom_filter is an empty filter and
+        // should not be read.
+        TBloomFilter* non_const_filter = &const_cast<TBloomFilter&>(params.bloom_filter);
+        swap(bloom_filter_, *non_const_filter);
+        DCHECK_EQ(non_const_filter->directory.size(), 0);
       }
+    } else {
+      BloomFilter::Or(params.bloom_filter, &bloom_filter_);
     }
   } else {
     DCHECK(is_min_max_filter());
-    DCHECK(params.has_min_max_filter());
-    if (params.min_max_filter().always_true()) {
+    DCHECK(params.__isset.min_max_filter);
+    if (params.min_max_filter.always_true) {
       Disable(coord->filter_mem_tracker_);
-    } else if (min_max_filter_.always_false()) {
-      MinMaxFilter::Copy(params.min_max_filter(), &min_max_filter_);
+    } else if (min_max_filter_.always_false) {
+      MinMaxFilter::Copy(params.min_max_filter, &min_max_filter_);
     } else {
-      MinMaxFilter::Or(params.min_max_filter(), &min_max_filter_,
+      MinMaxFilter::Or(params.min_max_filter, &min_max_filter_,
           ColumnType::FromThrift(desc_.src_expr.nodes[0].type));
     }
   }
@@ -1172,15 +1149,15 @@ void Coordinator::FilterState::ApplyUpdate(
 
 void Coordinator::FilterState::Disable(MemTracker* tracker) {
   if (is_bloom_filter()) {
-    bloom_filter_.set_always_true(true);
-    bloom_filter_.set_always_false(false);
-    tracker->Release(bloom_filter_directory_.size());
-    bloom_filter_directory_.clear();
-    bloom_filter_directory_.shrink_to_fit();
+    bloom_filter_.always_true = true;
+    bloom_filter_.always_false = false;
+    tracker->Release(bloom_filter_.directory.size());
+    bloom_filter_.directory.clear();
+    bloom_filter_.directory.shrink_to_fit();
   } else {
     DCHECK(is_min_max_filter());
-    min_max_filter_.set_always_true(true);
-    min_max_filter_.set_always_false(false);
+    min_max_filter_.always_true = true;
+    min_max_filter_.always_false = false;
   }
 }
 
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 9a35eb4..c8db6d7 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -31,19 +31,12 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
-#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/dml-exec-state.h"
 #include "util/counting-barrier.h"
 #include "util/progress-updater.h"
 #include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
 
-namespace kudu {
-namespace rpc {
-class RpcContext;
-} // namespace rpc
-} // namespace kudu
-
 namespace impala {
 
 class ClientRequestState;
@@ -178,7 +171,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// with others for the same filter ID into a global filter. If all updates for that
   /// filter ID have been received (may be 1 or more per filter), broadcast the global
   /// filter to fragment instances.
-  void UpdateFilter(const UpdateFilterParamsPB& params, kudu::rpc::RpcContext* context);
+  void UpdateFilter(const TUpdateFilterParams& params);
 
   /// Adds to 'document' a serialized array of all backends in a member named
   /// 'backend_states'.
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 743d339..5d39c56 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -122,12 +122,6 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
     stream_mgr_->CloseSender(request, response, rpc_context);
   }
 
-  virtual void UpdateFilter(
-      const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {}
-
-  virtual void PublishFilter(const PublishFilterParamsPB* req,
-      PublishFilterResultPB* resp, RpcContext* context) {}
-
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
 
  private:
diff --git a/be/src/runtime/decimal-value.h b/be/src/runtime/decimal-value.h
index ea2d126..b34cde0 100644
--- a/be/src/runtime/decimal-value.h
+++ b/be/src/runtime/decimal-value.h
@@ -22,7 +22,6 @@
 #include <ostream>
 
 #include "gen-cpp/Data_types.h"
-#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/multi-precision.h"
 #include "runtime/types.h"
 
@@ -60,8 +59,8 @@ class DecimalValue {
     return FromDouble(t.precision, t.scale, d, round, overflow);
   }
 
-  /// Returns a new DecimalValue created from the value in 'value_pb'.
-  static inline DecimalValue FromColumnValuePB(const ColumnValuePB& value_pb);
+  /// Returns a new DecimalValue created from the value in 'tvalue'.
+  static inline DecimalValue FromTColumnValue(const TColumnValue& tvalue);
 
   static inline DecimalValue FromDouble(int precision, int scale, double d,
       bool round, bool* overflow);
@@ -197,10 +196,11 @@ class DecimalValue {
 
   inline DecimalValue<T> Abs() const;
 
-  /// Store the binary representation of this DecimalValue in 'value_pb'.
-  void ToColumnValuePB(ColumnValuePB* value_pb) const {
+  /// Store the binary representation of this DecimalValue in 'tvalue'.
+  void ToTColumnValue(TColumnValue* tvalue) const {
     const uint8_t* data = reinterpret_cast<const uint8_t*>(&value_);
-    value_pb->mutable_decimal_val()->assign(data, data + sizeof(T));
+    tvalue->decimal_val.assign(data, data + sizeof(T));
+    tvalue->__isset.decimal_val = true;
   }
 
  private:
diff --git a/be/src/runtime/decimal-value.inline.h b/be/src/runtime/decimal-value.inline.h
index b2d5fc0..6480099 100644
--- a/be/src/runtime/decimal-value.inline.h
+++ b/be/src/runtime/decimal-value.inline.h
@@ -61,9 +61,9 @@ inline DecimalValue<T> DecimalValue<T>::FromDouble(int precision, int scale, dou
 }
 
 template <typename T>
-inline DecimalValue<T> DecimalValue<T>::FromColumnValuePB(const ColumnValuePB& value_pb) {
+inline DecimalValue<T> DecimalValue<T>::FromTColumnValue(const TColumnValue& tvalue) {
   T value = 0;
-  memcpy(&value, value_pb.decimal_val().c_str(), value_pb.decimal_val().length());
+  memcpy(&value, tvalue.decimal_val.c_str(), tvalue.decimal_val.length());
   return DecimalValue<T>(value);
 }
 
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index b6d2d84..e80a2a7 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -27,6 +27,7 @@
 #include "common/logging.h"
 #include "common/object-pool.h"
 #include "exec/kudu-util.h"
+#include "gen-cpp/ImpalaInternalService.h"
 #include "kudu/rpc/service_if.h"
 #include "rpc/rpc-mgr.h"
 #include "runtime/backend-client.h"
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 320e477..1b1b343 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -32,7 +32,6 @@
 #include "exec/hdfs-scan-node-base.h"
 #include "exec/exchange-node.h"
 #include "exec/scan-node.h"
-#include "kudu/rpc/rpc_context.h"
 #include "runtime/exec-env.h"
 #include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
@@ -50,7 +49,6 @@
 #include "util/periodic-counter-updater.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
-using kudu::rpc::RpcContext;
 using namespace impala;
 using namespace apache::thrift;
 
@@ -516,11 +514,10 @@ Status FragmentInstanceState::WaitForOpen() {
   return opened_promise_.Get();
 }
 
-void FragmentInstanceState::PublishFilter(
-    const PublishFilterParamsPB& params, RpcContext* context) {
+void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
   VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
-            << " filter_id=" << params.filter_id();
-  runtime_state_->filter_bank()->PublishGlobalFilter(params, context);
+            << " filter_id=" << params.filter_id;
+  runtime_state_->filter_bank()->PublishGlobalFilter(params);
 }
 
 const string& FragmentInstanceState::ExecStateToString(FInstanceExecStatePB state) {
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index bedf31f..f3aa895 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -29,7 +29,6 @@
 #include "util/promise.h"
 
 #include "gen-cpp/control_service.pb.h"
-#include "gen-cpp/data_stream_service.pb.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
 #include "runtime/row-batch.h"
@@ -37,12 +36,6 @@
 #include "util/promise.h"
 #include "util/runtime-profile.h"
 
-namespace kudu {
-namespace rpc {
-class RpcContext;
-} // namespace rpc
-} // namespace kudu
-
 namespace impala {
 
 class TPlanFragmentCtx;
@@ -95,7 +88,7 @@ class FragmentInstanceState {
   Status WaitForOpen();
 
   /// Publishes filter with ID 'filter_id' to this fragment instance's filter bank.
-  void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
+  void PublishFilter(const TPublishFilterParams& params);
 
   /// Called periodically by query state thread to get the current status of this fragment
   /// instance. The fragment instance's status is stored in 'instance_status' and its
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 5b60be0..196a74e 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -23,7 +23,6 @@
 #include "common/thread-debug-info.h"
 #include "exec/kudu-util.h"
 #include "exprs/expr.h"
-#include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/util/monotime.h"
@@ -41,7 +40,6 @@
 #include "runtime/runtime-state.h"
 #include "runtime/scanner-mem-limiter.h"
 #include "service/control-service.h"
-#include "service/data-stream-service.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
 #include "util/metrics.h"
@@ -52,6 +50,7 @@
 #include "gen-cpp/control_service.proxy.h"
 
 using kudu::MonoDelta;
+using kudu::rpc::RpcController;
 using kudu::rpc::RpcSidecar;
 
 #include "common/names.h"
@@ -675,11 +674,11 @@ void QueryState::Cancel() {
   for (auto entry: fis_map_) entry.second->Cancel();
 }
 
-void QueryState::PublishFilter(const PublishFilterParamsPB& params, RpcContext* context) {
+void QueryState::PublishFilter(const TPublishFilterParams& params) {
   if (!WaitForPrepare().ok()) return;
-  DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx()), 1);
-  for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx()]) {
-    fis->PublishFilter(params, context);
+  DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx), 1);
+  for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx]) {
+    fis->PublishFilter(params);
   }
 }
 
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 39101e1..6e95395 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -27,19 +27,12 @@
 #include "common/object-pool.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
-#include "gen-cpp/data_stream_service.pb.h"
 #include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
 #include "runtime/tmp-file-mgr.h"
 #include "util/container-util.h"
 #include "util/counting-barrier.h"
 #include "util/uid-util.h"
 
-namespace kudu {
-namespace rpc {
-class RpcContext;
-} // namespace rpc
-} // namespace kudu
-
 namespace impala {
 
 class ControlServiceProxy;
@@ -203,7 +196,7 @@ class QueryState {
       const TUniqueId& instance_id, FragmentInstanceState** fi_state);
 
   /// Blocks until all fragment instances have finished their Prepare phase.
-  void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
+  void PublishFilter(const TPublishFilterParams& params);
 
   /// Cancels all actively executing fragment instances. Blocks until all fragment
   /// instances have finished their Prepare phase. Idempotent.
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 21a0bfd..56f63aa 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -17,26 +17,19 @@
 
 #include "runtime/runtime-filter-bank.h"
 
-#include <chrono>
-
 #include <boost/algorithm/string/join.hpp>
 
 #include "gen-cpp/ImpalaInternalService_types.h"
-#include "gen-cpp/data_stream_service.proxy.h"
 #include "gutil/strings/substitute.h"
-#include "kudu/rpc/rpc_context.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/rpc/rpc_sidecar.h"
-#include "runtime/backend-client.h"
-#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/client-cache.h"
 #include "runtime/exec-env.h"
+#include "runtime/backend-client.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/initial-reservations.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
-#include "service/data-stream-service.h"
 #include "service/impala-server.h"
 #include "util/bit-util.h"
 #include "util/bloom-filter.h"
@@ -45,9 +38,6 @@
 
 #include "common/names.h"
 
-using kudu::rpc::RpcContext;
-using kudu::rpc::RpcController;
-using kudu::rpc::RpcSidecar;
 using namespace impala;
 using namespace boost;
 using namespace strings;
@@ -112,35 +102,32 @@ RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filte
   return ret;
 }
 
-void RuntimeFilterBank::UpdateFilterCompleteCb(
-    const RpcController* rpc_controller, const UpdateFilterResultPB* res) {
-  const kudu::Status controller_status = rpc_controller->status();
+namespace {
 
-  // In the case of an unsuccessful KRPC call, e.g., request dropped due to
-  // backpressure, we only log this event w/o retrying. Failing to send a
-  // filter is not a query-wide error - the remote fragment will continue
-  // regardless.
-  if (!controller_status.ok()) {
-    LOG(ERROR) << "UpdateFilter() failed: " << controller_status.message().ToString();
+/// Sends a filter to the coordinator. Executed asynchronously in the context of
+/// ExecEnv::rpc_pool().
+void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
+    ImpalaBackendClientCache* client_cache) {
+  Status status;
+  ImpalaBackendConnection coord(client_cache, address, &status);
+  if (!status.ok()) {
+    // Failing to send a filter is not a query-wide error - the remote fragment will
+    // continue regardless.
+    // TODO: Retry.
+    LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
+    return;
   }
-  // DataStreamService::UpdateFilter() should never set an error status
-  DCHECK_EQ(res->status().status_code(), TErrorCode::OK);
+  TUpdateFilterResult res;
+  status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
+}
 
-  {
-    std::unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
-    DCHECK_GT(num_inflight_rpcs_, 0);
-    --num_inflight_rpcs_;
-  }
-  krpcs_done_cv_.notify_one();
 }
 
 void RuntimeFilterBank::UpdateFilterFromLocal(
     int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
   DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
       << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
-  // This function is only called from ExecNode::Open() or more specifically
-  // PartitionedHashJoinNode::Open().
-  DCHECK(!closed_);
+  TUpdateFilterParams params;
   // A runtime filter may have both local and remote targets.
   bool has_local_target = false;
   bool has_remote_target = false;
@@ -172,108 +159,64 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
 
   if (has_remote_target
       && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
-    UpdateFilterParamsPB params;
-    // The memory associated with the following 2 objects needs to live until
-    // the asynchronous KRPC call proxy->UpdateFilterAsync() is completed.
-    // Hence, we allocate these 2 objects in 'obj_pool_'.
-    UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB);
-    RpcController* controller = obj_pool_.Add(new RpcController);
-
-    TUniqueIdToUniqueIdPB(state_->query_id(), params.mutable_query_id());
-    params.set_filter_id(filter_id);
+    params.__set_filter_id(filter_id);
+    params.__set_query_id(state_->query_id());
     if (type == TRuntimeFilterType::BLOOM) {
-      BloomFilter::ToProtobuf(bloom_filter, controller, params.mutable_bloom_filter());
+      BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
+      params.__isset.bloom_filter = true;
     } else {
-      DCHECK_EQ(type, TRuntimeFilterType::MIN_MAX);
-      min_max_filter->ToProtobuf(params.mutable_min_max_filter());
-    }
-    const TNetworkAddress& krpc_address = state_->query_ctx().coord_krpc_address;
-    const TNetworkAddress& host_address = state_->query_ctx().coord_address;
-
-    // Use 'proxy' to send the filter to the coordinator.
-    unique_ptr<DataStreamServiceProxy> proxy;
-    Status get_proxy_status =
-        DataStreamService::GetProxy(krpc_address, host_address.hostname, &proxy);
-    if (!get_proxy_status.ok()) {
-      // Failing to send a filter is not a query-wide error - the remote fragment will
-      // continue regardless.
-      LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1",
-          host_address.hostname, get_proxy_status.msg().msg());
-      return;
+      DCHECK(type == TRuntimeFilterType::MIN_MAX);
+      min_max_filter->ToThrift(&params.min_max_filter);
+      params.__isset.min_max_filter = true;
     }
 
-    // Increment 'num_inflight_rpcs_' to make sure that the filter will not be deallocated
-    // in Close() until all in-flight RPCs complete.
-    {
-      unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
-      DCHECK_GE(num_inflight_rpcs_, 0);
-      ++num_inflight_rpcs_;
-    }
-
-    proxy->UpdateFilterAsync(params, res, controller,
-        boost::bind(&RuntimeFilterBank::UpdateFilterCompleteCb, this, controller, res));
+    ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
+        SendFilterToCoordinator, state_->query_ctx().coord_address, params,
+        ExecEnv::GetInstance()->impalad_client_cache()));
   }
 }
 
-void RuntimeFilterBank::PublishGlobalFilter(
-    const PublishFilterParamsPB& params, RpcContext* context) {
+void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params) {
   lock_guard<mutex> l(runtime_filter_lock_);
   if (closed_) return;
-  RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id());
+  RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id);
   DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
-                                        << params.filter_id();
+                                        << params.filter_id;
 
   BloomFilter* bloom_filter = nullptr;
   MinMaxFilter* min_max_filter = nullptr;
   if (it->second->is_bloom_filter()) {
-    DCHECK(params.has_bloom_filter());
-    if (params.bloom_filter().always_true()) {
+    DCHECK(params.__isset.bloom_filter);
+    if (params.bloom_filter.always_true) {
       bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
     } else {
-      int64_t required_space = BloomFilter::GetExpectedMemoryUsed(
-          params.bloom_filter().log_bufferpool_space());
+      int64_t required_space =
+          BloomFilter::GetExpectedMemoryUsed(params.bloom_filter.log_bufferpool_space);
       DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
           << "BufferPool Client should have enough reservation to fulfill bloom filter "
              "allocation";
       bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
-
-      kudu::Slice sidecar_slice;
-      if (params.bloom_filter().has_directory_sidecar_idx()) {
-        kudu::Status status = context->GetInboundSidecar(
-            params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
-        if (!status.ok()) {
-          LOG(ERROR) << "Failed to get Bloom filter sidecar: "
-                     << status.message().ToString();
-          bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
-        }
+      Status status = bloom_filter->Init(params.bloom_filter);
+      if (!status.ok()) {
+        LOG(ERROR) << "Unable to allocate memory for bloom filter: "
+                   << status.GetDetail();
+        bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
       } else {
-        DCHECK(params.bloom_filter().always_false());
-      }
-
-      if (bloom_filter != BloomFilter::ALWAYS_TRUE_FILTER) {
-        Status status = bloom_filter->Init(
-            params.bloom_filter(), sidecar_slice.data(), sidecar_slice.size());
-        if (!status.ok()) {
-          LOG(ERROR) << "Unable to allocate memory for bloom filter: "
-                     << status.GetDetail();
-          bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
-        } else {
-          bloom_filters_.push_back(bloom_filter);
-          DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
-          bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
-        }
+        bloom_filters_.push_back(bloom_filter);
+        DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
+        bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
       }
     }
   } else {
     DCHECK(it->second->is_min_max_filter());
-    DCHECK(params.has_min_max_filter());
-    min_max_filter = MinMaxFilter::Create(params.min_max_filter(), it->second->type(),
-        &obj_pool_, filter_mem_tracker_.get());
+    DCHECK(params.__isset.min_max_filter);
+    min_max_filter = MinMaxFilter::Create(
+        params.min_max_filter, it->second->type(), &obj_pool_, filter_mem_tracker_.get());
     min_max_filters_.push_back(min_max_filter);
   }
   it->second->SetFilter(bloom_filter, min_max_filter);
   state_->runtime_profile()->AddInfoString(
-      Substitute("Filter $0 arrival", params.filter_id()),
+      Substitute("Filter $0 arrival", params.filter_id),
       PrettyPrinter::Print(it->second->arrival_delay_ms(), TUnit::TIME_MS));
 }
 
@@ -335,21 +278,8 @@ void RuntimeFilterBank::CancelLocked() {
 }
 
 void RuntimeFilterBank::Close() {
-  // Wait for all in-flight RPCs to complete before closing the filters.
-  {
-    unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
-    while (num_inflight_rpcs_ > 0) {
-      krpcs_done_cv_.wait(l1);
-    }
-  }
-
-  lock_guard<mutex> l2(runtime_filter_lock_);
+  lock_guard<mutex> l(runtime_filter_lock_);
   CancelLocked();
-  // We do not have to set 'closed_' to true before waiting for all in-flight RPCs to
-  // drain because the async build thread in
-  // BlockingJoinNode::ProcessBuildInputAndOpenProbe() should have exited by the time
-  // Close() is called so there shouldn't be any new RPCs being issued when this function
-  // is called.
   closed_ = true;
   for (BloomFilter* filter : bloom_filters_) filter->Close();
   for (MinMaxFilter* filter : min_max_filters_) filter->Close();
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 78a95cf..1208f03 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -20,7 +20,6 @@
 
 #include "codegen/impala-ir.h"
 #include "common/object-pool.h"
-#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/mem-pool.h"
 #include "runtime/types.h"
@@ -30,15 +29,6 @@
 #include <boost/thread/lock_guard.hpp>
 #include <boost/unordered_map.hpp>
 
-#include <condition_variable>
-
-namespace kudu {
-namespace rpc {
-class RpcContext;
-class RpcController;
-} // namespace rpc
-} // namespace kudu
-
 namespace impala {
 
 class BloomFilter;
@@ -46,6 +36,7 @@ class MemTracker;
 class MinMaxFilter;
 class RuntimeFilter;
 class RuntimeState;
+class TBloomFilter;
 class TRuntimeFilterDesc;
 class TQueryCtx;
 
@@ -103,8 +94,7 @@ class RuntimeFilterBank {
 
   /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
   /// consumption by operators that wish to use it for filtering.
-  void PublishGlobalFilter(
-      const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
+  void PublishGlobalFilter(const TPublishFilterParams& params);
 
   /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
   /// 'filter_size' would have an expected false-positive rate which would exceed
@@ -160,16 +150,6 @@ class RuntimeFilterBank {
   /// All filters expected to be consumed by the local plan fragment instance.
   RuntimeFilterMap consumed_filters_;
 
-  /// Lock protecting 'num_inflight_rpcs_' and it should not be taken at the same
-  /// time as runtime_filter_lock_.
-  SpinLock num_inflight_rpcs_lock_;
-  /// Use 'num_inflight_rpcs_' to keep track of the number of current in-flight
-  /// KRPC calls to prevent the memory pointed to by a BloomFilter* being
-  /// deallocated in RuntimeFilterBank::Close() before all KRPC calls have
-  /// been completed.
-  int32_t num_inflight_rpcs_ = 0;
-  std::condition_variable_any krpcs_done_cv_;
-
   /// Fragment instance's runtime state.
   RuntimeState* state_;
 
@@ -204,11 +184,6 @@ class RuntimeFilterBank {
   /// in ClaimBufferReservation(). Reservations are returned to the initial reservations
   /// pool in Close().
   BufferPool::ClientHandle buffer_pool_client_;
-
-  /// This is the callback for the asynchronous rpc UpdateFilterAsync() in
-  /// UpdateFilterFromLocal().
-  void UpdateFilterCompleteCb(
-      const kudu::rpc::RpcController* rpc_controller, const UpdateFilterResultPB* res);
 };
 
 }
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 618788c..5b65f7a 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -67,8 +67,7 @@ class RuntimeFilter {
 
   MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); }
 
-  /// Sets the internal filter bloom_filter to 'bloom_filter' or 'min_max_filter'
-  /// depending on the type of this RuntimeFilter. Can only legally be called
+  /// Sets the internal filter bloom_filter to 'bloom_filter'. Can only legally be called
   /// once per filter. Does not acquire the memory associated with 'bloom_filter'.
   void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
 
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index 4eb5c37..2b2ce79 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -27,7 +27,6 @@
 
 #include "common/global-types.h"
 #include "gen-cpp/Data_types.h"
-#include "gen-cpp/data_stream_service.pb.h"
 #include "udf/udf.h"
 #include "util/hash-util.h"
 
@@ -167,16 +166,17 @@ class TimestampValue {
     *ptp = boost::posix_time::ptime(date_, time_);
   }
 
-  // Store the binary representation of this TimestampValue in 'pvalue'.
-  void ToColumnValuePB(ColumnValuePB* pvalue) const {
+  // Store the binary representation of this TimestampValue in 'tvalue'.
+  void ToTColumnValue(TColumnValue* tvalue) const {
     const uint8_t* data = reinterpret_cast<const uint8_t*>(this);
-    pvalue->mutable_timestamp_val()->assign(data, data + Size());
+    tvalue->timestamp_val.assign(data, data + Size());
+    tvalue->__isset.timestamp_val = true;
   }
 
-  // Returns a new TimestampValue created from the value in 'value_pb'.
-  static TimestampValue FromColumnValuePB(const ColumnValuePB& value_pb) {
+  // Returns a new TimestampValue created from the value in 'tvalue'.
+  static TimestampValue FromTColumnValue(const TColumnValue& tvalue) {
     TimestampValue value;
-    memcpy(&value, value_pb.timestamp_val().c_str(), Size());
+    memcpy(&value, tvalue.timestamp_val.c_str(), Size());
     value.Validate();
     return value;
   }
diff --git a/be/src/scheduling/request-pool-service.h b/be/src/scheduling/request-pool-service.h
index ad38900..02642c4 100644
--- a/be/src/scheduling/request-pool-service.h
+++ b/be/src/scheduling/request-pool-service.h
@@ -20,6 +20,7 @@
 
 #include <jni.h>
 
+#include "gen-cpp/ImpalaInternalService.h"
 #include "common/status.h"
 #include "util/metrics-fwd.h"
 
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 31b4c57..13bb433 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1366,10 +1366,9 @@ Status ClientRequestState::UpdateBackendExecStatus(
   return coord_->UpdateBackendExecStatus(request, thrift_profiles);
 }
 
-void ClientRequestState::UpdateFilter(
-    const UpdateFilterParamsPB& params, RpcContext* context) {
+void ClientRequestState::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK(coord_.get());
-  coord_->UpdateFilter(params, context);
+  coord_->UpdateFilter(params);
 }
 
 bool ClientRequestState::GetDmlStats(TDmlResult* dml_result, Status* query_status) {
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index cf71a13..97ea18c 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -176,7 +176,7 @@ class ClientRequestState {
   /// object.
   Status UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
       const TRuntimeProfileForest& thrift_profiles) WARN_UNUSED_RESULT;
-  void UpdateFilter(const UpdateFilterParamsPB& params, kudu::rpc::RpcContext* context);
+  void UpdateFilter(const TUpdateFilterParams& params);
 
   /// Populate DML stats in 'dml_result' if this request succeeded.
   /// Sets 'query_status' to the overall query status.
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 2e57d02..890ceec 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -29,9 +29,7 @@
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
-#include "service/impala-server.h"
 #include "util/memory-metrics.h"
 #include "util/parse-util.h"
 #include "testutil/fault-injection-util.h"
@@ -109,38 +107,6 @@ void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
   ExecEnv::GetInstance()->stream_mgr()->AddData(request, response, rpc_context);
 }
 
-void DataStreamService::UpdateFilter(
-    const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {
-  // This failpoint is to allow jitter to be injected.
-  DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
-  DCHECK(req->has_filter_id());
-  DCHECK(req->has_query_id());
-  DCHECK(req->has_bloom_filter() || req->has_min_max_filter());
-  ExecEnv::GetInstance()->impala_server()->UpdateFilter(resp, *req, context);
-  RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
-}
-
-void DataStreamService::PublishFilter(
-    const PublishFilterParamsPB* req, PublishFilterResultPB* resp, RpcContext* context) {
-  // This failpoint is to allow jitter to be injected.
-  DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
-  DCHECK(req->has_filter_id());
-  DCHECK(req->has_dst_query_id());
-  DCHECK(req->has_dst_fragment_idx());
-  DCHECK(req->has_bloom_filter() || req->has_min_max_filter());
-  QueryState::ScopedRef qs(ProtoToQueryId(req->dst_query_id()));
-
-  if (qs.get() != nullptr) {
-    qs->PublishFilter(*req, context);
-    RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
-  } else {
-    string err_msg = Substitute("Query State not found for query_id=$0",
-        PrintId(ProtoToQueryId(req->dst_query_id())));
-    LOG(INFO) << err_msg;
-    RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
-  }
-}
-
 template<typename ResponsePBType>
 void DataStreamService::RespondRpc(const Status& status,
     ResponsePBType* response, kudu::rpc::RpcContext* ctx) {
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index 24a2249..539974c 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -63,16 +63,6 @@ class DataStreamService : public DataStreamServiceIf {
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, kudu::rpc::RpcContext* context);
 
-  /// Called by fragment instances that produce local runtime filters to deliver them to
-  /// the coordinator for aggregation and broadcast.
-  virtual void UpdateFilter(const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp,
-      kudu::rpc::RpcContext* context);
-
-  /// Called by the coordinator to deliver global runtime filters to fragments for
-  /// application at plan nodes.
-  virtual void PublishFilter(const PublishFilterParamsPB* req,
-      PublishFilterResultPB* resp, kudu::rpc::RpcContext* context);
-
   /// Respond to a RPC passed in 'response'/'ctx' with 'status' and release
   /// the payload memory from 'mem_tracker'. Takes ownership of 'ctx'.
   template<typename ResponsePBType>
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index 69a5736..d703406 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -22,6 +22,7 @@
 
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/ImpalaHiveServer2Service.h"
+#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/LineageGraph_types.h"
 #include "common/status.h"
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index 106c75e..ccd00a8 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -44,3 +44,24 @@ template <typename T> void SetUnknownIdError(
       Substitute("Unknown $0 id: $1", id_type, PrintId(id))));
   status.SetTStatus(status_container);
 }
+
+void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
+    const TUpdateFilterParams& params) {
+  DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
+  DCHECK(params.__isset.filter_id);
+  DCHECK(params.__isset.query_id);
+  DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
+  impala_server_->UpdateFilter(return_val, params);
+}
+
+void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
+    const TPublishFilterParams& params) {
+  DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
+  DCHECK(params.__isset.filter_id);
+  DCHECK(params.__isset.dst_query_id);
+  DCHECK(params.__isset.dst_fragment_idx);
+  DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
+  QueryState::ScopedRef qs(params.dst_query_id);
+  if (qs.get() == nullptr) return;
+  qs->PublishFilter(params);
+}
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index 425678b..c75122b 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -30,6 +30,10 @@ class ImpalaServer;
 class ImpalaInternalService : public ImpalaInternalServiceIf {
  public:
   ImpalaInternalService();
+  virtual void UpdateFilter(TUpdateFilterResult& return_val,
+      const TUpdateFilterParams& params);
+  virtual void PublishFilter(TPublishFilterResult& return_val,
+      const TPublishFilterParams& params);
 
  private:
   ImpalaServer* impala_server_;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index f8ab068..6d0a20a 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -53,7 +53,6 @@
 #include "exec/external-data-source-executor.h"
 #include "exprs/timezone_db.h"
 #include "gen-cpp/CatalogService_constants.h"
-#include "kudu/rpc/rpc_context.h"
 #include "kudu/util/random_util.h"
 #include "rpc/authentication.h"
 #include "rpc/rpc-trace.h"
@@ -96,6 +95,7 @@
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/DataSinks_types.h"
 #include "gen-cpp/ImpalaService_types.h"
+#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/LineageGraph_types.h"
 #include "gen-cpp/Frontend_types.h"
 
@@ -113,7 +113,6 @@ using boost::system_time;
 using boost::uuids::random_generator;
 using boost::uuids::uuid;
 using kudu::GetRandomSeed32;
-using kudu::rpc::RpcContext;
 using namespace apache::hive::service::cli::thrift;
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
@@ -2546,18 +2545,17 @@ Status ImpalaServer::CheckClientRequestSession(
   return Status::OK();
 }
 
-void ImpalaServer::UpdateFilter(UpdateFilterResultPB* result,
-    const UpdateFilterParamsPB& params, RpcContext* context) {
-  DCHECK(params.has_query_id());
-  DCHECK(params.has_filter_id());
+void ImpalaServer::UpdateFilter(TUpdateFilterResult& result,
+    const TUpdateFilterParams& params) {
+  DCHECK(params.__isset.query_id);
+  DCHECK(params.__isset.filter_id);
   shared_ptr<ClientRequestState> client_request_state =
-      GetClientRequestState(ProtoToQueryId(params.query_id()));
+      GetClientRequestState(params.query_id);
   if (client_request_state.get() == nullptr) {
-    LOG(INFO) << "Could not find client request state: "
-              << PrintId(ProtoToQueryId(params.query_id()));
+    LOG(INFO) << "Could not find client request state: " << PrintId(params.query_id);
     return;
   }
-  client_request_state->UpdateFilter(params, context);
+  client_request_state->UpdateFilter(params);
 }
 
 Status ImpalaServer::CheckNotShuttingDown() const {
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 7ed0f34..cd66c9a 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -49,12 +49,6 @@
 #include "util/thread-pool.h"
 #include "util/time.h"
 
-namespace kudu {
-namespace rpc {
-class RpcContext;
-} // namespace rpc
-} // namespace kudu
-
 namespace impala {
 using kudu::ThreadSafeRandom;
 
@@ -63,7 +57,12 @@ class DataSink;
 class CancellationWork;
 class ImpalaHttpHandler;
 class RowDescriptor;
+class TCatalogUpdate;
+class TPlanExecRequest;
+class TPlanExecParams;
 class TDmlResult;
+class TReportExecStatusArgs;
+class TReportExecStatusResult;
 class TNetworkAddress;
 class TClientRequest;
 class TExecRequest;
@@ -346,8 +345,8 @@ class ImpalaServer : public ImpalaServiceIf,
   virtual void CloseImpalaOperation(
       TCloseImpalaOperationResp& return_val, const TCloseImpalaOperationReq& request);
 
-  void UpdateFilter(UpdateFilterResultPB* return_val, const UpdateFilterParamsPB& params,
-      kudu::rpc::RpcContext* context);
+  /// ImpalaInternalService rpcs
+  void UpdateFilter(TUpdateFilterResult& return_val, const TUpdateFilterParams& params);
 
   /// Generates a unique id for this query and sets it in the given query context.
   /// Prepares the given query context by populating fields required for evaluating
diff --git a/be/src/util/bloom-filter-test.cc b/be/src/util/bloom-filter-test.cc
index 7a7d27f..e8e7e2e 100644
--- a/be/src/util/bloom-filter-test.cc
+++ b/be/src/util/bloom-filter-test.cc
@@ -21,7 +21,6 @@
 #include <unordered_set>
 #include <vector>
 
-#include "kudu/rpc/rpc_controller.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/mem-tracker.h"
@@ -29,14 +28,11 @@
 #include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 
-#include "gen-cpp/data_stream_service.pb.h"
-
 using namespace std;
 
-using namespace impala;
-using kudu::rpc::RpcController;
+namespace {
 
-namespace bloom_filter_test_util {
+using namespace impala;
 
 // Make a random uint64_t, avoiding the absent high bit and the low-entropy low bits
 // produced by rand().
@@ -72,47 +68,24 @@ bool BfFind(BloomFilter& bf, uint32_t h) {
 // Computes union of 'x' and 'y'. Computes twice with AVX enabled and disabled and
 // verifies both produce the same result. 'success' is set to true if both union
 // computations returned the same result and set to false otherwise.
-void BfUnion(const BloomFilter& x, const BloomFilter& y, int64_t directory_size,
-    bool* success, BloomFilterPB* protobuf, std::string* directory) {
-  BloomFilterPB protobuf_x, protobuf_y;
-  RpcController controller_x;
-  RpcController controller_y;
-  BloomFilter::ToProtobuf(&x, &controller_x, &protobuf_x);
-  BloomFilter::ToProtobuf(&y, &controller_y, &protobuf_y);
-
-  string directory_x(reinterpret_cast<const char*>(x.directory_), directory_size);
-  string directory_y(reinterpret_cast<const char*>(y.directory_), directory_size);
-
-  BloomFilter::Or(protobuf_x, reinterpret_cast<const uint8_t*>(directory_x.data()),
-      &protobuf_y, reinterpret_cast<uint8_t*>(const_cast<char*>(directory_y.data())),
-      directory_size);
-
+TBloomFilter BfUnion(const BloomFilter& x, const BloomFilter& y, bool* success) {
+  TBloomFilter thrift_x, thrift_y;
+  BloomFilter::ToThrift(&x, &thrift_x);
+  BloomFilter::ToThrift(&y, &thrift_y);
+  BloomFilter::Or(thrift_x, &thrift_y);
   {
     CpuInfo::TempDisable t1(CpuInfo::AVX);
     CpuInfo::TempDisable t2(CpuInfo::AVX2);
-    BloomFilterPB protobuf_x2, protobuf_y2;
-    RpcController controller_x2;
-    RpcController controller_y2;
-    BloomFilter::ToProtobuf(&x, &controller_x2, &protobuf_x2);
-    BloomFilter::ToProtobuf(&y, &controller_y2, &protobuf_y2);
-
-    string directory_x2(reinterpret_cast<const char*>(x.directory_), directory_size);
-    string directory_y2(reinterpret_cast<const char*>(y.directory_), directory_size);
-
-    BloomFilter::Or(protobuf_x2, reinterpret_cast<const uint8_t*>(directory_x2.data()),
-        &protobuf_y2, reinterpret_cast<uint8_t*>(const_cast<char*>(directory_y2.data())),
-        directory_size);
-
-    *success = directory_y.compare(directory_y2) == 0;
+    TBloomFilter thrift_x2, thrift_y2;
+    BloomFilter::ToThrift(&x, &thrift_x2);
+    BloomFilter::ToThrift(&y, &thrift_y2);
+    BloomFilter::Or(thrift_x2, &thrift_y2);
+    *success = thrift_y.directory == thrift_y2.directory;
   }
-
-  *protobuf = protobuf_y;
-  *directory = directory_y;
+  return thrift_y;
 }
 
-} // namespace bloom_filter_test_util
-
-using namespace bloom_filter_test_util;
+}  // namespace
 
 namespace impala {
 
@@ -231,15 +204,12 @@ class BloomFilterTest : public testing::Test {
     return bloom_filter;
   }
 
-  BloomFilter* CreateBloomFilter(BloomFilterPB filter_pb, const std::string& directory) {
+  BloomFilter* CreateBloomFilter(TBloomFilter t_filter) {
     int64_t filter_size =
-        BloomFilter::GetExpectedMemoryUsed(filter_pb.log_bufferpool_space());
+        BloomFilter::GetExpectedMemoryUsed(t_filter.log_bufferpool_space);
     EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
     BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
-
-    EXPECT_OK(bloom_filter->Init(
-        filter_pb, reinterpret_cast<const uint8_t*>(directory.data()), directory.size()));
-
+    EXPECT_OK(bloom_filter->Init(t_filter));
     bloom_filters_.push_back(bloom_filter);
     EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
     return bloom_filter;
@@ -341,7 +311,7 @@ TEST_F(BloomFilterTest, FindInvalid) {
   }
 }
 
-TEST_F(BloomFilterTest, Protobuf) {
+TEST_F(BloomFilterTest, Thrift) {
   BloomFilter* bf = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
   for (int i = 0; i < 10; ++i) BfInsert(*bf, i);
   // Check no unexpected new false positives.
@@ -350,27 +320,19 @@ TEST_F(BloomFilterTest, Protobuf) {
     if (!BfFind(*bf, i)) missing_ints.insert(i);
   }
 
-  BloomFilterPB to_protobuf;
-
-  RpcController controller;
-  BloomFilter::ToProtobuf(bf, &controller, &to_protobuf);
-
-  EXPECT_EQ(to_protobuf.always_true(), false);
-
-  std::string directory(reinterpret_cast<const char*>(bf->directory_),
-      BloomFilter::GetExpectedMemoryUsed(BloomFilter::MinLogSpace(100, 0.01)));
+  TBloomFilter to_thrift;
+  BloomFilter::ToThrift(bf, &to_thrift);
+  EXPECT_EQ(to_thrift.always_true, false);
 
-  BloomFilter* from_protobuf = CreateBloomFilter(to_protobuf, directory);
+  BloomFilter* from_thrift = CreateBloomFilter(to_thrift);
+  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_thrift, i));
+  for (int missing: missing_ints) ASSERT_FALSE(BfFind(*from_thrift, missing));
 
-  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_protobuf, i));
-  for (int missing : missing_ints) ASSERT_FALSE(BfFind(*from_protobuf, missing));
-
-  RpcController controller_2;
-  BloomFilter::ToProtobuf(nullptr, &controller_2, &to_protobuf);
-  EXPECT_EQ(to_protobuf.always_true(), true);
+  BloomFilter::ToThrift(NULL, &to_thrift);
+  EXPECT_EQ(to_thrift.always_true, true);
 }
 
-TEST_F(BloomFilterTest, ProtobufOr) {
+TEST_F(BloomFilterTest, ThriftOr) {
   BloomFilter* bf1 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
   BloomFilter* bf2 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
 
@@ -378,15 +340,7 @@ TEST_F(BloomFilterTest, ProtobufOr) {
   for (int i = 0; i < 10; ++i) BfInsert(*bf1, i);
 
   bool success;
-  BloomFilterPB protobuf;
-  std::string directory;
-  int64_t directory_size =
-      BloomFilter::GetExpectedMemoryUsed(BloomFilter::MinLogSpace(100, 0.01));
-
-  BfUnion(*bf1, *bf2, directory_size, &success, &protobuf, &directory);
-
-  BloomFilter* bf3 = CreateBloomFilter(protobuf, directory);
-
+  BloomFilter *bf3 = CreateBloomFilter(BfUnion(*bf1, *bf2, &success));
   ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
   for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
   for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
@@ -394,10 +348,8 @@ TEST_F(BloomFilterTest, ProtobufOr) {
   // Insert another value to aggregated BloomFilter.
   for (int i = 11; i < 50; ++i) BfInsert(*bf3, i);
 
-  // Apply BloomFilterPB back to BloomFilter and verify if aggregation was correct.
-  BfUnion(*bf1, *bf3, directory_size, &success, &protobuf, &directory);
-  BloomFilter* bf4 = CreateBloomFilter(protobuf, directory);
-
+  // Apply TBloomFilter back to BloomFilter and verify if aggregation was correct.
+  BloomFilter *bf4 = CreateBloomFilter(BfUnion(*bf1, *bf3, &success));
   ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
   for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
   for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index c41c81b..0a2f8fc 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -17,8 +17,6 @@
 
 #include "util/bloom-filter.h"
 
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/rpc/rpc_sidecar.h"
 #include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
 
@@ -57,13 +55,12 @@ Status BloomFilter::Init(const int log_bufferpool_space) {
   return Status::OK();
 }
 
-Status BloomFilter::Init(const BloomFilterPB& protobuf, const uint8_t* directory_in,
-    size_t directory_in_size) {
-  RETURN_IF_ERROR(Init(protobuf.log_bufferpool_space()));
-  if (directory_ != nullptr && !protobuf.always_false()) {
+Status BloomFilter::Init(const TBloomFilter& thrift) {
+  RETURN_IF_ERROR(Init(thrift.log_bufferpool_space));
+  if (directory_ != nullptr && !thrift.always_false) {
     always_false_ = false;
-    DCHECK_EQ(directory_in_size, directory_size());
-    memcpy(directory_, directory_in, directory_in_size);
+    DCHECK_EQ(thrift.directory.size(), directory_size());
+    memcpy(directory_, &thrift.directory[0], thrift.directory.size());
   }
   return Status::OK();
 }
@@ -76,66 +73,32 @@ void BloomFilter::Close() {
   }
 }
 
-void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
-    kudu::rpc::RpcController* controller, const char* directory,
-    unsigned long directory_size) {
-  DCHECK(rpc_params != nullptr);
-  DCHECK(!rpc_params->always_false());
-  DCHECK(!rpc_params->always_true());
-  kudu::Slice dir_slice(directory, directory_size);
-  unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar =
-      kudu::rpc::RpcSidecar::FromSlice(dir_slice);
-
-  int sidecar_idx = -1;
-  kudu::Status sidecar_status =
-      controller->AddOutboundSidecar(std::move(rpc_sidecar), &sidecar_idx);
-  if (!sidecar_status.ok()) {
-    LOG(ERROR) << "Cannot add outbound sidecar: " << sidecar_status.message().ToString();
-    // If AddOutboundSidecar() fails, we 'disable' the BloomFilterPB by setting it to
-    // an always true filter.
-    rpc_params->set_always_false(false);
-    rpc_params->set_always_true(true);
-    return;
-  }
-  rpc_params->set_directory_sidecar_idx(sidecar_idx);
-  rpc_params->set_always_false(false);
-  rpc_params->set_always_true(false);
-}
-
-void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
-    kudu::rpc::RpcController* controller, const string& directory) {
-      AddDirectorySidecar(rpc_params, controller,
-      reinterpret_cast<const char*>(&(directory[0])),
-      static_cast<unsigned long>(directory.size()));
-}
-
-void BloomFilter::ToProtobuf(
-    BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const {
-  protobuf->set_log_bufferpool_space(log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
+void BloomFilter::ToThrift(TBloomFilter* thrift) const {
+  thrift->log_bufferpool_space = log_num_buckets_ + LOG_BUCKET_BYTE_SIZE;
   if (always_false_) {
-    protobuf->set_always_false(true);
-    protobuf->set_always_true(false);
+    thrift->always_false = true;
+    thrift->always_true = false;
     return;
   }
-  BloomFilter::AddDirectorySidecar(protobuf, controller,
-      reinterpret_cast<const char*>(directory_),
+  thrift->directory.assign(reinterpret_cast<const char*>(directory_),
       static_cast<unsigned long>(directory_size()));
+  thrift->always_false = false;
+  thrift->always_true = false;
 }
 
-void BloomFilter::ToProtobuf(const BloomFilter* filter,
-    kudu::rpc::RpcController* controller, BloomFilterPB* protobuf) {
-  DCHECK(protobuf != nullptr);
-  // If filter == nullptr, then this BloomFilter is an always true filter.
+void BloomFilter::ToThrift(const BloomFilter* filter, TBloomFilter* thrift) {
+  DCHECK(thrift != nullptr);
   if (filter == nullptr) {
-    protobuf->set_always_true(true);
-    DCHECK(!protobuf->always_false());
+    thrift->always_true = true;
+    DCHECK_EQ(thrift->always_false, false);
     return;
   }
-  filter->ToProtobuf(protobuf, controller);
+  filter->ToThrift(thrift);
 }
 
 // The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. However, we
 // compile with -fno-strict-aliasing.
+
 void BloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept {
   // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is
   // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second
@@ -221,17 +184,20 @@ OrEqualArrayAvx(size_t n, const char* __restrict__ in, char* __restrict__ out) {
 }
 } //namespace
 
-void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in,
-    BloomFilterPB* out, uint8_t* directory_out, size_t directory_size) {
+void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
   DCHECK(out != nullptr);
   DCHECK(&in != out);
   // These cases are impossible in current code. If they become possible in the future,
   // memory usage should be tracked accordingly.
-  DCHECK(!out->always_false());
-  DCHECK(!out->always_true());
-  DCHECK(!in.always_true());
-  if (in.always_false()) return;
-  DCHECK_EQ(in.log_bufferpool_space(), out->log_bufferpool_space());
+  DCHECK(!out->always_false);
+  DCHECK(!out->always_true);
+  DCHECK(!in.always_true);
+  if (in.always_false) return;
+  DCHECK_EQ(in.log_bufferpool_space, out->log_bufferpool_space);
+  DCHECK_EQ(in.directory.size(), out->directory.size())
+      << "Equal log heap space " << in.log_bufferpool_space
+      << ", but different directory sizes: " << in.directory.size() << ", "
+      << out->directory.size();
   // The trivial loop out[i] |= in[i] should auto-vectorize with gcc at -O3, but it is not
   // written in a way that is very friendly to auto-vectorization. Instead, we manually
   // vectorize, increasing the speed by up to 56x.
@@ -239,14 +205,13 @@ void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in,
   // TODO: Tune gcc flags to auto-vectorize the trivial loop instead of hand-vectorizing
   // it. This might not be possible.
   if (CpuInfo::IsSupported(CpuInfo::AVX)) {
-    OrEqualArrayAvx(directory_size, reinterpret_cast<const char*>(directory_in),
-        reinterpret_cast<char*>(directory_out));
+    OrEqualArrayAvx(in.directory.size(), &in.directory[0], &out->directory[0]);
   } else {
-    const __m128i* simd_in = reinterpret_cast<const __m128i*>(directory_in);
+    const __m128i* simd_in = reinterpret_cast<const __m128i*>(&in.directory[0]);
     const __m128i* const simd_in_end =
-        reinterpret_cast<const __m128i*>(directory_in + directory_size);
-    __m128i* simd_out = reinterpret_cast<__m128i*>(directory_out);
-    // directory_in has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
+        reinterpret_cast<const __m128i*>(&in.directory[0] + in.directory.size());
+    __m128i* simd_out = reinterpret_cast<__m128i*>(&out->directory[0]);
+    // in.directory has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
     // == 16, we can do two _mm_or_si128's in each iteration without checking array
     // bounds.
     while (simd_in != simd_in_end) {
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 9c22120..73cb01e 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -27,35 +27,11 @@
 
 #include "common/compiler-util.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
-#include "gen-cpp/data_stream_service.pb.h"
 #include "gutil/macros.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "util/cpu-info.h"
 #include "util/hash-util.h"
 
-namespace kudu {
-namespace rpc {
-class RpcController;
-} // namespace rpc
-} // namespace kudu
-
-namespace impala {
-class BloomFilter;
-} // namespace impala
-
-// Need this forward declaration since we make bloom_filter_test_util::BfUnion() a friend
-// function.
-namespace bloom_filter_test_util {
-void BfUnion(const impala::BloomFilter& x, const impala::BloomFilter& y,
-    int64_t directory_size, bool* success, impala::BloomFilterPB* protobuf,
-    std::string* directory);
-} // namespace bloom_filter_test_util
-
-// Need this forward declaration since we make either::TestData a friend struct.
-namespace either {
-struct TestData;
-} // namespace either
-
 namespace impala {
 
 /// A BloomFilter stores sets of items and offers a query operation indicating whether or
@@ -90,21 +66,17 @@ class BloomFilter {
 
   /// Reset the filter state, allocate/reallocate and initialize the 'directory_'. All
   /// calls to Insert() and Find() should only be done between the calls to Init() and
-  /// Close(). Init and Close are safe to call multiple times.
+  /// Close().Init and Close are safe to call multiple times.
   Status Init(const int log_bufferpool_space);
-  Status Init(const BloomFilterPB& protobuf, const uint8_t* directory_in,
-      size_t directory_in_size);
+  Status Init(const TBloomFilter& thrift);
   void Close();
 
   /// Representation of a filter which allows all elements to pass.
   static constexpr BloomFilter* const ALWAYS_TRUE_FILTER = NULL;
 
-  /// Converts 'filter' to its corresponding Protobuf representation.
-  /// If the first argument is NULL, it is interpreted as a complete filter which
-  /// contains all elements.
-  /// Also sets a sidecar on 'controller' containing the Bloom filter's directory.
-  static void ToProtobuf(const BloomFilter* filter, kudu::rpc::RpcController* controller,
-      BloomFilterPB* protobuf);
+  /// Converts 'filter' to its corresponding Thrift representation. If the first argument
+  /// is NULL, it is interpreted as a complete filter which contains all elements.
+  static void ToThrift(const BloomFilter* filter, TBloomFilter* thrift);
 
   bool AlwaysFalse() const { return always_false_; }
 
@@ -119,12 +91,8 @@ class BloomFilter {
   /// high probabilty) if it is not.
   bool Find(const uint32_t hash) const noexcept;
 
-  /// This function computes the logical OR of 'directory_in' with 'directory_out'
-  /// and stores the result in 'directory_out'.
-  /// Additional checks are also performed to make sure the related fields of
-  /// 'in' and 'out' are well-defined.
-  static void Or(const BloomFilterPB& in, const uint8_t* directory_in, BloomFilterPB* out,
-      uint8_t* directory_out, size_t directory_size);
+  /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
+  static void Or(const TBloomFilter& in, TBloomFilter* out);
 
   /// As more distinct items are inserted into a BloomFilter, the false positive rate
   /// rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter
@@ -151,20 +119,6 @@ class BloomFilter {
     return sizeof(Bucket) * (1LL << std::max(1, log_heap_size - LOG_BUCKET_WORD_BITS));
   }
 
-  /// The following two functions set a sidecar on 'controller' containing the Bloom
-  /// filter's directory. Two interfaces are provided because this function may be called
-  /// in different contexts depending on whether or not the caller has access to an
-  /// instantiated BloomFilter. It is also required that 'rpc_params' is neither an
-  /// always false nor an always true Bloom filter when calling this function. Moreover,
-  /// since we directly pass the reference to Bloom filter's directory when instantiating
-  /// the corresponding RpcSidecar, we have to make sure that 'directory' is alive until
-  /// the RPC is done.
-  static void AddDirectorySidecar(BloomFilterPB* rpc_params,
-      kudu::rpc::RpcController* controller, const char* directory,
-      unsigned long directory_size);
-  static void AddDirectorySidecar(BloomFilterPB* rpc_params,
-      kudu::rpc::RpcController* controller, const string& directory);
-
  private:
   // always_false_ is true when the bloom filter hasn't had any elements inserted.
   bool always_false_ = true;
@@ -228,8 +182,8 @@ class BloomFilter {
     return 1uLL << (log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
   }
 
-  /// Serializes this filter as Protobuf.
-  void ToProtobuf(BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const;
+  /// Serializes this filter as Thrift.
+  void ToThrift(TBloomFilter* thrift) const;
 
 /// Some constants used in hashing. #defined for efficiency reasons.
 #define IMPALA_BLOOM_HASH_CONSTANTS                                             \
@@ -242,23 +196,6 @@ class BloomFilter {
       __attribute__((aligned(32))) = {IMPALA_BLOOM_HASH_CONSTANTS};
 
   DISALLOW_COPY_AND_ASSIGN(BloomFilter);
-
-  /// List 'BloomFilterTest_Protobuf_Test' as a friend class to run the backend
-  /// test in 'bloom-filter-test.cc' since it has to access the private field of
-  /// 'directory_' in BloomFilter.
-  friend class BloomFilterTest_Protobuf_Test;
-
-  /// List 'bloom_filter_test_util::BfUnion()' as a friend function to run the backend
-  /// test in 'bloom-filter-test.cc' since it has to access the private field of
-  /// 'directory_' in BloomFilter.
-  friend void bloom_filter_test_util::BfUnion(const impala::BloomFilter& x,
-      const impala::BloomFilter& y, int64_t directory_size, bool* success,
-      impala::BloomFilterPB* protobuf, std::string* directory);
-
-  /// List 'either::Test' as a friend struct to run the benchmark in
-  /// 'bloom-filter-benchmark.cc' since it has to access the private field of
-  /// 'directory_' in BloomFilter.
-  friend struct either::TestData;
 };
 
 // To set 8 bits in an 32-byte Bloom filter, we set one bit in each 32-bit uint32_t. This
diff --git a/be/src/util/min-max-filter-test.cc b/be/src/util/min-max-filter-test.cc
index a5c0667..a75ca1a 100644
--- a/be/src/util/min-max-filter-test.cc
+++ b/be/src/util/min-max-filter-test.cc
@@ -18,7 +18,6 @@
 #include "testutil/gtest-util.h"
 #include "util/min-max-filter.h"
 
-#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/decimal-value.h"
 #include "runtime/decimal-value.inline.h"
 #include "runtime/string-value.inline.h"
@@ -51,15 +50,15 @@ TEST(MinMaxFilterTest, TestBoolMinMaxFilter) {
   EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMax()), b1);
 
   // Check the behavior of Or.
-  MinMaxFilterPB pFilter1;
-  pFilter1.mutable_min()->set_bool_val(false);
-  pFilter1.mutable_max()->set_bool_val(true);
-  MinMaxFilterPB pFilter2;
-  pFilter2.mutable_min()->set_bool_val(false);
-  pFilter2.mutable_max()->set_bool_val(false);
-  MinMaxFilter::Or(pFilter1, &pFilter2, ColumnType(PrimitiveType::TYPE_BOOLEAN));
-  EXPECT_FALSE(pFilter2.min().bool_val());
-  EXPECT_TRUE(pFilter2.max().bool_val());
+  TMinMaxFilter tFilter1;
+  tFilter1.min.__set_bool_val(false);
+  tFilter1.max.__set_bool_val(true);
+  TMinMaxFilter tFilter2;
+  tFilter2.min.__set_bool_val(false);
+  tFilter2.max.__set_bool_val(false);
+  MinMaxFilter::Or(tFilter1, &tFilter2, ColumnType(PrimitiveType::TYPE_BOOLEAN));
+  EXPECT_FALSE(tFilter2.min.bool_val);
+  EXPECT_TRUE(tFilter2.max.bool_val);
 
   filter->Close();
 }
@@ -85,14 +84,14 @@ TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
   // Test the behavior of an empty filter.
   EXPECT_TRUE(int_filter->AlwaysFalse());
   EXPECT_FALSE(int_filter->AlwaysTrue());
-  MinMaxFilterPB pFilter;
-  int_filter->ToProtobuf(&pFilter);
-  EXPECT_TRUE(pFilter.always_false());
-  EXPECT_FALSE(pFilter.always_true());
-  EXPECT_FALSE(pFilter.min().has_int_val());
-  EXPECT_FALSE(pFilter.max().has_int_val());
+  TMinMaxFilter tFilter;
+  int_filter->ToThrift(&tFilter);
+  EXPECT_TRUE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_FALSE(tFilter.min.__isset.int_val);
+  EXPECT_FALSE(tFilter.max.__isset.int_val);
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(pFilter, int_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -110,25 +109,25 @@ TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
   int_filter->Insert(&i4);
   CheckIntVals(int_filter, i4, i2);
 
-  int_filter->ToProtobuf(&pFilter);
-  EXPECT_FALSE(pFilter.always_false());
-  EXPECT_FALSE(pFilter.always_true());
-  EXPECT_EQ(pFilter.min().int_val(), i4);
-  EXPECT_EQ(pFilter.max().int_val(), i2);
+  int_filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_EQ(tFilter.min.int_val, i4);
+  EXPECT_EQ(tFilter.max.int_val, i2);
   MinMaxFilter* int_filter2 =
-      MinMaxFilter::Create(pFilter, int_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_tracker);
   CheckIntVals(int_filter2, i4, i2);
 
   // Check the behavior of Or.
-  MinMaxFilterPB pFilter1;
-  pFilter1.mutable_min()->set_int_val(4);
-  pFilter1.mutable_max()->set_int_val(8);
-  MinMaxFilterPB pFilter2;
-  pFilter2.mutable_min()->set_int_val(2);
-  pFilter2.mutable_max()->set_int_val(7);
-  MinMaxFilter::Or(pFilter1, &pFilter2, int_type);
-  EXPECT_EQ(pFilter2.min().int_val(), 2);
-  EXPECT_EQ(pFilter2.max().int_val(), 8);
+  TMinMaxFilter tFilter1;
+  tFilter1.min.__set_int_val(4);
+  tFilter1.max.__set_int_val(8);
+  TMinMaxFilter tFilter2;
+  tFilter2.min.__set_int_val(2);
+  tFilter2.max.__set_int_val(7);
+  MinMaxFilter::Or(tFilter1, &tFilter2, int_type);
+  EXPECT_EQ(tFilter2.min.int_val, 2);
+  EXPECT_EQ(tFilter2.max.int_val, 8);
 
   int_filter->Close();
   empty_filter->Close();
@@ -163,13 +162,13 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   filter->MaterializeValues();
   EXPECT_TRUE(filter->AlwaysFalse());
   EXPECT_FALSE(filter->AlwaysTrue());
-  MinMaxFilterPB pFilter;
-  filter->ToProtobuf(&pFilter);
-  EXPECT_TRUE(pFilter.always_false());
-  EXPECT_FALSE(pFilter.always_true());
+  TMinMaxFilter tFilter;
+  filter->ToThrift(&tFilter);
+  EXPECT_TRUE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
 
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -192,11 +191,11 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   filter->MaterializeValues();
   CheckStringVals(filter, c, d);
 
-  filter->ToProtobuf(&pFilter);
-  EXPECT_FALSE(pFilter.always_false());
-  EXPECT_FALSE(pFilter.always_true());
-  EXPECT_EQ(pFilter.min().string_val(), c);
-  EXPECT_EQ(pFilter.max().string_val(), d);
+  filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_EQ(tFilter.min.string_val, c);
+  EXPECT_EQ(tFilter.max.string_val, d);
 
   // Test that strings longer than 1024 are truncated.
   string b1030(1030, 'b');
@@ -228,14 +227,14 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   for (int i = trailIndex; i < 1024; ++i) truncTrailMaxChar[i] = 0;
   CheckStringVals(filter, b1024, truncTrailMaxChar);
 
-  filter->ToProtobuf(&pFilter);
-  EXPECT_FALSE(pFilter.always_false());
-  EXPECT_FALSE(pFilter.always_true());
-  EXPECT_EQ(pFilter.min().string_val(), b1024);
-  EXPECT_EQ(pFilter.max().string_val(), truncTrailMaxChar);
+  filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_EQ(tFilter.min.string_val, b1024);
+  EXPECT_EQ(tFilter.max.string_val, truncTrailMaxChar);
 
   MinMaxFilter* filter2 =
-      MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
   CheckStringVals(filter2, b1024, truncTrailMaxChar);
 
   // Check that if the entire string is the max char and therefore after truncating for
@@ -250,12 +249,12 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   filter->Insert(&cVal);
   EXPECT_TRUE(filter->AlwaysTrue());
 
-  filter->ToProtobuf(&pFilter);
-  EXPECT_FALSE(pFilter.always_false());
-  EXPECT_TRUE(pFilter.always_true());
+  filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_TRUE(tFilter.always_true);
 
   MinMaxFilter* always_true_filter =
-      MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
   EXPECT_FALSE(always_true_filter->AlwaysFalse());
   EXPECT_TRUE(always_true_filter->AlwaysTrue());
 
@@ -277,20 +276,20 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   limit_filter->MaterializeValues();
   EXPECT_TRUE(limit_filter->AlwaysTrue());
 
-  limit_filter->ToProtobuf(&pFilter);
-  EXPECT_FALSE(pFilter.always_false());
-  EXPECT_TRUE(pFilter.always_true());
+  limit_filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_TRUE(tFilter.always_true);
 
   // Check the behavior of Or.
-  MinMaxFilterPB pFilter1;
-  pFilter1.mutable_min()->set_string_val("a");
-  pFilter1.mutable_max()->set_string_val("d");
-  MinMaxFilterPB pFilter2;
-  pFilter2.mutable_min()->set_string_val("b");
-  pFilter2.mutable_max()->set_string_val("e");
-  MinMaxFilter::Or(pFilter1, &pFilter2, string_type);
-  EXPECT_EQ(pFilter2.min().string_val(), "a");
-  EXPECT_EQ(pFilter2.max().string_val(), "e");
+  TMinMaxFilter tFilter1;
+  tFilter1.min.__set_string_val("a");
+  tFilter1.max.__set_string_val("d");
+  TMinMaxFilter tFilter2;
+  tFilter2.min.__set_string_val("b");
+  tFilter2.max.__set_string_val("e");
+  MinMaxFilter::Or(tFilter1, &tFilter2, string_type);
+  EXPECT_EQ(tFilter2.min.string_val, "a");
+  EXPECT_EQ(tFilter2.max.string_val, "e");
 
   filter->Close();
   empty_filter->Close();
@@ -318,14 +317,14 @@ TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
   // Test the behavior of an empty filter.
   EXPECT_TRUE(filter->AlwaysFalse());
   EXPECT_FALSE(filter->AlwaysTrue());
-  MinMaxFilterPB pFilter;
-  filter->ToProtobuf(&pFilter);
-  EXPECT_TRUE(pFilter.always_false());
-  EXPECT_FALSE(pFilter.always_true());
-  EXPECT_FALSE(pFilter.min().has_timestamp_val());
-  EXPECT_FALSE(pFilter.max().has_timestamp_val());
+  TMinMaxFilter tFilter;
+  filter->ToThrift(&tFilter);
+  EXPECT_TRUE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_FALSE(tFilter.min.__isset.timestamp_val);
+  EXPECT_FALSE(tFilter.max.__isset.timestamp_val);
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(pFilter, timestamp_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -343,25 +342,25 @@ TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
   filter->Insert(&t4);
   CheckTimestampVals(filter, t2, t3);
 
-  filter->ToProtobuf(&pFilter);
-  EXPECT_FALSE(pFilter.always_false());
-  EXPECT_FALSE(pFilter.always_true());
-  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter.min()), t2);
-  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter.max()), t3);
+  filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.min), t2);
+  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.max), t3);
   MinMaxFilter* filter2 =
-      MinMaxFilter::Create(pFilter, timestamp_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_tracker);
   CheckTimestampVals(filter2, t2, t3);
 
   // Check the behavior of Or.
-  MinMaxFilterPB pFilter1;
-  t2.ToColumnValuePB(pFilter1.mutable_min());
-  t4.ToColumnValuePB(pFilter1.mutable_max());
-  MinMaxFilterPB pFilter2;
-  t1.ToColumnValuePB(pFilter2.mutable_min());
-  t3.ToColumnValuePB(pFilter2.mutable_max());
-  MinMaxFilter::Or(pFilter1, &pFilter2, timestamp_type);
-  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter2.min()), t2);
-  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter2.max()), t3);
+  TMinMaxFilter tFilter1;
+  t2.ToTColumnValue(&tFilter1.min);
+  t4.ToTColumnValue(&tFilter1.max);
+  TMinMaxFilter tFilter2;
+  t1.ToTColumnValue(&tFilter2.min);
+  t3.ToTColumnValue(&tFilter2.max);
+  MinMaxFilter::Or(tFilter1, &tFilter2, timestamp_type);
+  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.min), t2);
+  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.max), t3);
 
   filter->Close();
   empty_filter->Close();
@@ -392,16 +391,16 @@ void CheckDecimalVals(
 }
 
 void CheckDecimalEmptyFilter(MinMaxFilter* filter, const ColumnType& column_type,
-    MinMaxFilterPB* pFilter, ObjectPool* obj_pool, MemTracker* mem_tracker) {
+    TMinMaxFilter* tFilter, ObjectPool* obj_pool, MemTracker* mem_tracker) {
   EXPECT_TRUE(filter->AlwaysFalse());
   EXPECT_FALSE(filter->AlwaysTrue());
-  filter->ToProtobuf(pFilter);
-  EXPECT_TRUE(pFilter->always_false());
-  EXPECT_FALSE(pFilter->always_true());
-  EXPECT_FALSE(pFilter->min().has_decimal_val());
-  EXPECT_FALSE(pFilter->max().has_decimal_val());
+  filter->ToThrift(tFilter);
+  EXPECT_TRUE(tFilter->always_false);
+  EXPECT_FALSE(tFilter->always_true);
+  EXPECT_FALSE(tFilter->min.__isset.decimal_val);
+  EXPECT_FALSE(tFilter->max.__isset.decimal_val);
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(*pFilter, column_type, obj_pool, mem_tracker);
+      MinMaxFilter::Create(*tFilter, column_type, obj_pool, mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
   empty_filter->Close();
@@ -428,30 +427,30 @@ void CheckDecimalEmptyFilter(MinMaxFilter* filter, const ColumnType& column_type
     CheckDecimalVals(filter##SIZE, d3##SIZE, d2##SIZE);                              \
   } while (false)
 
-#define DECIMAL_CHECK_PROTOBUF(SIZE)                                                   \
-  do {                                                                                 \
-    filter##SIZE->ToProtobuf(&pFilter##SIZE);                                          \
-    EXPECT_FALSE(pFilter##SIZE.always_false());                                        \
-    EXPECT_FALSE(pFilter##SIZE.always_true());                                         \
-    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter##SIZE.min()), d3##SIZE); \
-    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter##SIZE.max()), d2##SIZE); \
-    MinMaxFilter* filter##SIZE##2 = MinMaxFilter::Create(                              \
-        pFilter##SIZE, decimal##SIZE##_type, &obj_pool, &mem_tracker);                 \
-    CheckDecimalVals(filter##SIZE##2, d3##SIZE, d2##SIZE);                             \
-    filter##SIZE##2->Close();                                                          \
+#define DECIMAL_CHECK_THRIFT(SIZE)                                                  \
+  do {                                                                              \
+    filter##SIZE->ToThrift(&tFilter##SIZE);                                         \
+    EXPECT_FALSE(tFilter##SIZE.always_false);                                       \
+    EXPECT_FALSE(tFilter##SIZE.always_true);                                        \
+    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter##SIZE.min), d3##SIZE); \
+    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter##SIZE.max), d2##SIZE); \
+    MinMaxFilter* filter##SIZE##2 = MinMaxFilter::Create(                           \
+        tFilter##SIZE, decimal##SIZE##_type, &obj_pool, &mem_tracker);              \
+    CheckDecimalVals(filter##SIZE##2, d3##SIZE, d2##SIZE);                          \
+    filter##SIZE##2->Close();                                                       \
   } while (false)
 
-#define DECIMAL_CHECK_OR(SIZE)                                                          \
-  do {                                                                                  \
-    MinMaxFilterPB pFilter1##SIZE;                                                      \
-    d3##SIZE.ToColumnValuePB(pFilter1##SIZE.mutable_min());                             \
-    d2##SIZE.ToColumnValuePB(pFilter1##SIZE.mutable_max());                             \
-    MinMaxFilterPB pFilter2##SIZE;                                                      \
-    d1##SIZE.ToColumnValuePB(pFilter2##SIZE.mutable_min());                             \
-    d1##SIZE.ToColumnValuePB(pFilter2##SIZE.mutable_max());                             \
-    MinMaxFilter::Or(pFilter1##SIZE, &pFilter2##SIZE, decimal##SIZE##_type);            \
-    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter2##SIZE.min()), d3##SIZE); \
-    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter2##SIZE.max()), d2##SIZE); \
+#define DECIMAL_CHECK_OR(SIZE)                                                       \
+  do {                                                                               \
+    TMinMaxFilter tFilter1##SIZE;                                                    \
+    d3##SIZE.ToTColumnValue(&tFilter1##SIZE.min);                                    \
+    d2##SIZE.ToTColumnValue(&tFilter1##SIZE.max);                                    \
+    TMinMaxFilter tFilter2##SIZE;                                                    \
+    d1##SIZE.ToTColumnValue(&tFilter2##SIZE.min);                                    \
+    d1##SIZE.ToTColumnValue(&tFilter2##SIZE.max);                                    \
+    MinMaxFilter::Or(tFilter1##SIZE, &tFilter2##SIZE, decimal##SIZE##_type);         \
+    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter2##SIZE.min), d3##SIZE); \
+    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter2##SIZE.max), d2##SIZE); \
   } while (false)
 
 // Tests that a DecimalMinMaxFilter returns the expected min/max after having values
@@ -476,13 +475,13 @@ TEST(MinMaxFilterTest, TestDecimalMinMaxFilter) {
   MinMaxFilter* filter8 = MinMaxFilter::Create(decimal8_type, &obj_pool, &mem_tracker);
   MinMaxFilter* filter16 = MinMaxFilter::Create(decimal16_type, &obj_pool, &mem_tracker);
 
-  // Create protobuf minmax filters
-  MinMaxFilterPB pFilter4, pFilter8, pFilter16;
+  // Create thrift minmax filters
+  TMinMaxFilter tFilter4, tFilter8, tFilter16;
 
   // Test the behavior of an empty filter.
-  CheckDecimalEmptyFilter(filter4, decimal4_type, &pFilter4, &obj_pool, &mem_tracker);
-  CheckDecimalEmptyFilter(filter8, decimal8_type, &pFilter8, &obj_pool, &mem_tracker);
-  CheckDecimalEmptyFilter(filter16, decimal16_type, &pFilter16, &obj_pool, &mem_tracker);
+  CheckDecimalEmptyFilter(filter4, decimal4_type, &tFilter4, &obj_pool, &mem_tracker);
+  CheckDecimalEmptyFilter(filter8, decimal8_type, &tFilter8, &obj_pool, &mem_tracker);
+  CheckDecimalEmptyFilter(filter16, decimal16_type, &tFilter16, &obj_pool, &mem_tracker);
 
   // Insert and check
   DECIMAL_INSERT_AND_CHECK(4, 9, 5, 2345.67891, 3456.78912, 1234.56789);
@@ -491,10 +490,10 @@ TEST(MinMaxFilterTest, TestDecimalMinMaxFilter) {
   DECIMAL_INSERT_AND_CHECK(16, 38, 19, 2345678912345678912.2345678912345678912,
       3456789123456789123.3456789123456789123, 1234567891234567891.1234567891234567891);
 
-  // Protobuf check
-  DECIMAL_CHECK_PROTOBUF(4);
-  DECIMAL_CHECK_PROTOBUF(8);
-  DECIMAL_CHECK_PROTOBUF(16);
+  // Thrift check
+  DECIMAL_CHECK_THRIFT(4);
+  DECIMAL_CHECK_THRIFT(8);
+  DECIMAL_CHECK_THRIFT(16);
 
   // Check the behavior of Or.
   DECIMAL_CHECK_OR(4);
diff --git a/be/src/util/min-max-filter.cc b/be/src/util/min-max-filter.cc
index dd9b351..c6da060 100644
--- a/be/src/util/min-max-filter.cc
+++ b/be/src/util/min-max-filter.cc
@@ -82,55 +82,61 @@ IRFunction::Type MinMaxFilter::GetInsertIRFunctionType(ColumnType column_type) {
   }
 }
 
-#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE, PROTOBUF_TYPE, PRIMITIVE_TYPE)        \
-  const char* NAME##MinMaxFilter::LLVM_CLASS_NAME =                                    \
-      "class.impala::" #NAME "MinMaxFilter";                                           \
-  NAME##MinMaxFilter::NAME##MinMaxFilter(const MinMaxFilterPB& protobuf) {             \
-    DCHECK(!protobuf.always_true());                                                   \
-    if (protobuf.always_false()) {                                                     \
-      min_ = numeric_limits<TYPE>::max();                                              \
-      max_ = numeric_limits<TYPE>::lowest();                                           \
-    } else {                                                                           \
-      DCHECK(protobuf.has_min());                                                      \
-      DCHECK(protobuf.has_max());                                                      \
-      DCHECK(protobuf.min().has_##PROTOBUF_TYPE##_val());                              \
-      DCHECK(protobuf.max().has_##PROTOBUF_TYPE##_val());                              \
-      min_ = protobuf.min().PROTOBUF_TYPE##_val();                                     \
-      max_ = protobuf.max().PROTOBUF_TYPE##_val();                                     \
-    }                                                                                  \
-  }                                                                                    \
-  PrimitiveType NAME##MinMaxFilter::type() {                                           \
-    return PrimitiveType::TYPE_##PRIMITIVE_TYPE;                                       \
-  }                                                                                    \
-  void NAME##MinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {                \
-    if (!AlwaysFalse()) {                                                              \
-      protobuf->mutable_min()->set_##PROTOBUF_TYPE##_val(min_);                        \
-      protobuf->mutable_max()->set_##PROTOBUF_TYPE##_val(max_);                        \
-    }                                                                                  \
-    protobuf->set_always_false(AlwaysFalse());                                         \
-    protobuf->set_always_true(false);                                                  \
-  }                                                                                    \
-  string NAME##MinMaxFilter::DebugString() const {                                     \
-    stringstream out;                                                                  \
-    out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_                    \
-        << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")";             \
-    return out.str();                                                                  \
-  }                                                                                    \
-  void NAME##MinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {         \
-    if (out->always_false()) {                                                         \
-      out->mutable_min()->set_bool_val(in.min().PROTOBUF_TYPE##_val());                \
-      out->mutable_max()->set_bool_val(in.max().PROTOBUF_TYPE##_val());                \
-      out->set_always_false(false);                                                    \
-    } else {                                                                           \
-      out->mutable_min()->set_##PROTOBUF_TYPE##_val(                                   \
-          std::min(in.min().PROTOBUF_TYPE##_val(), out->min().PROTOBUF_TYPE##_val())); \
-      out->mutable_max()->set_##PROTOBUF_TYPE##_val(                                   \
-          std::max(in.max().PROTOBUF_TYPE##_val(), out->max().PROTOBUF_TYPE##_val())); \
-    }                                                                                  \
-  }                                                                                    \
-  void NAME##MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {       \
-    out->mutable_min()->set_##PROTOBUF_TYPE##_val(in.min().PROTOBUF_TYPE##_val());     \
-    out->mutable_max()->set_##PROTOBUF_TYPE##_val(in.max().PROTOBUF_TYPE##_val());     \
+#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE, THRIFT_TYPE, PRIMITIVE_TYPE)  \
+  const char* NAME##MinMaxFilter::LLVM_CLASS_NAME =                            \
+      "class.impala::" #NAME "MinMaxFilter";                                   \
+  NAME##MinMaxFilter::NAME##MinMaxFilter(const TMinMaxFilter& thrift) {        \
+    DCHECK(!thrift.always_true);                                               \
+    if (thrift.always_false) {                                                 \
+      min_ = numeric_limits<TYPE>::max();                                      \
+      max_ = numeric_limits<TYPE>::lowest();                                   \
+    } else {                                                                   \
+      DCHECK(thrift.__isset.min);                                              \
+      DCHECK(thrift.__isset.max);                                              \
+      DCHECK(thrift.min.__isset.THRIFT_TYPE##_val);                            \
+      DCHECK(thrift.max.__isset.THRIFT_TYPE##_val);                            \
+      min_ = thrift.min.THRIFT_TYPE##_val;                                     \
+      max_ = thrift.max.THRIFT_TYPE##_val;                                     \
+    }                                                                          \
+  }                                                                            \
+  PrimitiveType NAME##MinMaxFilter::type() {                                   \
+    return PrimitiveType::TYPE_##PRIMITIVE_TYPE;                               \
+  }                                                                            \
+  void NAME##MinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {             \
+    if (!AlwaysFalse()) {                                                      \
+      thrift->min.__set_##THRIFT_TYPE##_val(min_);                             \
+      thrift->__isset.min = true;                                              \
+      thrift->max.__set_##THRIFT_TYPE##_val(max_);                             \
+      thrift->__isset.max = true;                                              \
+    }                                                                          \
+    thrift->__set_always_false(AlwaysFalse());                                 \
+    thrift->__set_always_true(false);                                          \
+  }                                                                            \
+  string NAME##MinMaxFilter::DebugString() const {                             \
+    stringstream out;                                                          \
+    out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_            \
+        << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")";     \
+    return out.str();                                                          \
+  }                                                                            \
+  void NAME##MinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {   \
+    if (out->always_false) {                                                   \
+      out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val);            \
+      out->__isset.min = true;                                                 \
+      out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val);            \
+      out->__isset.max = true;                                                 \
+      out->__set_always_false(false);                                          \
+    } else {                                                                   \
+      out->min.__set_##THRIFT_TYPE##_val(                                      \
+          std::min(in.min.THRIFT_TYPE##_val, out->min.THRIFT_TYPE##_val));     \
+      out->max.__set_##THRIFT_TYPE##_val(                                      \
+          std::max(in.max.THRIFT_TYPE##_val, out->max.THRIFT_TYPE##_val));     \
+    }                                                                          \
+  }                                                                            \
+  void NAME##MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) { \
+    out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val);              \
+    out->__isset.min = true;                                                   \
+    out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val);              \
+    out->__isset.max = true;                                                   \
   }
 
 NUMERIC_MIN_MAX_FILTER_FUNCS(Bool, bool, bool, BOOLEAN);
@@ -216,17 +222,19 @@ const char* StringMinMaxFilter::LLVM_CLASS_NAME = "class.impala::StringMinMaxFil
 const int StringMinMaxFilter::MAX_BOUND_LENGTH = 1024;
 
 StringMinMaxFilter::StringMinMaxFilter(
-    const MinMaxFilterPB& protobuf, MemTracker* mem_tracker)
-  : mem_pool_(mem_tracker), min_buffer_(&mem_pool_), max_buffer_(&mem_pool_) {
-  always_false_ = protobuf.always_false();
-  always_true_ = protobuf.always_true();
+    const TMinMaxFilter& thrift, MemTracker* mem_tracker)
+  : mem_pool_(mem_tracker),
+    min_buffer_(&mem_pool_),
+    max_buffer_(&mem_pool_) {
+  always_false_ = thrift.always_false;
+  always_true_ = thrift.always_true;
   if (!always_true_ && !always_false_) {
-    DCHECK(protobuf.has_min());
-    DCHECK(protobuf.has_max());
-    DCHECK(protobuf.min().has_string_val());
-    DCHECK(protobuf.max().has_string_val());
-    min_ = StringValue(protobuf.min().string_val());
-    max_ = StringValue(protobuf.max().string_val());
+    DCHECK(thrift.__isset.min);
+    DCHECK(thrift.__isset.max);
+    DCHECK(thrift.min.__isset.string_val);
+    DCHECK(thrift.max.__isset.string_val);
+    min_ = StringValue(thrift.min.string_val);
+    max_ = StringValue(thrift.max.string_val);
     CopyToBuffer(&min_buffer_, &min_, min_.len);
     CopyToBuffer(&max_buffer_, &max_, max_.len);
   }
@@ -269,13 +277,17 @@ void StringMinMaxFilter::MaterializeValues() {
   }
 }
 
-void StringMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
+void StringMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
   if (!always_true_ && !always_false_) {
-    protobuf->mutable_min()->set_string_val(static_cast<char*>(min_.ptr), min_.len);
-    protobuf->mutable_max()->set_string_val(static_cast<char*>(max_.ptr), max_.len);
+    thrift->min.string_val.assign(static_cast<char*>(min_.ptr), min_.len);
+    thrift->min.__isset.string_val = true;
+    thrift->__isset.min = true;
+    thrift->max.string_val.assign(static_cast<char*>(max_.ptr), max_.len);
+    thrift->max.__isset.string_val = true;
+    thrift->__isset.max = true;
   }
-  protobuf->set_always_false(always_false_);
-  protobuf->set_always_true(always_true_);
+  thrift->__set_always_false(always_false_);
+  thrift->__set_always_true(always_true_);
 }
 
 string StringMinMaxFilter::DebugString() const {
@@ -286,26 +298,28 @@ string StringMinMaxFilter::DebugString() const {
   return out.str();
 }
 
-void StringMinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
-  if (out->always_false()) {
-    out->mutable_min()->set_string_val(in.min().string_val());
-    out->mutable_max()->set_string_val(in.max().string_val());
-    out->set_always_false(false);
+void StringMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  if (out->always_false) {
+    out->min.__set_string_val(in.min.string_val);
+    out->__isset.min = true;
+    out->max.__set_string_val(in.max.string_val);
+    out->__isset.max = true;
+    out->__set_always_false(false);
   } else {
-    StringValue in_min_val = StringValue(in.min().string_val());
-    StringValue out_min_val = StringValue(out->min().string_val());
-    if (in_min_val < out_min_val)
-      out->mutable_min()->set_string_val(in.min().string_val());
-    StringValue in_max_val = StringValue(in.max().string_val());
-    StringValue out_max_val = StringValue(out->max().string_val());
-    if (in_max_val > out_max_val)
-      out->mutable_max()->set_string_val(in.max().string_val());
+    StringValue in_min_val = StringValue(in.min.string_val);
+    StringValue out_min_val = StringValue(out->min.string_val);
+    if (in_min_val < out_min_val) out->min.__set_string_val(in.min.string_val);
+    StringValue in_max_val = StringValue(in.max.string_val);
+    StringValue out_max_val = StringValue(out->max.string_val);
+    if (in_max_val > out_max_val) out->max.__set_string_val(in.max.string_val);
   }
 }
 
-void StringMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
-  out->mutable_min()->set_string_val(in.min().string_val());
-  out->mutable_max()->set_string_val(in.max().string_val());
+void StringMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  out->min.__set_string_val(in.min.string_val);
+  out->__isset.min = true;
+  out->max.__set_string_val(in.max.string_val);
+  out->__isset.max = true;
 }
 
 void StringMinMaxFilter::CopyToBuffer(
@@ -335,13 +349,13 @@ void StringMinMaxFilter::SetAlwaysTrue() {
 const char* TimestampMinMaxFilter::LLVM_CLASS_NAME =
     "class.impala::TimestampMinMaxFilter";
 
-TimestampMinMaxFilter::TimestampMinMaxFilter(const MinMaxFilterPB& protobuf) {
-  always_false_ = protobuf.always_false();
+TimestampMinMaxFilter::TimestampMinMaxFilter(const TMinMaxFilter& thrift) {
+  always_false_ = thrift.always_false;
   if (!always_false_) {
-    DCHECK(protobuf.min().has_timestamp_val());
-    DCHECK(protobuf.max().has_timestamp_val());
-    min_ = TimestampValue::FromColumnValuePB(protobuf.min());
-    max_ = TimestampValue::FromColumnValuePB(protobuf.max());
+    DCHECK(thrift.min.__isset.timestamp_val);
+    DCHECK(thrift.max.__isset.timestamp_val);
+    min_ = TimestampValue::FromTColumnValue(thrift.min);
+    max_ = TimestampValue::FromTColumnValue(thrift.max);
   }
 }
 
@@ -349,13 +363,15 @@ PrimitiveType TimestampMinMaxFilter::type() {
   return PrimitiveType::TYPE_TIMESTAMP;
 }
 
-void TimestampMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
+void TimestampMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
   if (!always_false_) {
-    min_.ToColumnValuePB(protobuf->mutable_min());
-    max_.ToColumnValuePB(protobuf->mutable_max());
+    min_.ToTColumnValue(&thrift->min);
+    thrift->__isset.min = true;
+    max_.ToTColumnValue(&thrift->max);
+    thrift->__isset.max = true;
   }
-  protobuf->set_always_false(always_false_);
-  protobuf->set_always_true(false);
+  thrift->__set_always_false(always_false_);
+  thrift->__set_always_true(false);
 }
 
 string TimestampMinMaxFilter::DebugString() const {
@@ -365,46 +381,45 @@ string TimestampMinMaxFilter::DebugString() const {
   return out.str();
 }
 
-void TimestampMinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
-  if (out->always_false()) {
-    out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
-    out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
-    out->set_always_false(false);
+void TimestampMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  if (out->always_false) {
+    out->min.__set_timestamp_val(in.min.timestamp_val);
+    out->__isset.min = true;
+    out->max.__set_timestamp_val(in.max.timestamp_val);
+    out->__isset.max = true;
+    out->__set_always_false(false);
   } else {
-    TimestampValue in_min_val = TimestampValue::FromColumnValuePB(in.min());
-    TimestampValue out_min_val = TimestampValue::FromColumnValuePB(out->min());
-    if (in_min_val < out_min_val) {
-      out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
-    }
-    TimestampValue in_max_val = TimestampValue::FromColumnValuePB(in.max());
-    TimestampValue out_max_val = TimestampValue::FromColumnValuePB(out->max());
-    if (in_max_val > out_max_val) {
-      out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
-    }
+    TimestampValue in_min_val = TimestampValue::FromTColumnValue(in.min);
+    TimestampValue out_min_val = TimestampValue::FromTColumnValue(out->min);
+    if (in_min_val < out_min_val) out->min.__set_timestamp_val(in.min.timestamp_val);
+    TimestampValue in_max_val = TimestampValue::FromTColumnValue(in.max);
+    TimestampValue out_max_val = TimestampValue::FromTColumnValue(out->max);
+    if (in_max_val > out_max_val) out->max.__set_timestamp_val(in.max.timestamp_val);
   }
 }
 
-void TimestampMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
-  out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
-  out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
+void TimestampMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  out->min.__set_timestamp_val(in.min.timestamp_val);
+  out->__isset.min = true;
+  out->max.__set_timestamp_val(in.max.timestamp_val);
+  out->__isset.max = true;
 }
 
 // DECIMAL
 const char* DecimalMinMaxFilter::LLVM_CLASS_NAME = "class.impala::DecimalMinMaxFilter";
-#define DECIMAL_SET_MINMAX(SIZE)                                            \
-  do {                                                                      \
-    DCHECK(protobuf.min().has_decimal_val());                               \
-    DCHECK(protobuf.max().has_decimal_val());                               \
-    min##SIZE##_ = Decimal##SIZE##Value::FromColumnValuePB(protobuf.min()); \
-    max##SIZE##_ = Decimal##SIZE##Value::FromColumnValuePB(protobuf.max()); \
+#define DECIMAL_SET_MINMAX(SIZE)                                       \
+  do {                                                                 \
+    DCHECK(thrift.min.__isset.decimal_val);                            \
+    DCHECK(thrift.max.__isset.decimal_val);                            \
+    min##SIZE##_ = Decimal##SIZE##Value::FromTColumnValue(thrift.min); \
+    max##SIZE##_ = Decimal##SIZE##Value::FromTColumnValue(thrift.max); \
   } while (false)
 
 // Construct the Decimal min-max filter when the min-max filter information
 // comes in through thrift.  This can get called in coordinator, after the filter
 // is sent by executor
-DecimalMinMaxFilter::DecimalMinMaxFilter(const MinMaxFilterPB& protobuf, int precision)
-  : size_(ColumnType::GetDecimalByteSize(precision)),
-    always_false_(protobuf.always_false()) {
+DecimalMinMaxFilter::DecimalMinMaxFilter(const TMinMaxFilter& thrift, int precision)
+  : size_(ColumnType::GetDecimalByteSize(precision)), always_false_(thrift.always_false) {
   if (!always_false_) {
     switch (size_) {
       case DECIMAL_SIZE_4BYTE:
@@ -426,32 +441,34 @@ PrimitiveType DecimalMinMaxFilter::type() {
   return PrimitiveType::TYPE_DECIMAL;
 }
 
-#define DECIMAL_TO_PROTOBUF(SIZE)                          \
-  do {                                                     \
-    min##SIZE##_.ToColumnValuePB(protobuf->mutable_min()); \
-    max##SIZE##_.ToColumnValuePB(protobuf->mutable_max()); \
+#define DECIMAL_TO_THRIFT(SIZE)                \
+  do {                                         \
+    min##SIZE##_.ToTColumnValue(&thrift->min); \
+    max##SIZE##_.ToTColumnValue(&thrift->max); \
   } while (false)
 
 // Construct a thrift min-max filter.  Will be called by the executor
 // to be sent to the coordinator
-void DecimalMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
+void DecimalMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
   if (!always_false_) {
     switch (size_) {
       case DECIMAL_SIZE_4BYTE:
-        DECIMAL_TO_PROTOBUF(4);
+        DECIMAL_TO_THRIFT(4);
         break;
       case DECIMAL_SIZE_8BYTE:
-        DECIMAL_TO_PROTOBUF(8);
+        DECIMAL_TO_THRIFT(8);
         break;
       case DECIMAL_SIZE_16BYTE:
-        DECIMAL_TO_PROTOBUF(16);
+        DECIMAL_TO_THRIFT(16);
         break;
       default:
         DCHECK(false) << "DecimalMinMaxFilter: Unknown decimal byte size: " << size_;
     }
+    thrift->__isset.min = true;
+    thrift->__isset.max = true;
   }
-  protobuf->set_always_false(always_false_);
-  protobuf->set_always_true(false);
+  thrift->__set_always_false(always_false_);
+  thrift->__set_always_true(false);
 }
 
 void DecimalMinMaxFilter::Insert(void* val) {
@@ -497,24 +514,25 @@ string DecimalMinMaxFilter::DebugString() const {
   return out.str();
 }
 
-#define DECIMAL_OR(SIZE)                                           \
-  do {                                                             \
-    if (Decimal##SIZE##Value::FromColumnValuePB(in.min())          \
-        < Decimal##SIZE##Value::FromColumnValuePB(out->min()))     \
-      out->mutable_min()->set_decimal_val(in.min().decimal_val()); \
-    if (Decimal##SIZE##Value::FromColumnValuePB(in.max())          \
-        > Decimal##SIZE##Value::FromColumnValuePB(out->max()))     \
-      out->mutable_max()->set_decimal_val(in.max().decimal_val()); \
+#define DECIMAL_OR(SIZE)                                    \
+  do {                                                      \
+    if (Decimal##SIZE##Value::FromTColumnValue(in.min)      \
+        < Decimal##SIZE##Value::FromTColumnValue(out->min)) \
+      out->min.__set_decimal_val(in.min.decimal_val);       \
+    if (Decimal##SIZE##Value::FromTColumnValue(in.max)      \
+        > Decimal##SIZE##Value::FromTColumnValue(out->max)) \
+      out->max.__set_decimal_val(in.max.decimal_val);       \
   } while (false)
 
-void DecimalMinMaxFilter::Or(
-    const MinMaxFilterPB& in, MinMaxFilterPB* out, int precision) {
-  if (in.always_false()) {
+void DecimalMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out, int precision) {
+  if (in.always_false) {
     return;
-  } else if (out->always_false()) {
-    out->mutable_min()->set_decimal_val(in.min().decimal_val());
-    out->mutable_max()->set_decimal_val(in.max().decimal_val());
-    out->set_always_false(false);
+  } else if (out->always_false) {
+    out->min.__set_decimal_val(in.min.decimal_val);
+    out->__isset.min = true;
+    out->max.__set_decimal_val(in.max.decimal_val);
+    out->__isset.max = true;
+    out->__set_always_false(false);
   } else {
     int size = ColumnType::GetDecimalByteSize(precision);
     switch (size) {
@@ -533,9 +551,11 @@ void DecimalMinMaxFilter::Or(
   }
 }
 
-void DecimalMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
-  out->mutable_min()->set_decimal_val(in.min().decimal_val());
-  out->mutable_max()->set_decimal_val(in.max().decimal_val());
+void DecimalMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  out->min.__set_decimal_val(in.min.decimal_val);
+  out->__isset.min = true;
+  out->max.__set_decimal_val(in.max.decimal_val);
+  out->__isset.max = true;
 }
 
 // MinMaxFilter
@@ -575,29 +595,29 @@ MinMaxFilter* MinMaxFilter::Create(
   return nullptr;
 }
 
-MinMaxFilter* MinMaxFilter::Create(const MinMaxFilterPB& protobuf, ColumnType type,
+MinMaxFilter* MinMaxFilter::Create(const TMinMaxFilter& thrift, ColumnType type,
     ObjectPool* pool, MemTracker* mem_tracker) {
   switch (type.type) {
     case PrimitiveType::TYPE_BOOLEAN:
-      return pool->Add(new BoolMinMaxFilter(protobuf));
+      return pool->Add(new BoolMinMaxFilter(thrift));
     case PrimitiveType::TYPE_TINYINT:
-      return pool->Add(new TinyIntMinMaxFilter(protobuf));
+      return pool->Add(new TinyIntMinMaxFilter(thrift));
     case PrimitiveType::TYPE_SMALLINT:
-      return pool->Add(new SmallIntMinMaxFilter(protobuf));
+      return pool->Add(new SmallIntMinMaxFilter(thrift));
     case PrimitiveType::TYPE_INT:
-      return pool->Add(new IntMinMaxFilter(protobuf));
+      return pool->Add(new IntMinMaxFilter(thrift));
     case PrimitiveType::TYPE_BIGINT:
-      return pool->Add(new BigIntMinMaxFilter(protobuf));
+      return pool->Add(new BigIntMinMaxFilter(thrift));
     case PrimitiveType::TYPE_FLOAT:
-      return pool->Add(new FloatMinMaxFilter(protobuf));
+      return pool->Add(new FloatMinMaxFilter(thrift));
     case PrimitiveType::TYPE_DOUBLE:
-      return pool->Add(new DoubleMinMaxFilter(protobuf));
+      return pool->Add(new DoubleMinMaxFilter(thrift));
     case PrimitiveType::TYPE_STRING:
-      return pool->Add(new StringMinMaxFilter(protobuf, mem_tracker));
+      return pool->Add(new StringMinMaxFilter(thrift, mem_tracker));
     case PrimitiveType::TYPE_TIMESTAMP:
-      return pool->Add(new TimestampMinMaxFilter(protobuf));
+      return pool->Add(new TimestampMinMaxFilter(thrift));
     case PrimitiveType::TYPE_DECIMAL:
-      return pool->Add(new DecimalMinMaxFilter(protobuf, type.precision));
+      return pool->Add(new DecimalMinMaxFilter(thrift, type.precision));
     default:
       DCHECK(false) << "Unsupported MinMaxFilter type: " << type;
   }
@@ -605,93 +625,93 @@ MinMaxFilter* MinMaxFilter::Create(const MinMaxFilterPB& protobuf, ColumnType ty
 }
 
 void MinMaxFilter::Or(
-    const MinMaxFilterPB& in, MinMaxFilterPB* out, const ColumnType& columnType) {
-  if (in.always_false() || out->always_true()) return;
-  if (in.always_true()) {
-    out->set_always_true(true);
+    const TMinMaxFilter& in, TMinMaxFilter* out, const ColumnType& columnType) {
+  if (in.always_false || out->always_true) return;
+  if (in.always_true) {
+    out->__set_always_true(true);
     return;
   }
-  if (in.min().has_bool_val()) {
-    DCHECK(out->min().has_bool_val());
+  if (in.min.__isset.bool_val) {
+    DCHECK(out->min.__isset.bool_val);
     BoolMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min().has_byte_val()) {
-    DCHECK(out->min().has_byte_val());
+  } else if (in.min.__isset.byte_val) {
+    DCHECK(out->min.__isset.byte_val);
     TinyIntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min().has_short_val()) {
-    DCHECK(out->min().has_short_val());
+  } else if (in.min.__isset.short_val) {
+    DCHECK(out->min.__isset.short_val);
     SmallIntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min().has_int_val()) {
-    DCHECK(out->min().has_int_val());
+  } else if (in.min.__isset.int_val) {
+    DCHECK(out->min.__isset.int_val);
     IntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min().has_long_val()) {
-    DCHECK(out->min().has_long_val());
+  } else if (in.min.__isset.long_val) {
+    DCHECK(out->min.__isset.long_val);
     BigIntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min().has_double_val()) {
+  } else if (in.min.__isset.double_val) {
     // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
-    DCHECK(out->min().has_double_val());
+    DCHECK(out->min.__isset.double_val);
     DoubleMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min().has_string_val()) {
-    DCHECK(out->min().has_string_val());
+  } else if (in.min.__isset.string_val) {
+    DCHECK(out->min.__isset.string_val);
     StringMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min().has_timestamp_val()) {
-    DCHECK(out->min().has_timestamp_val());
+  } else if (in.min.__isset.timestamp_val) {
+    DCHECK(out->min.__isset.timestamp_val);
     TimestampMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min().has_decimal_val()) {
-    DCHECK(out->min().has_decimal_val());
+  } else if (in.min.__isset.decimal_val) {
+    DCHECK(out->min.__isset.decimal_val);
     DecimalMinMaxFilter::Or(in, out, columnType.precision);
     return;
   }
   DCHECK(false) << "Unsupported MinMaxFilter type.";
 }
 
-void MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
-  out->set_always_false(in.always_false());
-  out->set_always_true(in.always_true());
-  if (in.always_false() || in.always_true()) return;
-  if (in.min().has_bool_val()) {
-    DCHECK(!out->min().has_bool_val());
+void MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  out->__set_always_false(in.always_false);
+  out->__set_always_true(in.always_true);
+  if (in.always_false || in.always_true) return;
+  if (in.min.__isset.bool_val) {
+    DCHECK(!out->min.__isset.bool_val);
     BoolMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min().has_byte_val()) {
-    DCHECK(!out->min().has_byte_val());
+  } else if (in.min.__isset.byte_val) {
+    DCHECK(!out->min.__isset.byte_val);
     TinyIntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min().has_short_val()) {
-    DCHECK(!out->min().has_short_val());
+  } else if (in.min.__isset.short_val) {
+    DCHECK(!out->min.__isset.short_val);
     SmallIntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min().has_int_val()) {
-    DCHECK(!out->min().has_int_val());
+  } else if (in.min.__isset.int_val) {
+    DCHECK(!out->min.__isset.int_val);
     IntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min().has_long_val()) {
-    // Handles TimestampMinMaxFilter also as ColumnValuePB doesn't have a timestamp type.
-    DCHECK(!out->min().has_long_val());
+  } else if (in.min.__isset.long_val) {
+    // Handles TimestampMinMaxFilter also as TColumnValue doesn't have a timestamp type.
+    DCHECK(!out->min.__isset.long_val);
     BigIntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min().has_double_val()) {
-    // Handles FloatMinMaxFilter also as ColumnValuePB doesn't have a float type.
-    DCHECK(!out->min().has_double_val());
+  } else if (in.min.__isset.double_val) {
+    // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
+    DCHECK(!out->min.__isset.double_val);
     DoubleMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min().has_string_val()) {
-    DCHECK(!out->min().has_string_val());
+  } else if (in.min.__isset.string_val) {
+    DCHECK(!out->min.__isset.string_val);
     StringMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min().has_timestamp_val()) {
-    DCHECK(!out->min().has_timestamp_val());
+  } else if (in.min.__isset.timestamp_val) {
+    DCHECK(!out->min.__isset.timestamp_val);
     TimestampMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min().has_decimal_val()) {
-    DCHECK(!out->min().has_decimal_val());
+  } else if (in.min.__isset.decimal_val) {
+    DCHECK(!out->min.__isset.decimal_val);
     DecimalMinMaxFilter::Copy(in, out);
     return;
   }
diff --git a/be/src/util/min-max-filter.h b/be/src/util/min-max-filter.h
index ee08894..82cf54c 100644
--- a/be/src/util/min-max-filter.h
+++ b/be/src/util/min-max-filter.h
@@ -74,25 +74,25 @@ class MinMaxFilter {
   /// until this is called.
   virtual void MaterializeValues() {}
 
-  /// Convert this filter to a protobuf representation.
-  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const = 0;
+  /// Convert this filter to a thrift representation.
+  virtual void ToThrift(TMinMaxFilter* thrift) const = 0;
 
   virtual std::string DebugString() const = 0;
 
   /// Returns a new MinMaxFilter with the given type, allocated from 'mem_tracker'.
   static MinMaxFilter* Create(ColumnType type, ObjectPool* pool, MemTracker* mem_tracker);
 
-  /// Returns a new MinMaxFilter created from the protobuf representation, allocated from
+  /// Returns a new MinMaxFilter created from the thrift representation, allocated from
   /// 'mem_tracker'.
-  static MinMaxFilter* Create(const MinMaxFilterPB& protobuf, ColumnType type,
+  static MinMaxFilter* Create(const TMinMaxFilter& thrift, ColumnType type,
       ObjectPool* pool, MemTracker* mem_tracker);
 
   /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
   static void Or(
-      const MinMaxFilterPB& in, MinMaxFilterPB* out, const ColumnType& columnType);
+      const TMinMaxFilter& in, TMinMaxFilter* out, const ColumnType& columnType);
 
   /// Copies the contents of 'in' into 'out'.
-  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
+  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
 
   /// Returns the LLVM_CLASS_NAME for the given type.
   static std::string GetLlvmClassName(PrimitiveType type);
@@ -108,7 +108,7 @@ class MinMaxFilter {
       min_ = std::numeric_limits<TYPE>::max();                                \
       max_ = std::numeric_limits<TYPE>::lowest();                             \
     }                                                                         \
-    NAME##MinMaxFilter(const MinMaxFilterPB& protobuf);                       \
+    NAME##MinMaxFilter(const TMinMaxFilter& thrift);                          \
     virtual ~NAME##MinMaxFilter() {}                                          \
     virtual void* GetMin() override { return &min_; }                         \
     virtual void* GetMax() override { return &max_; }                         \
@@ -121,10 +121,10 @@ class MinMaxFilter {
       return min_ == std::numeric_limits<TYPE>::max()                         \
           && max_ == std::numeric_limits<TYPE>::lowest();                     \
     }                                                                         \
-    virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;         \
+    virtual void ToThrift(TMinMaxFilter* thrift) const override;              \
     virtual std::string DebugString() const override;                         \
-    static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);            \
-    static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);          \
+    static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);              \
+    static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);            \
     static const char* LLVM_CLASS_NAME;                                       \
                                                                               \
    private:                                                                   \
@@ -148,7 +148,7 @@ class StringMinMaxFilter : public MinMaxFilter {
       max_buffer_(&mem_pool_),
       always_false_(true),
       always_true_(false) {}
-  StringMinMaxFilter(const MinMaxFilterPB& protobuf, MemTracker* mem_tracker);
+  StringMinMaxFilter(const TMinMaxFilter& thrift, MemTracker* mem_tracker);
   virtual ~StringMinMaxFilter() {}
   virtual void Close() override { mem_pool_.FreeAll(); }
 
@@ -164,11 +164,11 @@ class StringMinMaxFilter : public MinMaxFilter {
   /// truncating them if necessary.
   virtual void MaterializeValues() override;
 
-  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
+  virtual void ToThrift(TMinMaxFilter* thrift) const override;
   virtual std::string DebugString() const override;
 
-  static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);
-  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
+  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
 
   /// Struct name in LLVM IR.
   static const char* LLVM_CLASS_NAME;
@@ -208,7 +208,7 @@ class StringMinMaxFilter : public MinMaxFilter {
 class TimestampMinMaxFilter : public MinMaxFilter {
  public:
   TimestampMinMaxFilter() { always_false_ = true; }
-  TimestampMinMaxFilter(const MinMaxFilterPB& protobuf);
+  TimestampMinMaxFilter(const TMinMaxFilter& thrift);
   virtual ~TimestampMinMaxFilter() {}
 
   virtual void* GetMin() override { return &min_; }
@@ -218,11 +218,11 @@ class TimestampMinMaxFilter : public MinMaxFilter {
   virtual void Insert(void* val) override;
   virtual bool AlwaysTrue() const override { return false; }
   virtual bool AlwaysFalse() const override { return always_false_; }
-  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
+  virtual void ToThrift(TMinMaxFilter* thrift) const override;
   virtual std::string DebugString() const override;
 
-  static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);
-  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
+  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
 
   /// Struct name in LLVM IR.
   static const char* LLVM_CLASS_NAME;
@@ -269,7 +269,7 @@ class DecimalMinMaxFilter : public MinMaxFilter {
   DecimalMinMaxFilter(int precision)
     : size_(ColumnType::GetDecimalByteSize(precision)), always_false_(true) {}
 
-  DecimalMinMaxFilter(const MinMaxFilterPB& protobuf, int precision);
+  DecimalMinMaxFilter(const TMinMaxFilter& thrift, int precision);
   virtual ~DecimalMinMaxFilter() {}
 
   virtual void* GetMin() override {
@@ -286,11 +286,11 @@ class DecimalMinMaxFilter : public MinMaxFilter {
   virtual PrimitiveType type() override;
   virtual bool AlwaysTrue() const override { return false; }
   virtual bool AlwaysFalse() const override { return always_false_; }
-  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
+  virtual void ToThrift(TMinMaxFilter* thrift) const override;
   virtual std::string DebugString() const override;
 
-  static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out, int precision);
-  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
+  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out, int precision);
+  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
 
   void Insert4(void* val);
   void Insert8(void* val);
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
index 4680a69..6c265a3 100644
--- a/common/protobuf/common.proto
+++ b/common/protobuf/common.proto
@@ -39,20 +39,3 @@ enum CompressionType {
   NONE = 0; // No compression.
   LZ4 = 1;
 }
-
-// This is a union over all possible return types.
-// TODO: if we upgrade to proto3, then we can use the oneof feature in Protobuf 3 in
-// the following to save some memory because only one of the fields below is set at a
-// time.
-message ColumnValuePB {
-  optional bool bool_val = 1;
-  optional int32 byte_val = 6;
-  optional int32 short_val = 7;
-  optional int32 int_val = 2;
-  optional int64 long_val = 3;
-  optional double double_val = 4;
-  optional string string_val = 5;
-  optional string binary_val = 8;
-  optional string timestamp_val = 9;
-  optional bytes decimal_val = 10;
-}
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index 708b814..b0e2b5d 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -78,77 +78,6 @@ message EndDataStreamResponsePB {
   optional int64 receiver_latency_ns = 2;
 }
 
-message BloomFilterPB {
-  // Log_2 of the bufferpool space required for this filter.
-  // See BloomFilter::BloomFilter() for details.
-  optional int32 log_bufferpool_space = 1;
-
-  // If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
-  // not meaningful.
-  optional bool always_true = 2;
-  optional bool always_false = 3;
-
-  // The sidecar index associated with the directory of a Bloom filter.
-  // A directory is a list of buckets representing the Bloom Filter contents,
-  // laid out contiguously in one string for efficiency of (de)serialisation.
-  // See BloomFilter::Bucket and BloomFilter::directory_.
-  optional int32 directory_sidecar_idx = 4;
-}
-
-message MinMaxFilterPB {
-  // If true, filter allows all elements to pass and 'min'/'max' will not be set.
-  optional bool always_true = 1;
-
-  // If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
-  optional bool always_false = 2;
-
-  optional ColumnValuePB min = 3;
-  optional ColumnValuePB max = 4;
-}
-
-message UpdateFilterParamsPB {
-  // Filter ID, unique within a query.
-  optional int32 filter_id = 1;
-
-  // Query that this filter is for.
-  optional UniqueIdPB query_id = 2;
-
-  optional BloomFilterPB bloom_filter = 3;
-
-  optional MinMaxFilterPB min_max_filter = 4;
-}
-
-message UpdateFilterResultPB {
-  optional StatusPB status = 1;
-
-  // Latency for response in the receiving daemon in nanoseconds.
-  optional int64 receiver_latency_ns = 2;
-}
-
-message PublishFilterParamsPB {
-  // Filter ID, unique within a query.
-  optional int32 filter_id = 1;
-
-  // Query that this filter is for.
-  optional UniqueIdPB dst_query_id = 2;
-
-  // Index of fragment to receive this filter
-  optional int32 dst_fragment_idx = 3;
-
-  // Actual bloom_filter payload
-  optional BloomFilterPB bloom_filter = 4;
-
-  // Actual min_max_filter payload
-  optional MinMaxFilterPB min_max_filter = 5;
-}
-
-message PublishFilterResultPB {
-  optional StatusPB status = 1;
-
-  // Latency for response in the receiving daemon in nanoseconds.
-  optional int64 receiver_latency_ns = 2;
-}
-
 // Handles data transmission between fragment instances.
 service DataStreamService {
   // Override the default authorization method.
@@ -161,12 +90,4 @@ service DataStreamService {
 
   // Called by a sender to close the channel between fragment instances.
   rpc EndDataStream(EndDataStreamRequestPB) returns (EndDataStreamResponsePB);
-
-  // Called by fragment instances that produce local runtime filters to deliver them to
-  // the coordinator for aggregation and broadcast.
-  rpc UpdateFilter(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
-
-  // Called by the coordinator to deliver global runtime filters to fragments for
-  // application at plan nodes.
-  rpc PublishFilter(PublishFilterParamsPB) returns (PublishFilterResultPB);
 }
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index fd26406..62b396e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -749,6 +749,82 @@ struct TPoolConfig {
   11: required i64 max_memory_multiple = 0;
 }
 
+struct TBloomFilter {
+  // Log_2 of the bufferpool space required for this filter.
+  // See BloomFilter::BloomFilter() for details.
+  1: required i32 log_bufferpool_space
+
+  // List of buckets representing the Bloom Filter contents, laid out contiguously in one
+  // string for efficiency of (de)serialisation. See BloomFilter::Bucket and
+  // BloomFilter::directory_.
+  2: binary directory
+
+  // If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
+  // not meaningful.
+  3: required bool always_true
+  4: required bool always_false
+}
+
+struct TMinMaxFilter {
+  // If true, filter allows all elements to pass and 'min'/'max' will not be set.
+  1: required bool always_true
+
+  // If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
+  2: required bool always_false
+
+  3: optional Data.TColumnValue min
+  4: optional Data.TColumnValue max
+}
+
+// UpdateFilter
+
+struct TUpdateFilterParams {
+  1: required ImpalaInternalServiceVersion protocol_version
+
+  // Filter ID, unique within a query.
+  // required in V1
+  2: optional i32 filter_id
+
+  // Query that this filter is for.
+  // required in V1
+  3: optional Types.TUniqueId query_id
+
+  // required in V1
+  4: optional TBloomFilter bloom_filter
+
+  5: optional TMinMaxFilter min_max_filter
+}
+
+struct TUpdateFilterResult {
+}
+
+
+// PublishFilter
+
+struct TPublishFilterParams {
+  1: required ImpalaInternalServiceVersion protocol_version
+
+  // Filter ID to update
+  // required in V1
+  2: optional i32 filter_id
+
+  // required in V1
+  3: optional Types.TUniqueId dst_query_id
+
+  // Index of fragment to receive this filter
+  // required in V1
+  4: optional Types.TFragmentIdx dst_fragment_idx
+
+  // Actual bloom_filter payload
+  // required in V1
+  5: optional TBloomFilter bloom_filter
+
+  6: optional TMinMaxFilter min_max_filter
+}
+
+struct TPublishFilterResult {
+}
+
 struct TParseDateStringResult {
   // True iff date string was successfully parsed
   1: required bool valid
@@ -761,4 +837,11 @@ struct TParseDateStringResult {
 
 service ImpalaInternalService {
 
+  // Called by fragment instances that produce local runtime filters to deliver them to
+  // the coordinator for aggregation and broadcast.
+  TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params);
+
+  // Called by the coordinator to deliver global runtime filters to fragments for
+  // application at plan nodes.
+  TPublishFilterResult PublishFilter(1:TPublishFilterParams params);
 }


[impala] 01/02: IMPALA-8709: Add Damerau-Levenshtein edit distance built-in function

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a862282811e76767c6c5d7874db2a310586f2421
Author: norbert.luksa <no...@cloudera.com>
AuthorDate: Fri Oct 25 13:49:18 2019 +0200

    IMPALA-8709: Add Damerau-Levenshtein edit distance built-in function
    
    This patch adds new built-in functions to calculate restricted
    Damerau-Levenshtein edit distance (optimal string alignment).
    Implmented as dle_dst() and damerau_levenshtein(). If either value is
    NULL or both values are NULL returns NULL which differs from Netezza's
    dle_dst() which returns the length of the not NULL value or 0 if both
    values are NULL. The NULL behavior matches the existing levenshtein()
    function.
    
    Also cleans up levenshtein tests.
    
    Testing:
    - Added unit tests to expr-test.cc
    - Manual testing on over 1400 string pairs from
      http://marvin.cs.uidaho.edu/misspell.html and results match Netezza
    
    Change-Id: Ib759817ec15e7075bf49d51e494e45c8af4db94d
    Reviewed-on: http://gerrit.cloudera.org:8080/13794
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/expr-test.cc                    |  78 +++++++++++--------
 be/src/exprs/string-functions-ir.cc          | 108 +++++++++++++++++++++++++--
 be/src/exprs/string-functions.h              |   3 +
 common/function-registry/impala_functions.py |   2 +
 4 files changed, 153 insertions(+), 38 deletions(-)

diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index e932a04..0b80f10 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -3992,37 +3992,51 @@ TEST_P(ExprTest, InPredicate) {
 }
 
 TEST_P(ExprTest, StringFunctions) {
-  TestValue("levenshtein('levenshtein', 'frankenstein')", TYPE_INT, 6);
-  TestValue("levenshtein('example', 'samples')", TYPE_INT, 3);
-  TestValue("levenshtein('sturgeon', 'urgently')", TYPE_INT, 6);
-  TestValue("levenshtein('distance', 'difference')", TYPE_INT, 5);
-  TestValue("levenshtein('kitten', 'sitting')", TYPE_INT, 3);
-  TestValue("levenshtein('levenshtein', 'levenshtein')", TYPE_INT, 0);
-  TestValue("levenshtein('', 'levenshtein')", TYPE_INT, 11);
-  TestValue("levenshtein('levenshtein', '')", TYPE_INT, 11);
-  TestIsNull("levenshtein('foo', NULL)", TYPE_INT);
-  TestIsNull("levenshtein(NULL, 'foo')", TYPE_INT);
-  TestIsNull("levenshtein(NULL, NULL)", TYPE_INT);
-  TestErrorString("levenshtein('z', repeat('x', 256))",
-      "levenshtein argument exceeds maximum length of 255 characters\n");
-  TestErrorString("levenshtein(repeat('x', 256), 'z')",
-      "levenshtein argument exceeds maximum length of 255 characters\n");
-
-  TestValue("le_dst('levenshtein', 'frankenstein')", TYPE_INT, 6);
-  TestValue("le_dst('example', 'samples')", TYPE_INT, 3);
-  TestValue("le_dst('sturgeon', 'urgently')", TYPE_INT, 6);
-  TestValue("le_dst('distance', 'difference')", TYPE_INT, 5);
-  TestValue("le_dst('kitten', 'sitting')", TYPE_INT, 3);
-  TestValue("le_dst('levenshtein', 'levenshtein')", TYPE_INT, 0);
-  TestValue("le_dst('', 'levenshtein')", TYPE_INT, 11);
-  TestValue("le_dst('levenshtein', '')", TYPE_INT, 11);
-  TestIsNull("le_dst('foo', NULL)", TYPE_INT);
-  TestIsNull("le_dst(NULL, 'foo')", TYPE_INT);
-  TestIsNull("le_dst(NULL, NULL)", TYPE_INT);
-  TestErrorString("le_dst('z', repeat('x', 256))",
-      "levenshtein argument exceeds maximum length of 255 characters\n");
-  TestErrorString("le_dst(repeat('x', 256), 'z')",
-      "levenshtein argument exceeds maximum length of 255 characters\n");
+
+  for (const string fn_name: { "levenshtein", "le_dst" }) {
+    TestValue(fn_name + "('levenshtein', 'frankenstein')", TYPE_INT, 6);
+    TestValue(fn_name + "('example', 'samples')", TYPE_INT, 3);
+    TestValue(fn_name + "('sturgeon', 'urgently')", TYPE_INT, 6);
+    TestValue(fn_name + "('distance', 'difference')", TYPE_INT, 5);
+    TestValue(fn_name + "('kitten', 'sitting')", TYPE_INT, 3);
+    TestValue(fn_name + "('levenshtein', 'levenshtein')", TYPE_INT, 0);
+    TestValue(fn_name + "('', 'levenshtein')", TYPE_INT, 11);
+    TestValue(fn_name + "('levenshtein', '')", TYPE_INT, 11);
+    TestIsNull(fn_name + "('foo', NULL)", TYPE_INT);
+    TestIsNull(fn_name + "(NULL, 'foo')", TYPE_INT);
+    TestIsNull(fn_name + "(NULL, NULL)", TYPE_INT);
+    TestErrorString(fn_name + "('z', repeat('x', 256))",
+        "levenshtein argument exceeds maximum length of 255 characters\n");
+    TestErrorString(fn_name + "(repeat('x', 256), 'z')",
+        "levenshtein argument exceeds maximum length of 255 characters\n");
+  }
+
+  for (const string fn_name: { "damerau_levenshtein", "dle_dst" }) {
+    TestValue(fn_name + "('', '')", TYPE_INT, 0);
+    TestValue(fn_name + "('abc', 'abc')", TYPE_INT, 0);
+    TestValue(fn_name + "('a', 'b')", TYPE_INT, 1);
+    TestValue(fn_name + "('a', '')", TYPE_INT, 1);
+    TestValue(fn_name + "('aabc', 'abc')", TYPE_INT, 1);
+    TestValue(fn_name + "('abcc', 'abc')", TYPE_INT, 1);
+    TestValue(fn_name + "('', 'a')", TYPE_INT, 1);
+    TestValue(fn_name + "('abc', 'abcc')", TYPE_INT, 1);
+    TestValue(fn_name + "('abc', 'aabc')", TYPE_INT, 1);
+    TestValue(fn_name + "('teh', 'the')", TYPE_INT, 1);
+    TestValue(fn_name + "('tets', 'test')", TYPE_INT, 1);
+    TestValue(fn_name + "('fuor', 'four')", TYPE_INT, 1);
+    TestValue(fn_name + "('kitten', 'sitting')", TYPE_INT, 3);
+    TestValue(fn_name + "('Saturday', 'Sunday')", TYPE_INT, 3);
+    TestValue(fn_name + "('rosettacode', 'raisethysword')", TYPE_INT, 8);
+    TestValue(fn_name + "('CA', 'ABC')", TYPE_INT, 3);
+    TestValue(fn_name + "(repeat('z', 255), repeat('x', 255))", TYPE_INT, 255);
+    TestIsNull(fn_name + "('foo', NULL)", TYPE_INT);
+    TestIsNull(fn_name + "(NULL, 'foo')", TYPE_INT);
+    TestIsNull(fn_name + "(NULL, NULL)", TYPE_INT);
+    TestErrorString(fn_name + "('z', repeat('x', 256))",
+        "damerau-levenshtein argument exceeds maximum length of 255 characters\n");
+    TestErrorString(fn_name + "(repeat('x', 256), 'z')",
+        "damerau-levenshtein argument exceeds maximum length of 255 characters\n");
+  }
 
   for (const string fn_name: { "jaro_dst", "jaro_distance" }) {
     TestIsNull(fn_name + "('foo', NULL)", TYPE_DOUBLE);
@@ -4039,6 +4053,7 @@ TEST_P(ExprTest, StringFunctions) {
     TestValue(fn_name + "('frog', 'fog')", TYPE_DOUBLE, 0.08333333333333337);
     TestValue(fn_name + "('hello', 'haloa')", TYPE_DOUBLE, 0.2666666666666666);
     TestValue(fn_name + "('atcg', 'tagc')", TYPE_DOUBLE, 0.1666666666666667);
+    TestValue(fn_name + "(repeat('z', 255), repeat('x', 255))", TYPE_DOUBLE, 1.0);
     TestErrorString(fn_name + "('z', repeat('x', 256))",
         "jaro argument exceeds maximum length of 255 characters\n");
     TestErrorString(fn_name + "(repeat('x', 256), 'z')",
@@ -4060,6 +4075,7 @@ TEST_P(ExprTest, StringFunctions) {
     TestValue(fn_name + "('frog', 'fog')", TYPE_DOUBLE, 0.9166666666666666);
     TestValue(fn_name + "('hello', 'haloa')", TYPE_DOUBLE, 0.73333333333333334);
     TestValue(fn_name + "('atcg', 'tagc')", TYPE_DOUBLE, 0.8333333333333333);
+    TestValue(fn_name + "(repeat('z', 255), repeat('x', 255))", TYPE_DOUBLE, 0.0);
     TestErrorString(fn_name + "('z', repeat('x', 256))",
         "jaro argument exceeds maximum length of 255 characters\n");
     TestErrorString(fn_name + "(repeat('x', 256), 'z')",
diff --git a/be/src/exprs/string-functions-ir.cc b/be/src/exprs/string-functions-ir.cc
index 11336e3..2aae65c 100644
--- a/be/src/exprs/string-functions-ir.cc
+++ b/be/src/exprs/string-functions-ir.cc
@@ -1166,7 +1166,11 @@ IntVal StringFunctions::Levenshtein(
 
   int column_start = 1;
 
-  auto column = reinterpret_cast<int*>(ctx->Allocate(sizeof(int) * (s1len + 1)));
+  int* column = reinterpret_cast<int*>(ctx->Allocate(sizeof(int) * (s1len + 1)));
+  if (UNLIKELY(column == nullptr)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return IntVal::null();
+  }
 
   std::iota(column + column_start - 1, column + s1len + 1, column_start - 1);
 
@@ -1213,8 +1217,19 @@ DoubleVal StringFunctions::JaroSimilarity(
   // the window size to search for matches in the other string
   int max_range = std::max(0, std::max(s1len, s2len) / 2 - 1);
 
-  int s1_matching[s1len];
-  int s2_matching[s2len];
+  int* s1_matching = reinterpret_cast<int*>(ctx->Allocate(sizeof(int) * (s1len)));
+  if (UNLIKELY(s1_matching == nullptr)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return DoubleVal::null();
+  }
+
+  int* s2_matching = reinterpret_cast<int*>(ctx->Allocate(sizeof(int) * (s2len)));
+  if (UNLIKELY(s2_matching == nullptr)) {
+    ctx->Free(reinterpret_cast<uint8_t*>(s1_matching));
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return DoubleVal::null();
+  }
+
   std::fill_n(s1_matching, s1len, -1);
   std::fill_n(s2_matching, s2len, -1);
 
@@ -1236,7 +1251,11 @@ DoubleVal StringFunctions::JaroSimilarity(
     }
   }
 
-  if (matching_characters == 0) return DoubleVal(0.0);
+  if (matching_characters == 0) {
+    ctx->Free(reinterpret_cast<uint8_t*>(s1_matching));
+    ctx->Free(reinterpret_cast<uint8_t*>(s2_matching));
+    return DoubleVal(0.0);
+  }
 
   // transpositions (one-way only)
   double transpositions = 0.0;
@@ -1247,9 +1266,7 @@ DoubleVal StringFunctions::JaroSimilarity(
     while (s2_matching[s2i] == -1) {
       s2i++;
     }
-    if (s1.ptr[s1i] != s2.ptr[s2i]) {
-      transpositions += 0.5;
-    }
+    if (s1.ptr[s1i] != s2.ptr[s2i]) transpositions += 0.5;
     s1i++;
     s2i++;
   }
@@ -1258,6 +1275,9 @@ DoubleVal StringFunctions::JaroSimilarity(
                                         + m / static_cast<double>(s2len)
                                         + (m - transpositions) / m );
 
+  ctx->Free(reinterpret_cast<uint8_t*>(s1_matching));
+  ctx->Free(reinterpret_cast<uint8_t*>(s2_matching));
+
   return DoubleVal(jaro_similarity);
 }
 
@@ -1359,4 +1379,78 @@ DoubleVal StringFunctions::JaroWinklerSimilarity(FunctionContext* ctx,
   }
   return DoubleVal(jaro_winkler_similarity);
 }
+
+IntVal StringFunctions::DamerauLevenshtein(
+    FunctionContext* ctx, const StringVal& s1, const StringVal& s2) {
+  // Based on https://en.wikipedia.org/wiki/Damerau%E2%80%93Levenshtein_distance
+  // Implements restricted Damerau-Levenshtein (optimal string alignment)
+
+  int s1len = s1.len;
+  int s2len = s2.len;
+
+  // error if either input exceeds 255 characters
+  if (s1len > 255 || s2len > 255) {
+    ctx->SetError("damerau-levenshtein argument exceeds maximum length of 255 "
+                  "characters");
+    return IntVal(-1);
+  }
+
+  // short cut cases:
+  // - null strings
+  // - zero length strings
+  // - identical length and value strings
+  if (s1.is_null || s2.is_null) return IntVal::null();
+  if (s1len == 0) return IntVal(s2len);
+  if (s2len == 0) return IntVal(s1len);
+  if (s1len == s2len && memcmp(s1.ptr, s2.ptr, s1len) == 0) return IntVal(0);
+
+  int i;
+  int j;
+  int l_cost;
+  int ptr_array_length = sizeof(int*) * (s1len + 1);
+  int int_array_length = sizeof(int) * (s2len + 1) * (s1len + 1);
+
+  // Allocating a 2D array (with d being an array of pointers to the start of the rows)
+  int** d = reinterpret_cast<int**>(ctx->Allocate(ptr_array_length));
+  if (UNLIKELY(d == nullptr)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return IntVal::null();
+  }
+  int* rows = reinterpret_cast<int*>(ctx->Allocate(int_array_length));
+  if (UNLIKELY(rows == nullptr)) {
+    ctx->Free(reinterpret_cast<uint8_t*>(d));
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return IntVal::null();
+  }
+  // Setting the pointers in the pointer-array to the start of (s2len + 1) length
+  // intervals and initializing its values based on the mentioned algorithm.
+  for (i = 0; i <= s1len; ++i) {
+    d[i] = rows + (s2len + 1) * i;
+    d[i][0] = i;
+  }
+  std::iota(d[0], d[0] + s2len + 1, 0);
+
+  for (i = 1; i <= s1len; ++i) {
+    for (j = 1; j <= s2len; ++j) {
+      if (s1.ptr[i - 1] == s2.ptr[j - 1]) {
+        l_cost = 0;
+      } else {
+        l_cost = 1;
+      }
+      d[i][j] = std::min(d[i - 1][j - 1] + l_cost, // substitution
+                         std::min(d[i][j - 1] + 1, // insertion
+                                  d[i - 1][j] + 1) // deletion
+      );
+      if (i > 1 && j > 1 && s1.ptr[i - 1] == s2.ptr[j - 2]
+          && s1.ptr[i - 2] == s2.ptr[j - 1]) {
+        d[i][j] = std::min(d[i][j], d[i - 2][j - 2] + l_cost); // transposition
+      }
+    }
+  }
+  int result = d[s1len][s2len];
+
+  ctx->Free(reinterpret_cast<uint8_t*>(d));
+  ctx->Free(reinterpret_cast<uint8_t*>(rows));
+  return IntVal(result);
+}
 }
diff --git a/be/src/exprs/string-functions.h b/be/src/exprs/string-functions.h
index 8386461..09a16b5 100644
--- a/be/src/exprs/string-functions.h
+++ b/be/src/exprs/string-functions.h
@@ -158,6 +158,9 @@ class StringFunctions {
   static IntVal Levenshtein(
       FunctionContext* context, const StringVal& s1, const StringVal& s2);
 
+  static IntVal DamerauLevenshtein(
+      FunctionContext* context, const StringVal& s1, const StringVal& s2);
+
   static DoubleVal JaroDistance(
       FunctionContext* ctx, const StringVal& s1, const StringVal& s2);
 
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index d7d1ceb..f132743 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -586,6 +586,8 @@ visible_functions = [
    'impala::StringFunctions::GetJsonObject'],
   [['levenshtein', 'le_dst'], 'INT', ['STRING', 'STRING'],
    '_ZN6impala15StringFunctions11LevenshteinEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
+  [['damerau_levenshtein', 'dle_dst'], 'INT', ['STRING', 'STRING'],
+   '_ZN6impala15StringFunctions18DamerauLevenshteinEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
   [['jaro_distance', 'jaro_dst'], 'DOUBLE', ['STRING', 'STRING'],
    '_ZN6impala15StringFunctions12JaroDistanceEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
   [['jaro_similarity', 'jaro_sim'], 'DOUBLE', ['STRING', 'STRING'],