You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/01/08 01:26:56 UTC

[2/3] impala git commit: IMPALA-7468: Port CancelQueryFInstances() to KRPC.

IMPALA-7468: Port CancelQueryFInstances() to KRPC.

When the Coordinator needs to cancel a query (for example because a user
has hit Control-C), it does this by sending a CancelQueryFInstances
message to each fragment instance. This change switches this code to use
KRPC.

Add new protobuf definitions for the messages, and remove the old thrift
definitions. Move the server-side implementation of Cancel() from
ImpalaInternalService to ControlService. Rework the scheduler so
that the FInstanceExecParams always contains the KRPC address of the
fragment executors, this address can then be used if a query is to be
cancelled.

For now keep the KRPC calls to CancelQueryFInstances() as synchronous.

While moving the client-side code, remove the fault injection code that
was inserted with FAULT_INJECTION_SEND_RPC_EXCEPTION and
FAULT_INJECTION_RECV_RPC_EXCEPTION (triggered by running impalad with
--fault_injection_rpc_exception_type=1) as this tickles code in
client-cache.h which is now not used.

TESTING:
  Ran all end-to-end tests.
  No new tests as test_cancellation.py provides good coverage.
  Checked in debugger that DebugAction style fault injection (triggered
  from test_cancellation.py) was working correctly.

Change-Id: I625030c3f1068061aa029e6e242f016cadd84969
Reviewed-on: http://gerrit.cloudera.org:8080/12142
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e4cff7d0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e4cff7d0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e4cff7d0

Branch: refs/heads/master
Commit: e4cff7d0d60e60ddf6c77b48ca0da7b878672c1a
Parents: ec2dfd9
Author: Andrew Sherman <as...@cloudera.com>
Authored: Mon Jan 7 09:35:08 2019 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jan 8 01:05:54 2019 +0000

----------------------------------------------------------------------
 be/src/runtime/backend-client.h             |  10 ---
 be/src/runtime/coordinator-backend-state.cc | 107 ++++++++++++++---------
 be/src/runtime/coordinator-backend-state.h  |  15 +++-
 be/src/scheduling/query-schedule.h          |  10 ++-
 be/src/scheduling/scheduler.cc              |  51 +++++++----
 be/src/scheduling/scheduler.h               |  12 +--
 be/src/service/control-service.cc           |  17 ++++
 be/src/service/control-service.h            |  62 +++++++------
 be/src/service/impala-internal-service.cc   |  14 ---
 be/src/service/impala-internal-service.h    |   2 -
 common/protobuf/control_service.proto       |  17 ++++
 common/thrift/ImpalaInternalService.thrift  |  20 -----
 12 files changed, 195 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/be/src/runtime/backend-client.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index 977049a..04139a6 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -55,16 +55,6 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
     ImpalaInternalServiceClient::recv_ExecQueryFInstances(_return);
   }
 
