You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/06/19 16:57:40 UTC

[impala] 05/05: IMPALA-7467: Port ExecQueryFInstances to krpc

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 30c3cd95a42cacbfa2dbb0b29a4757745af942c3
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Tue Apr 2 17:31:38 2019 +0000

    IMPALA-7467: Port ExecQueryFInstances to krpc
    
    This patch ports the ExecQueryFInstances rpc to use KRPC. The
    parameters for this call contain a huge number of Thrift structs
    (eg. everything related to TPlanNode and TExpr), so to avoid
    converting all of these to protobuf and the resulting effect that
    would have on the FE and catalog, this patch stores most of the
    parameters in a sidecar (in particular the TQueryCtx,
    TPlanFragmentCtx's, and TPlanFragmentInstanceCtx's).
    
    Testing:
    - Passed a full exhaustive run on the minicluster.
    Set up a ten node cluster with tpch 500:
    - Ran perf tests: 3 iterations per tpch query, 4 concurrent streams,
      no perf change.
    - Ran the stress test for 1000 queries, passed.
    
    Change-Id: Id3f1c6099109bd8e5361188005a7d0e892147570
    Reviewed-on: http://gerrit.cloudera.org:8080/13583
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/backend-client.h             |  29 +------
 be/src/runtime/coordinator-backend-state.cc | 119 +++++++++++++++++++---------
 be/src/runtime/coordinator-backend-state.h  |  10 ++-
 be/src/runtime/query-exec-mgr.cc            |  12 +--
 be/src/runtime/query-exec-mgr.h             |   3 +-
 be/src/runtime/query-state.cc               |  60 +++++++-------
 be/src/runtime/query-state.h                |  13 ++-
 be/src/runtime/test-env.cc                  |  17 ++--
 be/src/service/control-service.cc           |  43 +++++++++-
 be/src/service/control-service.h            |  11 +++
 be/src/service/impala-internal-service.cc   |  23 ------
 be/src/service/impala-internal-service.h    |   4 -
 common/protobuf/control_service.proto       |  39 +++++++++
 common/thrift/ImpalaInternalService.thrift  |  48 ++---------
 tests/custom_cluster/test_rpc_timeout.py    |   2 +-
 15 files changed, 245 insertions(+), 188 deletions(-)

diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index a4ff597..92279fc 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -32,12 +32,12 @@ namespace impala {
 class ImpalaBackendClient : public ImpalaInternalServiceClient {
  public:
   ImpalaBackendClient(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot)
-    : ImpalaInternalServiceClient(prot), transmit_csw_(NULL) {
+    : ImpalaInternalServiceClient(prot) {
   }
 
   ImpalaBackendClient(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot,
       boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot)
-    : ImpalaInternalServiceClient(iprot, oprot), transmit_csw_(NULL) {
+    : ImpalaInternalServiceClient(iprot, oprot) {
   }
 
 /// We intentionally disable this clang warning as we intend to hide the
@@ -45,29 +45,6 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
 #pragma clang diagnostic push
 #pragma clang diagnostic ignored "-Woverloaded-virtual"
 
-  void ExecQueryFInstances(TExecQueryFInstancesResult& _return,
-      const TExecQueryFInstancesParams& params, bool* send_done) {
-    DCHECK(!*send_done);
-    FAULT_INJECTION_SEND_RPC_EXCEPTION(16);
-    ImpalaInternalServiceClient::send_ExecQueryFInstances(params);
-    *send_done = true;
-    // Cannot inject fault on recv() side as the callers cannot handle it.
-    ImpalaInternalServiceClient::recv_ExecQueryFInstances(_return);
-  }
-
-  /// Callers of TransmitData() should provide their own counter to measure the data
-  /// transmission time.
-  void SetTransmitDataCounter(RuntimeProfile::ConcurrentTimerCounter* csw) {
-    DCHECK(transmit_csw_ == NULL);
-    transmit_csw_ = csw;
-  }
-
-  /// ImpalaBackendClient is shared by multiple queries. It's the caller's responsibility
-  /// to reset the counter after data transmission.
-  void ResetTransmitDataCounter() {
-    transmit_csw_ = NULL;
-  }
-
   void UpdateFilter(TUpdateFilterResult& _return, const TUpdateFilterParams& params,
       bool* send_done) {
     DCHECK(!*send_done);
@@ -86,8 +63,6 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
 
 #pragma clang diagnostic pop
 
- private:
-  RuntimeProfile::ConcurrentTimerCounter* transmit_csw_;
 };
 
 }
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index bd97712..19709f7 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -24,6 +24,7 @@
 #include "exec/kudu-util.h"
 #include "exec/scan-node.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "rpc/rpc-mgr.inline.h"
@@ -46,6 +47,7 @@
 
 using kudu::MonoDelta;
 using kudu::rpc::RpcController;
+using kudu::rpc::RpcSidecar;
 using namespace impala;
 using namespace rapidjson;
 namespace accumulators = boost::accumulators;
@@ -53,6 +55,9 @@ namespace accumulators = boost::accumulators;
 const char* Coordinator::BackendState::InstanceStats::LAST_REPORT_TIME_DESC =
     "Last report received time";
 
+DECLARE_int32(backend_client_rpc_timeout_ms);
+DECLARE_int64(rpc_max_message_size);
+
 Coordinator::BackendState::BackendState(
     const Coordinator& coord, int state_idx, TRuntimeFilterMode::type filter_mode)
   : coord_(coord),
@@ -94,30 +99,28 @@ void Coordinator::BackendState::Init(
 }
 
 void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
-    const FilterRoutingTable& filter_routing_table,
-    TExecQueryFInstancesParams* rpc_params) {
-  rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
-  rpc_params->__set_coord_state_idx(state_idx_);
-  rpc_params->__set_min_mem_reservation_bytes(
-      backend_exec_params_->min_mem_reservation_bytes);
-  rpc_params->__set_initial_mem_reservation_total_claims(
+    const FilterRoutingTable& filter_routing_table, ExecQueryFInstancesRequestPB* request,
+    TExecQueryFInstancesSidecar* sidecar) {
+  request->set_coord_state_idx(state_idx_);
+  request->set_min_mem_reservation_bytes(backend_exec_params_->min_mem_reservation_bytes);
+  request->set_initial_mem_reservation_total_claims(
       backend_exec_params_->initial_mem_reservation_total_claims);
-  rpc_params->__set_per_backend_mem_limit(coord_.schedule_.per_backend_mem_limit());
+  request->set_per_backend_mem_limit(coord_.schedule_.per_backend_mem_limit());
 
   // set fragment_ctxs and fragment_instance_ctxs
-  rpc_params->__isset.fragment_ctxs = true;
-  rpc_params->__isset.fragment_instance_ctxs = true;
-  rpc_params->fragment_instance_ctxs.resize(backend_exec_params_->instance_params.size());
+  sidecar->__isset.fragment_ctxs = true;
+  sidecar->__isset.fragment_instance_ctxs = true;
+  sidecar->fragment_instance_ctxs.resize(backend_exec_params_->instance_params.size());
   for (int i = 0; i < backend_exec_params_->instance_params.size(); ++i) {
-    TPlanFragmentInstanceCtx& instance_ctx = rpc_params->fragment_instance_ctxs[i];
+    TPlanFragmentInstanceCtx& instance_ctx = sidecar->fragment_instance_ctxs[i];
     const FInstanceExecParams& params = *backend_exec_params_->instance_params[i];
     int fragment_idx = params.fragment_exec_params.fragment.idx;
 
     // add a TPlanFragmentCtx, if we don't already have it
-    if (rpc_params->fragment_ctxs.empty()
-        || rpc_params->fragment_ctxs.back().fragment.idx != fragment_idx) {
-      rpc_params->fragment_ctxs.emplace_back();
-      TPlanFragmentCtx& fragment_ctx = rpc_params->fragment_ctxs.back();
+    if (sidecar->fragment_ctxs.empty()
+        || sidecar->fragment_ctxs.back().fragment.idx != fragment_idx) {
+      sidecar->fragment_ctxs.emplace_back();
+      TPlanFragmentCtx& fragment_ctx = sidecar->fragment_ctxs.back();
       fragment_ctx.__set_fragment(params.fragment_exec_params.fragment);
       fragment_ctx.__set_destinations(params.fragment_exec_params.destinations);
     }
@@ -141,7 +144,7 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
     // TODO: do this more efficiently, we're looping over the entire plan for each
     // instance separately
     int instance_idx = GetInstanceIdx(params.instance_id);
-    for (TPlanNode& plan_node: rpc_params->fragment_ctxs.back().fragment.plan.nodes) {
+    for (TPlanNode& plan_node : sidecar->fragment_ctxs.back().fragment.plan.nodes) {
       if (!plan_node.__isset.hash_join_node) continue;
       if (!plan_node.__isset.runtime_filters) continue;
 
@@ -167,6 +170,14 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
   }
 }
 
+void Coordinator::BackendState::SetExecError(const Status& status) {
+  const string ERR_TEMPLATE = "ExecQueryFInstances rpc query_id=$0 failed: $1";
+  const string& err_msg =
+      Substitute(ERR_TEMPLATE, PrintId(query_id()), status.msg().GetFullMessageDetails());
+  LOG(ERROR) << err_msg;
+  status_ = Status::Expected(err_msg);
+}
+
 void Coordinator::BackendState::Exec(
     const DebugOptions& debug_options,
     const FilterRoutingTable& filter_routing_table,
@@ -176,9 +187,52 @@ void Coordinator::BackendState::Exec(
     last_report_time_ms_ = GenerateReportTimestamp();
     exec_complete_barrier->Notify();
   });
-  TExecQueryFInstancesParams rpc_params;
-  rpc_params.__set_query_ctx(query_ctx());
-  SetRpcParams(debug_options, filter_routing_table, &rpc_params);
+  std::unique_ptr<ControlServiceProxy> proxy;
+  Status get_proxy_status =
+      ControlService::GetProxy(krpc_host_, krpc_host_.hostname, &proxy);
+  if (!get_proxy_status.ok()) {
+    SetExecError(get_proxy_status);
+    return;
+  }
+
+  ExecQueryFInstancesRequestPB request;
+  TExecQueryFInstancesSidecar sidecar;
+  sidecar.__set_query_ctx(query_ctx());
+  SetRpcParams(debug_options, filter_routing_table, &request, &sidecar);
+
+  RpcController rpc_controller;
+  rpc_controller.set_timeout(
+      MonoDelta::FromMilliseconds(FLAGS_backend_client_rpc_timeout_ms));
+
+  // Serialize the sidecar and add it to the rpc controller. The serialized buffer is
+  // owned by 'serializer' and is freed when it is destructed.
+  ThriftSerializer serializer(true);
+  uint8_t* serialized_buf = nullptr;
+  uint32_t serialized_len = 0;
+  Status serialize_status =
+      serializer.SerializeToBuffer(&sidecar, &serialized_len, &serialized_buf);
+  if (UNLIKELY(!serialize_status.ok())) {
+    SetExecError(serialize_status);
+    return;
+  } else if (serialized_len > FLAGS_rpc_max_message_size) {
+    SetExecError(
+        Status::Expected("Serialized Exec() request exceeds --rpc_max_message_size."));
+    return;
+  }
+
+  unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>();
+  sidecar_buf->assign_copy(serialized_buf, serialized_len);
+  unique_ptr<RpcSidecar> rpc_sidecar = RpcSidecar::FromFaststring(move(sidecar_buf));
+
+  int sidecar_idx;
+  kudu::Status sidecar_status =
+      rpc_controller.AddOutboundSidecar(move(rpc_sidecar), &sidecar_idx);
+  if (!sidecar_status.ok()) {
+    SetExecError(FromKuduStatus(sidecar_status, "Failed to add sidecar"));
+    return;
+  }
+  request.set_sidecar_idx(sidecar_idx);
+
   VLOG_FILE << "making rpc: ExecQueryFInstances"
       << " host=" << TNetworkAddressToString(impalad_address()) << " query_id="
       << PrintId(query_id());
@@ -187,33 +241,22 @@ void Coordinator::BackendState::Exec(
   lock_guard<mutex> l(lock_);
   int64_t start = MonotonicMillis();
 
-  ImpalaBackendConnection backend_client(
-      ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(), &status_);
-  if (!status_.ok()) return;
+  ExecQueryFInstancesResponsePB response;
+  Status rpc_status =
+      FromKuduStatus(proxy->ExecQueryFInstances(request, &response, &rpc_controller),
+          "Exec() rpc failed");
 
-  TExecQueryFInstancesResult thrift_result;
-  Status rpc_status = backend_client.DoRpc(
-      &ImpalaBackendClient::ExecQueryFInstances, rpc_params, &thrift_result);
   rpc_sent_ = true;
   rpc_latency_ = MonotonicMillis() - start;
 
-  const string ERR_TEMPLATE =
-      "ExecQueryFInstances rpc query_id=$0 failed: $1";
-
   if (!rpc_status.ok()) {
-    const string& err_msg =
-        Substitute(ERR_TEMPLATE, PrintId(query_id()), rpc_status.msg().msg());
-    VLOG_QUERY << err_msg;
-    status_ = Status::Expected(err_msg);
+    SetExecError(rpc_status);
     return;
   }
 
-  Status exec_status = Status(thrift_result.status);
+  Status exec_status = Status(response.status());
   if (!exec_status.ok()) {
-    const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
-        exec_status.msg().GetFullMessageDetails());
-    VLOG_QUERY << err_msg;
-    status_ = Status::Expected(err_msg);
+    SetExecError(exec_status);
     return;
   }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 13a7a35..428e332 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -306,11 +306,15 @@ class Coordinator::BackendState {
   const TQueryCtx& query_ctx() const { return coord_.query_ctx(); }
   const TUniqueId& query_id() const { return coord_.query_id(); }
 
-  /// Fill in rpc_params based on state. Uses filter_routing_table to remove filters
-  /// that weren't selected during its construction.
+  /// Fill in 'request' and 'sidecar' based on state. Uses filter_routing_table to remove
+  /// filters that weren't selected during its construction.
   void SetRpcParams(const DebugOptions& debug_options,
       const FilterRoutingTable& filter_routing_table,
-      TExecQueryFInstancesParams* rpc_params);
+      ExecQueryFInstancesRequestPB* request, TExecQueryFInstancesSidecar* sidecar);
+
+  /// Expects that 'status' is an error. Sets 'status_' to a formatted version of its
+  /// message.
+  void SetExecError(const Status& status);
 
   /// Version of IsDone() where caller must hold lock_ via lock;
   bool IsDoneLocked(const boost::unique_lock<boost::mutex>& lock) const;
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 25c5143..d5da249 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -42,15 +42,15 @@ using namespace impala;
 DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
     "every log_mem_usage_interval'th fragment completion.");
 
-Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
-  TUniqueId query_id = params.query_ctx.query_id;
+Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
+    const TExecQueryFInstancesSidecar& sidecar) {
+  TUniqueId query_id = sidecar.query_ctx.query_id;
   VLOG(2) << "StartQueryFInstances() query_id=" << PrintId(query_id)
-          << " coord=" << TNetworkAddressToString(params.query_ctx.coord_address);
-
+          << " coord=" << TNetworkAddressToString(sidecar.query_ctx.coord_address);
   bool dummy;
   QueryState* qs =
-      GetOrCreateQueryState(params.query_ctx, params.per_backend_mem_limit, &dummy);
-  Status status = qs->Init(params);
+      GetOrCreateQueryState(sidecar.query_ctx, request->per_backend_mem_limit(), &dummy);
+  Status status = qs->Init(request, sidecar);
   if (!status.ok()) {
     qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
     ReleaseQueryState(qs);
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index 76c0ed5..9ca86b6 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -49,7 +49,8 @@ class QueryExecMgr : public CacheLineAligned {
   /// was started (like low memory). In that case, no QueryState is created.
   /// After this function returns, it is legal to call QueryState::Cancel(), regardless of
   /// the return value of this function.
-  Status StartQuery(const TExecQueryFInstancesParams& params);
+  Status StartQuery(const ExecQueryFInstancesRequestPB* request,
+      const TExecQueryFInstancesSidecar& sidecar);
 
   /// Creates a QueryState for the given query with the provided parameters. Only valid
   /// to call if the QueryState does not already exist. The caller must call
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 7df1c4c..44708fe 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -134,7 +134,8 @@ QueryState::~QueryState() {
   host_profile_->StopPeriodicCounters();
 }
 
-Status QueryState::Init(const TExecQueryFInstancesParams& exec_rpc_params) {
+Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
+    const TExecQueryFInstancesSidecar& sidecar) {
   // Decremented in QueryExecMgr::StartQueryHelper() on success or by the caller of
   // Init() on failure. We need to do this before any returns because Init() always
   // returns a resource refcount to its caller.
@@ -195,26 +196,26 @@ Status QueryState::Init(const TExecQueryFInstancesParams& exec_rpc_params) {
       query_ctx().coord_address.hostname, &proxy_));
 
   // don't copy query_ctx, it's large and we already did that in the c'tor
-  exec_rpc_params_.__set_coord_state_idx(exec_rpc_params.coord_state_idx);
-  TExecQueryFInstancesParams& non_const_params =
-      const_cast<TExecQueryFInstancesParams&>(exec_rpc_params);
-  exec_rpc_params_.fragment_ctxs.swap(non_const_params.fragment_ctxs);
-  exec_rpc_params_.__isset.fragment_ctxs = true;
-  exec_rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
-  exec_rpc_params_.__isset.fragment_instance_ctxs = true;
+  exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
+  TExecQueryFInstancesSidecar& non_const_params =
+      const_cast<TExecQueryFInstancesSidecar&>(sidecar);
+  exec_rpc_sidecar_.fragment_ctxs.swap(non_const_params.fragment_ctxs);
+  exec_rpc_sidecar_.__isset.fragment_ctxs = true;
+  exec_rpc_sidecar_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
+  exec_rpc_sidecar_.__isset.fragment_instance_ctxs = true;
 
   instances_prepared_barrier_.reset(
-      new CountingBarrier(exec_rpc_params_.fragment_instance_ctxs.size()));
+      new CountingBarrier(exec_rpc_sidecar_.fragment_instance_ctxs.size()));
   instances_finished_barrier_.reset(
-      new CountingBarrier(exec_rpc_params_.fragment_instance_ctxs.size()));
+      new CountingBarrier(exec_rpc_sidecar_.fragment_instance_ctxs.size()));
 
   // Claim the query-wide minimum reservation. Do this last so that we don't need
   // to handle releasing it if a later step fails.
-  initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_,
-      buffer_reservation_, query_mem_tracker_,
-      exec_rpc_params.initial_mem_reservation_total_claims));
-  RETURN_IF_ERROR(
-      initial_reservations_->Init(query_id(), exec_rpc_params.min_mem_reservation_bytes));
+  initial_reservations_ =
+      obj_pool_.Add(new InitialReservations(&obj_pool_, buffer_reservation_,
+          query_mem_tracker_, exec_rpc_params->initial_mem_reservation_total_claims()));
+  RETURN_IF_ERROR(initial_reservations_->Init(
+      query_id(), exec_rpc_params->min_mem_reservation_bytes()));
   scanner_mem_limiter_ = obj_pool_.Add(new ScannerMemLimiter);
   return Status::OK();
 }
@@ -299,8 +300,8 @@ void QueryState::ConstructReport(bool instances_started,
     ReportExecStatusRequestPB* report, TRuntimeProfileForest* profiles_forest) {
   report->Clear();
   TUniqueIdToUniqueIdPB(query_id(), report->mutable_query_id());
-  DCHECK(exec_rpc_params().__isset.coord_state_idx);
-  report->set_coord_state_idx(exec_rpc_params().coord_state_idx);
+  DCHECK(exec_rpc_params().has_coord_state_idx());
+  report->set_coord_state_idx(exec_rpc_params().coord_state_idx());
   {
     std::unique_lock<SpinLock> l(status_lock_);
     overall_status_.ToProto(report->mutable_overall_status());
@@ -501,13 +502,13 @@ bool QueryState::WaitForFinishOrTimeout(int32_t timeout_ms) {
 
 bool QueryState::StartFInstances() {
   VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
-          << " #instances=" << exec_rpc_params_.fragment_instance_ctxs.size();
+          << " #instances=" << exec_rpc_sidecar_.fragment_instance_ctxs.size();
   DCHECK_GT(refcnt_.Load(), 0);
   DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
 
-  DCHECK_GT(exec_rpc_params_.fragment_ctxs.size(), 0);
-  TPlanFragmentCtx* fragment_ctx = &exec_rpc_params_.fragment_ctxs[0];
-  int num_unstarted_instances = exec_rpc_params_.fragment_instance_ctxs.size();
+  DCHECK_GT(exec_rpc_sidecar_.fragment_ctxs.size(), 0);
+  TPlanFragmentCtx* fragment_ctx = &exec_rpc_sidecar_.fragment_ctxs[0];
+  int num_unstarted_instances = exec_rpc_sidecar_.fragment_instance_ctxs.size();
   int fragment_ctx_idx = 0;
 
   // set up desc tbl
@@ -520,12 +521,12 @@ bool QueryState::StartFInstances() {
 
   fragment_events_start_time_ = MonotonicStopWatch::Now();
   for (const TPlanFragmentInstanceCtx& instance_ctx :
-           exec_rpc_params_.fragment_instance_ctxs) {
+      exec_rpc_sidecar_.fragment_instance_ctxs) {
     // determine corresponding TPlanFragmentCtx
     if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) {
       ++fragment_ctx_idx;
-      DCHECK_LT(fragment_ctx_idx, exec_rpc_params_.fragment_ctxs.size());
-      fragment_ctx = &exec_rpc_params_.fragment_ctxs[fragment_ctx_idx];
+      DCHECK_LT(fragment_ctx_idx, exec_rpc_sidecar_.fragment_ctxs.size());
+      fragment_ctx = &exec_rpc_sidecar_.fragment_ctxs[fragment_ctx_idx];
       // we expect fragment and instance contexts to follow the same order
       DCHECK_EQ(fragment_ctx->fragment.idx, instance_ctx.fragment_idx);
     }
@@ -640,11 +641,12 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
   ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L);
   ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
   VLOG_QUERY << "Executing instance. instance_id=" << PrintId(fis->instance_id())
-      << " fragment_idx=" << fis->instance_ctx().fragment_idx
-      << " per_fragment_instance_idx=" << fis->instance_ctx().per_fragment_instance_idx
-      << " coord_state_idx=" << exec_rpc_params().coord_state_idx
-      << " #in-flight="
-      << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
+             << " fragment_idx=" << fis->instance_ctx().fragment_idx
+             << " per_fragment_instance_idx="
+             << fis->instance_ctx().per_fragment_instance_idx
+             << " coord_state_idx=" << exec_rpc_params().coord_state_idx()
+             << " #in-flight="
+             << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
   Status status = fis->Exec();
   ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
   VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id())
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 408ba0a..6ed38da 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -133,7 +133,10 @@ class QueryState {
 
   /// The following getters are only valid after Init().
   ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }
-  const TExecQueryFInstancesParams& exec_rpc_params() const { return exec_rpc_params_; }
+  const ExecQueryFInstancesRequestPB& exec_rpc_params() const { return exec_rpc_params_; }
+  const TExecQueryFInstancesSidecar& exec_rpc_sidecar() const {
+    return exec_rpc_sidecar_;
+  }
 
   /// The following getters are only valid after Init() and should be called only from
   /// the backend execution (ie. not the coordinator side, since they require holding
@@ -169,7 +172,8 @@ class QueryState {
   ///
   /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
   /// The remaining public functions must be called only after Init().
-  Status Init(const TExecQueryFInstancesParams& exec_rpc_params) WARN_UNUSED_RESULT;
+  Status Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
+      const TExecQueryFInstancesSidecar& sidecar) WARN_UNUSED_RESULT;
 
   /// Performs the runtime-intensive parts of initial setup and starts all fragment
   /// instances belonging to this query. Each instance receives its own execution
@@ -310,10 +314,11 @@ class QueryState {
   /// Set in Init().
   std::unique_ptr<ControlServiceProxy> proxy_;
 
-  /// Set in Init(); exec_rpc_params_.query_ctx is *not* set to avoid duplication
+  /// Set in Init(); exec_rpc_sidecar_.query_ctx is *not* set to avoid duplication
   /// with query_ctx_.
   /// TODO: find a way not to have to copy this
-  TExecQueryFInstancesParams exec_rpc_params_;
+  ExecQueryFInstancesRequestPB exec_rpc_params_;
+  TExecQueryFInstancesSidecar exec_rpc_sidecar_;
 
   /// Buffer reservation for this query (owned by obj_pool_). Set in Init().
   ReservationTracker* buffer_reservation_ = nullptr;
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 14adc00..c8286e7 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -152,17 +152,18 @@ Status TestEnv::CreateQueryState(
   query_states_.push_back(qs);
   // make sure to initialize data structures unrelated to the TExecQueryFInstancesParams
   // param
-  TExecQueryFInstancesParams rpc_params;
+  ExecQueryFInstancesRequestPB rpc_params;
   // create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
-  rpc_params.__set_coord_state_idx(0);
-  rpc_params.__set_query_ctx(TQueryCtx());
-  rpc_params.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()}));
-  rpc_params.__set_fragment_instance_ctxs(
+  rpc_params.set_coord_state_idx(0);
+  TExecQueryFInstancesSidecar sidecar;
+  sidecar.__set_query_ctx(TQueryCtx());
+  sidecar.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()}));
+  sidecar.__set_fragment_instance_ctxs(
       vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
-  RETURN_IF_ERROR(qs->Init(rpc_params));
+  RETURN_IF_ERROR(qs->Init(&rpc_params, sidecar));
   FragmentInstanceState* fis = qs->obj_pool()->Add(
-      new FragmentInstanceState(qs, qs->exec_rpc_params().fragment_ctxs[0],
-          qs->exec_rpc_params().fragment_instance_ctxs[0]));
+      new FragmentInstanceState(qs, qs->exec_rpc_sidecar().fragment_ctxs[0],
+          qs->exec_rpc_sidecar().fragment_instance_ctxs[0]));
   RuntimeState* rs = qs->obj_pool()->Add(
       new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
   runtime_states_.push_back(rs);
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index 2c903d1..08f20c2 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -18,6 +18,7 @@
 #include "service/control-service.h"
 
 #include "common/constant-strings.h"
+#include "common/thread-debug-info.h"
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_controller.h"
@@ -26,6 +27,7 @@
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-exec-mgr.h"
 #include "runtime/query-state.h"
 #include "service/client-request-state.h"
 #include "service/impala-server.h"
@@ -35,9 +37,10 @@
 #include "util/parse-util.h"
 #include "util/uid-util.h"
 
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/RuntimeProfile_types.h"
 #include "gen-cpp/control_service.pb.h"
 #include "gen-cpp/control_service.proxy.h"
-#include "gen-cpp/RuntimeProfile_types.h"
 
 #include "common/names.h"
 
@@ -111,6 +114,44 @@ Status ControlService::GetProfile(const ReportExecStatusRequestPB& request,
   return Status::OK();
 }
 
+Status ControlService::GetExecQueryFInstancesSidecar(
+    const ExecQueryFInstancesRequestPB& request, RpcContext* rpc_context,
+    TExecQueryFInstancesSidecar* sidecar) {
+  kudu::Slice sidecar_slice;
+  KUDU_RETURN_IF_ERROR(
+      rpc_context->GetInboundSidecar(request.sidecar_idx(), &sidecar_slice),
+      "Failed to get thrift profile sidecar");
+  uint32_t len = sidecar_slice.size();
+  RETURN_IF_ERROR(DeserializeThriftMsg(sidecar_slice.data(), &len, true, sidecar));
+  return Status::OK();
+}
+
+void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* request,
+    ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context) {
+  DebugActionNoFail(FLAGS_debug_actions, "EXEC_QUERY_FINSTANCES_DELAY");
+  DCHECK(request->has_coord_state_idx());
+  DCHECK(request->has_sidecar_idx());
+  TExecQueryFInstancesSidecar sidecar;
+  const Status& sidecar_status =
+      GetExecQueryFInstancesSidecar(*request, rpc_context, &sidecar);
+  if (!sidecar_status.ok()) {
+    RespondAndReleaseRpc(sidecar_status, response, rpc_context);
+    return;
+  }
+  ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), sidecar.query_ctx.query_id);
+  VLOG_QUERY << "ExecQueryFInstances():"
+             << " query_id=" << PrintId(sidecar.query_ctx.query_id)
+             << " coord=" << TNetworkAddressToString(sidecar.query_ctx.coord_address)
+             << " #instances=" << sidecar.fragment_instance_ctxs.size();
+  Status resp_status =
+      ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(request, sidecar);
+  if (!resp_status.ok()) {
+    LOG(INFO) << "ExecQueryFInstances() failed: query_id="
+              << PrintId(sidecar.query_ctx.query_id) << ": " << resp_status.GetDetail();
+  }
+  RespondAndReleaseRpc(resp_status, response, rpc_context);
+}
+
 void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
     ReportExecStatusResponsePB* response, RpcContext* rpc_context) {
   const TUniqueId query_id = ProtoToQueryId(request->query_id());
diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h
index 4a869e0..020cba7 100644
--- a/be/src/service/control-service.h
+++ b/be/src/service/control-service.h
@@ -42,6 +42,7 @@ class ClientRequestState;
 class ControlServiceProxy;
 class MemTracker;
 class MetricGroup;
+class QueryExecMgr;
 class TRuntimeProfileTree;
 
 /// This singleton class implements service for managing execution of queries in Impala.
@@ -59,6 +60,10 @@ class ControlService : public ControlServiceIf {
   virtual bool Authorize(const google::protobuf::Message* req,
       google::protobuf::Message* resp, kudu::rpc::RpcContext* rpc_context) override;
 
+  /// Starts execution of a query's fragment instances on a backend.
+  virtual void ExecQueryFInstances(const ExecQueryFInstancesRequestPB* req,
+      ExecQueryFInstancesResponsePB* resp, kudu::rpc::RpcContext* context) override;
+
   /// Updates the coordinator with the query status of the backend encoded in 'req'.
   virtual void ReportExecStatus(const ReportExecStatusRequestPB* req,
       ReportExecStatusResponsePB* resp, kudu::rpc::RpcContext* rpc_context) override;
@@ -87,6 +92,12 @@ class ControlService : public ControlServiceIf {
       const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
       TRuntimeProfileForest* thrift_profiles);
 
+  /// Helper for deserializing the ExecQueryFInstances sidecar attached in the inbound
+  /// call within 'rpc_context'. On success, returns the deserialized sidecar in
+  /// 'sidecar'. On failure, returns the error status;
+  static Status GetExecQueryFInstancesSidecar(const ExecQueryFInstancesRequestPB& request,
+      RpcContext* rpc_context, TExecQueryFInstancesSidecar* sidecar);
+
   /// Helper for serializing 'status' as part of 'response'. Also releases memory
   /// of the RPC payload previously accounted towards the internal memory tracker.
   template <typename ResponsePBType>
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index 5260253..ccd00a8 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -22,7 +22,6 @@
 #include "common/status.h"
 #include "gutil/strings/substitute.h"
 #include "service/impala-server.h"
-#include "runtime/query-exec-mgr.h"
 #include "runtime/query-state.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/exec-env.h"
@@ -37,28 +36,6 @@ DECLARE_string(debug_actions);
 ImpalaInternalService::ImpalaInternalService() {
   impala_server_ = ExecEnv::GetInstance()->impala_server();
   DCHECK(impala_server_ != nullptr);
-  query_exec_mgr_ = ExecEnv::GetInstance()->query_exec_mgr();
-  DCHECK(query_exec_mgr_ != nullptr);
-}
-
-void ImpalaInternalService::ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
-    const TExecQueryFInstancesParams& params) {
-  DebugActionNoFail(FLAGS_debug_actions, "EXEC_QUERY_FINSTANCES_DELAY");
-  DCHECK(params.__isset.coord_state_idx);
-  DCHECK(params.__isset.query_ctx);
-  DCHECK(params.__isset.fragment_ctxs);
-  DCHECK(params.__isset.fragment_instance_ctxs);
-  ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), params.query_ctx.query_id);
-  VLOG_QUERY << "ExecQueryFInstances():" << " query_id="
-             << PrintId(params.query_ctx.query_id)
-             << " coord=" << TNetworkAddressToString(params.query_ctx.coord_address)
-             << " #instances=" << params.fragment_instance_ctxs.size();
-  Status status = query_exec_mgr_->StartQuery(params);
-  status.SetTStatus(&return_val);
-  if (!status.ok()) {
-    LOG(INFO) << "ExecQueryFInstances() failed: query_id="
-              << PrintId(params.query_ctx.query_id) << ": " << status.GetDetail();
-  }
 }
 
 template <typename T> void SetUnknownIdError(
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index 9f0ea60..c75122b 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -24,15 +24,12 @@
 namespace impala {
 
 class ImpalaServer;
-class QueryExecMgr;
 
 /// Proxies Thrift RPC requests onto their implementing objects for the
 /// ImpalaInternalService service.
 class ImpalaInternalService : public ImpalaInternalServiceIf {
  public:
   ImpalaInternalService();
-  virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
-      const TExecQueryFInstancesParams& params);
   virtual void UpdateFilter(TUpdateFilterResult& return_val,
       const TUpdateFilterParams& params);
   virtual void PublishFilter(TPublishFilterResult& return_val,
@@ -40,7 +37,6 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
 
  private:
   ImpalaServer* impala_server_;
-  QueryExecMgr* query_exec_mgr_;
 };
 
 }
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index f76e143..b598f39 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -216,10 +216,49 @@ message RemoteShutdownResultPB {
   optional ShutdownStatusPB shutdown_status = 2;
 }
 
+// ExecQueryFInstances
+message ExecQueryFInstancesRequestPB {
+  // This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to
+  // the coordinator.
+  optional int32 coord_state_idx = 1;
+
+  // Sidecar index of the TExecQueryFInstancesSidecar, which contains the query and plan
+  // fragment contexts.
+  optional int32 sidecar_idx = 2;
+
+  // The minimum query-wide memory reservation (in bytes) required for the backend
+  // executing the instances in fragment_instance_ctxs. This is the peak minimum
+  // reservation that may be required by the concurrently-executing operators at any
+  // point in query execution. It may be less than the initial reservation total claims
+  // (below) if execution of some operators never overlaps, which allows reuse of
+  // reservations.
+  optional int64 min_mem_reservation_bytes = 3;
+
+  // Total of the initial buffer reservations that we expect to be claimed on this
+  // backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
+  // operators in all fragment instances that execute on this backend. This is used for
+  // an optimization in InitialReservation. Measured in bytes.
+  optional int64 initial_mem_reservation_total_claims = 4;
+
+  // The backend memory limit (in bytes) as set by the admission controller. Used by the
+  // query mem tracker to enforce the memory limit.
+  optional int64 per_backend_mem_limit = 5;
+}
+
+message ExecQueryFInstancesResponsePB {
+  // Success or failure of the operation.
+  optional StatusPB status = 1;
+}
+
 service ControlService {
   // Override the default authorization method.
   option (kudu.rpc.default_authz_method) = "Authorize";
 
+  // Called by coord to start asynchronous execution of a query's fragment instances in
+  // backend. Returns as soon as all incoming data streams have been set up.
+  rpc ExecQueryFInstances(ExecQueryFInstancesRequestPB)
+      returns (ExecQueryFInstancesResponsePB);
+
   // Update the coordinator with the query status of the backend.
   rpc ReportExecStatus(ReportExecStatusRequestPB) returns (ReportExecStatusResponsePB);
 
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 2455c1c..1659b4e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -576,48 +576,14 @@ enum ImpalaInternalServiceVersion {
 
 // The following contains the per-rpc structs for the parameters and the result.
 
-// ExecQueryFInstances
+// TODO: convert this fully to protobuf.
+struct TExecQueryFInstancesSidecar {
+  1: optional TQueryCtx query_ctx
 
-struct TExecQueryFInstancesParams {
-  1: required ImpalaInternalServiceVersion protocol_version
-
-  // this backend's index into Coordinator::backend_states_,
-  // needed for subsequent rpcs to the coordinator
-  // required in V1
-  2: optional i32 coord_state_idx
-
-  // required in V1
-  3: optional TQueryCtx query_ctx
-
-  // required in V1
-  4: optional list<TPlanFragmentCtx> fragment_ctxs
+  2: optional list<TPlanFragmentCtx> fragment_ctxs
 
   // the order corresponds to the order of fragments in fragment_ctxs
-  // required in V1
-  5: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
-
-  // The minimum query-wide memory reservation (in bytes) required for the backend
-  // executing the instances in fragment_instance_ctxs. This is the peak minimum
-  // reservation that may be required by the concurrently-executing operators at any
-  // point in query execution. It may be less than the initial reservation total claims
-  // (below) if execution of some operators never overlaps, which allows reuse of
-  // reservations. required in V1
-  6: optional i64 min_mem_reservation_bytes
-
-  // Total of the initial buffer reservations that we expect to be claimed on this
-  // backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
-  // operators in all fragment instances that execute on this backend. This is used for
-  // an optimization in InitialReservation. Measured in bytes. required in V1
-  7: optional i64 initial_mem_reservation_total_claims
-
-  // The backend memory limit (in bytes) as set by the admission controller. Used by the
-  // query mem tracker to enforce the memory limit. required in V1
-  8: optional i64 per_backend_mem_limit
-}
-
-struct TExecQueryFInstancesResult {
-  // required in V1
-  1: optional Status.TStatus status
+  3: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
 }
 
 // Parameters for RequestPoolService.resolveRequestPool()
@@ -792,10 +758,6 @@ struct TParseDateStringResult {
 }
 
 service ImpalaInternalService {
-  // Called by coord to start asynchronous execution of a query's fragment instances in
-  // backend.
-  // Returns as soon as all incoming data streams have been set up.
-  TExecQueryFInstancesResult ExecQueryFInstances(1:TExecQueryFInstancesParams params);
 
   // Called by fragment instances that produce local runtime filters to deliver them to
   // the coordinator for aggregation and broadcast.
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index 166b385..9b2a3b2 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -99,7 +99,7 @@ class TestRPCTimeout(CustomClusterTestSuite):
   def test_execqueryfinstances_timeout(self, vector):
     for i in range(3):
       ex= self.execute_query_expect_failure(self.client, self.TEST_QUERY)
-      assert "RPC recv timed out" in str(ex)
+      assert "Exec() rpc failed: Timed out" in str(ex)
     verifiers = [MetricVerifier(i.service) for i in
                  ImpalaCluster.get_e2e_test_cluster().impalads]