You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/10 20:24:49 UTC

[impala] 03/09: IMPALA-7985: Port RemoteShutdown() to KRPC.

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit adde66b37cea3689dac453f5d4f2aa4863a35979
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Wed Jan 9 16:58:13 2019 -0800

    IMPALA-7985: Port RemoteShutdown() to KRPC.
    
    The :shutdown command is used to shutdown a remote server. The common
    case is that a user specifies the impalad to shutdown by specifying a
    host e.g. :shutdown('host100'). If a user has more than one impalad on a
    remote host then the form :shutdown('<host>:<port>') can be used to
    specify the port by which the impalad can be contacted. Prior to
    IMPALA-7985 this port was the backend port, e.g.
    :shutdown('host100:22000'). With IMPALA-7985 the port to use is the KRPC
    port, e.g. :shutdown('host100:27000').
    
    Shutdown is implemented by making an rpc call to the target impalad.
    This changes the implementation of this call to use KRPC.
    
    To aid the user in finding the KRPC port, the KRPC address is added to
    the /backends section of the debug web page.
    
    We attempt to detect the case where :shutdown is pointed at a thrift
    port (like the backend port) and print an informative message.
    
    Documentation of this change will be done in IMPALA-8098.
    Further improvements to DoRpcWithRetry() will be done in IMPALA-8143.
    
    For discussion of why it was chosen to implement this change in an
    incompatible way, see comments in
    https://issues.apache.org/jira/browse/IMPALA-7985.
    
    TESTING
    
    Ran all end-to-end tests.
    Enhance the test for /backends in test_web_pages.py.
    In test_restart_services.py add a call to the old backend port to the
    test. Some expected error messages were changed in line with what KRPC
    returns.
    
    Change-Id: I4fd00ee4e638f5e71e27893162fd65501ef9e74e
    Reviewed-on: http://gerrit.cloudera.org:8080/12260
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/backend-client.h               |  8 ---
 be/src/runtime/coordinator-backend-state.cc   | 21 +------
 be/src/runtime/coordinator-backend-state.h    |  5 --
 be/src/service/client-request-state.cc        | 79 ++++++++++++++++++++-------
 be/src/service/control-service.cc             | 25 +++++++--
 be/src/service/control-service.h              | 35 ++++++++++++
 be/src/service/impala-http-handler.cc         |  5 +-
 be/src/service/impala-internal-service.cc     |  8 ---
 be/src/service/impala-internal-service.h      |  2 -
 be/src/service/impala-server.cc               | 55 ++++++++++---------
 be/src/service/impala-server.h                |  6 +-
 common/protobuf/control_service.proto         | 37 +++++++++++++
 common/thrift/ImpalaInternalService.thrift    | 39 -------------
 tests/custom_cluster/test_restart_services.py | 34 +++++++-----
 tests/webserver/test_web_pages.py             | 18 ++++++
 www/backends.tmpl                             |  2 +
 16 files changed, 229 insertions(+), 150 deletions(-)

diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index 04139a6..a4ff597 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -84,14 +84,6 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
     ImpalaInternalServiceClient::recv_PublishFilter(_return);
   }
 
-  void RemoteShutdown(TRemoteShutdownResult& _return, const TRemoteShutdownParams& params,
-      bool* send_done) {
-    DCHECK(!*send_done);
-    ImpalaInternalServiceClient::send_RemoteShutdown(params);
-    *send_done = true;
-    ImpalaInternalServiceClient::recv_RemoteShutdown(_return);
-  }
-
 #pragma clang diagnostic pop
 
  private:
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 2bc8547..605a758 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -417,23 +417,6 @@ void Coordinator::BackendState::UpdateExecStats(
   }
 }
 
