You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/05/30 17:29:56 UTC

[impala] 04/04: IMPALA-8538: HS2 + HTTP(S) + BASIC/LDAP based thrift server endpoint

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cce709a6e37a9e615a093232f9b0dded8c8b5828
Author: Bharath Vissapragada <bh...@cloudera.com>
AuthorDate: Tue Apr 16 16:02:20 2019 -0700

    IMPALA-8538: HS2 + HTTP(S) + BASIC/LDAP based thrift server endpoint
    
    This patch adds an additional hiveserver2 endpoint for clients to
    connect to that uses HTTP. The endpoint can be disabled by setting
    --hs2_http_port=0. HTTP(S) also works when external TLS is
    enabled using --ssl_server_certificate.
    
    Thrift's http transport is modified to support BASIC authentication
    via ldap. For convenience of developing and reviewing, this patch
    is based on another that copied THttpServer and THttpTransport into
    Impala's codebase. Kerberos authentication is not supported, so the
    http endpoint is turned off if Kerberos is enabled and LDAP isn't.
    
    TODO
    =====
    - Fuzz test the http endpoint
    - Add tests for LDAP + HTTPS
    
    Testing
    =======
    - Parameterized JdbcTest and LdapJdbcTest to work for HS2 + HTTP mode
    - Added LdapHS2Test, which directly calls into the Hiveserver2
      interface using a thrift http client.
    
    Manual testing with Beeline client (from Apache Hive), which has
    builtin support to connect to HTTP(S) based HS2 compatible endpoints.
    
    Example
    ========
    
    -- HTTP mode:
    > start-impala-cluster.py
    > JDBC_URL="jdbc:hive2://localhost:<port>/default;transportMode=http"
    > beeline -u "$JDBC_URL"
    
    -- HTTPS mode:
    > cd $IMPALA_HOME
    > SSL_ARGS="--ssl_client_ca_certificate=./be/src/testutil/server-cert.pem \
        --ssl_server_certificate=./be/src/testutil/server-cert.pem \
        --ssl_private_key=./be/src/testutil/server-key.pem --hostname=localhost"
    > start-impala-cluster.py --impalad_args="$SSL_ARGS" \
        --catalogd_args="$SSL_ARGS" --state_store_args="$SSL_ARGS"
    - Create a local trust store using 'keytool' and import the certificate
    from server-cert.pem (./clientkeystore in the example).
    > JDBC_URL="jdbc:hive2://localhost:<port>/default;ssl=true;sslTrustStore= \
        ./clientkeystore;trustStorePassword=password;transportMode=http"
    > beeline -u "$JDBC_URL"
    
    -- BASIC Auth with LDAP:
    > LDAP_ARGS="--enable_ldap_auth --ldap_uri='ldap://...' \
        --ldap_bind_pattern='...' --ldap_passwords_in_clear_ok"
    > start-impala-cluster.py --impalad_args="$LDAP_ARGS"
    > JDBC_URL="jdbc:hive2://localhost:28000/default;user=...;password=\
        ...;transportMode=http"
    > beeline -u "$JDBC_URL"
    
    -- HTTPS mode with LDAP:
    > start-impala-cluster.py --impalad_args="$LDAP_ARGS $SSL_ARGS" \
        --catalogd_args="$SSL_ARGS" --state_store_args="$SSL_ARGS"
    > JDBC_URL="jdbc:hive2://localhost:28000/default;user=...;password=\
        ...;ssl=true;sslTrustStore=./clientkeystore;trustStorePassword=\
        password;transportMode=http"
    > beeline -u "$JDBC_URL"
    
    Change-Id: Ic5569ac62ef3af2868b5d0581f5029dac736b2ff
    Reviewed-on: http://gerrit.cloudera.org:8080/13299
    Reviewed-by: Thomas Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/auth-provider.h                         |  54 +++++--
 be/src/rpc/authentication-test.cc                  |  30 ++--
 be/src/rpc/authentication.cc                       | 143 ++++++++++++++----
 be/src/rpc/thrift-server.cc                        |  26 ++--
 be/src/rpc/thrift-server.h                         |  43 ++++--
 be/src/service/impala-server.cc                    |  50 ++++++-
 be/src/service/impala-server.h                     |  13 +-
 be/src/service/impalad-main.cc                     |   5 +-
 be/src/testutil/in-process-servers.cc              |  23 +--
 be/src/testutil/in-process-servers.h               |   4 +-
 be/src/transport/THttpServer.cpp                   |  39 ++++-
 be/src/transport/THttpServer.h                     |  45 +++++-
 be/src/transport/THttpTransport.cpp                |   6 +-
 be/src/transport/THttpTransport.h                  |  13 +-
 bin/start-impala-cluster.py                        |   5 +-
 common/thrift/generate_error_codes.py              |   3 +
 common/thrift/metrics.json                         |  40 ++++++
 .../apache/impala/customcluster/LdapHS2Test.java   | 159 +++++++++++++++++++++
 .../apache/impala/customcluster/LdapJdbcTest.java  |  28 ++--
 .../java/org/apache/impala/service/JdbcTest.java   |  13 +-
 .../org/apache/impala/service/JdbcTestBase.java    |  34 +++--
 .../apache/impala/testutil/ImpalaJdbcClient.java   |  41 ++++--
 fe/src/test/resources/users.ldif                   |  12 +-
 tests/common/impala_cluster.py                     |   1 +
 24 files changed, 683 insertions(+), 147 deletions(-)

diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index f3180d5..d575aef 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -23,6 +23,7 @@
 #include <sasl/sasl.h>
 
 #include "common/status.h"
+#include "rpc/thrift-server.h"
 #include "util/promise.h"
 
 namespace sasl { class TSasl; }
@@ -39,33 +40,45 @@ class AuthProvider {
   virtual Status Start() WARN_UNUSED_RESULT = 0;
 
   /// Creates a new Thrift transport factory in the out parameter that performs
-  /// authorisation per this provider's protocol.
+  /// authorisation per this provider's protocol. The top-level transport returned by
+  /// 'factory' must always be a TBufferedTransport, but depending on the AuthProvider
+  /// implementation and the value of 'underlying_transport_type', that may be wrapped
+  /// around another transport type, eg. a TSaslServerTransport.
   virtual Status GetServerTransportFactory(
+      ThriftServer::TransportType underlying_transport_type,
       boost::shared_ptr<apache::thrift::transport::TTransportFactory>* factory)
       WARN_UNUSED_RESULT = 0;
 
   /// Called by Thrift clients to wrap a raw transport with any intermediate transport
   /// that an auth protocol requires.
+  /// TODO: Return the correct clients for HTTP base transport. At this point, no clients
+  /// for HTTP endpoints are internal to the Impala service, so it should be OK.
   virtual Status WrapClientTransport(const std::string& hostname,
       boost::shared_ptr<apache::thrift::transport::TTransport> raw_transport,
       const std::string& service_name,
       boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport)
       WARN_UNUSED_RESULT = 0;
 
+  /// Setup 'connection_ptr' to get its username from 'underlying_transport'.
+  virtual void SetupConnectionContext(
+      const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
+      ThriftServer::TransportType underlying_transport_type,
+      apache::thrift::transport::TTransport* underlying_transport) = 0;
+
   /// Returns true if this provider uses Sasl at the transport layer.
-  virtual bool is_sasl() = 0;
+  virtual bool is_secure() = 0;
 
   virtual ~AuthProvider() { }
 };
 
