You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/08/06 04:18:53 UTC

[impala] branch master updated (cdc5869 -> bbe064e)

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from cdc5869  IMPALA-8668: [DOCS] HS2 Support for Impala-Shell connection
     new 6a31be8  Create ranger cache directory in containers.
     new b5193f3  IMPALA-8806: Add metrics to improve observability of executor groups
     new bbec8fa  IMPALA-8781: Result spooling tests to cover edge cases and cancellation
     new bbe064e  IMPALA-8828: Support impersonation via http paths

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/rpc/auth-provider.h                         |   4 +-
 be/src/rpc/authentication.cc                       |  76 ++++++++++++--
 be/src/rpc/thrift-server.h                         |   1 +
 be/src/runtime/exec-env.cc                         |   2 +-
 be/src/scheduling/cluster-membership-mgr-test.cc   |  12 ++-
 be/src/scheduling/cluster-membership-mgr.cc        |  38 ++++++-
 be/src/scheduling/cluster-membership-mgr.h         |  13 ++-
 be/src/scheduling/scheduler-test-util.cc           |   8 +-
 be/src/service/impala-hs2-server.cc                |  13 ++-
 be/src/service/impala-server.cc                    |   8 +-
 be/src/transport/THttpServer.cpp                   |  12 ++-
 be/src/transport/THttpServer.h                     |  49 +++++----
 common/thrift/metrics.json                         |  29 ++++++
 docker/impala_base/Dockerfile                      |   4 +-
 .../impala/customcluster/LdapImpalaShellTest.java  |  87 +++++++++++++---
 .../queries/QueryTest/result-spooling.test         | 113 +++++++++++++++++++++
 tests/custom_cluster/test_auto_scaling.py          |  68 ++++++++-----
 tests/custom_cluster/test_executor_groups.py       |  47 +++++++--
 tests/hs2/test_fetch_first.py                      |  19 +++-
 tests/query_test/test_cancellation.py              |  58 +----------
 tests/query_test/test_result_spooling.py           | 101 ++++++++++++++++--
 tests/util/cancel_util.py                          |  87 ++++++++++++++++
 22 files changed, 676 insertions(+), 173 deletions(-)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/result-spooling.test
 create mode 100644 tests/util/cancel_util.py


[impala] 04/04: IMPALA-8828: Support impersonation via http paths

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bbe064ec194aff4ecf1e794bd4071df4ea4be166
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Aug 1 04:29:52 2019 +0000

    IMPALA-8828: Support impersonation via http paths
    
    This patch allows clients that connect over the HTTP server to specify
    the 'doAs' parameter in the provided path in order to perform
    impersonation.
    
    The existing rules for impersonation are applied, i.e.
    authorized_proxy_user_config or authorized_proxy_group_config must be
    set with the appropriate values for impersonation to be successful.
    
    Testing:
    - Added a FE test that verifies impersonation works as expected with
      impala-shell and ldap.
    - Manually tested with Apache Knox.
    
    Change-Id: I20b9c2e2d106530732f1c52f8d3d1ecc24ae4bd6
    Reviewed-on: http://gerrit.cloudera.org:8080/13994
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/auth-provider.h                         |  4 +-
 be/src/rpc/authentication.cc                       | 76 +++++++++++++++++--
 be/src/rpc/thrift-server.h                         |  1 +
 be/src/service/impala-hs2-server.cc                | 13 +++-
 be/src/transport/THttpServer.cpp                   | 12 ++-
 be/src/transport/THttpServer.h                     | 49 ++++++------
 .../impala/customcluster/LdapImpalaShellTest.java  | 87 ++++++++++++++++++----
 7 files changed, 182 insertions(+), 60 deletions(-)

diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 1807eb9..095ecac 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -194,9 +194,7 @@ class NoAuthProvider : public AuthProvider {
       const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
       ThriftServer::TransportType underlying_transport_type,
       apache::thrift::transport::TTransport* underlying_input_transport,
-      apache::thrift::transport::TTransport* underlying_output_transport) {
-    connection_ptr->username = "";
-  }
+      apache::thrift::transport::TTransport* underlying_output_transport);
 
   virtual bool is_secure() { return false; }
 };
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index ef8e1db..a7c04ee 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -26,6 +26,7 @@
 #include <boost/filesystem.hpp>
 #include <gutil/casts.h>
 #include <gutil/strings/escaping.h>
+#include <gutil/strings/split.h>
 #include <gutil/strings/strip.h>
 #include <gutil/strings/substitute.h>
 #include <random>
@@ -47,6 +48,7 @@
 #include "transport/THttpServer.h"
 #include "transport/TSaslClientTransport.h"
 #include "util/auth-util.h"
+#include "util/coding-util.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
 #include "util/network-util.h"
@@ -577,6 +579,31 @@ vector<string> ReturnHeaders(ThriftServer::ConnectionContext* connection_context
   return std::move(connection_context->return_headers);
 }
 