-  void CancelQueryFInstances(TCancelQueryFInstancesResult& _return,
-      const TCancelQueryFInstancesParams& params, bool* send_done) {
-    DCHECK(!*send_done);
-    FAULT_INJECTION_SEND_RPC_EXCEPTION(16);
-    ImpalaInternalServiceClient::send_CancelQueryFInstances(params);
-    *send_done = true;
-    FAULT_INJECTION_RECV_RPC_EXCEPTION(16);
-    ImpalaInternalServiceClient::recv_CancelQueryFInstances(_return);
-  }
-
   /// Callers of TransmitData() should provide their own counter to measure the data
   /// transmission time.
   void SetTransmitDataCounter(RuntimeProfile::ConcurrentTimerCounter* csw) {

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 87224c1..a2e1901 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -23,22 +23,25 @@
 #include "exec/exec-node.h"
 #include "exec/kudu-util.h"
 #include "exec/scan-node.h"
-#include "runtime/exec-env.h"
-#include "runtime/fragment-instance-state.h"
-#include "runtime/debug-options.h"
-#include "runtime/client-cache.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
 #include "runtime/backend-client.h"
+#include "runtime/client-cache.h"
 #include "runtime/coordinator-filter-state.h"
+#include "runtime/debug-options.h"
+#include "runtime/exec-env.h"
+#include "runtime/fragment-instance-state.h"
+#include "service/control-service.h"
+#include "util/counting-barrier.h"
 #include "util/error-util-internal.h"
-#include "util/uid-util.h"
 #include "util/network-util.h"
-#include "util/counting-barrier.h"
-
-#include "gen-cpp/control_service.pb.h"
-#include "gen-cpp/ImpalaInternalService_constants.h"
+#include "util/uid-util.h"
 
 #include "common/names.h"
 
+using kudu::MonoDelta;
+using kudu::rpc::RpcController;
 using namespace impala;
 using namespace rapidjson;
 namespace accumulators = boost::accumulators;
@@ -58,6 +61,7 @@ void Coordinator::BackendState::Init(
     const vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool) {
   backend_exec_params_ = &exec_params;
   host_ = backend_exec_params_->instance_params[0]->host;
+  krpc_host_ = backend_exec_params_->instance_params[0]->krpc_host;
   num_remaining_instances_ = backend_exec_params_->instance_params.size();
 
   // populate instance_stats_map_ and install instance
@@ -385,6 +389,23 @@ void Coordinator::BackendState::UpdateExecStats(
   }
 }
 
+template <typename F>
+Status Coordinator::BackendState::DoRrpcWithRetry(
+    F&& rpc_call, const char* debug_action, const char* error_msg) {
+  Status rpc_status;
+  for (int i = 0; i < 3; i++) {
+    RpcController rpc_controller;
+    rpc_controller.set_timeout(MonoDelta::FromSeconds(10));
+    // Check for injected failures.
+    rpc_status = DebugAction(query_ctx().client_request.query_options, debug_action);
+    if (!rpc_status.ok()) continue;
+
+    rpc_status = FromKuduStatus(rpc_call(&rpc_controller), error_msg);
+    if (rpc_status.ok()) break;
+  }
+  return rpc_status;
+}
+
 bool Coordinator::BackendState::Cancel() {
   unique_lock<mutex> l(lock_);
 
@@ -401,36 +422,44 @@ bool Coordinator::BackendState::Cancel() {
   // set an error status to make sure we only cancel this once
   if (status_.ok()) status_ = Status::CANCELLED;
 
-  TCancelQueryFInstancesParams params;
-  params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_query_id(query_id());
-  TCancelQueryFInstancesResult dummy;
-  VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id()) <<
-      " backend=" << TNetworkAddressToString(impalad_address());
-
-
-  // The return value 'dummy' is ignored as it's only set if the fragment
-  // instance cannot be found in the backend. The fragment instances of a query
-  // can all be cancelled locally in a backend due to RPC failure to
-  // coordinator. In which case, the query state can be gone already.
-  ImpalaBackendConnection::RpcStatus rpc_status = ImpalaBackendConnection::DoRpcWithRetry(
-      ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(),
-      &ImpalaBackendClient::CancelQueryFInstances, params,
-      [this] () {
-        return DebugAction(query_ctx().client_request.query_options,
-            "COORD_CANCEL_QUERY_FINSTANCES_RPC");
-      }, &dummy);
-  if (!rpc_status.status.ok()) {
-    status_.MergeStatus(rpc_status.status);
-    if (rpc_status.client_error) {
-      VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id())
-                 << " failed to connect to " << TNetworkAddressToString(impalad_address())
-                 << " :" << rpc_status.status.msg().msg();
-    } else {
-      VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id())
-                 << " rpc to " << TNetworkAddressToString(impalad_address())
-                 << " failed: " << rpc_status.status.msg().msg();
-    }
+  VLOG_QUERY << "Sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id())
+             << " backend=" << TNetworkAddressToString(krpc_host_);
+
+  std::unique_ptr<ControlServiceProxy> proxy;
+  Status get_proxy_status =
+      ControlService::GetProxy(krpc_host_, krpc_host_.hostname, &proxy);
+  if (!get_proxy_status.ok()) {
+    status_.MergeStatus(get_proxy_status);
+    VLOG_QUERY << "Cancel query_id= " << PrintId(query_id()) << " could not get proxy to "
+               << TNetworkAddressToString(krpc_host_)
+               << " failure: " << get_proxy_status.msg().msg();
+    return true;
+  }
+
+  CancelQueryFInstancesRequestPB request;
+  TUniqueIdToUniqueIdPB(query_id(), request.mutable_query_id());
+  CancelQueryFInstancesResponsePB response;
+
+  auto cancel_rpc = [&](RpcController* rpc_controller) -> kudu::Status {
+    return proxy->CancelQueryFInstances(request, &response, rpc_controller);
+  };
+
+  Status rpc_status = DoRrpcWithRetry(
+      cancel_rpc, "COORD_CANCEL_QUERY_FINSTANCES_RPC", "Cancel() RPC failed");
+
+  if (!rpc_status.ok()) {
+    status_.MergeStatus(rpc_status);
+    VLOG_QUERY << "Cancel query_id= " << PrintId(query_id()) << " could not do rpc to "
+               << TNetworkAddressToString(krpc_host_)
+               << " failure: " << rpc_status.msg().msg();
+    return true;
+  }
+  Status cancel_status = Status(response.status());
+  if (!cancel_status.ok()) {
+    status_.MergeStatus(cancel_status);
+    VLOG_QUERY << "Cancel query_id= " << PrintId(query_id())
+               << " got failure after rpc to " << TNetworkAddressToString(krpc_host_)
+               << " failure: " << cancel_status.msg().msg();
     return true;
   }
   return true;

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 1363701..e84e097 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -30,15 +30,14 @@
 #include <boost/accumulators/statistics/variance.hpp>
 #include <boost/thread/mutex.hpp>
 
+#include "gen-cpp/control_service.proxy.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "runtime/coordinator.h"
 #include "scheduling/query-schedule.h"
 #include "util/error-util-internal.h"
 #include "util/progress-updater.h"
-#include "util/stopwatch.h"
 #include "util/runtime-profile.h"
-#include "gen-cpp/control_service.pb.h"
-#include "gen-cpp/RuntimeProfile_types.h"
-#include "gen-cpp/Types_types.h"
+#include "util/stopwatch.h"
 
 namespace impala {
 
@@ -242,7 +241,10 @@ class Coordinator::BackendState {
   /// indices of fragments executing on this backend, populated in Init()
   std::unordered_set<int> fragments_;
 
+  /// Thrift address of execution backend.
   TNetworkAddress host_;
+  /// Krpc address of execution backend.
+  TNetworkAddress krpc_host_;
 
   /// protects fields below
   /// lock ordering: Coordinator::lock_ must only be obtained *prior* to lock_
@@ -292,6 +294,11 @@ class Coordinator::BackendState {
 
   /// Same as ComputeResourceUtilization() but caller must hold lock.
   ResourceUtilization ComputeResourceUtilizationLocked();
+
+  /// Retry the Rpc 'rpc_call' up to 3 times.
+  /// Pass 'debug_action' to DebugAction() to potentially inject errors.
+  template <typename F>
+  Status DoRrpcWithRetry(F&& rpc_call, const char* debug_action, const char* error_msg);
 };
 
 /// Per fragment execution statistics.

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 6cf5175..6021f98 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -85,7 +85,8 @@ typedef std::map<TNetworkAddress, BackendExecParams> PerBackendExecParams;
 /// TPlanFragmentInstanceCtx
 struct FInstanceExecParams {
   TUniqueId instance_id;
-  TNetworkAddress host; // execution backend
+  TNetworkAddress host; // Thrift address of execution backend.
+  TNetworkAddress krpc_host; // Krpc address of execution backend.
   PerNodeScanRanges per_node_scan_ranges;
 
   /// 0-based ordinal of this particular instance within its fragment (not: query-wide)
@@ -100,8 +101,11 @@ struct FInstanceExecParams {
   const TPlanFragment& fragment() const;
 
   FInstanceExecParams(const TUniqueId& instance_id, const TNetworkAddress& host,
-      int per_fragment_instance_idx, const FragmentExecParams& fragment_exec_params)
-    : instance_id(instance_id), host(host),
+      const TNetworkAddress& krpc_host, int per_fragment_instance_idx,
+      const FragmentExecParams& fragment_exec_params)
+    : instance_id(instance_id),
+      host(host),
+      krpc_host(krpc_host),
       per_fragment_instance_idx(per_fragment_instance_idx),
       sender_id(-1),
       fragment_exec_params(fragment_exec_params) {}

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index fb99942..de671a9 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -330,7 +330,7 @@ void Scheduler::ComputeFragmentExecParams(
   // for each plan, compute the FInstanceExecParams for the tree of fragments
   for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
     // set instance_id, host, per_node_scan_ranges
-    ComputeFragmentExecParams(plan_exec_info,
+    ComputeFragmentExecParams(executor_config, plan_exec_info,
         schedule->GetFragmentExecParams(plan_exec_info.fragments[0].idx), schedule);
 
     // Set destinations, per_exch_num_senders, sender_id.
@@ -376,24 +376,28 @@ void Scheduler::ComputeFragmentExecParams(
   }
 }
 
-void Scheduler::ComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info,
-    FragmentExecParams* fragment_params, QuerySchedule* schedule) {
+void Scheduler::ComputeFragmentExecParams(const BackendConfig& executor_config,
+    const TPlanExecInfo& plan_exec_info, FragmentExecParams* fragment_params,
+    QuerySchedule* schedule) {
   // traverse input fragments
   for (FragmentIdx input_fragment_idx : fragment_params->input_fragments) {
-    ComputeFragmentExecParams(
-        plan_exec_info, schedule->GetFragmentExecParams(input_fragment_idx), schedule);
+    ComputeFragmentExecParams(executor_config, plan_exec_info,
+        schedule->GetFragmentExecParams(input_fragment_idx), schedule);
   }
 
   const TPlanFragment& fragment = fragment_params->fragment;
   // case 1: single instance executed at coordinator
   if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
     const TNetworkAddress& coord = local_backend_descriptor_.address;
+    DCHECK(local_backend_descriptor_.__isset.krpc_address);
+    const TNetworkAddress& krpc_coord = local_backend_descriptor_.krpc_address;
+    DCHECK(IsResolvedAddress(krpc_coord));
     // make sure that the coordinator instance ends up with instance idx 0
     TUniqueId instance_id = fragment_params->is_coord_fragment
         ? schedule->query_id()
         : schedule->GetNextInstanceId();
     fragment_params->instance_exec_params.emplace_back(
-        instance_id, coord, 0, *fragment_params);
+        instance_id, coord, krpc_coord, 0, *fragment_params);
     FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back();
 
     // That instance gets all of the scan ranges, if there are any.
@@ -407,7 +411,7 @@ void Scheduler::ComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info,
   }
 
   if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) {
-    CreateUnionInstances(fragment_params, schedule);
+    CreateUnionInstances(executor_config, fragment_params, schedule);
     return;
   }
 
@@ -415,7 +419,7 @@ void Scheduler::ComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info,
   if (leftmost_scan_id != g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
     // case 2: leaf fragment with leftmost scan
     // TODO: check that there's only one scan in this fragment
-    CreateScanInstances(leftmost_scan_id, fragment_params, schedule);
+    CreateScanInstances(executor_config, leftmost_scan_id, fragment_params, schedule);
   } else {
     // case 3: interior fragment without leftmost scan
     // we assign the same hosts as those of our leftmost input fragment (so that a
@@ -424,7 +428,7 @@ void Scheduler::ComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info,
   }
 }
 
-void Scheduler::CreateUnionInstances(
+void Scheduler::CreateUnionInstances(const BackendConfig& executor_config,
     FragmentExecParams* fragment_params, QuerySchedule* schedule) {
   const TPlanFragment& fragment = fragment_params->fragment;
   DCHECK(ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE));
@@ -454,8 +458,13 @@ void Scheduler::CreateUnionInstances(
   // TODO-MT: figure out how to parallelize Union
   int per_fragment_idx = 0;
   for (const TNetworkAddress& host : hosts) {
-    fragment_params->instance_exec_params.emplace_back(
-        schedule->GetNextInstanceId(), host, per_fragment_idx++, *fragment_params);
+    const TBackendDescriptor& backend_descriptor =
+        LookUpBackendDesc(executor_config, host);
+    DCHECK(backend_descriptor.__isset.krpc_address);
+    const TNetworkAddress& krpc_host = backend_descriptor.krpc_address;
+    DCHECK(IsResolvedAddress(krpc_host));
+    fragment_params->instance_exec_params.emplace_back(schedule->GetNextInstanceId(),
+        host, krpc_host, per_fragment_idx++, *fragment_params);
     // assign all scan ranges
     FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back();
     if (fragment_params->scan_range_assignment.count(host) > 0) {
@@ -464,16 +473,20 @@ void Scheduler::CreateUnionInstances(
   }
 }
 
-void Scheduler::CreateScanInstances(PlanNodeId leftmost_scan_id,
-    FragmentExecParams* fragment_params, QuerySchedule* schedule) {
+void Scheduler::CreateScanInstances(const BackendConfig& executor_config,
+    PlanNodeId leftmost_scan_id, FragmentExecParams* fragment_params,
+    QuerySchedule* schedule) {
   int max_num_instances =
       schedule->request().query_ctx.client_request.query_options.mt_dop;
   if (max_num_instances == 0) max_num_instances = 1;
 
   if (fragment_params->scan_range_assignment.empty()) {
+    DCHECK(local_backend_descriptor_.__isset.krpc_address);
+    DCHECK(IsResolvedAddress(local_backend_descriptor_.krpc_address));
     // this scan doesn't have any scan ranges: run a single instance on the coordinator
     fragment_params->instance_exec_params.emplace_back(schedule->GetNextInstanceId(),
-        local_backend_descriptor_.address, 0, *fragment_params);
+        local_backend_descriptor_.address, local_backend_descriptor_.krpc_address, 0,
+        *fragment_params);
     return;
   }
 
@@ -482,6 +495,11 @@ void Scheduler::CreateScanInstances(PlanNodeId leftmost_scan_id,
     // evenly divide up the scan ranges of the leftmost scan between at most
     // <dop> instances
     const TNetworkAddress& host = assignment_entry.first;
+    const TBackendDescriptor& backend_descriptor =
+        LookUpBackendDesc(executor_config, host);
+    DCHECK(backend_descriptor.__isset.krpc_address);
+    TNetworkAddress krpc_host = backend_descriptor.krpc_address;
+    DCHECK(IsResolvedAddress(krpc_host));
     auto scan_ranges_it = assignment_entry.second.find(leftmost_scan_id);
     DCHECK(scan_ranges_it != assignment_entry.second.end());
     const vector<TScanRangeParams>& params_list = scan_ranges_it->second;
@@ -508,7 +526,7 @@ void Scheduler::CreateScanInstances(PlanNodeId leftmost_scan_id,
     int params_idx = 0; // into params_list
     for (int i = 0; i < num_instances; ++i) {
       fragment_params->instance_exec_params.emplace_back(schedule->GetNextInstanceId(),
-          host, per_fragment_instance_idx++, *fragment_params);
+          host, krpc_host, per_fragment_instance_idx++, *fragment_params);
       FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back();
 
       // Threshold beyond which we want to assign to the next instance.
@@ -552,7 +570,8 @@ void Scheduler::CreateCollocatedInstances(
   for (const FInstanceExecParams& input_instance_params :
       input_fragment_params->instance_exec_params) {
     fragment_params->instance_exec_params.emplace_back(schedule->GetNextInstanceId(),
-        input_instance_params.host, per_fragment_instance_idx++, *fragment_params);
+        input_instance_params.host, input_instance_params.krpc_host,
+        per_fragment_instance_idx++, *fragment_params);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 2618830..48f223d 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -430,8 +430,9 @@ class Scheduler {
   /// Recursively create FInstanceExecParams and set per_node_scan_ranges for
   /// fragment_params and its input fragments via a depth-first traversal.
   /// All fragments are part of plan_exec_info.
-  void ComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info,
-      FragmentExecParams* fragment_params, QuerySchedule* schedule);
+  void ComputeFragmentExecParams(const BackendConfig& executor_config,
+      const TPlanExecInfo& plan_exec_info, FragmentExecParams* fragment_params,
+      QuerySchedule* schedule);
 
   /// Create instances of the fragment corresponding to fragment_params, which contains
   /// a Union node.
@@ -442,7 +443,8 @@ class Scheduler {
   /// a UnionNode with partitioned joins or grouping aggregates as children runs on
   /// at least as many hosts as the input to those children).
   /// TODO: is this really necessary? If not, revise.
-  void CreateUnionInstances(FragmentExecParams* fragment_params, QuerySchedule* schedule);
+  void CreateUnionInstances(const BackendConfig& executor_config,
+      FragmentExecParams* fragment_params, QuerySchedule* schedule);
 
   /// Create instances of the fragment corresponding to fragment_params to run on the
   /// selected replica hosts of the scan ranges of the node with id scan_id.
@@ -451,8 +453,8 @@ class Scheduler {
   /// number of bytes per instances and then in a single pass assigning scan ranges to
   /// each instance to roughly meet that average.
   /// For all other storage mgrs, it load-balances the number of splits per instance.
-  void CreateScanInstances(
-      PlanNodeId scan_id, FragmentExecParams* fragment_params, QuerySchedule* schedule);
+  void CreateScanInstances(const BackendConfig& executor_config, PlanNodeId scan_id,
+      FragmentExecParams* fragment_params, QuerySchedule* schedule);
 
   /// For each instance of fragment_params's input fragment, create a collocated
   /// instance for fragment_params's fragment.

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/be/src/service/control-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index 072a855..991d8f3 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -25,6 +25,7 @@
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "service/client-request-state.h"
 #include "service/impala-server.h"
 #include "testutil/fault-injection-util.h"
@@ -160,4 +161,20 @@ void ControlService::RespondAndReleaseRpc(const Status& status, ResponsePBType*
   rpc_context->RespondSuccess();
 }
 
+void ControlService::CancelQueryFInstances(const CancelQueryFInstancesRequestPB* request,
+    CancelQueryFInstancesResponsePB* response, ::kudu::rpc::RpcContext* rpc_context) {
+  DCHECK(request->has_query_id());
+  const TUniqueId& query_id = ProtoToQueryId(request->query_id());
+  VLOG_QUERY << "CancelQueryFInstances(): query_id=" << PrintId(query_id);
+  FAULT_INJECTION_RPC_DELAY(RPC_CANCELQUERYFINSTANCES);
+  QueryState::ScopedRef qs(query_id);
+  if (qs.get() == nullptr) {
+    Status status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
+        Substitute("Unknown query id: $0", PrintId(query_id))));
+    RespondAndReleaseRpc(status, response, rpc_context);
+    return;
+  }
+  qs->Cancel();
+  RespondAndReleaseRpc(Status::OK(), response, rpc_context);
+}
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/be/src/service/control-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h
index 44871d7..5dc73dd 100644
--- a/be/src/service/control-service.h
+++ b/be/src/service/control-service.h
@@ -38,44 +38,48 @@ class TRuntimeProfileTree;
 
 /// This singleton class implements service for managing execution of queries in Impala.
 class ControlService : public ControlServiceIf {
-  public:
-   ControlService(MetricGroup* metric_group);
+ public:
+  ControlService(MetricGroup* metric_group);
 
-   /// Initializes the service by registering it with the singleton RPC manager.
-   /// This mustn't be called until RPC manager has been initialized.
-   Status Init();
+  /// Initializes the service by registering it with the singleton RPC manager.
+  /// This mustn't be called until RPC manager has been initialized.
+  Status Init();
 
-   /// Returns true iff the 'remote_user' in 'context' is authorized to access
-   /// ControlService. On denied access, the RPC is replied to with an error message.
-   /// Authorization is enforced only when Kerberos is enabled.
-   virtual bool Authorize(const google::protobuf::Message* req,
-       google::protobuf::Message* resp, kudu::rpc::RpcContext* rpc_context) override;
+  /// Returns true iff the 'remote_user' in 'context' is authorized to access
+  /// ControlService. On denied access, the RPC is replied to with an error message.
+  /// Authorization is enforced only when Kerberos is enabled.
+  virtual bool Authorize(const google::protobuf::Message* req,
+      google::protobuf::Message* resp, kudu::rpc::RpcContext* rpc_context) override;
 
-   /// Updates the coordinator with the query status of the backend encoded in 'req'.
-   virtual void ReportExecStatus(const ReportExecStatusRequestPB *req,
-       ReportExecStatusResponsePB *resp, kudu::rpc::RpcContext *rpc_context) override;
+  /// Updates the coordinator with the query status of the backend encoded in 'req'.
+  virtual void ReportExecStatus(const ReportExecStatusRequestPB* req,
+      ReportExecStatusResponsePB* resp, kudu::rpc::RpcContext* rpc_context) override;
+
+  /// Cancel any executing fragment instances for the query id specified in 'req'.
+  virtual void CancelQueryFInstances(const CancelQueryFInstancesRequestPB* req,
+      CancelQueryFInstancesResponsePB* resp, ::kudu::rpc::RpcContext* context) override;
 
   /// Gets a ControlService proxy to a server with 'address' and 'hostname'.
   /// The newly created proxy is returned in 'proxy'. Returns error status on failure.
   static Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
       std::unique_ptr<ControlServiceProxy>* proxy);
 
-  private:
-   /// Tracks the memory usage of payload in the service queue.
-   std::unique_ptr<MemTracker> mem_tracker_;
-
-   /// Helper for deserializing runtime profile from the sidecar attached in the inbound
-   /// call within 'rpc_context'. On success, returns the deserialized profile in
-   /// 'thrift_profiles'. On failure, returns the error status;
-   static Status GetProfile(const ReportExecStatusRequestPB& request,
-       const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
-       TRuntimeProfileForest* thrift_profiles);
-
-   /// Helper for serializing 'status' as part of 'response'. Also releases memory
-   /// of the RPC payload previously accounted towards the internal memory tracker.
-   template<typename ResponsePBType>
-   void RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
-       kudu::rpc::RpcContext* rpc_context);
+ private:
+  /// Tracks the memory usage of payload in the service queue.
+  std::unique_ptr<MemTracker> mem_tracker_;
+
+  /// Helper for deserializing runtime profile from the sidecar attached in the inbound
+  /// call within 'rpc_context'. On success, returns the deserialized profile in
+  /// 'thrift_profiles'. On failure, returns the error status;
+  static Status GetProfile(const ReportExecStatusRequestPB& request,
+      const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
+      TRuntimeProfileForest* thrift_profiles);
+
+  /// Helper for serializing 'status' as part of 'response'. Also releases memory
+  /// of the RPC payload previously accounted towards the internal memory tracker.
+  template <typename ResponsePBType>
+  void RespondAndReleaseRpc(
+      const Status& status, ResponsePBType* response, kudu::rpc::RpcContext* rpc_context);
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/be/src/service/impala-internal-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index 08c708e..f009b39 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -65,20 +65,6 @@ template <typename T> void SetUnknownIdError(
   status.SetTStatus(status_container);
 }
 
-void ImpalaInternalService::CancelQueryFInstances(
-    TCancelQueryFInstancesResult& return_val,
-    const TCancelQueryFInstancesParams& params) {
-  VLOG_QUERY << "CancelQueryFInstances(): query_id=" << PrintId(params.query_id);
-  FAULT_INJECTION_RPC_DELAY(RPC_CANCELQUERYFINSTANCES);
-  DCHECK(params.__isset.query_id);
-  QueryState::ScopedRef qs(params.query_id);
-  if (qs.get() == nullptr) {
-    SetUnknownIdError("query", params.query_id, &return_val);
-    return;
-  }
-  qs->Cancel();
-}
-
 void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
     const TUpdateFilterParams& params) {
   FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index 376b615..28000c7 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -33,8 +33,6 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
   ImpalaInternalService();
   virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
       const TExecQueryFInstancesParams& params);
-  virtual void CancelQueryFInstances(TCancelQueryFInstancesResult& return_val,
-      const TCancelQueryFInstancesParams& params);
   virtual void UpdateFilter(TUpdateFilterResult& return_val,
       const TUpdateFilterParams& params);
   virtual void PublishFilter(TPublishFilterResult& return_val,

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/common/protobuf/control_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index c546959..273e0ae 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -158,9 +158,26 @@ message ReportExecStatusResponsePB {
   optional StatusPB status = 1;
 }
 
+message CancelQueryFInstancesRequestPB {
+  // The query id of the query being cancelled.
+  optional UniqueIdPB query_id = 1;
+}
+
+message CancelQueryFInstancesResponsePB {
+  optional StatusPB status = 1;
+}
+
 service ControlService {
   // Override the default authorization method.
   option (kudu.rpc.default_authz_method) = "Authorize";
 
+  // Update the coordinator with the query status of the backend.
   rpc ReportExecStatus(ReportExecStatusRequestPB) returns (ReportExecStatusResponsePB);
+
+  // Called by coordinator to cancel execution of a single query's fragment instances,
+  // which the coordinator initiated with a prior call to ExecQueryFInstances.
+  // Cancellation is asynchronous (in the sense that this call may return before the
+  // fragment instance has completely stopped executing).
+  rpc CancelQueryFInstances(CancelQueryFInstancesRequestPB)
+      returns (CancelQueryFInstancesResponsePB);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/e4cff7d0/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 5875257..6add620 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -564,20 +564,6 @@ struct TExecQueryFInstancesResult {
   1: optional Status.TStatus status
 }
 
-// CancelQueryFInstances
-
-struct TCancelQueryFInstancesParams {
-  1: required ImpalaInternalServiceVersion protocol_version
-
-  // required in V1
-  2: optional Types.TUniqueId query_id
-}
-
-struct TCancelQueryFInstancesResult {
-  // required in V1
-  1: optional Status.TStatus status
-}
-
 // Parameters for RequestPoolService.resolveRequestPool()
 // TODO: why is this here?
 struct TResolveRequestPoolParams {
@@ -766,12 +752,6 @@ service ImpalaInternalService {
   // Returns as soon as all incoming data streams have been set up.
   TExecQueryFInstancesResult ExecQueryFInstances(1:TExecQueryFInstancesParams params);
 
-  // Called by coord to cancel execution of a single query's fragment instances, which
-  // the coordinator initiated with a prior call to ExecQueryFInstances.
-  // Cancellation is asynchronous.
-  TCancelQueryFInstancesResult CancelQueryFInstances(
-      1:TCancelQueryFInstancesParams params);
-
   // Called by fragment instances that produce local runtime filters to deliver them to
   // the coordinator for aggregation and broadcast.
   TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params);