-template <typename F>
-Status Coordinator::BackendState::DoRrpcWithRetry(
-    F&& rpc_call, const char* debug_action, const char* error_msg) {
-  Status rpc_status;
-  for (int i = 0; i < 3; i++) {
-    RpcController rpc_controller;
-    rpc_controller.set_timeout(MonoDelta::FromSeconds(10));
-    // Check for injected failures.
-    rpc_status = DebugAction(query_ctx().client_request.query_options, debug_action);
-    if (!rpc_status.ok()) continue;
-
-    rpc_status = FromKuduStatus(rpc_call(&rpc_controller), error_msg);
-    if (rpc_status.ok()) break;
-  }
-  return rpc_status;
-}
-
 bool Coordinator::BackendState::Cancel() {
   unique_lock<mutex> l(lock_);
 
@@ -472,8 +455,8 @@ bool Coordinator::BackendState::Cancel() {
     return proxy->CancelQueryFInstances(request, &response, rpc_controller);
   };
 
-  Status rpc_status = DoRrpcWithRetry(
-      cancel_rpc, "COORD_CANCEL_QUERY_FINSTANCES_RPC", "Cancel() RPC failed");
+  Status rpc_status = ControlService::DoRpcWithRetry(cancel_rpc, query_ctx(),
+      "COORD_CANCEL_QUERY_FINSTANCES_RPC", "Cancel() RPC failed", 3, 10);
 
   if (!rpc_status.ok()) {
     status_.MergeStatus(rpc_status);
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 122da42..1cc67c1 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -307,11 +307,6 @@ class Coordinator::BackendState {
 
   /// Same as ComputeResourceUtilization() but caller must hold lock.
   ResourceUtilization ComputeResourceUtilizationLocked();
-
-  /// Retry the Rpc 'rpc_call' up to 3 times.
-  /// Pass 'debug_action' to DebugAction() to potentially inject errors.
-  template <typename F>
-  Status DoRrpcWithRetry(F&& rpc_call, const char* debug_action, const char* error_msg);
 };
 
 /// Per fragment execution statistics.
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index bde4741..d8ce417 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -21,12 +21,14 @@
 #include <limits>
 #include <gutil/strings/substitute.h>
 
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "runtime/backend-client.h"
 #include "runtime/coordinator.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
-#include "runtime/exec-env.h"
 #include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
 #include "service/frontend.h"
@@ -40,19 +42,23 @@
 
 #include "gen-cpp/CatalogService.h"
 #include "gen-cpp/CatalogService_types.h"
+#include "gen-cpp/control_service.pb.h"
+#include "gen-cpp/control_service.proxy.h"
 
 #include <thrift/Thrift.h>
 
 #include "common/names.h"
+#include "control-service.h"
 
 using boost::algorithm::iequals;
 using boost::algorithm::join;
+using kudu::rpc::RpcController;
 using namespace apache::hive::service::cli::thrift;
 using namespace apache::thrift;
 using namespace beeswax;
 using namespace strings;
 
-DECLARE_int32(be_port);
+DECLARE_int32(krpc_port);
 DECLARE_int32(catalog_service_port);
 DECLARE_string(catalog_service_host);
 DECLARE_int64(max_result_cache_size);
@@ -630,39 +636,72 @@ Status ClientRequestState::ExecDdlRequest() {
 
 Status ClientRequestState::ExecShutdownRequest() {
   const TShutdownParams& request = exec_request_.admin_request.shutdown_params;
-  int port = request.__isset.backend && request.backend.port != 0 ? request.backend.port :
-                                                                    FLAGS_be_port;
+  bool backend_port_specified = request.__isset.backend && request.backend.port != 0;
+  int port = backend_port_specified ? request.backend.port : FLAGS_krpc_port;
   // Use the local shutdown code path if the host is unspecified or if it exactly matches
   // the configured host/port. This avoids the possibility of RPC errors preventing
   // shutdown.
   if (!request.__isset.backend
-      || (request.backend.hostname == FLAGS_hostname && port == FLAGS_be_port)) {
-    TShutdownStatus shutdown_status;
+      || (request.backend.hostname == FLAGS_hostname && port == FLAGS_krpc_port)) {
+    ShutdownStatusPB shutdown_status;
     int64_t deadline_s = request.__isset.deadline_s ? request.deadline_s : -1;
     RETURN_IF_ERROR(parent_server_->StartShutdown(deadline_s, &shutdown_status));
     SetResultSet({ImpalaServer::ShutdownStatusToString(shutdown_status)});
     return Status::OK();
   }
-  TNetworkAddress addr = MakeNetworkAddress(request.backend.hostname, port);
 
-  TRemoteShutdownParams params;
-  if (request.__isset.deadline_s) params.__set_deadline_s(request.deadline_s);
-  TRemoteShutdownResult resp;
+  // KRPC relies on resolved IP address, so convert hostname.
+  IpAddr ip_address;
+  Status ip_status = HostnameToIpAddr(request.backend.hostname, &ip_address);
+  if (!ip_status.ok()) {
+    VLOG(1) << "Could not convert hostname " << request.backend.hostname
+            << " to ip address, error: " << ip_status.GetDetail();
+    return ip_status;
+  }
+  TNetworkAddress addr = MakeNetworkAddress(ip_address, port);
+
+  std::unique_ptr<ControlServiceProxy> proxy;
+  Status get_proxy_status = ControlService::GetProxy(addr, addr.hostname, &proxy);
+  if (!get_proxy_status.ok()) {
+    return Status(
+        Substitute("Could not get Proxy to ControlService at $0 with error: $1.",
+            TNetworkAddressToString(addr), get_proxy_status.msg().msg()));
+  }
+
+  RemoteShutdownParamsPB params;
+  if (request.__isset.deadline_s) params.set_deadline_s(request.deadline_s);
+  RemoteShutdownResultPB resp;
   VLOG_QUERY << "Sending Shutdown RPC to " << TNetworkAddressToString(addr);
-  ImpalaBackendConnection::RpcStatus rpc_status = ImpalaBackendConnection::DoRpcWithRetry(
-      ExecEnv::GetInstance()->impalad_client_cache(), addr,
-      &ImpalaBackendClient::RemoteShutdown, params,
-      [this]() { return DebugAction(query_options(), "CRS_SHUTDOWN_RPC"); }, &resp);
-  if (!rpc_status.status.ok()) {
+
+  auto shutdown_rpc = [&](RpcController* rpc_controller) -> kudu::Status {
+    return proxy->RemoteShutdown(params, &resp, rpc_controller);
+  };
+
+  Status rpc_status = ControlService::DoRpcWithRetry(
+      shutdown_rpc, query_ctx_, "CRS_SHUTDOWN_RPC", "RemoteShutdown() RPC failed", 3, 10);
+
+  if (!rpc_status.ok()) {
+    const string& msg = rpc_status.msg().msg();
     VLOG_QUERY << "RemoteShutdown query_id= " << PrintId(query_id())
                << " failed to send RPC to " << TNetworkAddressToString(addr) << " :"
-               << rpc_status.status.msg().msg();
-    return rpc_status.status;
+               << msg;
+    string err_string = Substitute(
+        "Rpc to $0 failed with error '$1'", TNetworkAddressToString(addr), msg);
+    // Attempt to detect if the the failure is because of not using a KRPC port.
+    if (backend_port_specified
+        && msg.find("RemoteShutdown() RPC failed: Timed out: connection negotiation to")
+            != string::npos) {
+      // Prior to IMPALA-7985 :shutdown() used the backend port.
+      err_string.append(" This may be because the port specified is wrong. You may have"
+                        " specified the backend (thrift) port which :shutdown() can no"
+                        " longer use. Please make sure the correct KRPC port is being"
+                        " used, or don't specify any port in the :shutdown() command.");
+    }
+    return Status(err_string);
   }
-
-  Status shutdown_status(resp.status);
+  Status shutdown_status(resp.status());
   RETURN_IF_ERROR(shutdown_status);
-  SetResultSet({ImpalaServer::ShutdownStatusToString(resp.shutdown_status)});
+  SetResultSet({ImpalaServer::ShutdownStatusToString(resp.shutdown_status())});
   return Status::OK();
 }
 
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index 991d8f3..d48b873 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -20,6 +20,7 @@
 #include "common/constant-strings.h"
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "rpc/rpc-mgr.h"
 #include "rpc/rpc-mgr.inline.h"
 #include "runtime/coordinator.h"
@@ -94,7 +95,7 @@ bool ControlService::Authorize(const google::protobuf::Message* req,
 }
 
 Status ControlService::GetProfile(const ReportExecStatusRequestPB& request,
-    const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
+    const ClientRequestState& request_state, RpcContext* rpc_context,
     TRuntimeProfileForest* thrift_profiles) {
   // Debug action to simulate deserialization failure.
   RETURN_IF_ERROR(DebugAction(request_state.query_options(),
@@ -110,7 +111,7 @@ Status ControlService::GetProfile(const ReportExecStatusRequestPB& request,
 }
 
 void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
-    ReportExecStatusResponsePB* response, kudu::rpc::RpcContext* rpc_context) {
+    ReportExecStatusResponsePB* response, RpcContext* rpc_context) {
   const TUniqueId query_id = ProtoToQueryId(request->query_id());
   shared_ptr<ClientRequestState> request_state =
       ExecEnv::GetInstance()->impala_server()->GetClientRequestState(query_id);
@@ -152,9 +153,9 @@ void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
   RespondAndReleaseRpc(resp_status, response, rpc_context);
 }
 
-template<typename ResponsePBType>
-void ControlService::RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
-    kudu::rpc::RpcContext* rpc_context) {
+template <typename ResponsePBType>
+void ControlService::RespondAndReleaseRpc(
+    const Status& status, ResponsePBType* response, RpcContext* rpc_context) {
   status.ToProto(response->mutable_status());
   // Release the memory against the control service's memory tracker.
   mem_tracker_->Release(rpc_context->GetTransferSize());
@@ -162,10 +163,11 @@ void ControlService::RespondAndReleaseRpc(const Status& status, ResponsePBType*
 }
 
 void ControlService::CancelQueryFInstances(const CancelQueryFInstancesRequestPB* request,
-    CancelQueryFInstancesResponsePB* response, ::kudu::rpc::RpcContext* rpc_context) {
+    CancelQueryFInstancesResponsePB* response, RpcContext* rpc_context) {
   DCHECK(request->has_query_id());
   const TUniqueId& query_id = ProtoToQueryId(request->query_id());
   VLOG_QUERY << "CancelQueryFInstances(): query_id=" << PrintId(query_id);
+  // TODO(IMPALA-8143) Use DebugAction for fault injection.
   FAULT_INJECTION_RPC_DELAY(RPC_CANCELQUERYFINSTANCES);
   QueryState::ScopedRef qs(query_id);
   if (qs.get() == nullptr) {
@@ -177,4 +179,15 @@ void ControlService::CancelQueryFInstances(const CancelQueryFInstancesRequestPB*
   qs->Cancel();
   RespondAndReleaseRpc(Status::OK(), response, rpc_context);
 }
+
+void ControlService::RemoteShutdown(const RemoteShutdownParamsPB* req,
+    RemoteShutdownResultPB* response, RpcContext* rpc_context) {
+  // TODO(IMPALA-8143) Use DebugAction for fault injection.
+  FAULT_INJECTION_RPC_DELAY(RPC_REMOTESHUTDOWN);
+  Status status = ExecEnv::GetInstance()->impala_server()->StartShutdown(
+      req->has_deadline_s() ? req->deadline_s() : -1,
+      response->mutable_shutdown_status());
+
+  RespondAndReleaseRpc(status, response, rpc_context);
+}
 }
diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h
index 5dc73dd..6a0267f 100644
--- a/be/src/service/control-service.h
+++ b/be/src/service/control-service.h
@@ -20,8 +20,16 @@
 
 #include "gen-cpp/control_service.service.h"
 
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "util/debug-util.h"
+
 #include "common/status.h"
 
+using kudu::MonoDelta;
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcController;
+
 namespace kudu {
 namespace rpc {
 class RpcContext;
@@ -59,11 +67,38 @@ class ControlService : public ControlServiceIf {
   virtual void CancelQueryFInstances(const CancelQueryFInstancesRequestPB* req,
       CancelQueryFInstancesResponsePB* resp, ::kudu::rpc::RpcContext* context) override;
 
+  /// Initiate shutdown.
+  virtual void RemoteShutdown(const RemoteShutdownParamsPB* req,
+      RemoteShutdownResultPB* response, ::kudu::rpc::RpcContext* context) override;
+
   /// Gets a ControlService proxy to a server with 'address' and 'hostname'.
   /// The newly created proxy is returned in 'proxy'. Returns error status on failure.
   static Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
       std::unique_ptr<ControlServiceProxy>* proxy);
 
+  /// Retry the Rpc 'rpc_call' up to 'times_to_try' times.
+  /// Each Rpc has a timeout of 'timeout_s' seconds.
+  /// There is no sleeping between retries.
+  /// Pass 'debug_action' to DebugAction() to potentially inject errors.
+  template <typename F>
+  static Status DoRpcWithRetry(F&& rpc_call, const TQueryCtx& query_ctx,
+      const char* debug_action, const char* error_msg, int times_to_try, int timeout_s) {
+    DCHECK_GT(times_to_try, 0);
+    Status rpc_status;
+    for (int i = 0; i < times_to_try; i++) {
+      RpcController rpc_controller;
+      rpc_controller.set_timeout(MonoDelta::FromSeconds(timeout_s));
+      // Check for injected failures.
+      rpc_status = DebugAction(query_ctx.client_request.query_options, debug_action);
+      if (!rpc_status.ok()) continue;
+
+      rpc_status = FromKuduStatus(rpc_call(&rpc_controller), error_msg);
+      if (rpc_status.ok()) break;
+      // TODO(IMPALA-8143) Add a sleep if RpcMgr::IsServerTooBusy().
+    }
+    return rpc_status;
+  }
+
  private:
   /// Tracks the memory usage of payload in the service queue.
   std::unique_ptr<MemTracker> mem_tracker_;
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 424e5cf..4a55e66 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -33,9 +33,9 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "scheduling/admission-controller.h"
-#include "service/impala-server.h"
 #include "service/client-request-state.h"
 #include "service/frontend.h"
+#include "service/impala-server.h"
 #include "thrift/protocol/TDebugProtocol.h"
 #include "util/coding-util.h"
 #include "util/logging-support.h"
@@ -854,7 +854,10 @@ void ImpalaHttpHandler::BackendsHandler(const Webserver::ArgumentMap& args,
     Value backend_obj(kObjectType);
     string address = TNetworkAddressToString(backend.address);
     Value str(address.c_str(), document->GetAllocator());
+    Value krpc_address(
+        TNetworkAddressToString(backend.krpc_address).c_str(), document->GetAllocator());
     backend_obj.AddMember("address", str, document->GetAllocator());
+    backend_obj.AddMember("krpc_address", krpc_address, document->GetAllocator());
     backend_obj.AddMember("is_coordinator", backend.is_coordinator,
         document->GetAllocator());
     backend_obj.AddMember("is_executor", backend.is_executor, document->GetAllocator());
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index c4c2139..d4ed14d 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -86,11 +86,3 @@ void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
   if (qs.get() == nullptr) return;
   qs->PublishFilter(params);
 }
-
-void ImpalaInternalService::RemoteShutdown(TRemoteShutdownResult& return_val,
-    const TRemoteShutdownParams& params) {
-  FAULT_INJECTION_RPC_DELAY(RPC_REMOTESHUTDOWN);
-  Status status = impala_server_->StartShutdown(
-      params.__isset.deadline_s ? params.deadline_s : -1, &return_val.shutdown_status);
-  status.ToThrift(&return_val.status);
-}
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index 28000c7..9f0ea60 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -37,8 +37,6 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
       const TUpdateFilterParams& params);
   virtual void PublishFilter(TPublishFilterResult& return_val,
       const TPublishFilterParams& params);
-  virtual void RemoteShutdown(TRemoteShutdownResult& return_val,
-      const TRemoteShutdownParams& params);
 
  private:
   ImpalaServer* impala_server_;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index cf16563..c2705d6 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -2428,38 +2428,39 @@ Status ImpalaServer::CheckNotShuttingDown() const {
       TErrorCode::SERVER_SHUTTING_DOWN, ShutdownStatusToString(GetShutdownStatus())));
 }
 
-TShutdownStatus ImpalaServer::GetShutdownStatus() const {
-  TShutdownStatus result;
+ShutdownStatusPB ImpalaServer::GetShutdownStatus() const {
+  ShutdownStatusPB result;
   int64_t shutdown_time = shutting_down_.Load();
   DCHECK_GT(shutdown_time, 0);
   int64_t shutdown_deadline = shutdown_deadline_.Load();
   DCHECK_GT(shutdown_time, 0);
   int64_t now = MonotonicMillis();
   int64_t elapsed_ms = now - shutdown_time;
-  result.grace_remaining_ms =
-      max<int64_t>(0, FLAGS_shutdown_grace_period_s * 1000 - elapsed_ms);
-  result.deadline_remaining_ms =
-      max<int64_t>(0, shutdown_deadline - now);
-  result.finstances_executing =
-      ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
-  result.client_requests_registered = ImpaladMetrics::NUM_QUERIES_REGISTERED->GetValue();
-  result.backend_queries_executing =
-      ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->GetValue();
+  result.set_grace_remaining_ms(
+      max<int64_t>(0, FLAGS_shutdown_grace_period_s * 1000 - elapsed_ms));
+  result.set_deadline_remaining_ms(max<int64_t>(0, shutdown_deadline - now));
+  result.set_finstances_executing(
+      ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue());
+  result.set_client_requests_registered(
+      ImpaladMetrics::NUM_QUERIES_REGISTERED->GetValue());
+  result.set_backend_queries_executing(
+      ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->GetValue());
   return result;
 }
 
-string ImpalaServer::ShutdownStatusToString(const TShutdownStatus& shutdown_status) {
+string ImpalaServer::ShutdownStatusToString(const ShutdownStatusPB& shutdown_status) {
   return Substitute("startup grace period left: $0, deadline left: $1, "
-      "queries registered on coordinator: $2, queries executing: $3, "
-      "fragment instances: $4",
-      PrettyPrinter::Print(shutdown_status.grace_remaining_ms, TUnit::TIME_MS),
-      PrettyPrinter::Print(shutdown_status.deadline_remaining_ms, TUnit::TIME_MS),
-      shutdown_status.client_requests_registered,
-      shutdown_status.backend_queries_executing, shutdown_status.finstances_executing);
+                    "queries registered on coordinator: $2, queries executing: $3, "
+                    "fragment instances: $4",
+      PrettyPrinter::Print(shutdown_status.grace_remaining_ms(), TUnit::TIME_MS),
+      PrettyPrinter::Print(shutdown_status.deadline_remaining_ms(), TUnit::TIME_MS),
+      shutdown_status.client_requests_registered(),
+      shutdown_status.backend_queries_executing(),
+      shutdown_status.finstances_executing());
 }
 
 Status ImpalaServer::StartShutdown(
-    int64_t relative_deadline_s, TShutdownStatus* shutdown_status) {
+    int64_t relative_deadline_s, ShutdownStatusPB* shutdown_status) {
   DCHECK_GE(relative_deadline_s, -1);
   if (relative_deadline_s == -1) relative_deadline_s = FLAGS_shutdown_deadline_s;
   int64_t now = MonotonicMillis();
@@ -2493,22 +2494,24 @@ Status ImpalaServer::StartShutdown(
   // Show the full grace/limit times to avoid showing confusing intermediate values
   // to the person running the statement.
   if (set_grace) {
-    shutdown_status->grace_remaining_ms = FLAGS_shutdown_grace_period_s * 1000L;
+    shutdown_status->set_grace_remaining_ms(FLAGS_shutdown_grace_period_s * 1000L);
+  }
+  if (set_deadline) {
+    shutdown_status->set_deadline_remaining_ms(relative_deadline_s * 1000L);
   }
-  if (set_deadline) shutdown_status->deadline_remaining_ms = relative_deadline_s * 1000L;
   return Status::OK();
 }
 
 [[noreturn]] void ImpalaServer::ShutdownThread() {
   while (true) {
     SleepForMs(1000);
-    TShutdownStatus shutdown_status = GetShutdownStatus();
+    const ShutdownStatusPB& shutdown_status = GetShutdownStatus();
     LOG(INFO) << "Shutdown status: " << ShutdownStatusToString(shutdown_status);
-    if (shutdown_status.grace_remaining_ms <= 0
-        && shutdown_status.backend_queries_executing == 0
-        && shutdown_status.client_requests_registered == 0) {
+    if (shutdown_status.grace_remaining_ms() <= 0
+        && shutdown_status.backend_queries_executing() == 0
+        && shutdown_status.client_requests_registered() == 0) {
       break;
-    } else if (shutdown_status.deadline_remaining_ms <= 0) {
+    } else if (shutdown_status.deadline_remaining_ms() <= 0) {
       break;
     }
   }
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 69c1f00..c23714b 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -409,7 +409,7 @@ class ImpalaServer : public ImpalaServiceIf,
   /// information about the pending shutdown in 'shutdown_status'. 'relative_deadline_s'
   /// is the deadline value in seconds to use, or -1 if we should use the default
   /// deadline. See Shutdown class comment for explanation of the shutdown sequence.
-  Status StartShutdown(int64_t relative_deadline_s, TShutdownStatus* shutdown_status);
+  Status StartShutdown(int64_t relative_deadline_s, ShutdownStatusPB* shutdown_status);
 
   /// Returns true if a shut down is in progress.
   bool IsShuttingDown() const { return shutting_down_.Load() != 0; }
@@ -421,10 +421,10 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// Return information about the status of a shutdown. Only valid to call if a shutdown
   /// is in progress (i.e. IsShuttingDown() is true).
-  TShutdownStatus GetShutdownStatus() const;
+  ShutdownStatusPB GetShutdownStatus() const;
 
   /// Convert the shutdown status to a human-readable string.
-  static std::string ShutdownStatusToString(const TShutdownStatus& shutdown_status);
+  static std::string ShutdownStatusToString(const ShutdownStatusPB& shutdown_status);
 
   // Mapping between query option names and levels
   QueryOptionLevels query_option_levels_;
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 8e6749d..f76e143 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -182,6 +182,40 @@ message CancelQueryFInstancesResponsePB {
   optional StatusPB status = 1;
 }
 
+message RemoteShutdownParamsPB {
+  // Deadline for the shutdown. After this deadline expires (starting at the time when
+  // this remote shutdown command is received), the Impala daemon exits immediately
+  // regardless of whether queries are still executing.
+  optional int64 deadline_s = 1;
+}
+
+// The current status of a shutdown operation.
+message ShutdownStatusPB {
+  // Milliseconds remaining in startup grace period. 0 if the period has expired.
+  optional int64 grace_remaining_ms = 1;
+
+  // Milliseconds remaining in shutdown deadline. 0 if the deadline has expired.
+  optional int64 deadline_remaining_ms = 2;
+
+  // Number of fragment instances still executing.
+  optional int64 finstances_executing = 3;
+
+  // Number of client requests still registered with the Impala server that is being shut
+  // down.
+  optional int64 client_requests_registered = 4;
+
+  // Number of queries still executing on backend.
+  optional int64 backend_queries_executing = 5;
+}
+
+message RemoteShutdownResultPB {
+  // Success or failure of the operation.
+  optional StatusPB status = 1;
+
+  // If status is OK, additional info about the shutdown status.
+  optional ShutdownStatusPB shutdown_status = 2;
+}
+
 service ControlService {
   // Override the default authorization method.
   option (kudu.rpc.default_authz_method) = "Authorize";
@@ -195,4 +229,7 @@ service ControlService {
   // fragment instance has completely stopped executing).
   rpc CancelQueryFInstances(CancelQueryFInstancesRequestPB)
       returns (CancelQueryFInstancesResponsePB);
+
+  // Called to initiate shutdown of this backend.
+  rpc RemoteShutdown(RemoteShutdownParamsPB) returns (RemoteShutdownResultPB);
 }
\ No newline at end of file
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 3d3e997..fd2ea6f 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -733,42 +733,6 @@ struct TPublishFilterParams {
 struct TPublishFilterResult {
 }
 
-// RemoteShutdown
-
-struct TRemoteShutdownParams {
-  // Deadline for the shutdown. After this deadline expires (starting at the time when
-  // this remote shutdown command is received), the Impala daemon exits immediately
-  // regardless of whether queries are still executing.
-  1: optional i64 deadline_s
-}
-
-// The current status of a shutdown operation.
-struct TShutdownStatus {
-  // Milliseconds remaining in startup grace period. 0 if the period has expired.
-  1: required i64 grace_remaining_ms
-
-  // Milliseconds remaining in shutdown deadline. 0 if the deadline has expired.
-  2: required i64 deadline_remaining_ms
-
-  // Number of fragment instances still executing.
-  3: required i64 finstances_executing
-
-  // Number of client requests still registered with the Impala server that is being shut
-  // down.
-  4: required i64 client_requests_registered
-
-  // Number of queries still executing on backend.
-  5: required i64 backend_queries_executing
-}
-
-struct TRemoteShutdownResult {
-  // Success or failure of the operation.
-  1: required Status.TStatus status
-
-  // If status is OK, additional info about the shutdown status
-  2: required TShutdownStatus shutdown_status
-}
-
 service ImpalaInternalService {
   // Called by coord to start asynchronous execution of a query's fragment instances in
   // backend.
@@ -782,7 +746,4 @@ service ImpalaInternalService {
   // Called by the coordinator to deliver global runtime filters to fragments for
   // application at plan nodes.
   TPublishFilterResult PublishFilter(1:TPublishFilterParams params);
-
-  // Called to initiate shutdown of this backend.
-  TRemoteShutdownResult RemoteShutdown(1:TRemoteShutdownParams params);
 }
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index 1b6911a..e1e19ba 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -35,6 +35,7 @@ from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
 
 LOG = logging.getLogger(__name__)
 
+
 class TestRestart(CustomClusterTestSuite):
   @classmethod
   def get_workload(cls):
@@ -121,33 +122,40 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
     # Test that a failed shut down from a bogus host or port fails gracefully.
     ex = self.execute_query_expect_failure(self.client,
         ":shutdown('e6c00ca5cd67b567eb96c6ecfb26f05')")
-    assert "Couldn't open transport" in str(ex)
+    assert "Could not find IPv4 address for:" in str(ex)
     ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:100000')")
-    assert "Couldn't open transport" in str(ex)
-    # Test that pointing to the wrong thrift service (the HS2 port) fails gracefully.
-    ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:21050')")
-    assert ("RPC Error: Client for localhost:21050 hit an unexpected exception: " +
-            "Invalid method name: 'RemoteShutdown'") in str(ex)
+    assert "Invalid port:" in str(ex)
+    assert ("This may be because the port specified is wrong.") not in str(ex)
+
+    # Test that pointing to the wrong thrift service (the HS2 port) fails gracefully-ish.
+    thrift_ports = [21051, 22001]  # HS2 port, old backend port.
+    for port in thrift_ports:
+      ex = self.execute_query_expect_failure(self.client,
+          ":shutdown('localhost:{0}')".format(port))
+      assert ("failed with error 'RemoteShutdown() RPC failed") in str(ex)
+      assert ("This may be because the port specified is wrong.") in str(ex)
+
     # Test RPC error handling with debug action.
-    ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:22001')",
+    ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:27001')",
         query_options={'debug_action': 'CRS_SHUTDOWN_RPC:FAIL'})
-    assert 'Debug Action: CRS_SHUTDOWN_RPC:FAIL' in str(ex)
+    assert 'Rpc to 127.0.0.1:27001 failed with error \'Debug Action: ' \
+        'CRS_SHUTDOWN_RPC:FAIL' in str(ex)
 
     # Test remote shutdown.
     LOG.info("Start remote shutdown {0}".format(time.time()))
-    self.execute_query_expect_success(self.client, ":shutdown('localhost:22001')",
+    self.execute_query_expect_success(self.client, ":shutdown('localhost:27001')",
         query_options={})
 
     # Remote shutdown does not require statestore.
     self.cluster.statestored.kill()
     self.cluster.statestored.wait_for_exit()
-    self.execute_query_expect_success(self.client, ":shutdown('localhost:22002')",
+    self.execute_query_expect_success(self.client, ":shutdown('localhost:27002')",
         query_options={})
 
     # Test local shutdown, which should succeed even with injected RPC error.
     LOG.info("Start local shutdown {0}".format(time.time()))
     self.execute_query_expect_success(self.client,
-        ":shutdown('{0}:22000')".format(socket.gethostname()),
+        ":shutdown('{0}:27000')".format(socket.gethostname()),
         query_options={'debug_action': 'CRS_SHUTDOWN_RPC:FAIL'})
 
     # Make sure that the impala daemons exit after the startup grace period plus a 10
@@ -207,7 +215,7 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
     # and only get scan ranges that don't contain the midpoint of any row group, and
     # therefore not actually produce any rows.
     SLOW_QUERY = "select count(*) from tpch.lineitem where sleep(1) = l_orderkey"
-    SHUTDOWN_EXEC2 = ": shutdown('localhost:22001')"
+    SHUTDOWN_EXEC2 = ": shutdown('localhost:27001')"
 
     # Run this query before shutdown and make sure that it executes successfully on
     # all executors through the startup grace period without disruption.
@@ -275,7 +283,7 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
     # Test that we can reduce the deadline after setting it to a high value.
     # Run a query that will fail as a result of the reduced deadline.
     deadline_expiry_handle = self.__exec_and_wait_until_running(SLOW_QUERY)
-    SHUTDOWN_EXEC3 = ": shutdown('localhost:22002', {0})"
+    SHUTDOWN_EXEC3 = ": shutdown('localhost:27002', {0})"
     VERY_HIGH_DEADLINE = 5000
     HIGH_DEADLINE = 1000
     LOW_DEADLINE = 5
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index bbc6172..c7fa741 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -469,3 +469,21 @@ class TestWebPage(ImpalaTestSuite):
     assert 'backends' in response_json
     # When this test runs, all impalads would have already started.
     assert len(response_json['backends']) == 3
+
+    # Look at results for a single backend - they are not sorted.
+    backend_row = response_json['backends'][0]
+
+    # The 'address' column is the backend port of the impalad.
+    assert len(backend_row['address']) > 0
+    be_ports = ('22000', '22001', '22002')
+    assert backend_row['address'].endswith(be_ports)
+
+    # The 'krpc_address' is the krpc address of the impalad.
+    assert len(backend_row['krpc_address']) > 0
+    krpc_ports = ('27000', '27001', '27002')
+    assert backend_row['krpc_address'].endswith(krpc_ports)
+
+    assert backend_row['is_coordinator']
+    assert backend_row['is_executor']
+    assert not backend_row['is_quiescing']
+    assert len(backend_row['admit_mem_limit']) > 0
diff --git a/www/backends.tmpl b/www/backends.tmpl
index c80d25a..d29f4a1 100644
--- a/www/backends.tmpl
+++ b/www/backends.tmpl
@@ -24,6 +24,7 @@ under the License.
   <thead>
     <tr>
       <th>Address</th>
+      <th>Krpc Address</th>
       <th>Coordinator</th>
       <th>Executor</th>
       <th>Quiescing</th>
@@ -36,6 +37,7 @@ under the License.
     {{#backends}}
     <tr>
       <td>{{address}}</td>
+      <td>{{krpc_address}}</td>
       <td>{{is_coordinator}}</td>
       <td>{{is_executor}}</td>
       <td>{{is_quiescing}}</td>