+// Takes the path component of an HTTP request and parses it. For now, we only care about
+// the 'doAs' parameter.
+bool HttpPathFn(ThriftServer::ConnectionContext* connection_context, const string& path,
+    string* err_msg) {
+  // 'path' should be of the form '/.*[?<key=value>[&<key=value>...]]'
+  vector<string> split = Split(path, delimiter::Limit("?", 1));
+  if (split.size() == 2) {
+    for (auto pair : Split(split[1], "&")) {
+      vector<string> key_value = Split(pair, delimiter::Limit("=", 1));
+      if (key_value.size() == 2 && key_value[0] == "doAs") {
+        string decoded;
+        if (!UrlDecode(key_value[1], &decoded)) {
+          *err_msg = Substitute(
+              "Could not decode 'doAs' parameter from HTTP request with path: $0", path);
+          return false;
+        } else {
+          connection_context->do_as_user = decoded;
+        }
+        break;
+      }
+    }
+  }
+  return true;
+}
+
 namespace {
 
 // SASL requires mutexes for thread safety, but doesn't implement
@@ -1014,18 +1041,22 @@ void SecureAuthProvider::SetupConnectionContext(
     case ThriftServer::HTTP: {
       THttpServer* http_input_transport =
           down_cast<THttpServer*>(underlying_input_transport);
+      THttpServer* http_output_transport =
+          down_cast<THttpServer*>(underlying_output_transport);
+      THttpServer::HttpCallbacks callbacks;
+      callbacks.path_fn = std::bind(
+          HttpPathFn, connection_ptr.get(), std::placeholders::_1, std::placeholders::_2);
+      callbacks.return_headers_fn = std::bind(ReturnHeaders, connection_ptr.get());
       if (has_ldap_) {
-        http_input_transport->setBasicAuthFn(
-            std::bind(BasicAuth, connection_ptr.get(), std::placeholders::_1));
+        callbacks.basic_auth_fn =
+            std::bind(BasicAuth, connection_ptr.get(), std::placeholders::_1);
       }
       if (!principal_.empty()) {
-        http_input_transport->setNegotiateAuthFn(std::bind(NegotiateAuth,
-            connection_ptr.get(), std::placeholders::_1, std::placeholders::_2));
-        THttpServer* http_output_transport =
-            down_cast<THttpServer*>(underlying_input_transport);
-        http_output_transport->setReturnHeadersFn(
-            std::bind(ReturnHeaders, connection_ptr.get()));
+        callbacks.negotiate_auth_fn = std::bind(NegotiateAuth, connection_ptr.get(),
+            std::placeholders::_1, std::placeholders::_2);
       }
+      http_input_transport->setCallbacks(callbacks);
+      http_output_transport->setCallbacks(callbacks);
       break;
     }
     default:
@@ -1060,6 +1091,35 @@ Status NoAuthProvider::WrapClientTransport(const string& hostname,
   return Status::OK();
 }
 
+void NoAuthProvider::SetupConnectionContext(
+    const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
+    ThriftServer::TransportType underlying_transport_type,
+    TTransport* underlying_input_transport, TTransport* underlying_output_transport) {
+  connection_ptr->username = "";
+  switch (underlying_transport_type) {
+    case ThriftServer::BINARY:
+      // Intentionally blank - since there's no security, there's nothing to set up here.
+      break;
+    case ThriftServer::HTTP: {
+      THttpServer* http_input_transport =
+          down_cast<THttpServer*>(underlying_input_transport);
+      THttpServer* http_output_transport =
+          down_cast<THttpServer*>(underlying_input_transport);
+      THttpServer::HttpCallbacks callbacks;
+      // Even though there's no security, we set up some callbacks, eg. to allow
+      // impersonation over unsecured connections for testing purposes.
+      callbacks.path_fn = std::bind(
+          HttpPathFn, connection_ptr.get(), std::placeholders::_1, std::placeholders::_2);
+      callbacks.return_headers_fn = std::bind(ReturnHeaders, connection_ptr.get());
+      http_input_transport->setCallbacks(callbacks);
+      http_output_transport->setCallbacks(callbacks);
+      break;
+    }
+    default:
+      LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
+  }
+}
+
 Status AuthManager::Init() {
   ssl_socket_factory_.reset(new TSSLSocketFactory(TLSv1_0));
 
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index e4bae51..8c43a37 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -95,6 +95,7 @@ class ThriftServer {
   struct ConnectionContext {
     TUniqueId connection_id;
     Username username;
+    Username do_as_user;
     TNetworkAddress network_address;
     std::string server_name;
     /// Used to pass HTTP headers generated by the input transport to the output transport
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 9717963..b50a213 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -309,10 +309,15 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
   state->kudu_latest_observed_ts = 0;
 
   // If the username was set by a lower-level transport, use it.
-  const ThriftServer::Username& username =
-      ThriftServer::GetThreadConnectionContext()->username;
-  if (!username.empty()) {
-    state->connected_user = username;
+  const ThriftServer::ConnectionContext* connection_context =
+      ThriftServer::GetThreadConnectionContext();
+  if (!connection_context->username.empty()) {
+    state->connected_user = connection_context->username;
+    if (!connection_context->do_as_user.empty()) {
+      state->do_as_user = connection_context->do_as_user;
+      Status status = AuthorizeProxyUser(state->connected_user, state->do_as_user);
+      HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
+    }
   } else {
     state->connected_user = request.username;
   }
diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp
index 182ed1d..fb2ff11 100644
--- a/be/src/transport/THttpServer.cpp
+++ b/be/src/transport/THttpServer.cpp
@@ -128,6 +128,10 @@ bool THttpServer::parseStatusLine(char* status) {
   *http = '\0';
 
   if (strcmp(method, "POST") == 0) {
+    string err_msg;
+    if (!callbacks_.path_fn(string(path), &err_msg)) {
+      throw TTransportException(err_msg);
+    }
     // POST method ok, looking for content.
     return true;
   } else if (strcmp(method, "OPTIONS") == 0) {
@@ -182,7 +186,7 @@ void THttpServer::headersDone() {
   // all supported auth types.
   bool authorized = false;
   if (has_ldap_ && (!got_negotiate_auth || !has_kerberos_)) {
-    if (basic_auth_fn_(basic_auth_token)) {
+    if (callbacks_.basic_auth_fn(basic_auth_token)) {
       authorized = true;
       if (metrics_enabled_) total_basic_auth_success_->Increment(1);
     } else {
@@ -190,7 +194,7 @@ void THttpServer::headersDone() {
     }
   } else if (has_kerberos_ && (!got_basic_auth || !has_ldap_)) {
     bool is_complete;
-    if (negotiate_auth_fn_(negotiate_auth_token, &is_complete)) {
+    if (callbacks_.negotiate_auth_fn(negotiate_auth_token, &is_complete)) {
       // If 'is_complete' is false we want to return a 401.
       authorized = is_complete;
       if (is_complete && metrics_enabled_) total_negotiate_auth_success_->Increment(1);
@@ -220,7 +224,7 @@ void THttpServer::flush() {
     << "Server: Thrift/" << VERSION << CRLF << "Access-Control-Allow-Origin: *" << CRLF
     << "Content-Type: application/x-thrift" << CRLF << "Content-Length: " << len << CRLF
     << "Connection: Keep-Alive" << CRLF;
-  vector<string> return_headers = return_headers_fn_();
+  vector<string> return_headers = callbacks_.return_headers_fn();
   for (const string& header : return_headers) {
     h << header << CRLF;
   }
@@ -261,7 +265,7 @@ std::string THttpServer::getTimeRFC1123() {
 void THttpServer::returnUnauthorized() {
   std::ostringstream h;
   h << "HTTP/1.1 401 Unauthorized" << CRLF << "Date: " << getTimeRFC1123() << CRLF;
-  vector<string> return_headers = return_headers_fn_();
+  vector<string> return_headers = callbacks_.return_headers_fn();
   for (const string& header : return_headers) {
     h << header << CRLF;
   }
diff --git a/be/src/transport/THttpServer.h b/be/src/transport/THttpServer.h
index de057b5..c205b59 100644
--- a/be/src/transport/THttpServer.h
+++ b/be/src/transport/THttpServer.h
@@ -33,16 +33,28 @@ namespace transport {
 class THttpServer : public THttpTransport {
 public:
 
-  // Function that takes a base64 encoded string of the form 'username:password' and
-  // returns true if authentication is successful.
-  typedef std::function<bool(const std::string&)> BasicAuthFn;
-
-  // Function that takes the value from a 'Authorization: Negotiate' header. Returns true
-  // if successful and sets 'is_complete' to true if negoation is done.
-  typedef std::function<bool(const std::string&, bool* is_complete)> NegotiateAuthFn;
-
-  // Function that returns a list of headers to return to the client.
-  typedef std::function<std::vector<std::string>()> ReturnHeadersFn;
+  struct HttpCallbacks {
+   public:
+    // Function that takes the value from a 'Authorization: Basic' header. Returns true
+    // if authentication is successful. Must be set if 'has_ldap_' is true.
+    std::function<bool(const std::string&)> basic_auth_fn =
+        [&](const std::string&) { DCHECK(false); return false; };
+
+    // Function that takes the value from a 'Authorization: Negotiate' header. Returns
+    // true if successful and sets 'is_complete' to true if negoation is done. Must be set
+    // if 'has_kerberos_' is true.
+    std::function<bool(const std::string&, bool* is_complete)> negotiate_auth_fn =
+        [&](const std::string&, bool*) { DCHECK(false); return false; };
+
+    // Function that returns a list of headers to return to the client.
+    std::function<std::vector<std::string>()> return_headers_fn =
+        [&]() { return std::vector<std::string>(); };
+
+    // Function that takes the path component of an HTTP request. Returns false and sets
+    // 'err_msg' if an error is encountered.
+    std::function<bool(const std::string& path, std::string* err_msg)> path_fn =
+        [&](const std::string&, std::string*) { return true; };
+  };
 
   THttpServer(boost::shared_ptr<TTransport> transport, bool has_ldap, bool has_kerberos,
       bool metrics_enabled, impala::IntCounter* total_basic_auth_success,
@@ -54,9 +66,7 @@ public:
 
   virtual void flush();
 
-  void setBasicAuthFn(const BasicAuthFn& fn) { basic_auth_fn_ = fn; }
-  void setNegotiateAuthFn(const NegotiateAuthFn& fn) { negotiate_auth_fn_ = fn; }
-  void setReturnHeadersFn(const ReturnHeadersFn& fn) { return_headers_fn_ = fn; }
+  void setCallbacks(const HttpCallbacks& callbacks) { callbacks_ = callbacks; }
 
 protected:
   void readHeaders();
@@ -68,12 +78,6 @@ protected:
   void returnUnauthorized();
 
  private:
-  static bool dummyBasicAuthFn(const std::string&) { return false; }
-  static bool dummyNegotiateAuthFn(const std::string&, bool*) { return false; }
-  static std::vector<std::string> dummyReturnHeadersFn() {
-    return std::vector<std::string>();
-  }
-
   // If either of the following is true, a '401 - Unauthorized' will be returned to the
   // client on requests that do not contain a valid 'Authorization' header. If 'has_ldap_'
   // is true, 'Basic' auth headers will be processed, and if 'has_kerberos_' is true
@@ -81,12 +85,7 @@ protected:
   bool has_ldap_ = false;
   bool has_kerberos_ = false;
 
-  // Called with the base64 encoded authorization from a 'Authorization: Basic' header.
-  BasicAuthFn basic_auth_fn_ = &dummyBasicAuthFn;
-  // Called with the value from a 'Authorization: Negotiate' header.
-  NegotiateAuthFn negotiate_auth_fn_ = &dummyNegotiateAuthFn;
-  // Called during flush() to get additional headers to return.
-  ReturnHeadersFn return_headers_fn_ = &dummyReturnHeadersFn;
+  HttpCallbacks callbacks_;
 
   // The value from the 'Authorization' header.
   std::string auth_value_ = "";
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
index f41f27a..be6b20d 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
@@ -53,14 +53,22 @@ public class LdapImpalaShellTest {
   // These correspond to the values in fe/src/test/resources/users.ldif
   private static final String testUser_ = "Test1Ldap";
   private static final String testPassword_ = "12345";
+  private static final String testUser2_ = "Test2Ldap";
+  private static final String testPassword2_ = "abcde";
+
+  // The cluster will be set up to allow testUser_ to act as a proxy for delegateUser_.
+  // Includes a special character to test HTTP path encoding.
+  private static final String delegateUser_ = "proxyUser$";
 
   @Before
   public void setUp() throws Exception {
     String uri =
         String.format("ldap://localhost:%s", serverRule.getLdapServer().getPort());
     String dn = "cn=#UID,ou=Users,dc=myorg,dc=com";
-    String ldapArgs = String.format("--enable_ldap_auth --ldap_uri='%s' "
-        + "--ldap_bind_pattern='%s' --ldap_passwords_in_clear_ok", uri, dn);
+    String ldapArgs = String.format(
+        "--enable_ldap_auth --ldap_uri='%s' --ldap_bind_pattern='%s' " +
+        "--ldap_passwords_in_clear_ok --authorized_proxy_user_config=%s=%s",
+        uri, dn, testUser_, delegateUser_);
     int ret = CustomClusterRunner.StartImpalaCluster(ldapArgs);
     assertEquals(ret, 0);
   }
@@ -74,32 +82,34 @@ public class LdapImpalaShellTest {
    * Helper to run a shell command 'cmd'. If 'shouldSucceed' is true, the command
    * is expected to succeed, failure otherwise.
    */
-  private void runShellCommand(String[] cmd, boolean shouldSucceed) throws Exception {
+  private void runShellCommand(String[] cmd, boolean shouldSucceed, String expectedOut,
+      String expectedErr) throws Exception {
     Runtime rt = Runtime.getRuntime();
     Process process = rt.exec(cmd);
     // Collect the stderr.
     BufferedReader input = new BufferedReader(
         new InputStreamReader(process.getErrorStream()));
-    StringBuffer stderr = new StringBuffer();
+    StringBuffer stderrBuf = new StringBuffer();
     String line;
     while ((line = input.readLine()) != null) {
-      stderr.append(line);
-      stderr.append('\n');
+      stderrBuf.append(line);
+      stderrBuf.append('\n');
     }
+    String stderr = stderrBuf.toString();
+    assertTrue(stderr, stderr.contains(expectedErr));
     // Collect the stdout (which has the resultsets).
     input = new BufferedReader(new InputStreamReader(process.getInputStream()));
-    StringBuffer stdout = new StringBuffer();
+    StringBuffer stdoutBuf = new StringBuffer();
     while ((line = input.readLine()) != null) {
-      stdout.append(line);
-      stdout.append('\n');
+      stdoutBuf.append(line);
+      stdoutBuf.append('\n');
     }
     int expectedReturn = shouldSucceed ? 0 : 1;
     assertEquals(stderr.toString(), expectedReturn, process.waitFor());
-    // If the query succeeds, assert that the output contains the correct
-    // username.
+    // If the query succeeds, assert that the output is correct.
     if (shouldSucceed) {
-      String temp = stdout.toString();
-      assertTrue(temp, temp.contains(testUser_));
+      String stdout = stdoutBuf.toString();
+      assertTrue(stdout, stdout.contains(expectedOut));
     }
   }
 
@@ -127,11 +137,56 @@ public class LdapImpalaShellTest {
     for (String protocol: protocolsToTest) {
       protocol = String.format(protocolTemplate, protocol);
       validCommand[1] = protocol;
-      runShellCommand(validCommand, /*shouldSucceed*/ true);
+      runShellCommand(validCommand, /*shouldSucceed*/ true, testUser_,
+          "Starting Impala Shell using LDAP-based authentication");
       invalidCommand[1] = protocol;
-      runShellCommand(invalidCommand, /*shouldSucceed*/ false);
+      runShellCommand(
+          invalidCommand, /*shouldSucceed*/ false, "", "Not connected to Impala");
       commandWithoutAuth[1] = protocol;
-      runShellCommand(commandWithoutAuth, /*shouldSucceed*/ false);
+      runShellCommand(
+          commandWithoutAuth, /*shouldSucceed*/ false, "", "Not connected to Impala");
     }
   }
+
+  private String[] buildCommand(
+      String query, String protocol, String user, String password, String httpPath) {
+    String[] command = {"impala-shell.sh", "--protocol=" + protocol, "--ldap",
+        "--auth_creds_ok_in_clear", "--user=" + user,
+        "--ldap_password_cmd=printf " + password, "--query=" + query,
+        "--http_path=" + httpPath};
+    return command;
+  }
+
+  /**
+   * Tests user impersonation over the HTTP protocol by using the HTTP path to specify the
+   * 'doAs' parameter.
+   */
+  @Test
+  public void testHttpImpersonation() throws Exception {
+    String invalidDelegateUser = "invalid-delegate-user";
+    String query = "select logged_in_user()";
+    String errTemplate = "User '%s' is not authorized to delegate to '%s'";
+
+    // Run with an invalid proxy user.
+    String[] command = buildCommand(
+        query, "hs2-http", testUser2_, testPassword2_, "/?doAs=" + delegateUser_);
+    runShellCommand(command, /* shouldSucceed */ false, "",
+        String.format(errTemplate, testUser2_, delegateUser_));
+
+    // Run with a valid proxy user but invalid delegate user.
+    command = buildCommand(
+        query, "hs2-http", testUser_, testPassword_, "/?doAs=" + invalidDelegateUser);
+    runShellCommand(command, /* shouldSucceed */ false, "",
+        String.format(errTemplate, testUser_, invalidDelegateUser));
+
+    // 'doAs' parameter that cannot be decoded.
+    command = buildCommand(
+        query, "hs2-http", testUser_, testPassword_, "/?doAs=%");
+    runShellCommand(command, /* shouldSucceed */ false, "", "Not connected to Impala");
+
+    // Successfully delegate.
+    command = buildCommand(
+        query, "hs2-http", testUser_, testPassword_, "/?doAs=" + delegateUser_);
+    runShellCommand(command, /* shouldSucceed */ true, delegateUser_, "");
+  }
 }


[impala] 01/04: Create ranger cache directory in containers.

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6a31be8dd748972b2bfbe4544904cc85edc77923
Author: Bharath Vissapragada <bh...@cloudera.com>
AuthorDate: Mon Aug 5 12:18:39 2019 -0700

    Create ranger cache directory in containers.
    
    Create a ranger cache directory used by ranger clients when ranger
    is enabled. For simplicity, it is added to the base image. It is
    used only on the coordinators/catalogd.
    
    Change-Id: Iad134636e1566a44acf7b010e6b6067a972798c6
    Reviewed-on: http://gerrit.cloudera.org:8080/14007
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 docker/impala_base/Dockerfile | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/docker/impala_base/Dockerfile b/docker/impala_base/Dockerfile
index d43cc7a..46ad011 100644
--- a/docker/impala_base/Dockerfile
+++ b/docker/impala_base/Dockerfile
@@ -44,6 +44,8 @@ RUN cd /opt/impala/bin && ln -s impalad statestored && ln -s impalad catalogd &&
 # Create conf directory for later config injection.
     mkdir /opt/impala/conf && \
 # Create logs directory to collect container logs.
-    mkdir /opt/impala/logs
+    mkdir /opt/impala/logs && \
+# Create ranger cache directory that is used when ranger is enabled.
+    mkdir /opt/impala/rangercache
 
 WORKDIR /opt/impala/


[impala] 03/04: IMPALA-8781: Result spooling tests to cover edge cases and cancellation

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bbec8fa74961755269298706302477780019e7d5
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Jul 22 11:27:33 2019 -0700

    IMPALA-8781: Result spooling tests to cover edge cases and cancellation
    
    Adds additional tests to test_result_spooling.py to cover various edge
    cases when fetching query results (ensure all Impala types are returned
    properly, UDFs are evaluated correctly, etc.). A new QueryTest file
    result-spooling.test is added to encapsulate all these tests. Tests with
    a decreased ROW_BATCH_SIZE are added as well to validate that
    BufferedPlanRootSink buffers row batches correctly.
    
    BufferedPlanRootSink requires careful synchronization of the producer
    and consumer threads, especially when queries are cancelled. The
    TestResultSpoolingCancellation class is dedicated to running
    cancellation tests with SPOOL_QUERY_RESULTS = true. The implementation
    is heavily borrowed from test_cancellation.py and some of the logic is
    re-factored into a new utility class called cancel_utils.py to avoid
    code duplication between test_cancellation.py and
    test_result_spooling.py.
    
    Testing:
    * Looped test_result_spooling.py overnight with no failures
    * Core tests passed
    
    Change-Id: Ib3b3a1539c4a5fa9b43c8ca315cea16c9701e283
    Reviewed-on: http://gerrit.cloudera.org:8080/13907
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../queries/QueryTest/result-spooling.test         | 113 +++++++++++++++++++++
 tests/hs2/test_fetch_first.py                      |  19 +++-
 tests/query_test/test_cancellation.py              |  58 +----------
 tests/query_test/test_result_spooling.py           | 101 ++++++++++++++++--
 tests/util/cancel_util.py                          |  87 ++++++++++++++++
 5 files changed, 314 insertions(+), 64 deletions(-)

diff --git a/testdata/workloads/functional-query/queries/QueryTest/result-spooling.test b/testdata/workloads/functional-query/queries/QueryTest/result-spooling.test
new file mode 100644
index 0000000..4a80805
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/result-spooling.test
@@ -0,0 +1,113 @@
+====
+---- QUERY
+# Validate reading all types from a table when result spooling is enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select * from alltypes order by id limit 10
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+2,true,2,2,2,20,2.200000047683716,20.2,'01/01/09','2',2009-01-01 00:02:00.100000000,2009,1
+3,false,3,3,3,30,3.299999952316284,30.3,'01/01/09','3',2009-01-01 00:03:00.300000000,2009,1
+4,true,4,4,4,40,4.400000095367432,40.4,'01/01/09','4',2009-01-01 00:04:00.600000000,2009,1
+5,false,5,5,5,50,5.5,50.5,'01/01/09','5',2009-01-01 00:05:00.100000000,2009,1
+6,true,6,6,6,60,6.599999904632568,60.59999999999999,'01/01/09','6',2009-01-01 00:06:00.150000000,2009,1
+7,false,7,7,7,70,7.699999809265137,70.7,'01/01/09','7',2009-01-01 00:07:00.210000000,2009,1
+8,true,8,8,8,80,8.800000190734863,80.8,'01/01/09','8',2009-01-01 00:08:00.280000000,2009,1
+9,false,9,9,9,90,9.899999618530273,90.89999999999999,'01/01/09','9',2009-01-01 00:09:00.360000000,2009,1
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Validates that column ordering is preserved when result spooling is enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select month, year, timestamp_col, string_col, date_string_col, double_col, float_col,
+bigint_col, int_col, smallint_col, tinyint_col, bool_col, id from alltypes
+order by id limit 10
+---- RESULTS
+1,2009,2009-01-01 00:00:00,'0','01/01/09',0,0,0,0,0,0,true,0
+1,2009,2009-01-01 00:01:00,'1','01/01/09',10.1,1.100000023841858,10,1,1,1,false,1
+1,2009,2009-01-01 00:02:00.100000000,'2','01/01/09',20.2,2.200000047683716,20,2,2,2,true,2
+1,2009,2009-01-01 00:03:00.300000000,'3','01/01/09',30.3,3.299999952316284,30,3,3,3,false,3
+1,2009,2009-01-01 00:04:00.600000000,'4','01/01/09',40.4,4.400000095367432,40,4,4,4,true,4
+1,2009,2009-01-01 00:05:00.100000000,'5','01/01/09',50.5,5.5,50,5,5,5,false,5
+1,2009,2009-01-01 00:06:00.150000000,'6','01/01/09',60.59999999999999,6.599999904632568,60,6,6,6,true,6
+1,2009,2009-01-01 00:07:00.210000000,'7','01/01/09',70.7,7.699999809265137,70,7,7,7,false,7
+1,2009,2009-01-01 00:08:00.280000000,'8','01/01/09',80.8,8.800000190734863,80,8,8,8,true,8
+1,2009,2009-01-01 00:09:00.360000000,'9','01/01/09',90.89999999999999,9.899999618530273,90,9,9,9,false,9
+---- TYPES
+INT,INT,TIMESTAMP,STRING,STRING,DOUBLE,FLOAT,BIGINT,INT,SMALLINT,TINYINT,BOOLEAN,INT
+====
+---- QUERY
+# Validates that UDFs are properly evaluated when result spooling is enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select abs(id), abs(int_col) + abs(int_col) from alltypes order by id limit 10
+---- RESULTS
+0,0
+1,2
+2,4
+3,6
+4,8
+5,10
+6,12
+7,14
+8,16
+9,18
+---- TYPES
+BIGINT,BIGINT
+====
+---- QUERY
+# Validates that queries with Agg nodes return correct results when result spooling is
+# enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select avg(int_col) + avg(bigint_col) from alltypes
+---- RESULTS
+49.5
+---- TYPES
+DOUBLE
+====
+---- QUERY
+# Validates that queries with Join nodes return correct results when result spooling is
+# enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select j.test_name, d.name from jointbl j inner join dimtbl d on
+(j.test_id = d.id)
+---- RESULTS
+'Name1','Name1'
+'Name2','Name2'
+'Name3','Name3'
+'Name4','Name4'
+'Name5','Name5'
+'Name16','Name6'
+'Name6','Name6'
+'Name16','Name6'
+'Name16','Name6'
+'Name6','Name6'
+'Name16','Name6'
+---- TYPES
+STRING,STRING
+====
+---- QUERY
+# Validates that NULLs are properly handled when result spooling is enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select * from nullrows order by id limit 10
+---- RESULTS
+'a','','NULL',NULL,NULL,'a','a',true
+'b','','NULL',NULL,NULL,'a','NULL',false
+'c','','NULL',NULL,NULL,'a','NULL',NULL
+'d','','NULL',NULL,NULL,'a','NULL',NULL
+'e','','NULL',NULL,NULL,'a','NULL',NULL
+'f','','NULL',NULL,NULL,'f','f',true
+'g','','NULL',NULL,NULL,'f','NULL',false
+'h','','NULL',NULL,NULL,'f','NULL',NULL
+'i','','NULL',NULL,NULL,'f','NULL',NULL
+'j','','NULL',NULL,NULL,'f','NULL',NULL
+---- TYPES
+STRING,STRING,STRING,INT,DOUBLE,STRING,STRING,BOOLEAN
+====
+---- QUERY
+SET SPOOL_QUERY_RESULTS=true;
+select * from emptytable;
+---- RESULTS
+---- TYPES
+STRING,INT
+====
diff --git a/tests/hs2/test_fetch_first.py b/tests/hs2/test_fetch_first.py
index 12451bb..42feee8 100644
--- a/tests/hs2/test_fetch_first.py
+++ b/tests/hs2/test_fetch_first.py
@@ -104,14 +104,26 @@ class TestFetchFirst(HS2TestSuite):
   @pytest.mark.execute_serially
   @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6)
   def test_query_stmts_v6(self):
-    self.run_query_stmts_test();
+    self.run_query_stmts_test()
 
   @pytest.mark.execute_serially
   @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
   def test_query_stmts_v1(self):
-    self.run_query_stmts_test();
+    self.run_query_stmts_test()
 
-  def run_query_stmts_test(self):
+  @pytest.mark.xfail(reason="Unsupported until IMPALA-8819 is completed")
+  @pytest.mark.execute_serially
+  @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6)
+  def test_query_stmts_v6_with_result_spooling(self):
+    self.run_query_stmts_test({'spool_query_results': 'true'})
+
+  @pytest.mark.xfail(reason="Unsupported until IMPALA-8819 is completed")
+  @pytest.mark.execute_serially
+  @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
+  def test_query_stmts_v1_with_result_spooling(self):
+    self.run_query_stmts_test({'spool_query_results': 'true'})
+
+  def run_query_stmts_test(self, conf_overlay=dict()):
     """Tests Impala's limited support for the FETCH_FIRST fetch orientation for queries.
     Impala permits FETCH_FIRST for a particular query iff result caching is enabled
     via the 'impala.resultset.cache.size' confOverlay option. FETCH_FIRST will succeed as
@@ -125,6 +137,7 @@ class TestFetchFirst(HS2TestSuite):
     execute_statement_req = TCLIService.TExecuteStatementReq()
     execute_statement_req.sessionHandle = self.session_handle
     execute_statement_req.confOverlay = dict()
+    execute_statement_req.confOverlay.update(conf_overlay)
     execute_statement_req.statement =\
       "SELECT * FROM functional.alltypessmall ORDER BY id LIMIT 30"
     execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index ff21a7a..2fa5f03 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -25,6 +25,7 @@ from RuntimeProfile.ttypes import TRuntimeProfileFormat
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.test_vector import ImpalaTestDimension
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.util.cancel_util import cancel_query_and_validate_state
 from tests.verifiers.metric_verifier import MetricVerifier
 
 # PRIMARY KEY for lineitem
@@ -135,7 +136,6 @@ class TestCancellation(ImpalaTestSuite):
         query = "create table ctas_cancel stored as %sfile as %s" %\
             (file_format, query)
 
-    join_before_close = vector.get_value('join_before_close')
     wait_action = vector.get_value('wait_action')
     fail_rpc_action = vector.get_value('fail_rpc_action')
 
@@ -148,63 +148,13 @@ class TestCancellation(ImpalaTestSuite):
 
     # Execute the query multiple times, cancelling it each time.
     for i in xrange(NUM_CANCELATION_ITERATIONS):
-      handle = self.execute_query_async(query, vector.get_value('exec_option'),
-                                        table_format=vector.get_value('table_format'))
-
-      def fetch_results():
-        threading.current_thread().fetch_results_error = None
-        threading.current_thread().query_profile = None
-        try:
-          new_client = self.create_impala_client()
-          new_client.fetch(query, handle)
-        except ImpalaBeeswaxException as e:
-          threading.current_thread().fetch_results_error = e
-
-      thread = threading.Thread(target=fetch_results)
-      thread.start()
-
-      sleep(vector.get_value('cancel_delay'))
-      assert self.client.get_state(handle) != self.client.QUERY_STATES['EXCEPTION']
-      cancel_result = self.client.cancel(handle)
-      assert cancel_result.status_code == 0,\
-          'Unexpected status code from cancel request: %s' % cancel_result
-
-      if join_before_close:
-        thread.join()
-
-      close_error = None
-      try:
-        self.client.close_query(handle)
-      except ImpalaBeeswaxException as e:
-        close_error = e
-
-      # Before accessing fetch_results_error we need to join the fetch thread
-      thread.join()
-
-      if thread.fetch_results_error is None:
-        # If the fetch rpc didn't result in CANCELLED (and auto-close the query) then
-        # the close rpc should have succeeded.
-        assert close_error is None
-      elif close_error is None:
-        # If the close rpc succeeded, then the fetch rpc should have either succeeded,
-        # failed with 'Cancelled' or failed with 'Invalid query handle' (if the close
-        # rpc occured before the fetch rpc).
-        if thread.fetch_results_error is not None:
-          assert 'Cancelled' in str(thread.fetch_results_error) or \
-            ('Invalid query handle' in str(thread.fetch_results_error) \
-             and not join_before_close)
-      else:
-        # If the close rpc encountered an exception, then it must be due to fetch
-        # noticing the cancellation and doing the auto-close.
-        assert 'Invalid or unknown query handle' in str(close_error)
-        assert 'Cancelled' in str(thread.fetch_results_error)
+      cancel_query_and_validate_state(self.client, query,
+          vector.get_value('exec_option'), vector.get_value('table_format'),
+          vector.get_value('cancel_delay'), vector.get_value('join_before_close'))
 
       if query_type == "CTAS":
         self.cleanup_test_table(vector.get_value('table_format'))
 
-      # TODO: Add some additional verification to check to make sure the query was
-      # actually canceled
-
     # Executing the same query without canceling should work fine. Only do this if the
     # query has a limit or aggregation
     if not debug_action and ('count' in query or 'limit' in query):
diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py
index 40b55b6..470ad45 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -15,18 +15,105 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from time import sleep
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_vector import ImpalaTestDimension
+from tests.util.cancel_util import cancel_query_and_validate_state
+
+# Queries to execute, use the TPC-H dataset because tables are large so queries take some
+# time to execute.
+CANCELLATION_QUERIES = ['select l_returnflag from tpch_parquet.lineitem',
+                        'select * from tpch_parquet.lineitem limit 50',
+                        'select * from tpch_parquet.lineitem order by l_orderkey']
+
+# Time to sleep between issuing query and canceling.
+CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4]
 
 
 class TestResultSpooling(ImpalaTestSuite):
   @classmethod
+  def add_test_dimensions(cls):
+    super(TestResultSpooling, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet')
+
+  @classmethod
   def get_workload(cls):
     return 'functional-query'
 
-  def test_result_spooling(self):
-    """Tests that setting SPOOL_QUERY_RESULTS = true for simple queries returns the
-    correct number of results."""
-    query_opts = {"spool_query_results": "true"}
-    query = "select * from functional.alltypes limit 10"
-    result = self.execute_query_expect_success(self.client, query, query_opts)
-    assert(len(result.data) == 10)
+  def test_result_spooling(self, vector):
+    self.run_test_case('QueryTest/result-spooling', vector)
+
+  def test_multi_batches(self, vector):
+    """Validates that reading multiple row batches works when result spooling is
+    enabled."""
+    vector.get_value('exec_option')['batch_size'] = 10
+    self.validate_query("select id from alltypes order by id limit 1000",
+        vector.get_value('exec_option'))
+
+  def validate_query(self, query, exec_options):
+    """Compares the results of the given query with and without result spooling
+    enabled."""
+    exec_options = exec_options.copy()
+    result = self.execute_query(query, exec_options)
+    assert result.success, "Failed to run {0} when result spooling is " \
+                           "disabled".format(query)
+    base_data = result.data
+    exec_options['spool_query_results'] = 'true'
+    result = self.execute_query(query, exec_options)
+    assert result.success, "Failed to run {0} when result spooling is " \
+                           "enabled".format(query)
+    assert len(result.data) == len(base_data), "{0} returned a different number of " \
+                                               "results when result spooling was " \
+                                               "enabled".format(query)
+    assert result.data == base_data, "{0} returned different results when result " \
+                                     "spooling was enabled".format(query)
+
+
+class TestResultSpoolingCancellation(ImpalaTestSuite):
+  """Test cancellation of queries when result spooling is enabled. This class heavily
+  borrows from the cancellation tests in test_cancellation.py. It uses the following test
+  dimensions: 'query' and 'cancel_delay'. 'query' is a list of queries to run
+  asynchronously and then cancel. 'cancel_delay' controls how long a query should run
+  before being cancelled.
+  """
+
+  @classmethod
+  def get_workload(cls):
+    return 'tpch'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestResultSpoolingCancellation, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('query',
+        *CANCELLATION_QUERIES))
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('cancel_delay',
+        *CANCEL_DELAY_IN_SECONDS))
+
+    # Result spooling should be independent of file format, so only testing for
+    # table_format=parquet/none in order to avoid a test dimension explosion.
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet' and
+        v.get_value('table_format').compression_codec == 'none')
+
+  def test_cancellation(self, vector):
+    vector.get_value('exec_option')['spool_query_results'] = 'true'
+    cancel_query_and_validate_state(self.client, vector.get_value('query'),
+        vector.get_value('exec_option'), vector.get_value('table_format'),
+        vector.get_value('cancel_delay'))
+
+  def test_cancel_no_fetch(self, vector):
+    """Test cancelling a query before any results are fetched. Unlike the
+    test_cancellation test, the query is cancelled before results are
+    fetched (there is no fetch thread)."""
+    vector.get_value('exec_option')['spool_query_results'] = 'true'
+    handle = None
+    try:
+      handle = self.execute_query_async(vector.get_value('query'),
+          vector.get_value('exec_option'))
+      sleep(vector.get_value('cancel_delay'))
+      cancel_result = self.client.cancel(handle)
+      assert cancel_result.status_code == 0,\
+          'Unexpected status code from cancel request: {0}'.format(cancel_result)
+    finally:
+      if handle: self.client.close_query(handle)
diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py
new file mode 100644
index 0000000..5c08d6a
--- /dev/null
+++ b/tests/util/cancel_util.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import threading
+from time import sleep
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+def cancel_query_and_validate_state(client, query, exec_option, table_format,
+    cancel_delay, join_before_close=False):
+  """Runs the given query asynchronously and then cancels it after the specified delay.
+  The query is run with the given 'exec_options' against the specified 'table_format'. A
+  separate async thread is launched to fetch the results of the query. The method
+  validates that the query was successfully cancelled and that the error messages for the
+  calls to ImpalaConnection#fetch and #close are consistent. If 'join_before_close' is
+  True the method will join against the fetch results thread before closing the query.
+  """
+  if exec_option: client.set_configuration(exec_option)
+  if table_format: ImpalaTestSuite.change_database(client, table_format)
+  handle = client.execute_async(query)
+
+  thread = threading.Thread(target=__fetch_results, args=(query, handle))
+  thread.start()
+
+  sleep(cancel_delay)
+  assert client.get_state(handle) != client.QUERY_STATES['EXCEPTION']
+  cancel_result = client.cancel(handle)
+  assert cancel_result.status_code == 0,\
+      'Unexpected status code from cancel request: %s' % cancel_result
+
+  if join_before_close:
+    thread.join()
+
+  close_error = None
+  try:
+    client.close_query(handle)
+  except ImpalaBeeswaxException as e:
+    close_error = e
+
+  # Before accessing fetch_results_error we need to join the fetch thread
+  thread.join()
+
+  if thread.fetch_results_error is None:
+    # If the fetch rpc didn't result in CANCELLED (and auto-close the query) then
+    # the close rpc should have succeeded.
+    assert close_error is None
+  elif close_error is None:
+    # If the close rpc succeeded, then the fetch rpc should have either succeeded,
+    # failed with 'Cancelled' or failed with 'Invalid query handle' (if the close
+    # rpc occured before the fetch rpc).
+    if thread.fetch_results_error is not None:
+      assert 'Cancelled' in str(thread.fetch_results_error) or \
+        ('Invalid query handle' in str(thread.fetch_results_error)
+         and not join_before_close)
+  else:
+    # If the close rpc encountered an exception, then it must be due to fetch
+    # noticing the cancellation and doing the auto-close.
+    assert 'Invalid or unknown query handle' in str(close_error)
+    assert 'Cancelled' in str(thread.fetch_results_error)
+
+  # TODO: Add some additional verification to check to make sure the query was
+  # actually canceled
+
+
+def __fetch_results(query, handle):
+  threading.current_thread().fetch_results_error = None
+  threading.current_thread().query_profile = None
+  try:
+    new_client = ImpalaTestSuite.create_impala_client()
+    new_client.fetch(query, handle)
+  except ImpalaBeeswaxException as e:
+    threading.current_thread().fetch_results_error = e


[impala] 02/04: IMPALA-8806: Add metrics to improve observability of executor groups

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b5193f36de05218f90581d29dd71d7e570e38b46
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Mon Jul 29 16:58:11 2019 -0700

    IMPALA-8806: Add metrics to improve observability of executor groups
    
    This patch adds 3 metrics under a new metric group called
    "cluster-membership" that keep track of the number of executor groups
    that have at least one live executor, number of executor groups that are
    in a healthy state and the number of backends registered with the
    statestore.
    
    Testing:
    Modified tests to use these metrics for verification.
    
    Change-Id: I7745ea1c7c6778d3fb5e59adbc873697beb0f3b9
    Reviewed-on: http://gerrit.cloudera.org:8080/13979
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/exec-env.cc                       |  2 +-
 be/src/scheduling/cluster-membership-mgr-test.cc | 12 +++--
 be/src/scheduling/cluster-membership-mgr.cc      | 38 +++++++++++--
 be/src/scheduling/cluster-membership-mgr.h       | 13 ++++-
 be/src/scheduling/scheduler-test-util.cc         |  8 +--
 be/src/service/impala-server.cc                  |  8 ++-
 common/thrift/metrics.json                       | 29 ++++++++++
 tests/custom_cluster/test_auto_scaling.py        | 68 ++++++++++++++----------
 tests/custom_cluster/test_executor_groups.py     | 47 +++++++++++++---
 9 files changed, 177 insertions(+), 48 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 44ca63a..53a231f 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -217,7 +217,7 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
   }
 
   cluster_membership_mgr_.reset(new ClusterMembershipMgr(
-      statestore_subscriber_->id(), statestore_subscriber_.get()));
+      statestore_subscriber_->id(), statestore_subscriber_.get(), metrics_.get()));
 
   admission_controller_.reset(
       new AdmissionController(cluster_membership_mgr_.get(), statestore_subscriber_.get(),
diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc
index f196cc3..e3e24d1 100644
--- a/be/src/scheduling/cluster-membership-mgr-test.cc
+++ b/be/src/scheduling/cluster-membership-mgr-test.cc
@@ -26,6 +26,7 @@
 #include "service/impala-server.h"
 #include "testutil/gtest-util.h"
 #include "testutil/rand-util.h"
+#include "util/metrics.h"
 
 using std::mt19937;
 using std::uniform_int_distribution;
@@ -69,6 +70,7 @@ class ClusterMembershipMgrTest : public testing::Test {
   /// A struct to hold information related to a simulated backend during the test.
   struct Backend {
     string backend_id;
+    std::unique_ptr<MetricGroup> metric_group;
     std::unique_ptr<ClusterMembershipMgr> cmm;
     std::shared_ptr<TBackendDescriptor> desc;
   };
@@ -175,7 +177,9 @@ class ClusterMembershipMgrTest : public testing::Test {
   /// this method.
   void CreateCMM(Backend* be) {
     ASSERT_TRUE(IsInVector(be, offline_));
-    be->cmm = make_unique<ClusterMembershipMgr>(be->backend_id, nullptr);
+    be->metric_group = make_unique<MetricGroup>("test");
+    be->cmm = make_unique<ClusterMembershipMgr>(
+        be->backend_id, nullptr, be->metric_group.get());
     RemoveFromVector(be, &offline_);
     starting_.push_back(be);
   }
@@ -268,8 +272,10 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   auto b1 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(1));
   auto b2 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(2));
 
-  ClusterMembershipMgr cmm1(b1->address.hostname, nullptr);
-  ClusterMembershipMgr cmm2(b2->address.hostname, nullptr);
+  MetricGroup tmp_metrics1("test-metrics1");
+  MetricGroup tmp_metrics2("test-metrics2");
+  ClusterMembershipMgr cmm1(b1->address.hostname, nullptr, &tmp_metrics1);
+  ClusterMembershipMgr cmm2(b2->address.hostname, nullptr, &tmp_metrics2);
 
   const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, TTopicDelta()}};
diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
index fe3f98c..e81905e 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -19,6 +19,7 @@
 
 #include "common/logging.h"
 #include "common/names.h"
+#include "util/metrics.h"
 #include "util/test-info.h"
 
 namespace {
@@ -44,12 +45,22 @@ ExecutorGroup* FindOrInsertExecutorGroup(const TExecutorGroupDesc& group,
 
 namespace impala {
 
-ClusterMembershipMgr::ClusterMembershipMgr(string local_backend_id,
-    StatestoreSubscriber* subscriber) :
-    current_membership_(std::make_shared<const Snapshot>()),
+static const string LIVE_EXEC_GROUP_KEY("cluster-membership.executor-groups.total");
+static const string HEALTHY_EXEC_GROUP_KEY(
+    "cluster-membership.executor-groups.total-healthy");
+static const string TOTAL_BACKENDS_KEY("cluster-membership.backends.total");
+
+ClusterMembershipMgr::ClusterMembershipMgr(
+    string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup* metrics)
+  : current_membership_(std::make_shared<const Snapshot>()),
     statestore_subscriber_(subscriber),
     thrift_serializer_(/* compact= */ false),
     local_backend_id_(move(local_backend_id)) {
+  DCHECK(metrics != nullptr);
+  MetricGroup* metric_grp = metrics->GetOrCreateChildGroup("cluster-membership");
+  total_live_executor_groups_ = metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY, 0);
+  total_healthy_executor_groups_ = metric_grp->AddCounter(HEALTHY_EXEC_GROUP_KEY, 0);
+  total_backends_ = metric_grp->AddCounter(TOTAL_BACKENDS_KEY, 0);
 }
 
 Status ClusterMembershipMgr::Init() {
@@ -311,6 +322,8 @@ void ClusterMembershipMgr::UpdateMembership(
     DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
   }
 
+  UpdateMetrics(*new_backend_map, *new_executor_groups);
+
   // Don't send updates or update the current membership if the statestore is in its
   // post-recovery grace period.
   if (ss_is_recovering) {
@@ -527,4 +540,23 @@ bool ClusterMembershipMgr::CheckConsistency(const BackendIdMap& current_backends
   return true;
 }
 
+void ClusterMembershipMgr::UpdateMetrics(
+    const BackendIdMap& current_backends, const ExecutorGroups& executor_groups) {
+  int total_live_executor_groups = 0;
+  int total_healthy_executor_groups = 0;
+  for (const auto& group_it : executor_groups) {
+    const ExecutorGroup& group = group_it.second;
+    if (group.IsHealthy()) {
+      total_live_executor_groups++;
+      total_healthy_executor_groups++;
+    } else if (group.NumHosts() > 0) {
+      total_live_executor_groups++;
+    }
+  }
+  DCHECK_GE(total_live_executor_groups, total_healthy_executor_groups);
+  total_live_executor_groups_->SetValue(total_live_executor_groups);
+  total_healthy_executor_groups_->SetValue(total_healthy_executor_groups);
+  total_backends_->SetValue(current_backends.size());
+}
+
 } // end namespace impala
diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h
index ec11af3..8a09df0 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -32,6 +32,7 @@
 #include "scheduling/executor-group.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/container-util.h"
+#include "util/metrics-fwd.h"
 
 namespace impala {
 
@@ -127,7 +128,8 @@ class ClusterMembershipMgr {
   /// locks are held when calling this callback.
   typedef std::function<Status(const TUpdateExecutorMembershipRequest&)> UpdateFrontendFn;
 
-  ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* subscriber);
+  ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* subscriber,
+      MetricGroup* metrics);
 
   /// Initializes instances of this class. This only sets up the statestore subscription.
   /// Callbacks to the local ImpalaServer and Frontend must be registered in separate
@@ -205,11 +207,20 @@ class ClusterMembershipMgr {
   bool CheckConsistency(const BackendIdMap& current_backends,
       const ExecutorGroups& executor_groups, const ExecutorBlacklist& executor_blacklist);
 
+  /// Updates the membership metrics.
+  void UpdateMetrics(const BackendIdMap& current_backends,
+      const ExecutorGroups& executor_groups);
+
   /// Ensures that only one thread is processing a membership update at a time, either
   /// from a statestore update or a blacklisting decision. Must be taken before any other
   /// locks in this class.
   boost::mutex update_membership_lock_;
 
+  /// Membership metrics
+  IntCounter* total_live_executor_groups_ = nullptr;
+  IntCounter* total_healthy_executor_groups_ = nullptr;
+  IntCounter* total_backends_ = nullptr;
+
   /// The snapshot of the current cluster membership. When receiving changes to the
   /// executors configuration from the statestore we will make a copy of the stored
   /// object, apply the updates to the copy and atomically swap the contents of this
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 09a87fb..e0cd7f3 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -694,10 +694,10 @@ void SchedulerWrapper::InitializeScheduler() {
                                            << "hosts.";
   const Host& scheduler_host = plan_.cluster().hosts()[0];
   string scheduler_backend_id = scheduler_host.ip;
-  cluster_membership_mgr_.reset(new ClusterMembershipMgr(scheduler_backend_id, nullptr));
-  cluster_membership_mgr_->SetLocalBeDescFn([scheduler_host]() {
-      return BuildBackendDescriptor(scheduler_host);
-  });
+  cluster_membership_mgr_.reset(
+      new ClusterMembershipMgr(scheduler_backend_id, nullptr, &metrics_));
+  cluster_membership_mgr_->SetLocalBeDescFn(
+      [scheduler_host]() { return BuildBackendDescriptor(scheduler_host); });
   Status status = cluster_membership_mgr_->Init();
   DCHECK(status.ok()) << "Cluster membership manager init failed in test";
   scheduler_.reset(new Scheduler(&metrics_, nullptr));
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d2f44d4..511e3c5 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -242,8 +242,12 @@ DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and co
     "queries from clients. If false, it will refuse client connections.");
 DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query "
     "fragments.");
-DEFINE_string(executor_groups, "", "List of executor groups, separated by comma. "
-    "Currently only a single group may be specified.");
+DEFINE_string(executor_groups, "",
+    "List of executor groups, separated by comma. Each executor group specification can "
+    "optionally contain a minimum size, separated by a ':', e.g. --executor_groups "
+    "default-pool-1:3. Default minimum size is 1. Only when the cluster membership "
+    "contains at least that number of executors for the group will it be considered "
+    "healthy for admission. Currently only a single group may be specified.");
 
 // TODO: can we automatically choose a startup grace period based on the max admission
 // control queue timeout + some margin for error?
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 2a0ee30..bd30674 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2441,5 +2441,34 @@
     "units": "NONE",
     "kind": "GAUGE",
     "key": "events-processor.events-received-15min-rate"
+  },
+  {
+    "description": "Total number of executor groups that have at least one executor",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of executor groups that have at least one executor",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.executor-groups.total"
+  },
+  {
+    "description": "Total number of executor groups that are in a healthy state, that is, have at least the configured minimum number of executors to be considered for admission",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of executor groups that are in a healthy state",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.executor-groups.total-healthy"
+  },{
+    "description": "Total number of backends registered with the statestore",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of backends registered with the statestore",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.backends.total"
   }
 ]
diff --git a/tests/custom_cluster/test_auto_scaling.py b/tests/custom_cluster/test_auto_scaling.py
index 3c5ecca..2bc4600 100644
--- a/tests/custom_cluster/test_auto_scaling.py
+++ b/tests/custom_cluster/test_auto_scaling.py
@@ -42,8 +42,8 @@ class TestAutoScaling(CustomClusterTestSuite):
   def _get_total_admitted_queries(self):
     return self.impalad_test_service.get_total_admitted_queries("default-pool")
 
-  def _get_num_executors(self):
-    return self.impalad_test_service.get_num_known_live_backends(only_executors=True)
+  def _get_num_backends(self):
+    return self.impalad_test_service.get_metric_value("cluster-membership.backends.total")
 
   def _get_num_running_queries(self):
     return self.impalad_test_service.get_num_running_queries("default-pool")
@@ -67,9 +67,12 @@ class TestAutoScaling(CustomClusterTestSuite):
       workload.start()
 
       # Wait for workers to spin up
-      assert any(self._get_num_executors() >= GROUP_SIZE or sleep(1)
+      cluster_size = GROUP_SIZE + 1  # +1 to include coordinator.
+      assert any(self._get_num_backends() >= cluster_size or sleep(1)
                  for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
           "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total-healthy") >= 1
 
       # Wait until we admitted at least 10 queries
       assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
@@ -77,26 +80,30 @@ class TestAutoScaling(CustomClusterTestSuite):
           "Did not admit enough queries within %s s" % self.STATE_CHANGE_TIMEOUT_S
 
       # Wait for second executor group to start
-      num_expected = 2 * GROUP_SIZE
-      assert any(self._get_num_executors() == num_expected or sleep(1)
+      cluster_size = (2 * GROUP_SIZE) + 1
+      assert any(self._get_num_backends() >= cluster_size or sleep(1)
                  for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
                      "Number of backends did not reach %s within %s s" % (
-                     num_expected, self.STATE_CHANGE_TIMEOUT_S)
+                     cluster_size, self.STATE_CHANGE_TIMEOUT_S)
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total-healthy") >= 2
 
       # Wait for query rate to surpass the maximum for a single executor group plus 20%
       min_query_rate = 1.2 * EXECUTOR_SLOTS
       assert any(workload.get_query_rate() > min_query_rate or sleep(1)
                  for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
                      "Query rate did not surpass %s within %s s" % (
-                     num_expected, self.STATE_CHANGE_TIMEOUT_S)
+                     cluster_size, self.STATE_CHANGE_TIMEOUT_S)
 
       LOG.info("Stopping workload")
       workload.stop()
 
       # Wait for workers to spin down
-      assert any(self._get_num_executors() == 0 or sleep(1)
-                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
-          "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      self.impalad_test_service.wait_for_metric_value(
+        "cluster-membership.backends.total", 1,
+        timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total") == 0
 
     finally:
       if workload:
@@ -122,9 +129,10 @@ class TestAutoScaling(CustomClusterTestSuite):
       workload.start()
 
       # Wait for workers to spin up
-      assert any(self._get_num_executors() >= GROUP_SIZE or sleep(1)
-                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
-          "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      cluster_size = GROUP_SIZE + 1  # +1 to include coordinator.
+      self.impalad_test_service.wait_for_metric_value(
+        "cluster-membership.backends.total", cluster_size,
+        timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
 
       # Wait until we admitted at least 10 queries
       assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
@@ -144,15 +152,18 @@ class TestAutoScaling(CustomClusterTestSuite):
           "Unexpected number of running queries: %s" % num_running
 
       # Check that only a single group started
-      assert self._get_num_executors() == GROUP_SIZE
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total-healthy") == 1
 
       LOG.info("Stopping workload")
       workload.stop()
 
       # Wait for workers to spin down
-      assert any(self._get_num_executors() == 0 or sleep(1)
-                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
-          "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      self.impalad_test_service.wait_for_metric_value(
+        "cluster-membership.backends.total", 1,
+        timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total") == 0
 
     finally:
       if workload:
@@ -179,22 +190,23 @@ class TestAutoScaling(CustomClusterTestSuite):
       workload.start()
 
       # Wait for first executor to start up
-      assert any(self._get_num_executors() >= 1 or sleep(1)
-                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
-          "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      self.impalad_test_service.wait_for_metric_value(
+        "cluster-membership.executor-groups.total", 1,
+        timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
 
       # Wait for remaining executors to start up and make sure that no queries are
       # admitted during startup
       end_time = time() + self.STATE_CHANGE_TIMEOUT_S
       startup_complete = False
+      cluster_size = GROUP_SIZE + 1  # +1 to include coordinator.
       while time() < end_time:
         num_admitted = self._get_total_admitted_queries()
-        num_backends = self._get_num_executors()
-        if num_backends < GROUP_SIZE:
+        num_backends = self._get_num_backends()
+        if num_backends < cluster_size:
           assert num_admitted == 0, "%s/%s backends started but %s queries have " \
-              "already been admitted." % (num_backends, GROUP_SIZE, num_admitted)
+              "already been admitted." % (num_backends, cluster_size, num_admitted)
         if num_admitted > 0:
-          assert num_backends == GROUP_SIZE
+          assert num_backends == cluster_size
           startup_complete = True
           break
         sleep(1)
@@ -205,9 +217,11 @@ class TestAutoScaling(CustomClusterTestSuite):
       workload.stop()
 
       # Wait for workers to spin down
-      assert any(self._get_num_executors() == 0 or sleep(1)
-                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
-          "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      self.impalad_test_service.wait_for_metric_value(
+        "cluster-membership.backends.total", 1,
+        timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total") == 0
 
     finally:
       if workload:
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 5a5a4b1..9c0315b 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -42,6 +42,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self.num_groups = 1
     self.num_executors = 1
     super(TestExecutorGroups, self).setup_method(method)
+    self.coordinator = self.cluster.impalads[0]
 
   def _group_name(self, name):
     # By convention, group names must start with their associated resource pool name
@@ -88,6 +89,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
     """Tests that a query submitted to a coordinator with no executor group times out."""
     result = self.execute_query_expect_failure(self.client, "select sleep(2)")
     assert "Admission for query exceeded timeout" in str(result)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
 
   @pytest.mark.execute_serially
   def test_single_group(self):
@@ -95,6 +98,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
     QUERY = "select count(*) from functional.alltypestiny"
     self._add_executor_group("group1", 2)
     self.execute_query_expect_success(self.client, QUERY)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 1
 
   @pytest.mark.execute_serially
   def test_executor_group_starts_while_qeueud(self):
@@ -105,8 +110,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
     handle = client.execute_async(QUERY)
     profile = client.get_runtime_profile(handle)
     assert "No healthy executor groups found for pool" in profile
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
     self._add_executor_group("group1", 2)
     client.wait_for_finished_timeout(handle, 20)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 1
 
   @pytest.mark.execute_serially
   def test_executor_group_health(self):
@@ -114,14 +123,18 @@ class TestExecutorGroups(CustomClusterTestSuite):
     QUERY = "select count(*) from functional.alltypestiny"
     # Start cluster and group
     self._add_executor_group("group1", 2)
+    self.coordinator.service.wait_for_metric_value(
+      "cluster-membership.executor-groups.total-healthy", 1)
     client = self.client
     # Run query to validate
     self.execute_query_expect_success(client, QUERY)
     # Kill an executor
-    coordinator = self.cluster.impalads[0]
     executor = self.cluster.impalads[1]
     executor.kill()
-    coordinator.service.wait_for_num_known_live_backends(2)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2,
+                                                   timeout=20)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
     # Run query and observe timeout
     handle = client.execute_async(QUERY)
     profile = client.get_runtime_profile(handle)
@@ -132,6 +145,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
     client.wait_for_finished_timeout(handle, 20)
     # Run query and observe success
     self.execute_query_expect_success(client, QUERY)
+    assert self.coordinator.service.wait_for_metric_value(
+      "cluster-membership.executor-groups.total-healthy", 1)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="-default_pool_max_requests=1")
@@ -146,19 +161,22 @@ class TestExecutorGroups(CustomClusterTestSuite):
     profile = client.get_runtime_profile(q2)
     assert "Initial admission queue reason: number of running queries" in profile, profile
     # Kill an executor
-    coordinator = self.cluster.impalads[0]
     executor = self.cluster.impalads[1]
     executor.kill()
-    coordinator.service.wait_for_num_known_live_backends(2)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2)
     # Wait for q1 to finish (sleep runs on the coordinator)
     client.wait_for_finished_timeout(q1, 20)
     # Check that q2 still hasn't run
     profile = client.get_runtime_profile(q2)
     assert "Admission result: Queued" in profile, profile
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
     # Restore executor group health
     executor.start()
     # Query should now finish
     client.wait_for_finished_timeout(q2, 20)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 1
 
   @pytest.mark.execute_serially
   def test_max_concurrent_queries(self):
@@ -184,6 +202,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
              where month < 3 and id + random() < sleep(500);"
     self._add_executor_group("group1", 2, max_concurrent_queries=1)
     self._add_executor_group("group2", 2, max_concurrent_queries=1)
+    self.coordinator.service.wait_for_metric_value(
+      "cluster-membership.executor-groups.total-healthy", 2)
     client = self.client
     q1 = client.execute_async(QUERY)
     client.wait_for_admission_control(q1)
@@ -257,6 +277,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
     QUERY = "select sleep(4)"
     # Start first executor
     self._add_executor_group("group1", 3, num_executors=1)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total") == 1
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
     # Run query and observe that it gets queued
     client = self.client
     handle = client.execute_async(QUERY)
@@ -266,10 +291,16 @@ class TestExecutorGroups(CustomClusterTestSuite):
     initial_state = client.get_state(handle)
     # Start another executor and observe that the query stays queued
     self._add_executor_group("group1", 3, num_executors=1)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 3)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
     profile = client.get_runtime_profile(handle)
     assert client.get_state(handle) == initial_state
     # Start the remaining executor and observe that the query finishes
     self._add_executor_group("group1", 3, num_executors=1)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 4)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 1
     client.wait_for_finished_timeout(handle, 20)
 
   @pytest.mark.execute_serially
@@ -283,16 +314,18 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # Run query to make sure things work
     QUERY = "select count(*) from functional.alltypestiny"
     self.execute_query_expect_success(self.client, QUERY)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 1
     # Kill executors to make group empty
     impalads = self.cluster.impalads
     impalads[1].kill()
     impalads[2].kill()
-    impalads[0].service.wait_for_num_known_live_backends(1)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1)
     # Run query to make sure it times out
     result = self.execute_query_expect_failure(self.client, QUERY)
-    print str(result)
     expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
                      "pool default-pool. Queued reason: No healthy executor groups " \
                      "found for pool default-pool."
     assert expected_error in str(result)
-
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0