-/// If either (or both) Kerberos and LDAP auth are desired, we use Sasl for the
-/// communication.  This "wraps" the underlying communication, in thrift-speak.
-/// This is used for both client and server contexts; there is one for internal
-/// and one for external communication.
-class SaslAuthProvider : public AuthProvider {
+/// Used if either (or both) Kerberos and LDAP auth are desired. For BINARY connections we
+/// use Sasl for the communication, and for HTTP connections we use BASIC auth.  This
+/// "wraps" the underlying communication, in thrift-speak. This is used for both client
+/// and server contexts; there is one for internal and one for external communication.
+class SecureAuthProvider : public AuthProvider {
  public:
-  SaslAuthProvider(bool is_internal) : has_ldap_(false), is_internal_(is_internal),
-      needs_kinit_(false) {}
+  SecureAuthProvider(bool is_internal)
+    : has_ldap_(false), is_internal_(is_internal), needs_kinit_(false) {}
 
   /// Performs initialization of external state. Kinit if configured to use kerberos.
   /// If we're using ldap, set up appropriate certificate usage.
@@ -86,9 +99,19 @@ class SaslAuthProvider : public AuthProvider {
   /// Then presto! You've got authentication for the connection.
   /// This is only applicable to Thrift connections and not KRPC connections.
   virtual Status GetServerTransportFactory(
+      ThriftServer::TransportType underlying_transport_type,
       boost::shared_ptr<apache::thrift::transport::TTransportFactory>* factory);
 
-  virtual bool is_sasl() { return true; }
+  /// IF sasl was used, the username will be available from the handshake, and we set it
+  /// here. If HTTP BASIC auth was used, the username won't be available until the first
+  /// packet is received, so w register a callback with the transport that will set the
+  /// username then.
+  virtual void SetupConnectionContext(
+      const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
+      ThriftServer::TransportType underlying_transport_type,
+      apache::thrift::transport::TTransport* underlying_transport);
+
+  virtual bool is_secure() { return true; }
 
   /// Initializes kerberos items and checks for sanity.  Failures can occur on a
   /// malformed principal or when setting some environment variables.  Called
@@ -152,6 +175,7 @@ class NoAuthProvider : public AuthProvider {
   virtual Status Start() { return Status::OK(); }
 
   virtual Status GetServerTransportFactory(
+      ThriftServer::TransportType underlying_transport_type,
       boost::shared_ptr<apache::thrift::transport::TTransportFactory>* factory);
 
   virtual Status WrapClientTransport(const std::string& hostname,
@@ -159,7 +183,15 @@ class NoAuthProvider : public AuthProvider {
       const std::string& service_name,
       boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport);
 
-  virtual bool is_sasl() { return false; }
+  /// If there is no auth, then we don't have a username available.
+  virtual void SetupConnectionContext(
+      const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
+      ThriftServer::TransportType underlying_transport_type,
+      apache::thrift::transport::TTransport* underlying_transport) {
+    connection_ptr->username = "";
+  }
+
+  virtual bool is_secure() { return false; }
 };
 
 /// The first entry point to the authentication subsystem.  Performs initialization
diff --git a/be/src/rpc/authentication-test.cc b/be/src/rpc/authentication-test.cc
index 0dacdee..7b5198e 100644
--- a/be/src/rpc/authentication-test.cc
+++ b/be/src/rpc/authentication-test.cc
@@ -54,7 +54,7 @@ int SaslAuthorizeInternal(sasl_conn_t* conn, void* context,
 TEST(Auth, PrincipalSubstitution) {
   string hostname;
   ASSERT_OK(GetHostname(&hostname));
-  SaslAuthProvider sa(false);  // false means it's external
+  SecureAuthProvider sa(false); // false means it's external
 
   FLAGS_principal = "service_name/_HOST@some.realm";
   string principal;
@@ -70,20 +70,20 @@ TEST(Auth, PrincipalSubstitution) {
   FLAGS_principal.clear();
 }
 
-void AuthOk(const string& name, SaslAuthProvider* sa) {
+void AuthOk(const string& name, SecureAuthProvider* sa) {
   EXPECT_EQ(SASL_OK,
       SaslAuthorizeInternal(NULL, (void*)sa, name.c_str(), name.size(), NULL, 0, NULL, 0,
           NULL));
 }
 
-void AuthFails(const string& name, SaslAuthProvider* sa) {
+void AuthFails(const string& name, SecureAuthProvider* sa) {
   EXPECT_EQ(SASL_BADAUTH,
       SaslAuthorizeInternal(NULL, (void*)sa, name.c_str(), name.size(), NULL, 0, NULL, 0,
           NULL));
 }
 
 TEST(Auth, AuthorizeInternalPrincipals) {
-  SaslAuthProvider sa(true);  // false means it's external
+  SecureAuthProvider sa(true); // false means it's external
   ASSERT_OK(sa.InitKerberos("service_name/localhost@some.realm", "/etc/hosts"));
 
   AuthOk("service_name/localhost@some.realm", &sa);
@@ -116,7 +116,7 @@ TEST(Auth, ValidAuthProviders) {
 // Set up ldap flags and ensure we make the appropriate auth providers
 TEST(Auth, LdapAuth) {
   AuthProvider* ap = NULL;
-  SaslAuthProvider* sa = NULL;
+  SecureAuthProvider* sa = NULL;
 
   FLAGS_enable_ldap_auth = true;
   FLAGS_ldap_uri = "ldaps://bogus.com";
@@ -126,20 +126,20 @@ TEST(Auth, LdapAuth) {
 
   // External auth provider is sasl, ldap, but not kerberos
   ap = AuthManager::GetInstance()->GetExternalAuthProvider();
-  ASSERT_TRUE(ap->is_sasl());
-  sa = dynamic_cast<SaslAuthProvider*>(ap);
+  ASSERT_TRUE(ap->is_secure());
+  sa = dynamic_cast<SecureAuthProvider*>(ap);
   ASSERT_TRUE(sa->has_ldap());
   ASSERT_EQ("", sa->principal());
 
   // Internal auth provider isn't sasl.
   ap = AuthManager::GetInstance()->GetInternalAuthProvider();
-  ASSERT_FALSE(ap->is_sasl());
+  ASSERT_FALSE(ap->is_secure());
 }
 
 // Set up ldap and kerberos flags and ensure we make the appropriate auth providers
 TEST(Auth, LdapKerbAuth) {
   AuthProvider* ap = NULL;
-  SaslAuthProvider* sa = NULL;
+  SecureAuthProvider* sa = NULL;
 
   if ((env_keytab == NULL) || (env_princ == NULL)) {
     return;     // In a non-kerberized environment
@@ -154,15 +154,15 @@ TEST(Auth, LdapKerbAuth) {
 
   // External auth provider is sasl, ldap, and kerberos
   ap = AuthManager::GetInstance()->GetExternalAuthProvider();
-  ASSERT_TRUE(ap->is_sasl());
-  sa = dynamic_cast<SaslAuthProvider*>(ap);
+  ASSERT_TRUE(ap->is_secure());
+  sa = dynamic_cast<SecureAuthProvider*>(ap);
   ASSERT_TRUE(sa->has_ldap());
   ASSERT_EQ(FLAGS_principal, sa->principal());
 
   // Internal auth provider is sasl and kerberos
   ap = AuthManager::GetInstance()->GetInternalAuthProvider();
-  ASSERT_TRUE(ap->is_sasl());
-  sa = dynamic_cast<SaslAuthProvider*>(ap);
+  ASSERT_TRUE(ap->is_secure());
+  sa = dynamic_cast<SecureAuthProvider*>(ap);
   ASSERT_FALSE(sa->has_ldap());
   ASSERT_EQ(FLAGS_principal, sa->principal());
 }
@@ -176,10 +176,10 @@ TEST(Auth, KerbAndSslEnabled) {
   FLAGS_ssl_server_certificate = "some_path";
   FLAGS_ssl_private_key = "some_path";
   ASSERT_TRUE(IsInternalTlsConfigured());
-  SaslAuthProvider sa_internal(true);
+  SecureAuthProvider sa_internal(true);
   ASSERT_OK(
       sa_internal.InitKerberos("service_name/_HOST@some.realm", "/etc/hosts"));
-  SaslAuthProvider sa_external(false);
+  SecureAuthProvider sa_external(false);
   ASSERT_OK(
       sa_external.InitKerberos("service_name/_HOST@some.realm", "/etc/hosts"));
 }
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index 072ccdf..addedd9 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -24,6 +24,8 @@
 #include <boost/random/mersenne_twister.hpp>
 #include <boost/random/uniform_int.hpp>
 #include <boost/filesystem.hpp>
+#include <gutil/casts.h>
+#include <gutil/strings/escaping.h>
 #include <gutil/strings/substitute.h>
 #include <random>
 #include <string>
@@ -40,6 +42,7 @@
 #include "kudu/security/init.h"
 #include "rpc/auth-provider.h"
 #include "rpc/thrift-server.h"
+#include "transport/THttpServer.h"
 #include "transport/TSaslClientTransport.h"
 #include "util/auth-util.h"
 #include "util/debug-util.h"
@@ -62,6 +65,7 @@ using boost::algorithm::trim;
 using boost::mt19937;
 using boost::uniform_int;
 using namespace apache::thrift;
+using namespace apache::thrift::transport;
 using namespace boost::filesystem;   // for is_regular(), is_absolute()
 using namespace strings;
 
@@ -192,18 +196,14 @@ static int SaslLogCallback(void* context, int level, const char* message) {
 // Use --ldap_ca_certificate to specify the location of the certificate used to confirm
 // the authenticity of the LDAP server certificate.
 //
-// conn: The Sasl connection struct, which we ignore
-// context: Ignored; always NULL
 // user: The username to authenticate
 // pass: The password to use
 // passlen: The length of pass
-// propctx: Ignored - properties requested
-// Return: SASL_OK on success, SASL_FAIL otherwise
-int SaslLdapCheckPass(sasl_conn_t* conn, void* context, const char* user,
-    const char* pass, unsigned passlen, struct propctx* propctx) {
+// Return: true on success, false otherwise
+static bool LdapCheckPass(const char* user, const char* pass, unsigned passlen) {
   if (passlen == 0 && !FLAGS_ldap_allow_anonymous_binds) {
     // Disable anonymous binds.
-    return SASL_FAIL;
+    return false;
   }
 
   LDAP* ld;
@@ -211,7 +211,7 @@ int SaslLdapCheckPass(sasl_conn_t* conn, void* context, const char* user,
   if (rc != LDAP_SUCCESS) {
     LOG(WARNING) << "Could not initialize connection with LDAP server ("
                  << FLAGS_ldap_uri << "). Error: " << ldap_err2string(rc);
-    return SASL_FAIL;
+    return false;
   }
 
   // Force the LDAP version to 3 to make sure TLS is supported.
@@ -227,7 +227,7 @@ int SaslLdapCheckPass(sasl_conn_t* conn, void* context, const char* user,
       LOG(WARNING) << "Could not start TLS secure connection to LDAP server ("
                    << FLAGS_ldap_uri << "). Error: " << ldap_err2string(tls_rc);
       ldap_unbind_ext(ld, NULL, NULL);
-      return SASL_FAIL;
+      return false;
     }
     VLOG(2) << "Started TLS connection with LDAP server: " << FLAGS_ldap_uri;
   }
@@ -260,12 +260,27 @@ int SaslLdapCheckPass(sasl_conn_t* conn, void* context, const char* user,
   if (rc != LDAP_SUCCESS) {
     LOG(WARNING) << "LDAP authentication failure for " << user_str
                  << " : " << ldap_err2string(rc);
-    return SASL_FAIL;
+    return false;
   }
 
   VLOG_QUERY << "LDAP bind successful";
 
-  return SASL_OK;
+  return true;
+}
+
+// Wrapper around the function we use to check passwords with LDAP which converts the
+// return value to something appropriate for SASL.
+//
+// conn: The Sasl connection struct, which we ignore
+// context: Ignored; always NULL
+// user: The username to authenticate
+// pass: The password to use
+// passlen: The length of pass
+// propctx: Ignored - properties requested
+// Return: SASL_OK on success, SASL_FAIL otherwise
+int SaslLdapCheckPass(sasl_conn_t* conn, void* context, const char* user,
+    const char* pass, unsigned passlen, struct propctx* propctx) {
+  return LdapCheckPass(user, pass, passlen) ? SASL_OK : SASL_FAIL;
 }
 
 // Sasl wants a way to ask us about some options, this function provides
@@ -409,14 +424,14 @@ int SaslAuthorizeInternal(sasl_conn_t* conn, void* context,
               << "<service>/<hostname>@<realm> - got: " << requested_user;
     return SASL_BADAUTH;
   }
-  SaslAuthProvider* internal_auth_provider;
+  SecureAuthProvider* internal_auth_provider;
   if (context == NULL) {
-    internal_auth_provider = static_cast<SaslAuthProvider*>(
+    internal_auth_provider = static_cast<SecureAuthProvider*>(
         AuthManager::GetInstance()->GetInternalAuthProvider());
   } else {
     // Branch should only be taken for testing, where context is used to inject an auth
     // provider.
-    internal_auth_provider = static_cast<SaslAuthProvider*>(context);
+    internal_auth_provider = static_cast<SecureAuthProvider*>(context);
   }
 
   vector<string> whitelist;
@@ -481,6 +496,30 @@ static int SaslGetPath(void* context, const char** path) {
   return SASL_OK;
 }
 
+bool BasicAuth(ThriftServer::ConnectionContext* connection_context, const char* base64) {
+  int64_t in_len = strlen(base64);
+  string decoded;
+  if (!Base64Unescape(base64, in_len, &decoded)) {
+    LOG(ERROR) << "Failed to decode base64 auth string from: "
+               << TNetworkAddressToString(connection_context->network_address);
+    return false;
+  }
+  std::size_t colon = decoded.find(':');
+  if (colon == std::string::npos) {
+    LOG(ERROR) << "Auth string must be in the form '<username>:<password>' from: "
+               << TNetworkAddressToString(connection_context->network_address);
+    return false;
+  }
+  string username = decoded.substr(0, colon);
+  string password = decoded.substr(colon + 1);
+  bool ret = LdapCheckPass(username.c_str(), password.c_str(), password.length());
+  if (ret) {
+    // Authenication was successful, so set the username on the connection.
+    connection_context->username = username;
+  }
+  return ret;
+}
+
 namespace {
 
 // SASL requires mutexes for thread safety, but doesn't implement
@@ -658,9 +697,8 @@ Status CheckReplayCacheDirPermissions() {
   return Status::OK();
 }
 
-Status SaslAuthProvider::InitKerberos(const string& principal,
-    const string& keytab_file) {
-
+Status SecureAuthProvider::InitKerberos(
+    const string& principal, const string& keytab_file) {
   principal_ = principal;
   keytab_file_ = keytab_file;
   // The logic here is that needs_kinit_ is false unless we are the internal
@@ -782,7 +820,7 @@ Status AuthManager::InitKerberosEnv() {
   return Status::OK();
 }
 
-Status SaslAuthProvider::Start() {
+Status SecureAuthProvider::Start() {
   // True for kerberos internal use
   if (needs_kinit_) {
     DCHECK(is_internal_);
@@ -825,10 +863,26 @@ Status SaslAuthProvider::Start() {
   return Status::OK();
 }
 
-Status SaslAuthProvider::GetServerTransportFactory(
+Status SecureAuthProvider::GetServerTransportFactory(
+    ThriftServer::TransportType underlying_transport_type,
     boost::shared_ptr<TTransportFactory>* factory) {
   DCHECK(!principal_.empty() || has_ldap_);
 
+  if (underlying_transport_type == ThriftServer::HTTP) {
+    // TODO: add support for SPNEGO style of HTTP auth
+    if (!principal_.empty() && !has_ldap_) {
+      const string err = "Kerberos not yet supported with HTTP transport.";
+      LOG(ERROR) << err;
+      return Status(err);
+    }
+
+    factory->reset(new ThriftServer::BufferedTransportFactory(
+        ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
+        new THttpServerTransportFactory(/* requireBasicAuth */ true)));
+    return Status::OK();
+  }
+
+  DCHECK(underlying_transport_type == ThriftServer::BINARY);
   // This is the heart of the link between this file and thrift.  Here we
   // associate a Sasl mechanism with our callbacks.
   try {
@@ -863,10 +917,9 @@ Status SaslAuthProvider::GetServerTransportFactory(
   return Status::OK();
 }
 
-Status SaslAuthProvider::WrapClientTransport(const string& hostname,
+Status SecureAuthProvider::WrapClientTransport(const string& hostname,
     boost::shared_ptr<TTransport> raw_transport, const string& service_name,
     boost::shared_ptr<TTransport>* wrapped_transport) {
-
   boost::shared_ptr<sasl::TSasl> sasl_client;
   const map<string, string> props; // Empty; unused by thrift
   const string auth_id; // Empty; unused by thrift
@@ -894,10 +947,46 @@ Status SaslAuthProvider::WrapClientTransport(const string& hostname,
   return Status::OK();
 }
 
+void SecureAuthProvider::SetupConnectionContext(
+    const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
+    ThriftServer::TransportType underlying_transport_type,
+    TTransport* underlying_transport) {
+  switch (underlying_transport_type) {
+    case ThriftServer::BINARY: {
+      TSaslServerTransport* sasl_transport =
+          down_cast<TSaslServerTransport*>(underlying_transport);
+
+      // Get the username from the transport.
+      connection_ptr->username = sasl_transport->getUsername();
+      break;
+    }
+    case ThriftServer::HTTP: {
+      THttpServer* http_transport = down_cast<THttpServer*>(underlying_transport);
+      http_transport->setAuthFn(
+          std::bind(BasicAuth, connection_ptr.get(), std::placeholders::_1));
+      break;
+    }
+    default:
+      LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
+  }
+}
+
 Status NoAuthProvider::GetServerTransportFactory(
+    ThriftServer::TransportType underlying_transport_type,
     boost::shared_ptr<TTransportFactory>* factory) {
   // No Sasl - yawn.  Here, have a regular old buffered transport.
-  factory->reset(new ThriftServer::BufferedTransportFactory());
+  switch (underlying_transport_type) {
+    case ThriftServer::BINARY:
+      factory->reset(new ThriftServer::BufferedTransportFactory());
+      break;
+    case ThriftServer::HTTP:
+      factory->reset(new ThriftServer::BufferedTransportFactory(
+          ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
+          new THttpServerTransportFactory()));
+      break;
+    default:
+      LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
+  }
   return Status::OK();
 }
 
@@ -1006,8 +1095,8 @@ Status AuthManager::Init() {
   // the client side, this is just a check for the "back end" kerberos
   // principal.
   if (use_kerberos) {
-    SaslAuthProvider* sap = NULL;
-    internal_auth_provider_.reset(sap = new SaslAuthProvider(true));
+    SecureAuthProvider* sap = NULL;
+    internal_auth_provider_.reset(sap = new SecureAuthProvider(true));
     RETURN_IF_ERROR(sap->InitKerberos(kerberos_internal_principal,
         FLAGS_keytab_file));
     LOG(INFO) << "Internal communication is authenticated with Kerberos";
@@ -1018,11 +1107,11 @@ Status AuthManager::Init() {
   RETURN_IF_ERROR(internal_auth_provider_->Start());
 
   // Set up the external auth provider as per above.  Either a "front end"
-  // principal or ldap tells us to use a SaslAuthProvider, and we fill in
+  // principal or ldap tells us to use a SecureAuthProvider, and we fill in
   // details from there.
   if (use_ldap || use_kerberos) {
-    SaslAuthProvider* sap = NULL;
-    external_auth_provider_.reset(sap = new SaslAuthProvider(false));
+    SecureAuthProvider* sap = NULL;
+    external_auth_provider_.reset(sap = new SecureAuthProvider(false));
     if (use_kerberos) {
       RETURN_IF_ERROR(sap->InitKerberos(kerberos_external_principal,
           FLAGS_keytab_file));
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index ccf1aff..aa9a088 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -38,6 +38,7 @@
 #include "rpc/authentication.h"
 #include "rpc/thrift-server.h"
 #include "rpc/thrift-thread.h"
+#include "transport/THttpServer.h"
 #include "util/condition-variable.h"
 #include "util/debug-util.h"
 #include "util/metrics.h"
@@ -259,15 +260,19 @@ void* ThriftServer::ThriftServerEventProcessor::createContext(
       boost::shared_ptr<ConnectionContext>(new ConnectionContext);
   TTransport* underlying_transport =
       (static_cast<TBufferedTransport*>(transport))->getUnderlyingTransport().get();
-  if (!thrift_server_->auth_provider_->is_sasl()) {
-    socket = static_cast<TSocket*>(underlying_transport);
-  } else {
+
+  thrift_server_->auth_provider_->SetupConnectionContext(
+      connection_ptr, thrift_server_->transport_type_, underlying_transport);
+  if (thrift_server_->auth_provider_->is_secure()
+      && thrift_server_->transport_type_ == ThriftServer::BINARY) {
     TSaslServerTransport* sasl_transport = static_cast<TSaslServerTransport*>(
         underlying_transport);
-
-    // Get the username from the transport.
-    connection_ptr->username = sasl_transport->getUsername();
     socket = static_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
+  } else if (thrift_server_->transport_type_ == ThriftServer::HTTP) {
+    THttpServer* http_transport = static_cast<THttpServer*>(underlying_transport);
+    socket = static_cast<TSocket*>(http_transport->getUnderlyingTransport().get());
+  } else {
+    socket = static_cast<TSocket*>(underlying_transport);
   }
 
   {
@@ -324,7 +329,8 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext
 
 ThriftServer::ThriftServer(const string& name,
     const boost::shared_ptr<TProcessor>& processor, int port, AuthProvider* auth_provider,
-    MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms)
+    MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms,
+    TransportType transport_type)
   : started_(false),
     port_(port),
     ssl_enabled_(false),
@@ -335,7 +341,8 @@ ThriftServer::ThriftServer(const string& name,
     processor_(processor),
     connection_handler_(NULL),
     metrics_(NULL),
-    auth_provider_(auth_provider) {
+    auth_provider_(auth_provider),
+    transport_type_(transport_type) {
   if (auth_provider_ == NULL) {
     auth_provider_ = AuthManager::GetInstance()->GetInternalAuthProvider();
   }
@@ -483,7 +490,8 @@ Status ThriftServer::Start() {
   boost::shared_ptr<TServerSocket> server_socket;
   boost::shared_ptr<TTransportFactory> transport_factory;
   RETURN_IF_ERROR(CreateSocket(&server_socket));
-  RETURN_IF_ERROR(auth_provider_->GetServerTransportFactory(&transport_factory));
+  RETURN_IF_ERROR(
+      auth_provider_->GetServerTransportFactory(transport_type_, &transport_factory));
 
   server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory,
       protocol_factory, thread_factory, name_, max_concurrent_connections_,
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 4afa040..764fbf2 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -44,24 +44,36 @@ class AuthProvider;
 /// TODO: shutdown is buggy (which only harms tests)
 class ThriftServer {
  public:
-  /// Transport factory that procuess buffered transports with a customisable
-  /// buffer-size. A larger buffer is usually more efficient, as it allows the underlying
-  /// transports to perform fewer system calls.
+  /// Transport factory that wraps transports in a buffered transport with a customisable
+  /// buffer-size and optionally in another transport from a provided factory. A larger
+  /// buffer is usually more efficient, as it allows the underlying transports to perform
+  /// fewer system calls.
   class BufferedTransportFactory :
       public apache::thrift::transport::TBufferedTransportFactory {
    public:
     static const int DEFAULT_BUFFER_SIZE_BYTES = 128 * 1024;
 
-    BufferedTransportFactory(uint32_t buffer_size = DEFAULT_BUFFER_SIZE_BYTES) :
-        buffer_size_(buffer_size) { }
+    BufferedTransportFactory(uint32_t buffer_size = DEFAULT_BUFFER_SIZE_BYTES,
+        apache::thrift::transport::TTransportFactory* wrapped_factory =
+            new apache::thrift::transport::TTransportFactory())
+      : buffer_size_(buffer_size), wrapped_factory_(wrapped_factory) {}
 
     virtual boost::shared_ptr<apache::thrift::transport::TTransport> getTransport(
         boost::shared_ptr<apache::thrift::transport::TTransport> trans) {
+      boost::shared_ptr<apache::thrift::transport::TTransport> wrapped =
+          wrapped_factory_->getTransport(trans);
       return boost::shared_ptr<apache::thrift::transport::TTransport>(
-          new apache::thrift::transport::TBufferedTransport(trans, buffer_size_));
+          new apache::thrift::transport::TBufferedTransport(wrapped, buffer_size_));
     }
    private:
     uint32_t buffer_size_;
+    std::unique_ptr<apache::thrift::transport::TTransportFactory> wrapped_factory_;
+  };
+
+  /// Transport implementation used by the thrift server.
+  enum TransportType {
+    BINARY, // Thrift bytes over default transport.
+    HTTP, // Thrift bytes over HTTP transport.
   };
 
   /// Username.
@@ -151,7 +163,8 @@ class ThriftServer {
   ThriftServer(const std::string& name,
       const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port,
       AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr,
-      int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0);
+      int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0,
+      TransportType server_transport = TransportType::BINARY);
 
   /// Enables secure access over SSL. Must be called before Start(). The first three
   /// arguments are the minimum SSL/TLS version, and paths to certificate and private key
@@ -244,6 +257,9 @@ class ThriftServer {
   /// Not owned by us, owned by the AuthManager
   AuthProvider* auth_provider_;
 
+  /// Underlying transport type used by this thrift server.
+  TransportType transport_type_;
+
   /// Helper class which monitors starting servers. Needs access to internal members, and
   /// is not used outside of this class.
   class ThriftServerEventProcessor;
@@ -311,13 +327,20 @@ class ThriftServerBuilder {
     return *this;
   }
 
+  /// Sets the underlying transport type for the thrift server.
+  ThriftServerBuilder& transport_type(ThriftServer::TransportType transport_type) {
+    server_transport_type_ = transport_type;
+    return *this;
+  }
+
   /// Constructs a new ThriftServer and puts it in 'server', if construction was
   /// successful, returns an error otherwise. In the error case, 'server' will not have
   /// been set and will not need to be freed, otherwise the caller assumes ownership of
   /// '*server'.
   Status Build(ThriftServer** server) {
-    std::unique_ptr<ThriftServer> ptr(new ThriftServer(name_, processor_, port_,
-        auth_provider_, metrics_, max_concurrent_connections_, queue_timeout_ms_));
+    std::unique_ptr<ThriftServer> ptr(
+        new ThriftServer(name_, processor_, port_, auth_provider_, metrics_,
+            max_concurrent_connections_, queue_timeout_ms_, server_transport_type_));
     if (enable_ssl_) {
       RETURN_IF_ERROR(ptr->EnableSsl(
           version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
@@ -332,6 +355,8 @@ class ThriftServerBuilder {
   std::string name_;
   boost::shared_ptr<apache::thrift::TProcessor> processor_;
   int port_ = 0;
+  ThriftServer::TransportType server_transport_type_ =
+      ThriftServer::TransportType::BINARY;
 
   AuthProvider* auth_provider_ = nullptr;
   MetricGroup* metrics_ = nullptr;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 09fa5c7..80416ab 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -67,6 +67,7 @@
 #include "service/impala-internal-service.h"
 #include "service/client-request-state.h"
 #include "service/frontend.h"
+#include "util/auth-util.h"
 #include "util/bit-util.h"
 #include "util/container-util.h"
 #include "util/debug-util.h"
@@ -124,12 +125,15 @@ DECLARE_string(authorized_proxy_group_config);
 DECLARE_string(authorized_proxy_group_config_delimiter);
 DECLARE_bool(abort_on_config_error);
 DECLARE_bool(disk_spill_encryption);
+DECLARE_bool(enable_ldap_auth);
 DECLARE_bool(use_local_catalog);
 
 DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served."
     "If 0 or less, the Beeswax server is not started.");
 DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 client requests are served."
     "If 0 or less, the HiveServer2 server is not started.");
+DEFINE_int32(hs2_http_port, 28000, "port on which HiveServer2 HTTP(s) client "
+    "requests are served. If 0 or less, the HiveServer2 http server is not started.");
 
 DEFINE_int32(fe_service_threads, 64,
     "number of threads available to serve client requests");
@@ -288,6 +292,7 @@ const uint32_t MAX_CANCELLATION_QUEUE_SIZE = 65536;
 
 const string BEESWAX_SERVER_NAME = "beeswax-frontend";
 const string HS2_SERVER_NAME = "hiveserver2-frontend";
+const string HS2_HTTP_SERVER_NAME = "hiveserver2-http-frontend";
 
 const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000";
 const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
@@ -2106,7 +2111,8 @@ void ImpalaServer::ConnectionEnd(
       }
     }
   } else {
-    DCHECK_EQ(connection_context.server_name, HS2_SERVER_NAME);
+    DCHECK(connection_context.server_name ==  HS2_SERVER_NAME
+        || connection_context.server_name == HS2_HTTP_SERVER_NAME);
     for (const TUniqueId& session_id : disconnected_sessions) {
       shared_ptr<SessionState> state;
       Status status = GetSessionState(session_id, &state);
@@ -2414,8 +2420,8 @@ void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status) {
   crs->set_expired();
 }
 
-Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
-   int32_t hs2_port) {
+Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port,
+    int32_t hs2_http_port) {
   exec_env_->SetImpalaServer(this);
 
   // We must register the HTTP handlers after registering the ImpalaServer with the
@@ -2524,6 +2530,39 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
       hs2_server_.reset(server);
       hs2_server_->SetConnectionHandler(this);
     }
+
+    // We can't currently secure the http server with Kerberos, only with LDAP, so if
+    // Kerberos is enabled and LDAP isn't we automatically disable the http server.
+    if ((hs2_http_port > 0 && (!IsKerberosEnabled() || FLAGS_enable_ldap_auth))
+        || (TestInfo::is_test() && hs2_http_port == 0)) {
+      boost::shared_ptr<TProcessor> hs2_http_processor(
+          new ImpalaHiveServer2ServiceProcessor(handler));
+      boost::shared_ptr<TProcessorEventHandler> event_handler(
+          new RpcEventHandler("hs2_http", exec_env_->metrics()));
+      hs2_http_processor->setEventHandler(event_handler);
+
+      ThriftServer* http_server;
+      ThriftServerBuilder http_builder(
+          HS2_HTTP_SERVER_NAME, hs2_http_processor, hs2_http_port);
+      if (IsExternalTlsConfigured()) {
+        LOG(INFO) << "Enabling SSL for HiveServer2 HTTP endpoint.";
+        http_builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
+            .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
+            .ssl_version(ssl_version)
+            .cipher_list(FLAGS_ssl_cipher_list);
+      }
+
+      RETURN_IF_ERROR(
+          http_builder
+              .auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
+              .transport_type(ThriftServer::TransportType::HTTP)
+              .metrics(exec_env_->metrics())
+              .max_concurrent_connections(FLAGS_fe_service_threads)
+              .queue_timeout(FLAGS_accepted_client_cnxn_timeout)
+              .Build(&http_server));
+      hs2_http_server_.reset(http_server);
+      hs2_http_server_->SetConnectionHandler(this);
+    }
   }
   LOG(INFO) << "Initialized coordinator/executor Impala server on "
       << TNetworkAddressToString(GetThriftBackendAddress());
@@ -2538,6 +2577,11 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
     RETURN_IF_ERROR(hs2_server_->Start());
     LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_server_->port();
   }
+  if (hs2_http_server_.get()) {
+    RETURN_IF_ERROR(hs2_http_server_->Start());
+    LOG(INFO) << "Impala HiveServer2 Service (HTTP) listening on "
+              << hs2_http_server_->port();
+  }
   if (beeswax_server_.get()) {
     RETURN_IF_ERROR(beeswax_server_->Start());
     LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 7d99a6e..8be9cb6 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -179,7 +179,8 @@ class ImpalaServer : public ImpalaServiceIf,
   /// ephemeral port in tests and to not start the service in a daemon. A port < 0
   /// always means to not start the service. The port values can be obtained after
   /// Start() by calling GetThriftBackendPort(), GetBeeswaxPort() or GetHS2Port().
-  Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port);
+  Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port,
+      int32_t hs2_http_port);
 
   /// Blocks until the server shuts down.
   void Join();
@@ -1032,6 +1033,15 @@ class ImpalaServer : public ImpalaServiceIf,
       // We won't have a connection context in the case of ChildQuery, which calls into
       // hiveserver2 functions directly without going through the Thrift stack.
       if (ThriftServer::HasThreadConnectionContext()) {
+        // Check that the session user matches the user authenticated on the connection.
+        const ThriftServer::Username& connection_username =
+            ThriftServer::GetThreadConnectionContext()->username;
+        if (!connection_username.empty()
+            && session_->connected_user != connection_username) {
+          return Status::Expected(TErrorCode::UNAUTHORIZED_SESSION_USER,
+              connection_username, session_->connected_user);
+        }
+
         // Try adding the session id to the connection's set of sessions in case this is
         // the first time this session has been used on this connection.
         impala_->AddSessionToConnection(session_id, session_.get());
@@ -1258,6 +1268,7 @@ class ImpalaServer : public ImpalaServiceIf,
   /// explicitly.
   boost::scoped_ptr<ThriftServer> beeswax_server_;
   boost::scoped_ptr<ThriftServer> hs2_server_;
+  boost::scoped_ptr<ThriftServer> hs2_http_server_;
   boost::scoped_ptr<ThriftServer> thrift_be_server_;
 
   /// Flag that records if backend and/or client services have been started. The flag is
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 7650126..9d44773 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -53,6 +53,7 @@ using namespace impala;
 
 DECLARE_int32(beeswax_port);
 DECLARE_int32(hs2_port);
+DECLARE_int32(hs2_http_port);
 DECLARE_int32(be_port);
 DECLARE_bool(is_coordinator);
 
@@ -84,8 +85,8 @@ int ImpaladMain(int argc, char** argv) {
   InitRpcEventTracing(exec_env.webserver(), exec_env.rpc_mgr());
 
   boost::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
-  Status status =
-      impala_server->Start(FLAGS_be_port, FLAGS_beeswax_port, FLAGS_hs2_port);
+  Status status = impala_server->Start(
+      FLAGS_be_port, FLAGS_beeswax_port, FLAGS_hs2_port, FLAGS_hs2_http_port);
   if (!status.ok()) {
     LOG(ERROR) << "Impalad services did not start correctly, exiting.  Error: "
         << status.GetDetail();
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index c09ef0b..0a58211 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -57,33 +57,36 @@ Status InProcessImpalaServer::StartWithEphemeralPorts(const string& statestore_h
       statestore_host, statestore_port);
   // Start the daemon and check if it works, if not delete the current server object and
   // pick a new set of ports
-  return (*server)->StartWithClientServers(0, 0);
+  return (*server)->StartWithClientServers(0, 0, 0);
 }
 
 InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int backend_port,
     int krpc_port, int subscriber_port, int webserver_port, const string& statestore_host,
     int statestore_port)
-    : backend_port_(backend_port),
-      beeswax_port_(0),
-      hs2_port_(0),
-      impala_server_(NULL),
-      exec_env_(new ExecEnv(backend_port, krpc_port, subscriber_port,
-          webserver_port, statestore_host, statestore_port)) {
-}
+  : backend_port_(backend_port),
+    beeswax_port_(0),
+    hs2_port_(0),
+    hs2_http_port_(0),
+    impala_server_(NULL),
+    exec_env_(new ExecEnv(backend_port, krpc_port, subscriber_port, webserver_port,
+        statestore_host, statestore_port)) {}
 
 void InProcessImpalaServer::SetCatalogIsReady() {
   DCHECK(impala_server_ != NULL) << "Call Start*() first.";
   exec_env_->frontend()->SetCatalogIsReady();
 }
 
-Status InProcessImpalaServer::StartWithClientServers(int beeswax_port, int hs2_port) {
+Status InProcessImpalaServer::StartWithClientServers(
+    int beeswax_port, int hs2_port, int hs2_http_port) {
   RETURN_IF_ERROR(exec_env_->Init());
   beeswax_port_ = beeswax_port;
   hs2_port_ = hs2_port;
+  hs2_http_port_ = hs2_http_port;
 
   impala_server_.reset(new ImpalaServer(exec_env_.get()));
   SetCatalogIsReady();
-  RETURN_IF_ERROR(impala_server_->Start(backend_port_, beeswax_port, hs2_port));
+  RETURN_IF_ERROR(
+      impala_server_->Start(backend_port_, beeswax_port, hs2_port, hs2_http_port_));
   exec_env_->scheduler()->UpdateLocalBackendAddrForBeTest();
 
   // This flag is read directly in several places to find the address of the local
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index f863650..3902520 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -59,7 +59,7 @@ class InProcessImpalaServer {
 
   /// Starts all servers, including the beeswax and hs2 client
   /// servers.
-  Status StartWithClientServers(int beeswax_port, int hs2_port);
+  Status StartWithClientServers(int beeswax_port, int hs2_port, int hs2_http_port);
 
   /// Blocks until the backend server exits. Returns Status::OK unless
   /// there was an error joining.
@@ -86,6 +86,8 @@ class InProcessImpalaServer {
 
   uint32_t hs2_port_;
 
+  uint32_t hs2_http_port_;
+
   /// The ImpalaServer that handles client and backend requests.
   boost::shared_ptr<ImpalaServer> impala_server_;
 
diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp
index a20d612..6e77b6c 100644
--- a/be/src/transport/THttpServer.cpp
+++ b/be/src/transport/THttpServer.cpp
@@ -21,7 +21,10 @@
 #include <sstream>
 #include <iostream>
 
-#include <thrift/transport/THttpServer.h>
+#include <gutil/strings/strip.h>
+#include <gutil/strings/util.h>
+
+#include "transport/THttpServer.h"
 #include <thrift/transport/TSocket.h>
 #ifdef _MSC_VER
 #include <Shlwapi.h>
@@ -33,7 +36,8 @@ namespace transport {
 
 using namespace std;
 
-THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) : THttpTransport(transport) {
+THttpServer::THttpServer(boost::shared_ptr<TTransport> transport, bool requireBasicAuth)
+  : THttpTransport(transport), requireBasicAuth_(requireBasicAuth) {
 }
 
 THttpServer::~THttpServer() {
@@ -64,6 +68,8 @@ void THttpServer::parseHeader(char* header) {
     contentLength_ = atoi(value);
   } else if (strncmp(header, "X-Forwarded-For", sz) == 0) {
     origin_ = value;
+  } else if (requireBasicAuth_ && THRIFT_strncasecmp(header, "Authorization", sz) == 0) {
+    authValue_ = string(value);
   }
 }
 
@@ -115,6 +121,26 @@ bool THttpServer::parseStatusLine(char* status) {
   throw TTransportException(string("Bad Status (unsupported method): ") + status);
 }
 
+void THttpServer::headersDone() {
+  if (requireBasicAuth_) {
+    bool authorized = false;
+    if (authValue_ != "") {
+      StripWhiteSpace(&authValue_);
+      string base64;
+      if (TryStripPrefixString(authValue_, "Basic ", &base64)) {
+        if (authFn_(base64.c_str())) {
+          authorized = true;
+        }
+      }
+    }
+    if (!authorized) {
+      returnUnauthorized();
+      throw TTransportException("HTTP Basic auth failed.");
+    }
+    authValue_ = "";
+  }
+}
+
 void THttpServer::flush() {
   // Fetch the contents of the write buffer
   uint8_t* buf;
@@ -159,6 +185,15 @@ std::string THttpServer::getTimeRFC1123() {
           broken_t->tm_sec);
   return std::string(buff);
 }
+
+void THttpServer::returnUnauthorized() {
+  std::ostringstream h;
+  h << "HTTP/1.1 401 Unauthorized" << CRLF << "Date: " << getTimeRFC1123() << CRLF
+    << "WWW-Authenticate: Basic" << CRLF << CRLF;
+  string header = h.str();
+  transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
+  transport_->flush();
+}
 }
 }
 } // apache::thrift::transport
diff --git a/be/src/transport/THttpServer.h b/be/src/transport/THttpServer.h
index a7ab944..85413c5 100644
--- a/be/src/transport/THttpServer.h
+++ b/be/src/transport/THttpServer.h
@@ -17,28 +17,54 @@
  * under the License.
  */
 
-#ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_
-#define _THRIFT_TRANSPORT_THTTPSERVER_H_ 1
+#ifndef IMPALA_TRANSPORT_THTTPSERVER_H
+#define IMPALA_TRANSPORT_THTTPSERVER_H
 
-#include <thrift/transport/THttpTransport.h>
+#include "transport/THttpTransport.h"
 
 namespace apache {
 namespace thrift {
 namespace transport {
 
+/*
+ * Implements server side work for http connections, including support for BASIC auth.
+ */
 class THttpServer : public THttpTransport {
 public:
-  THttpServer(boost::shared_ptr<TTransport> transport);
+
+  // Function that takes a base64 encoded string of the form 'username:password' and
+  // returns true if authentication is successful.
+  typedef std::function<bool(const char*)> BasicAuthFn;
+
+  THttpServer(boost::shared_ptr<TTransport> transport, bool requireBasicAuth = false);
 
   virtual ~THttpServer();
 
   virtual void flush();
 
+  void setAuthFn(const BasicAuthFn& fn) { authFn_ = fn; }
+
 protected:
   void readHeaders();
   virtual void parseHeader(char* header);
   virtual bool parseStatusLine(char* status);
+  virtual void headersDone();
   std::string getTimeRFC1123();
+  // Returns a '401 - Unauthorized' to the client.
+  void returnUnauthorized();
+
+ private:
+  static bool dummyAuthFn(const char*) { return false; }
+
+  // If true, a '401' will be returned and a TTransportException thrown unless each set
+  // of headers contains a valid 'Authorization: Basic...'.
+  bool requireBasicAuth_ = false;
+
+  // Called with the base64 encoded authorization from a 'Authorization: Basic' header.
+  BasicAuthFn authFn_ = &dummyAuthFn;
+
+  // The value from the 'Authorization' header.
+  std::string authValue_ = "";
 };
 
 /**
@@ -46,7 +72,9 @@ protected:
  */
 class THttpServerTransportFactory : public TTransportFactory {
 public:
-  THttpServerTransportFactory() {}
+
+  explicit THttpServerTransportFactory(bool requireBasicAuth = false)
+    : requireBasicAuth_(requireBasicAuth) {}
 
   virtual ~THttpServerTransportFactory() {}
 
@@ -54,11 +82,14 @@ public:
    * Wraps the transport into a buffered one.
    */
   virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
-    return boost::shared_ptr<TTransport>(new THttpServer(trans));
+    return boost::shared_ptr<TTransport>(new THttpServer(trans, requireBasicAuth_));
   }
+
+ private:
+  bool requireBasicAuth_ = false;
 };
 }
 }
 } // apache::thrift::transport
 
-#endif // #ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_
+#endif // #ifndef IMPALA_TRANSPORT_THTTPSERVER_H
diff --git a/be/src/transport/THttpTransport.cpp b/be/src/transport/THttpTransport.cpp
index a466ff6..339e02e 100644
--- a/be/src/transport/THttpTransport.cpp
+++ b/be/src/transport/THttpTransport.cpp
@@ -19,7 +19,7 @@
 
 #include <sstream>
 
-#include <thrift/transport/THttpTransport.h>
+#include "transport/THttpTransport.h"
 
 namespace apache {
 namespace thrift {
@@ -234,7 +234,7 @@ void THttpTransport::readHeaders() {
     if (strlen(line) == 0) {
       if (finished) {
         readHeaders_ = false;
-        return;
+        break;
       } else {
         // Must have been an HTTP 100, keep going for another status line
         statusLine = true;
@@ -248,6 +248,8 @@ void THttpTransport::readHeaders() {
       }
     }
   }
+
+  headersDone();
 }
 
 void THttpTransport::write(const uint8_t* buf, uint32_t len) {
diff --git a/be/src/transport/THttpTransport.h b/be/src/transport/THttpTransport.h
index a9f564c..03b2a71 100644
--- a/be/src/transport/THttpTransport.h
+++ b/be/src/transport/THttpTransport.h
@@ -17,8 +17,8 @@
  * under the License.
  */
 
-#ifndef _THRIFT_TRANSPORT_THTTPTRANSPORT_H_
-#define _THRIFT_TRANSPORT_THTTPTRANSPORT_H_ 1
+#ifndef IMPALA_TRANSPORT_THTTPTRANSPORT_H
+#define IMPALA_TRANSPORT_THTTPTRANSPORT_H
 
 #include <thrift/transport/TBufferTransports.h>
 #include <thrift/transport/TVirtualTransport.h>
@@ -58,6 +58,8 @@ public:
 
   virtual const std::string getOrigin();
 
+  boost::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }
+
 protected:
   boost::shared_ptr<TTransport> transport_;
   std::string origin_;
@@ -76,7 +78,7 @@ protected:
   uint32_t httpBufLen_;
   uint32_t httpBufSize_;
 
-  virtual void init();
+  void init();
 
   uint32_t readMoreData();
   char* readLine();
@@ -84,6 +86,9 @@ protected:
   void readHeaders();
   virtual void parseHeader(char* header) = 0;
   virtual bool parseStatusLine(char* status) = 0;
+  // Called each time we finish reading a set of headers. Allows subclasses to do
+  // verification, eg. of authorization, before proceeding.
+  virtual void headersDone() {}
 
   uint32_t readChunked();
   void readChunkedFooters();
@@ -101,4 +106,4 @@ protected:
 }
 } // apache::thrift::transport
 
-#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
+#endif // #ifndef IMPALA_TRANSPORT_THTTPCLIENT_H
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 5849ed6..455efcb 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -36,7 +36,7 @@ from subprocess import call, check_call
 from testdata.common import cgroups
 from tests.common.environ import build_flavor_timeout
 from tests.common.impala_cluster import (ImpalaCluster, DEFAULT_BEESWAX_PORT,
-    DEFAULT_HS2_PORT, DEFAULT_BE_PORT, DEFAULT_KRPC_PORT,
+    DEFAULT_HS2_PORT, DEFAULT_BE_PORT, DEFAULT_KRPC_PORT, DEFAULT_HS2_HTTP_PORT,
     DEFAULT_STATE_STORE_SUBSCRIBER_PORT, DEFAULT_IMPALAD_WEBSERVER_PORT,
     DEFAULT_STATESTORED_WEBSERVER_PORT, DEFAULT_CATALOGD_WEBSERVER_PORT,
     DEFAULT_CATALOGD_JVM_DEBUG_PORT, DEFAULT_IMPALAD_JVM_DEBUG_PORT,
@@ -201,6 +201,7 @@ def choose_impalad_ports(instance_num):
   from the argument name to the port number."""
   return {'beeswax_port': DEFAULT_BEESWAX_PORT + instance_num,
           'hs2_port': DEFAULT_HS2_PORT + instance_num,
+          'hs2_http_port': DEFAULT_HS2_HTTP_PORT + instance_num,
           'be_port': DEFAULT_BE_PORT + instance_num,
           'krpc_port': DEFAULT_KRPC_PORT + instance_num,
           'state_store_subscriber_port':
@@ -212,6 +213,7 @@ def build_impalad_port_args(instance_num):
   IMPALAD_PORTS = (
       "-beeswax_port={beeswax_port} "
       "-hs2_port={hs2_port} "
+      "-hs2_http_port={hs2_http_port} "
       "-be_port={be_port} "
       "-krpc_port={krpc_port} "
       "-state_store_subscriber_port={state_store_subscriber_port} "
@@ -511,6 +513,7 @@ class DockerMiniClusterOperations(object):
       chosen_ports = choose_impalad_ports(i)
       port_map = {DEFAULT_BEESWAX_PORT: chosen_ports['beeswax_port'],
                   DEFAULT_HS2_PORT: chosen_ports['hs2_port'],
+                  DEFAULT_HS2_HTTP_PORT: chosen_ports['hs2_http_port'],
                   DEFAULT_IMPALAD_WEBSERVER_PORT: chosen_ports['webserver_port']}
       self.__run_container__("impalad_coord_exec", impalad_arg_lists[i], port_map, i,
           mem_limit=mem_limit)
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index cc1a38f..a21d195 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -411,6 +411,9 @@ error_codes = (
 
   ("DISCONNECTED_SESSION_CLOSED", 135,
    "Session closed because it has no active connections"),
+
+  ("UNAUTHORIZED_SESSION_USER", 136,
+   "The user authorized on the connection '$0' does not match the session username '$1'"),
 )
 
 import sys
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 4437325..5fac28e 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -902,6 +902,46 @@
     "key": "impala.thrift-server.hiveserver2-frontend.timedout-cnxn-requests"
   },
   {
+    "description": "The number of active HiveServer2 HTTP API connections to this Impala Daemon.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Active Connections",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.connections-in-use"
+  },
+  {
+    "description": "The total number of HiveServer2 HTTP API connections made to this Impala Daemon over its lifetime.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Total Connections",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.total-connections"
+  },
+  {
+    "description": "The number of HiveServer2 HTTP API connections to this Impala Daemon that have been accepted and are waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Connections Queued for Setup",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.connection-setup-queue-size"
+  },
+  {
+    "description": "The number of HiveServer2 HTTP API connection requests to this Impala Daemon that have been timed out waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Connection Requests Timed Out",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.timedout-cnxn-requests"
+  },
+  {
     "description": "The amount of memory freed by the last memory tracker garbage collection.",
     "contexts": [
       "IMPALAD"
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
new file mode 100644
index 0000000..cbc7ab5
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.customcluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.directory.server.core.annotations.CreateDS;
+import org.apache.directory.server.core.annotations.CreatePartition;
+import org.apache.directory.server.annotations.CreateLdapServer;
+import org.apache.directory.server.annotations.CreateTransport;
+import org.apache.directory.server.core.annotations.ApplyLdifFiles;
+import org.apache.directory.server.core.integ.CreateLdapServerRule;
+import org.apache.hive.service.rpc.thrift.*;
+import org.apache.thrift.transport.THttpClient;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+@CreateDS(name = "myDS",
+    partitions = { @CreatePartition(name = "test", suffix = "dc=myorg,dc=com") })
+@CreateLdapServer(
+    transports = { @CreateTransport(protocol = "LDAP", address = "localhost") })
+@ApplyLdifFiles({"users.ldif"})
+/**
+ * Tests that hiveserver2 operations over the http interface work as expected when
+ * ldap authentication is being used.
+ */
+public class LdapHS2Test {
+  @ClassRule
+  public static CreateLdapServerRule serverRule = new CreateLdapServerRule();
+
+  @Before
+  public void setUp() throws Exception {
+    String uri =
+        String.format("ldap://localhost:%s", serverRule.getLdapServer().getPort());
+    String dn = "cn=#UID,ou=Users,dc=myorg,dc=com";
+    String ldapArgs = String.format("--enable_ldap_auth --ldap_uri='%s' "
+        + "--ldap_bind_pattern='%s' --ldap_passwords_in_clear_ok", uri, dn);
+    int ret = CustomClusterRunner.StartImpalaCluster(ldapArgs);
+    assertEquals(ret, 0);
+  }
+
+  static void verifySuccess(TStatus status) throws Exception {
+    if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS
+        || status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS) {
+      return;
+    }
+    throw new Exception(status.toString());
+  }
+
+  /**
+   * Executes 'query' and fetches the results. Expects there to be exactly one string
+   * returned, which be be equal to 'expectedResult'.
+   */
+  static TOperationHandle execAndFetch(TCLIService.Iface client,
+      TSessionHandle sessionHandle, String query, String expectedResult)
+      throws Exception {
+    TExecuteStatementReq execReq = new TExecuteStatementReq(sessionHandle, query);
+    TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
+    verifySuccess(execResp.getStatus());
+
+    TFetchResultsReq fetchReq = new TFetchResultsReq(
+        execResp.getOperationHandle(), TFetchOrientation.FETCH_NEXT, 1000);
+    TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
+    verifySuccess(fetchResp.getStatus());
+    List<TColumn> columns = fetchResp.getResults().getColumns();
+    assertEquals(columns.size(), 1);
+    assertEquals(columns.get(0).getStringVal().getValues().get(0), expectedResult);
+
+    return execResp.getOperationHandle();
+  }
+
+  /**
+   * Tests LDAP authentication to the HTTP hiveserver2 endpoint.
+   */
+  @Test
+  public void testHiveserver2() throws Exception {
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+    // Authenticate as 'Test1Ldap' with password '12345'
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'Test1Ldap'.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    // Running a query should succeed.
+    TOperationHandle operationHandle = execAndFetch(
+        client, openResp.getSessionHandle(), "select logged_in_user()", "Test1Ldap");
+
+    // Authenticate as 'Test2Ldap' with password 'abcde'
+    headers.put("Authorization", "Basic VGVzdDJMZGFwOmFiY2Rl");
+    transport.setCustomHeaders(headers);
+    String expectedError = "The user authorized on the connection 'Test2Ldap' does not "
+        + "match the session username 'Test1Ldap'\n";
+    try {
+      // Run a query which should fail.
+      execAndFetch(client, openResp.getSessionHandle(), "select 1", "1");
+      fail("Expected error: " + expectedError);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(expectedError));
+    }
+
+    // Try to cancel the first query, which should fail.
+    TCancelOperationReq cancelReq = new TCancelOperationReq(operationHandle);
+    TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
+    assertEquals(cancelResp.getStatus().getStatusCode(), TStatusCode.ERROR_STATUS);
+    assertEquals(cancelResp.getStatus().getErrorMessage(), expectedError);
+
+    // Open another session which will get username 'Test2Ldap'.
+    TOpenSessionReq openReq2 = new TOpenSessionReq();
+    TOpenSessionResp openResp2 = client.OpenSession(openReq);
+    // Running a query with the new session should succeed.
+    execAndFetch(
+        client, openResp2.getSessionHandle(), "select logged_in_user()", "Test2Ldap");
+
+    // Attempt to authenticate with some bad headers:
+    // - invalid username/password combination
+    // - invalid base64 encoded value
+    // - Invalid mechanism
+    for (String authStr : new String[] {"Basic VGVzdDJMZGFwOjEyMzQ1",
+             "Basic invalid-base64", "Negotiate VGVzdDFMZGFwOjEyMzQ1"}) {
+      // Attempt to authenticate with an invalid password.
+      headers.put("Authorization", authStr);
+      transport.setCustomHeaders(headers);
+      try {
+        TOpenSessionReq openReq3 = new TOpenSessionReq();
+        TOpenSessionResp openResp3 = client.OpenSession(openReq);
+        fail("Exception exception.");
+      } catch (Exception e) {
+        assertEquals(e.getMessage(), "HTTP Response code: 401");
+      }
+    }
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapJdbcTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapJdbcTest.java
index 8c77cc4..c841b0b 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapJdbcTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapJdbcTest.java
@@ -33,8 +33,8 @@ import org.apache.directory.server.annotations.CreateTransport;
 import org.apache.directory.server.core.annotations.ApplyLdifFiles;
 import org.apache.directory.server.core.integ.CreateLdapServerRule;
 import org.apache.impala.testutil.ImpalaJdbcClient;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 
@@ -56,8 +56,10 @@ public class LdapJdbcTest extends JdbcTestBase {
   private static final String testUser_ = "Test1Ldap";
   private static final String testPassword_ = "12345";
 
-  @BeforeClass
-  public static void setUp() throws Exception {
+  public LdapJdbcTest(String connectionType) { super(connectionType); }
+
+  @Before
+  public void setUp() throws Exception {
     String uri =
         String.format("ldap://localhost:%s", serverRule.getLdapServer().getPort());
     String dn = "cn=#UID,ou=Users,dc=myorg,dc=com";
@@ -66,13 +68,13 @@ public class LdapJdbcTest extends JdbcTestBase {
     int ret = CustomClusterRunner.StartImpalaCluster(ldapArgs);
     assertEquals(ret, 0);
 
-    con_ =
-        createConnection(ImpalaJdbcClient.getLdapConnectionStr(testUser_, testPassword_));
+    con_ = createConnection(
+        ImpalaJdbcClient.getLdapConnectionStr(connectionType_, testUser_, testPassword_));
   }
 
-  @AfterClass
-  public static void cleanUp() throws Exception {
-    JdbcTestBase.cleanUp();
+  @After
+  public void cleanUp() throws Exception {
+    super.cleanUp();
     CustomClusterRunner.StartImpalaCluster();
   }
 
@@ -87,16 +89,16 @@ public class LdapJdbcTest extends JdbcTestBase {
   @Test
   public void testFailedConnection() throws Exception {
     try {
-      Connection con = createConnection(
-          ImpalaJdbcClient.getLdapConnectionStr(testUser_, "invalid-password"));
+      Connection con = createConnection(ImpalaJdbcClient.getLdapConnectionStr(
+          connectionType_, testUser_, "invalid-password"));
       fail("Connecting with an invalid password should throw an error.");
     } catch (SQLException e) {
       assertTrue(e.getMessage().contains("Could not open client transport"));
     }
 
     try {
-      Connection con = createConnection(
-          ImpalaJdbcClient.getLdapConnectionStr("invalid-user", testPassword_));
+      Connection con = createConnection(ImpalaJdbcClient.getLdapConnectionStr(
+          connectionType_, "invalid-user", testPassword_));
       fail("Connecting with an invalid user name should throw an error.");
     } catch (SQLException e) {
       assertTrue(e.getMessage().contains("Could not open client transport"));
diff --git a/fe/src/test/java/org/apache/impala/service/JdbcTest.java b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
index 1fbc0d0..2f4d1db 100644
--- a/fe/src/test/java/org/apache/impala/service/JdbcTest.java
+++ b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
@@ -37,7 +37,7 @@ import java.util.Map;
 import org.apache.impala.testutil.ImpalaJdbcClient;
 import org.apache.impala.util.Metrics;
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -48,9 +48,11 @@ import org.junit.Test;
  *
  */
 public class JdbcTest extends JdbcTestBase {
-  @BeforeClass
-  public static void setUp() throws Exception {
-    con_ = createConnection(ImpalaJdbcClient.getNoAuthConnectionStr());
+  public JdbcTest(String connectionType) { super(connectionType); }
+
+  @Before
+  public void setUp() throws Exception {
+    con_ = createConnection(ImpalaJdbcClient.getNoAuthConnectionStr(connectionType_));
   }
 
   @Test
@@ -554,7 +556,8 @@ public class JdbcTest extends JdbcTestBase {
     List<Long> lastTimeSessionActive = new ArrayList<>();
 
     for (int timeout : timeoutPeriods) {
-      connections.add(createConnection(ImpalaJdbcClient.getNoAuthConnectionStr()));
+      connections.add(
+          createConnection(ImpalaJdbcClient.getNoAuthConnectionStr(connectionType_)));
     }
 
     Long numOpenSessions = (Long)metrics.getMetric(
diff --git a/fe/src/test/java/org/apache/impala/service/JdbcTestBase.java b/fe/src/test/java/org/apache/impala/service/JdbcTestBase.java
index 74e309a..91e529f 100644
--- a/fe/src/test/java/org/apache/impala/service/JdbcTestBase.java
+++ b/fe/src/test/java/org/apache/impala/service/JdbcTestBase.java
@@ -31,25 +31,40 @@ import org.apache.impala.analysis.Parser;
 import org.apache.impala.analysis.StatementBase;
 import org.apache.impala.testutil.ImpalaJdbcClient;
 import org.junit.After;
-import org.junit.AfterClass;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runner.RunWith;
 
 import com.google.common.collect.Lists;
 
 /**
  * Base class providing utility functions for tests that need a Jdbc connection.
  */
+@RunWith(Parameterized.class)
 public abstract class JdbcTestBase {
-  protected static Connection con_;
+  protected String connectionType_;
+  protected Connection con_;
 
   // Test-local list of test tables. These are cleaned up in @After.
   protected final List<String> testTableNames_ = Lists.newArrayList();
 
+  public JdbcTestBase(String connectionType) { connectionType_ = connectionType; }
+
+  @Parameterized.Parameters
+  public static String[] createConnections() {
+    return new String[] {"binary", "http"};
+  }
+
   /**
-   * Closes 'con_'. Any subclasses that specify their own 'AfterClass' will need to call
-   * this function there.
+   * Closes 'con_'. Any subclasses that specify their own 'After' will need to call this
+   * function there.
    */
-  @AfterClass
-  public static void cleanUp() throws Exception {
+  @After
+  public void cleanUp() throws Exception {
+    for (String tableName : testTableNames_) {
+      dropTestTable(tableName);
+    }
+
     con_.close();
     assertTrue("Connection should be closed", con_.isClosed());
 
@@ -112,11 +127,4 @@ public abstract class JdbcTestBase {
       stmt.close();
     }
   }
-
-  @After
-  public void testCleanUp() throws SQLException {
-    for (String tableName : testTableNames_) {
-      dropTestTable(tableName);
-    }
-  }
 }
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java b/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
index 32b3e77..01a4b2f 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
@@ -60,10 +60,17 @@ public class ImpalaJdbcClient {
 
   private final static String LDAP_AUTH_SPEC = ";user=%s;password=%s";
 
-  // The default connection string connects to localhost at the default hs2_port without
-  // Sasl.
-  private final static String DEFAULT_CONNECTION_STRING =
-      "jdbc:hive2://localhost:21050/default";
+  // Connects with HTTP as the transport.
+  private final static String HTTP_TRANSPORT_SPEC = ";transportMode=http";
+
+  // HiveServer2 compatible ports on coordinator for BINARY and HTTP based transports.
+  private final static int HS2_BINARY_PORT = 21050;
+  private final static int HS2_HTTP_PORT = 28000;
+
+  // The default connection string template to connect to localhost on a given port
+  // number.
+  private final static String DEFAULT_CONNECTION_TEMPLATE =
+      "jdbc:hive2://localhost:%d/default";
 
   private final String driverName_;
   private final String connString_;
@@ -139,23 +146,35 @@ public class ImpalaJdbcClient {
   }
 
   public static ImpalaJdbcClient createClientUsingHiveJdbcDriver() {
-    return new ImpalaJdbcClient(HIVE_SERVER2_DRIVER_NAME, getNoAuthConnectionStr());
+    return new ImpalaJdbcClient(
+        HIVE_SERVER2_DRIVER_NAME, getNoAuthConnectionStr("binary"));
   }
 
   public static ImpalaJdbcClient createClientUsingHiveJdbcDriver(String connString) {
     return new ImpalaJdbcClient(HIVE_SERVER2_DRIVER_NAME, connString);
   }
 
-  public static String getNoAuthConnectionStr() {
-    return getConnectionStr(NOSASL_AUTH_SPEC);
+  public static ImpalaJdbcClient createHttpClientUsingHiveJdbcDriver() {
+    return new ImpalaJdbcClient(HIVE_SERVER2_DRIVER_NAME, getNoAuthConnectionStr("http"));
+  }
+
+  public static String getNoAuthConnectionStr(String connType) {
+    return getConnectionStr(connType, NOSASL_AUTH_SPEC);
   }
 
-  public static String getLdapConnectionStr(String username, String password) {
-    return getConnectionStr(String.format(LDAP_AUTH_SPEC, username, password));
+  public static String getLdapConnectionStr(
+      String connType, String username, String password) {
+    return getConnectionStr(connType, String.format(LDAP_AUTH_SPEC, username, password));
   }
 
-  private static String getConnectionStr(String authStr) {
-    return DEFAULT_CONNECTION_STRING + authStr;
+  private static String getConnectionStr(String connType, String authStr) {
+    String connString = DEFAULT_CONNECTION_TEMPLATE + authStr;
+    if (connType == "binary") {
+      return String.format(connString, HS2_BINARY_PORT);
+    } else {
+      Preconditions.checkState(connType == "http");
+      return String.format(connString + HTTP_TRANSPORT_SPEC, HS2_HTTP_PORT);
+    }
   }
 
   /**
diff --git a/fe/src/test/resources/users.ldif b/fe/src/test/resources/users.ldif
index 93ced04..0415efb 100644
--- a/fe/src/test/resources/users.ldif
+++ b/fe/src/test/resources/users.ldif
@@ -22,4 +22,14 @@ objectClass: top
 cn: Test1Ldap
 sn: Ldap
 uid: ldaptest1
-userPassword: 12345
\ No newline at end of file
+userPassword: 12345
+
+dn: cn=Test2Ldap,ou=Users,dc=myorg,dc=com
+objectClass: inetOrgPerson
+objectClass: organizationalPerson
+objectClass: person
+objectClass: top
+cn: Test2Ldap
+sn: Ldap
+uid: ldaptest2
+userPassword: abcde
\ No newline at end of file
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 4b45979..967b66d 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -50,6 +50,7 @@ START_DAEMON_PATH = os.path.join(IMPALA_HOME, 'bin/start-daemon.sh')
 
 DEFAULT_BEESWAX_PORT = 21000
 DEFAULT_HS2_PORT = 21050
+DEFAULT_HS2_HTTP_PORT = 28000
 DEFAULT_BE_PORT = 22000
 DEFAULT_KRPC_PORT = 27000
 DEFAULT_CATALOG_SERVICE_PORT = 26000