You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/05/30 17:29:52 UTC

[impala] branch master updated (a7b8c1e -> cce709a)

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from a7b8c1e  IMPALA-8049: [DOCS] Ranger authz support in impala
     new 9e13fd7  IMPALA-8538 (part 1) Copied THttp(Server|Transport) from thrift-0.9.3
     new b1cb879  IMPALA-1653: Don't close hiveserver2 session when connection is closed
     new 487547e  IMPALA-6042: Allow Impala shell to use a global impalarc config
     new cce709a  IMPALA-8538: HS2 + HTTP(S) + BASIC/LDAP based thrift server endpoint

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/rpc/auth-provider.h                         |  54 ++++-
 be/src/rpc/authentication-test.cc                  |  30 +--
 be/src/rpc/authentication.cc                       | 143 ++++++++---
 be/src/rpc/thrift-server.cc                        |  29 ++-
 be/src/rpc/thrift-server.h                         |  46 +++-
 be/src/service/impala-beeswax-server.cc            |  36 +--
 be/src/service/impala-hs2-server.cc                |  29 ++-
 be/src/service/impala-server.cc                    | 175 +++++++++++---
 be/src/service/impala-server.h                     |  67 ++++-
 be/src/service/impalad-main.cc                     |   5 +-
 be/src/testutil/in-process-servers.cc              |  23 +-
 be/src/testutil/in-process-servers.h               |   4 +-
 be/src/transport/CMakeLists.txt                    |   4 +-
 be/src/transport/THttpServer.cpp                   | 199 +++++++++++++++
 be/src/transport/THttpServer.h                     |  95 ++++++++
 be/src/transport/THttpTransport.cpp                | 269 +++++++++++++++++++++
 be/src/transport/THttpTransport.h                  | 109 +++++++++
 bin/rat_exclude_files.txt                          |   1 +
 bin/start-impala-cluster.py                        |   5 +-
 common/thrift/generate_error_codes.py              |   6 +
 common/thrift/metrics.json                         |  40 +++
 .../apache/impala/customcluster/LdapHS2Test.java   | 159 ++++++++++++
 .../apache/impala/customcluster/LdapJdbcTest.java  |  28 ++-
 .../java/org/apache/impala/service/JdbcTest.java   |  13 +-
 .../org/apache/impala/service/JdbcTestBase.java    |  34 ++-
 .../apache/impala/testutil/ImpalaJdbcClient.java   |  41 +++-
 fe/src/test/resources/users.ldif                   |  12 +-
 shell/impala_shell.py                              |  66 +++--
 shell/impala_shell_config_defaults.py              |   5 +-
 shell/option_parser.py                             |   6 +-
 tests/common/impala_cluster.py                     |   1 +
 tests/common/impala_connection.py                  |   6 +
 tests/custom_cluster/test_hs2.py                   | 103 ++++++++
 tests/hs2/test_hs2.py                              |  57 +++--
 tests/shell/good_impalarc2                         |   7 +
 tests/shell/impalarc_with_query_options            |   3 +
 tests/shell/test_shell_commandline.py              |  44 +++-
 tests/shell/test_shell_interactive.py              |   2 +
 tests/shell/util.py                                |  10 +-
 39 files changed, 1710 insertions(+), 256 deletions(-)
 create mode 100644 be/src/transport/THttpServer.cpp
 create mode 100644 be/src/transport/THttpServer.h
 create mode 100644 be/src/transport/THttpTransport.cpp
 create mode 100644 be/src/transport/THttpTransport.h
 create mode 100644 fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
 create mode 100644 tests/custom_cluster/test_hs2.py
 create mode 100644 tests/shell/good_impalarc2


[impala] 04/04: IMPALA-8538: HS2 + HTTP(S) + BASIC/LDAP based thrift server endpoint

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cce709a6e37a9e615a093232f9b0dded8c8b5828
Author: Bharath Vissapragada <bh...@cloudera.com>
AuthorDate: Tue Apr 16 16:02:20 2019 -0700

    IMPALA-8538: HS2 + HTTP(S) + BASIC/LDAP based thrift server endpoint
    
    This patch adds an additional hiveserver2 endpoint for clients to
    connect to that uses HTTP. The endpoint can be disabled by setting
    --hs2_http_port=0. HTTP(S) also works when external TLS is
    enabled using --ssl_server_certificate.
    
    Thrift's http transport is modified to support BASIC authentication
    via ldap. For convenience of developing and reviewing, this patch
    is based on another that copied THttpServer and THttpTransport into
    Impala's codebase. Kerberos authentication is not supported, so the
    http endpoint is turned off if Kerberos is enabled and LDAP isn't.
    
    TODO
    =====
    - Fuzz test the http endpoint
    - Add tests for LDAP + HTTPS
    
    Testing
    =======
    - Parameterized JdbcTest and LdapJdbcTest to work for HS2 + HTTP mode
    - Added LdapHS2Test, which directly calls into the Hiveserver2
      interface using a thrift http client.
    
    Manual testing with Beeline client (from Apache Hive), which has
    builtin support to connect to HTTP(S) based HS2 compatible endpoints.
    
    Example
    ========
    
    -- HTTP mode:
    > start-impala-cluster.py
    > JDBC_URL="jdbc:hive2://localhost:<port>/default;transportMode=http"
    > beeline -u "$JDBC_URL"
    
    -- HTTPS mode:
    > cd $IMPALA_HOME
    > SSL_ARGS="--ssl_client_ca_certificate=./be/src/testutil/server-cert.pem \
        --ssl_server_certificate=./be/src/testutil/server-cert.pem \
        --ssl_private_key=./be/src/testutil/server-key.pem --hostname=localhost"
    > start-impala-cluster.py --impalad_args="$SSL_ARGS" \
        --catalogd_args="$SSL_ARGS" --state_store_args="$SSL_ARGS"
    - Create a local trust store using 'keytool' and import the certificate
    from server-cert.pem (./clientkeystore in the example).
    > JDBC_URL="jdbc:hive2://localhost:<port>/default;ssl=true;sslTrustStore= \
        ./clientkeystore;trustStorePassword=password;transportMode=http"
    > beeline -u "$JDBC_URL"
    
    -- BASIC Auth with LDAP:
    > LDAP_ARGS="--enable_ldap_auth --ldap_uri='ldap://...' \
        --ldap_bind_pattern='...' --ldap_passwords_in_clear_ok"
    > start-impala-cluster.py --impalad_args="$LDAP_ARGS"
    > JDBC_URL="jdbc:hive2://localhost:28000/default;user=...;password=\
        ...;transportMode=http"
    > beeline -u "$JDBC_URL"
    
    -- HTTPS mode with LDAP:
    > start-impala-cluster.py --impalad_args="$LDAP_ARGS $SSL_ARGS" \
        --catalogd_args="$SSL_ARGS" --state_store_args="$SSL_ARGS"
    > JDBC_URL="jdbc:hive2://localhost:28000/default;user=...;password=\
        ...;ssl=true;sslTrustStore=./clientkeystore;trustStorePassword=\
        password;transportMode=http"
    > beeline -u "$JDBC_URL"
    
    Change-Id: Ic5569ac62ef3af2868b5d0581f5029dac736b2ff
    Reviewed-on: http://gerrit.cloudera.org:8080/13299
    Reviewed-by: Thomas Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/auth-provider.h                         |  54 +++++--
 be/src/rpc/authentication-test.cc                  |  30 ++--
 be/src/rpc/authentication.cc                       | 143 ++++++++++++++----
 be/src/rpc/thrift-server.cc                        |  26 ++--
 be/src/rpc/thrift-server.h                         |  43 ++++--
 be/src/service/impala-server.cc                    |  50 ++++++-
 be/src/service/impala-server.h                     |  13 +-
 be/src/service/impalad-main.cc                     |   5 +-
 be/src/testutil/in-process-servers.cc              |  23 +--
 be/src/testutil/in-process-servers.h               |   4 +-
 be/src/transport/THttpServer.cpp                   |  39 ++++-
 be/src/transport/THttpServer.h                     |  45 +++++-
 be/src/transport/THttpTransport.cpp                |   6 +-
 be/src/transport/THttpTransport.h                  |  13 +-
 bin/start-impala-cluster.py                        |   5 +-
 common/thrift/generate_error_codes.py              |   3 +
 common/thrift/metrics.json                         |  40 ++++++
 .../apache/impala/customcluster/LdapHS2Test.java   | 159 +++++++++++++++++++++
 .../apache/impala/customcluster/LdapJdbcTest.java  |  28 ++--
 .../java/org/apache/impala/service/JdbcTest.java   |  13 +-
 .../org/apache/impala/service/JdbcTestBase.java    |  34 +++--
 .../apache/impala/testutil/ImpalaJdbcClient.java   |  41 ++++--
 fe/src/test/resources/users.ldif                   |  12 +-
 tests/common/impala_cluster.py                     |   1 +
 24 files changed, 683 insertions(+), 147 deletions(-)

diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index f3180d5..d575aef 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -23,6 +23,7 @@
 #include <sasl/sasl.h>
 
 #include "common/status.h"
+#include "rpc/thrift-server.h"
 #include "util/promise.h"
 
 namespace sasl { class TSasl; }
@@ -39,33 +40,45 @@ class AuthProvider {
   virtual Status Start() WARN_UNUSED_RESULT = 0;
 
   /// Creates a new Thrift transport factory in the out parameter that performs
-  /// authorisation per this provider's protocol.
+  /// authorisation per this provider's protocol. The top-level transport returned by
+  /// 'factory' must always be a TBufferedTransport, but depending on the AuthProvider
+  /// implementation and the value of 'underlying_transport_type', that may be wrapped
+  /// around another transport type, eg. a TSaslServerTransport.
   virtual Status GetServerTransportFactory(
+      ThriftServer::TransportType underlying_transport_type,
       boost::shared_ptr<apache::thrift::transport::TTransportFactory>* factory)
       WARN_UNUSED_RESULT = 0;
 
   /// Called by Thrift clients to wrap a raw transport with any intermediate transport
   /// that an auth protocol requires.
+  /// TODO: Return the correct clients for HTTP base transport. At this point, no clients
+  /// for HTTP endpoints are internal to the Impala service, so it should be OK.
   virtual Status WrapClientTransport(const std::string& hostname,
       boost::shared_ptr<apache::thrift::transport::TTransport> raw_transport,
       const std::string& service_name,
       boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport)
       WARN_UNUSED_RESULT = 0;
 
+  /// Setup 'connection_ptr' to get its username from 'underlying_transport'.
+  virtual void SetupConnectionContext(
+      const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
+      ThriftServer::TransportType underlying_transport_type,
+      apache::thrift::transport::TTransport* underlying_transport) = 0;
+
   /// Returns true if this provider uses Sasl at the transport layer.
-  virtual bool is_sasl() = 0;
+  virtual bool is_secure() = 0;
 
   virtual ~AuthProvider() { }
 };
 
-/// If either (or both) Kerberos and LDAP auth are desired, we use Sasl for the
-/// communication.  This "wraps" the underlying communication, in thrift-speak.
-/// This is used for both client and server contexts; there is one for internal
-/// and one for external communication.
-class SaslAuthProvider : public AuthProvider {
+/// Used if either (or both) Kerberos and LDAP auth are desired. For BINARY connections we
+/// use Sasl for the communication, and for HTTP connections we use BASIC auth.  This
+/// "wraps" the underlying communication, in thrift-speak. This is used for both client
+/// and server contexts; there is one for internal and one for external communication.
+class SecureAuthProvider : public AuthProvider {
  public:
-  SaslAuthProvider(bool is_internal) : has_ldap_(false), is_internal_(is_internal),
-      needs_kinit_(false) {}
+  SecureAuthProvider(bool is_internal)
+    : has_ldap_(false), is_internal_(is_internal), needs_kinit_(false) {}
 
   /// Performs initialization of external state. Kinit if configured to use kerberos.
   /// If we're using ldap, set up appropriate certificate usage.
@@ -86,9 +99,19 @@ class SaslAuthProvider : public AuthProvider {
   /// Then presto! You've got authentication for the connection.
   /// This is only applicable to Thrift connections and not KRPC connections.
   virtual Status GetServerTransportFactory(
+      ThriftServer::TransportType underlying_transport_type,
       boost::shared_ptr<apache::thrift::transport::TTransportFactory>* factory);
 
-  virtual bool is_sasl() { return true; }
+  /// IF sasl was used, the username will be available from the handshake, and we set it
+  /// here. If HTTP BASIC auth was used, the username won't be available until the first
+  /// packet is received, so w register a callback with the transport that will set the
+  /// username then.
+  virtual void SetupConnectionContext(
+      const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
+      ThriftServer::TransportType underlying_transport_type,
+      apache::thrift::transport::TTransport* underlying_transport);
+
+  virtual bool is_secure() { return true; }
 
   /// Initializes kerberos items and checks for sanity.  Failures can occur on a
   /// malformed principal or when setting some environment variables.  Called
@@ -152,6 +175,7 @@ class NoAuthProvider : public AuthProvider {
   virtual Status Start() { return Status::OK(); }
 
   virtual Status GetServerTransportFactory(
+      ThriftServer::TransportType underlying_transport_type,
       boost::shared_ptr<apache::thrift::transport::TTransportFactory>* factory);
 
   virtual Status WrapClientTransport(const std::string& hostname,
@@ -159,7 +183,15 @@ class NoAuthProvider : public AuthProvider {
       const std::string& service_name,
       boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport);
 
-  virtual bool is_sasl() { return false; }
+  /// If there is no auth, then we don't have a username available.
+  virtual void SetupConnectionContext(
+      const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
+      ThriftServer::TransportType underlying_transport_type,
+      apache::thrift::transport::TTransport* underlying_transport) {
+    connection_ptr->username = "";
+  }
+
+  virtual bool is_secure() { return false; }
 };
 
 /// The first entry point to the authentication subsystem.  Performs initialization
diff --git a/be/src/rpc/authentication-test.cc b/be/src/rpc/authentication-test.cc
index 0dacdee..7b5198e 100644
--- a/be/src/rpc/authentication-test.cc
+++ b/be/src/rpc/authentication-test.cc
@@ -54,7 +54,7 @@ int SaslAuthorizeInternal(sasl_conn_t* conn, void* context,
 TEST(Auth, PrincipalSubstitution) {
   string hostname;
   ASSERT_OK(GetHostname(&hostname));
-  SaslAuthProvider sa(false);  // false means it's external
+  SecureAuthProvider sa(false); // false means it's external
 
   FLAGS_principal = "service_name/_HOST@some.realm";
   string principal;
@@ -70,20 +70,20 @@ TEST(Auth, PrincipalSubstitution) {
   FLAGS_principal.clear();
 }
 
-void AuthOk(const string& name, SaslAuthProvider* sa) {
+void AuthOk(const string& name, SecureAuthProvider* sa) {
   EXPECT_EQ(SASL_OK,
       SaslAuthorizeInternal(NULL, (void*)sa, name.c_str(), name.size(), NULL, 0, NULL, 0,
           NULL));
 }
 
-void AuthFails(const string& name, SaslAuthProvider* sa) {
+void AuthFails(const string& name, SecureAuthProvider* sa) {
   EXPECT_EQ(SASL_BADAUTH,
       SaslAuthorizeInternal(NULL, (void*)sa, name.c_str(), name.size(), NULL, 0, NULL, 0,
           NULL));
 }
 
 TEST(Auth, AuthorizeInternalPrincipals) {
-  SaslAuthProvider sa(true);  // false means it's external
+  SecureAuthProvider sa(true); // false means it's external
   ASSERT_OK(sa.InitKerberos("service_name/localhost@some.realm", "/etc/hosts"));
 
   AuthOk("service_name/localhost@some.realm", &sa);
@@ -116,7 +116,7 @@ TEST(Auth, ValidAuthProviders) {
 // Set up ldap flags and ensure we make the appropriate auth providers
 TEST(Auth, LdapAuth) {
   AuthProvider* ap = NULL;
-  SaslAuthProvider* sa = NULL;
+  SecureAuthProvider* sa = NULL;
 
   FLAGS_enable_ldap_auth = true;
   FLAGS_ldap_uri = "ldaps://bogus.com";
@@ -126,20 +126,20 @@ TEST(Auth, LdapAuth) {
 
   // External auth provider is sasl, ldap, but not kerberos
   ap = AuthManager::GetInstance()->GetExternalAuthProvider();
-  ASSERT_TRUE(ap->is_sasl());
-  sa = dynamic_cast<SaslAuthProvider*>(ap);
+  ASSERT_TRUE(ap->is_secure());
+  sa = dynamic_cast<SecureAuthProvider*>(ap);
   ASSERT_TRUE(sa->has_ldap());
   ASSERT_EQ("", sa->principal());
 
   // Internal auth provider isn't sasl.
   ap = AuthManager::GetInstance()->GetInternalAuthProvider();
-  ASSERT_FALSE(ap->is_sasl());
+  ASSERT_FALSE(ap->is_secure());
 }
 
 // Set up ldap and kerberos flags and ensure we make the appropriate auth providers
 TEST(Auth, LdapKerbAuth) {
   AuthProvider* ap = NULL;
-  SaslAuthProvider* sa = NULL;
+  SecureAuthProvider* sa = NULL;
 
   if ((env_keytab == NULL) || (env_princ == NULL)) {
     return;     // In a non-kerberized environment
@@ -154,15 +154,15 @@ TEST(Auth, LdapKerbAuth) {
 
   // External auth provider is sasl, ldap, and kerberos
   ap = AuthManager::GetInstance()->GetExternalAuthProvider();
-  ASSERT_TRUE(ap->is_sasl());
-  sa = dynamic_cast<SaslAuthProvider*>(ap);
+  ASSERT_TRUE(ap->is_secure());
+  sa = dynamic_cast<SecureAuthProvider*>(ap);
   ASSERT_TRUE(sa->has_ldap());
   ASSERT_EQ(FLAGS_principal, sa->principal());
 
   // Internal auth provider is sasl and kerberos
   ap = AuthManager::GetInstance()->GetInternalAuthProvider();
-  ASSERT_TRUE(ap->is_sasl());
-  sa = dynamic_cast<SaslAuthProvider*>(ap);
+  ASSERT_TRUE(ap->is_secure());
+  sa = dynamic_cast<SecureAuthProvider*>(ap);
   ASSERT_FALSE(sa->has_ldap());
   ASSERT_EQ(FLAGS_principal, sa->principal());
 }
@@ -176,10 +176,10 @@ TEST(Auth, KerbAndSslEnabled) {
   FLAGS_ssl_server_certificate = "some_path";
   FLAGS_ssl_private_key = "some_path";
   ASSERT_TRUE(IsInternalTlsConfigured());
-  SaslAuthProvider sa_internal(true);
+  SecureAuthProvider sa_internal(true);
   ASSERT_OK(
       sa_internal.InitKerberos("service_name/_HOST@some.realm", "/etc/hosts"));
-  SaslAuthProvider sa_external(false);
+  SecureAuthProvider sa_external(false);
   ASSERT_OK(
       sa_external.InitKerberos("service_name/_HOST@some.realm", "/etc/hosts"));
 }
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index 072ccdf..addedd9 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -24,6 +24,8 @@
 #include <boost/random/mersenne_twister.hpp>
 #include <boost/random/uniform_int.hpp>
 #include <boost/filesystem.hpp>
+#include <gutil/casts.h>
+#include <gutil/strings/escaping.h>
 #include <gutil/strings/substitute.h>
 #include <random>
 #include <string>
@@ -40,6 +42,7 @@
 #include "kudu/security/init.h"
 #include "rpc/auth-provider.h"
 #include "rpc/thrift-server.h"
+#include "transport/THttpServer.h"
 #include "transport/TSaslClientTransport.h"
 #include "util/auth-util.h"
 #include "util/debug-util.h"
@@ -62,6 +65,7 @@ using boost::algorithm::trim;
 using boost::mt19937;
 using boost::uniform_int;
 using namespace apache::thrift;
+using namespace apache::thrift::transport;
 using namespace boost::filesystem;   // for is_regular(), is_absolute()
 using namespace strings;
 
@@ -192,18 +196,14 @@ static int SaslLogCallback(void* context, int level, const char* message) {
 // Use --ldap_ca_certificate to specify the location of the certificate used to confirm
 // the authenticity of the LDAP server certificate.
 //
-// conn: The Sasl connection struct, which we ignore
-// context: Ignored; always NULL
 // user: The username to authenticate
 // pass: The password to use
 // passlen: The length of pass
-// propctx: Ignored - properties requested
-// Return: SASL_OK on success, SASL_FAIL otherwise
-int SaslLdapCheckPass(sasl_conn_t* conn, void* context, const char* user,
-    const char* pass, unsigned passlen, struct propctx* propctx) {
+// Return: true on success, false otherwise
+static bool LdapCheckPass(const char* user, const char* pass, unsigned passlen) {
   if (passlen == 0 && !FLAGS_ldap_allow_anonymous_binds) {
     // Disable anonymous binds.
-    return SASL_FAIL;
+    return false;
   }
 
   LDAP* ld;
@@ -211,7 +211,7 @@ int SaslLdapCheckPass(sasl_conn_t* conn, void* context, const char* user,
   if (rc != LDAP_SUCCESS) {
     LOG(WARNING) << "Could not initialize connection with LDAP server ("
                  << FLAGS_ldap_uri << "). Error: " << ldap_err2string(rc);
-    return SASL_FAIL;
+    return false;
   }
 
   // Force the LDAP version to 3 to make sure TLS is supported.
@@ -227,7 +227,7 @@ int SaslLdapCheckPass(sasl_conn_t* conn, void* context, const char* user,
       LOG(WARNING) << "Could not start TLS secure connection to LDAP server ("
                    << FLAGS_ldap_uri << "). Error: " << ldap_err2string(tls_rc);
       ldap_unbind_ext(ld, NULL, NULL);
-      return SASL_FAIL;
+      return false;
     }
     VLOG(2) << "Started TLS connection with LDAP server: " << FLAGS_ldap_uri;
   }
@@ -260,12 +260,27 @@ int SaslLdapCheckPass(sasl_conn_t* conn, void* context, const char* user,
   if (rc != LDAP_SUCCESS) {
     LOG(WARNING) << "LDAP authentication failure for " << user_str
                  << " : " << ldap_err2string(rc);
-    return SASL_FAIL;
+    return false;
   }
 
   VLOG_QUERY << "LDAP bind successful";
 
-  return SASL_OK;
+  return true;
+}
+
+// Wrapper around the function we use to check passwords with LDAP which converts the
+// return value to something appropriate for SASL.
+//
+// conn: The Sasl connection struct, which we ignore
+// context: Ignored; always NULL
+// user: The username to authenticate
+// pass: The password to use
+// passlen: The length of pass
+// propctx: Ignored - properties requested
+// Return: SASL_OK on success, SASL_FAIL otherwise
+int SaslLdapCheckPass(sasl_conn_t* conn, void* context, const char* user,
+    const char* pass, unsigned passlen, struct propctx* propctx) {
+  return LdapCheckPass(user, pass, passlen) ? SASL_OK : SASL_FAIL;
 }
 
 // Sasl wants a way to ask us about some options, this function provides
@@ -409,14 +424,14 @@ int SaslAuthorizeInternal(sasl_conn_t* conn, void* context,
               << "<service>/<hostname>@<realm> - got: " << requested_user;
     return SASL_BADAUTH;
   }
-  SaslAuthProvider* internal_auth_provider;
+  SecureAuthProvider* internal_auth_provider;
   if (context == NULL) {
-    internal_auth_provider = static_cast<SaslAuthProvider*>(
+    internal_auth_provider = static_cast<SecureAuthProvider*>(
         AuthManager::GetInstance()->GetInternalAuthProvider());
   } else {
     // Branch should only be taken for testing, where context is used to inject an auth
     // provider.
-    internal_auth_provider = static_cast<SaslAuthProvider*>(context);
+    internal_auth_provider = static_cast<SecureAuthProvider*>(context);
   }
 
   vector<string> whitelist;
@@ -481,6 +496,30 @@ static int SaslGetPath(void* context, const char** path) {
   return SASL_OK;
 }
 
+bool BasicAuth(ThriftServer::ConnectionContext* connection_context, const char* base64) {
+  int64_t in_len = strlen(base64);
+  string decoded;
+  if (!Base64Unescape(base64, in_len, &decoded)) {
+    LOG(ERROR) << "Failed to decode base64 auth string from: "
+               << TNetworkAddressToString(connection_context->network_address);
+    return false;
+  }
+  std::size_t colon = decoded.find(':');
+  if (colon == std::string::npos) {
+    LOG(ERROR) << "Auth string must be in the form '<username>:<password>' from: "
+               << TNetworkAddressToString(connection_context->network_address);
+    return false;
+  }
+  string username = decoded.substr(0, colon);
+  string password = decoded.substr(colon + 1);
+  bool ret = LdapCheckPass(username.c_str(), password.c_str(), password.length());
+  if (ret) {
+    // Authenication was successful, so set the username on the connection.
+    connection_context->username = username;
+  }
+  return ret;
+}
+
 namespace {
 
 // SASL requires mutexes for thread safety, but doesn't implement
@@ -658,9 +697,8 @@ Status CheckReplayCacheDirPermissions() {
   return Status::OK();
 }
 
-Status SaslAuthProvider::InitKerberos(const string& principal,
-    const string& keytab_file) {
-
+Status SecureAuthProvider::InitKerberos(
+    const string& principal, const string& keytab_file) {
   principal_ = principal;
   keytab_file_ = keytab_file;
   // The logic here is that needs_kinit_ is false unless we are the internal
@@ -782,7 +820,7 @@ Status AuthManager::InitKerberosEnv() {
   return Status::OK();
 }
 
-Status SaslAuthProvider::Start() {
+Status SecureAuthProvider::Start() {
   // True for kerberos internal use
   if (needs_kinit_) {
     DCHECK(is_internal_);
@@ -825,10 +863,26 @@ Status SaslAuthProvider::Start() {
   return Status::OK();
 }
 
-Status SaslAuthProvider::GetServerTransportFactory(
+Status SecureAuthProvider::GetServerTransportFactory(
+    ThriftServer::TransportType underlying_transport_type,
     boost::shared_ptr<TTransportFactory>* factory) {
   DCHECK(!principal_.empty() || has_ldap_);
 
+  if (underlying_transport_type == ThriftServer::HTTP) {
+    // TODO: add support for SPNEGO style of HTTP auth
+    if (!principal_.empty() && !has_ldap_) {
+      const string err = "Kerberos not yet supported with HTTP transport.";
+      LOG(ERROR) << err;
+      return Status(err);
+    }
+
+    factory->reset(new ThriftServer::BufferedTransportFactory(
+        ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
+        new THttpServerTransportFactory(/* requireBasicAuth */ true)));
+    return Status::OK();
+  }
+
+  DCHECK(underlying_transport_type == ThriftServer::BINARY);
   // This is the heart of the link between this file and thrift.  Here we
   // associate a Sasl mechanism with our callbacks.
   try {
@@ -863,10 +917,9 @@ Status SaslAuthProvider::GetServerTransportFactory(
   return Status::OK();
 }
 
-Status SaslAuthProvider::WrapClientTransport(const string& hostname,
+Status SecureAuthProvider::WrapClientTransport(const string& hostname,
     boost::shared_ptr<TTransport> raw_transport, const string& service_name,
     boost::shared_ptr<TTransport>* wrapped_transport) {
-
   boost::shared_ptr<sasl::TSasl> sasl_client;
   const map<string, string> props; // Empty; unused by thrift
   const string auth_id; // Empty; unused by thrift
@@ -894,10 +947,46 @@ Status SaslAuthProvider::WrapClientTransport(const string& hostname,
   return Status::OK();
 }
 
+void SecureAuthProvider::SetupConnectionContext(
+    const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
+    ThriftServer::TransportType underlying_transport_type,
+    TTransport* underlying_transport) {
+  switch (underlying_transport_type) {
+    case ThriftServer::BINARY: {
+      TSaslServerTransport* sasl_transport =
+          down_cast<TSaslServerTransport*>(underlying_transport);
+
+      // Get the username from the transport.
+      connection_ptr->username = sasl_transport->getUsername();
+      break;
+    }
+    case ThriftServer::HTTP: {
+      THttpServer* http_transport = down_cast<THttpServer*>(underlying_transport);
+      http_transport->setAuthFn(
+          std::bind(BasicAuth, connection_ptr.get(), std::placeholders::_1));
+      break;
+    }
+    default:
+      LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
+  }
+}
+
 Status NoAuthProvider::GetServerTransportFactory(
+    ThriftServer::TransportType underlying_transport_type,
     boost::shared_ptr<TTransportFactory>* factory) {
   // No Sasl - yawn.  Here, have a regular old buffered transport.
-  factory->reset(new ThriftServer::BufferedTransportFactory());
+  switch (underlying_transport_type) {
+    case ThriftServer::BINARY:
+      factory->reset(new ThriftServer::BufferedTransportFactory());
+      break;
+    case ThriftServer::HTTP:
+      factory->reset(new ThriftServer::BufferedTransportFactory(
+          ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
+          new THttpServerTransportFactory()));
+      break;
+    default:
+      LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
+  }
   return Status::OK();
 }
 
@@ -1006,8 +1095,8 @@ Status AuthManager::Init() {
   // the client side, this is just a check for the "back end" kerberos
   // principal.
   if (use_kerberos) {
-    SaslAuthProvider* sap = NULL;
-    internal_auth_provider_.reset(sap = new SaslAuthProvider(true));
+    SecureAuthProvider* sap = NULL;
+    internal_auth_provider_.reset(sap = new SecureAuthProvider(true));
     RETURN_IF_ERROR(sap->InitKerberos(kerberos_internal_principal,
         FLAGS_keytab_file));
     LOG(INFO) << "Internal communication is authenticated with Kerberos";
@@ -1018,11 +1107,11 @@ Status AuthManager::Init() {
   RETURN_IF_ERROR(internal_auth_provider_->Start());
 
   // Set up the external auth provider as per above.  Either a "front end"
-  // principal or ldap tells us to use a SaslAuthProvider, and we fill in
+  // principal or ldap tells us to use a SecureAuthProvider, and we fill in
   // details from there.
   if (use_ldap || use_kerberos) {
-    SaslAuthProvider* sap = NULL;
-    external_auth_provider_.reset(sap = new SaslAuthProvider(false));
+    SecureAuthProvider* sap = NULL;
+    external_auth_provider_.reset(sap = new SecureAuthProvider(false));
     if (use_kerberos) {
       RETURN_IF_ERROR(sap->InitKerberos(kerberos_external_principal,
           FLAGS_keytab_file));
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index ccf1aff..aa9a088 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -38,6 +38,7 @@
 #include "rpc/authentication.h"
 #include "rpc/thrift-server.h"
 #include "rpc/thrift-thread.h"
+#include "transport/THttpServer.h"
 #include "util/condition-variable.h"
 #include "util/debug-util.h"
 #include "util/metrics.h"
@@ -259,15 +260,19 @@ void* ThriftServer::ThriftServerEventProcessor::createContext(
       boost::shared_ptr<ConnectionContext>(new ConnectionContext);
   TTransport* underlying_transport =
       (static_cast<TBufferedTransport*>(transport))->getUnderlyingTransport().get();
-  if (!thrift_server_->auth_provider_->is_sasl()) {
-    socket = static_cast<TSocket*>(underlying_transport);
-  } else {
+
+  thrift_server_->auth_provider_->SetupConnectionContext(
+      connection_ptr, thrift_server_->transport_type_, underlying_transport);
+  if (thrift_server_->auth_provider_->is_secure()
+      && thrift_server_->transport_type_ == ThriftServer::BINARY) {
     TSaslServerTransport* sasl_transport = static_cast<TSaslServerTransport*>(
         underlying_transport);
-
-    // Get the username from the transport.
-    connection_ptr->username = sasl_transport->getUsername();
     socket = static_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
+  } else if (thrift_server_->transport_type_ == ThriftServer::HTTP) {
+    THttpServer* http_transport = static_cast<THttpServer*>(underlying_transport);
+    socket = static_cast<TSocket*>(http_transport->getUnderlyingTransport().get());
+  } else {
+    socket = static_cast<TSocket*>(underlying_transport);
   }
 
   {
@@ -324,7 +329,8 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext
 
 ThriftServer::ThriftServer(const string& name,
     const boost::shared_ptr<TProcessor>& processor, int port, AuthProvider* auth_provider,
-    MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms)
+    MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms,
+    TransportType transport_type)
   : started_(false),
     port_(port),
     ssl_enabled_(false),
@@ -335,7 +341,8 @@ ThriftServer::ThriftServer(const string& name,
     processor_(processor),
     connection_handler_(NULL),
     metrics_(NULL),
-    auth_provider_(auth_provider) {
+    auth_provider_(auth_provider),
+    transport_type_(transport_type) {
   if (auth_provider_ == NULL) {
     auth_provider_ = AuthManager::GetInstance()->GetInternalAuthProvider();
   }
@@ -483,7 +490,8 @@ Status ThriftServer::Start() {
   boost::shared_ptr<TServerSocket> server_socket;
   boost::shared_ptr<TTransportFactory> transport_factory;
   RETURN_IF_ERROR(CreateSocket(&server_socket));
-  RETURN_IF_ERROR(auth_provider_->GetServerTransportFactory(&transport_factory));
+  RETURN_IF_ERROR(
+      auth_provider_->GetServerTransportFactory(transport_type_, &transport_factory));
 
   server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory,
       protocol_factory, thread_factory, name_, max_concurrent_connections_,
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 4afa040..764fbf2 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -44,24 +44,36 @@ class AuthProvider;
 /// TODO: shutdown is buggy (which only harms tests)
 class ThriftServer {
  public:
-  /// Transport factory that procuess buffered transports with a customisable
-  /// buffer-size. A larger buffer is usually more efficient, as it allows the underlying
-  /// transports to perform fewer system calls.
+  /// Transport factory that wraps transports in a buffered transport with a customisable
+  /// buffer-size and optionally in another transport from a provided factory. A larger
+  /// buffer is usually more efficient, as it allows the underlying transports to perform
+  /// fewer system calls.
   class BufferedTransportFactory :
       public apache::thrift::transport::TBufferedTransportFactory {
    public:
     static const int DEFAULT_BUFFER_SIZE_BYTES = 128 * 1024;
 
-    BufferedTransportFactory(uint32_t buffer_size = DEFAULT_BUFFER_SIZE_BYTES) :
-        buffer_size_(buffer_size) { }
+    BufferedTransportFactory(uint32_t buffer_size = DEFAULT_BUFFER_SIZE_BYTES,
+        apache::thrift::transport::TTransportFactory* wrapped_factory =
+            new apache::thrift::transport::TTransportFactory())
+      : buffer_size_(buffer_size), wrapped_factory_(wrapped_factory) {}
 
     virtual boost::shared_ptr<apache::thrift::transport::TTransport> getTransport(
         boost::shared_ptr<apache::thrift::transport::TTransport> trans) {
+      boost::shared_ptr<apache::thrift::transport::TTransport> wrapped =
+          wrapped_factory_->getTransport(trans);
       return boost::shared_ptr<apache::thrift::transport::TTransport>(
-          new apache::thrift::transport::TBufferedTransport(trans, buffer_size_));
+          new apache::thrift::transport::TBufferedTransport(wrapped, buffer_size_));
     }
    private:
     uint32_t buffer_size_;
+    std::unique_ptr<apache::thrift::transport::TTransportFactory> wrapped_factory_;
+  };
+
+  /// Transport implementation used by the thrift server.
+  enum TransportType {
+    BINARY, // Thrift bytes over default transport.
+    HTTP, // Thrift bytes over HTTP transport.
   };
 
   /// Username.
@@ -151,7 +163,8 @@ class ThriftServer {
   ThriftServer(const std::string& name,
       const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port,
       AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr,
-      int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0);
+      int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0,
+      TransportType server_transport = TransportType::BINARY);
 
   /// Enables secure access over SSL. Must be called before Start(). The first three
   /// arguments are the minimum SSL/TLS version, and paths to certificate and private key
@@ -244,6 +257,9 @@ class ThriftServer {
   /// Not owned by us, owned by the AuthManager
   AuthProvider* auth_provider_;
 
+  /// Underlying transport type used by this thrift server.
+  TransportType transport_type_;
+
   /// Helper class which monitors starting servers. Needs access to internal members, and
   /// is not used outside of this class.
   class ThriftServerEventProcessor;
@@ -311,13 +327,20 @@ class ThriftServerBuilder {
     return *this;
   }
 
+  /// Sets the underlying transport type for the thrift server.
+  ThriftServerBuilder& transport_type(ThriftServer::TransportType transport_type) {
+    server_transport_type_ = transport_type;
+    return *this;
+  }
+
   /// Constructs a new ThriftServer and puts it in 'server', if construction was
   /// successful, returns an error otherwise. In the error case, 'server' will not have
   /// been set and will not need to be freed, otherwise the caller assumes ownership of
   /// '*server'.
   Status Build(ThriftServer** server) {
-    std::unique_ptr<ThriftServer> ptr(new ThriftServer(name_, processor_, port_,
-        auth_provider_, metrics_, max_concurrent_connections_, queue_timeout_ms_));
+    std::unique_ptr<ThriftServer> ptr(
+        new ThriftServer(name_, processor_, port_, auth_provider_, metrics_,
+            max_concurrent_connections_, queue_timeout_ms_, server_transport_type_));
     if (enable_ssl_) {
       RETURN_IF_ERROR(ptr->EnableSsl(
           version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
@@ -332,6 +355,8 @@ class ThriftServerBuilder {
   std::string name_;
   boost::shared_ptr<apache::thrift::TProcessor> processor_;
   int port_ = 0;
+  ThriftServer::TransportType server_transport_type_ =
+      ThriftServer::TransportType::BINARY;
 
   AuthProvider* auth_provider_ = nullptr;
   MetricGroup* metrics_ = nullptr;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 09fa5c7..80416ab 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -67,6 +67,7 @@
 #include "service/impala-internal-service.h"
 #include "service/client-request-state.h"
 #include "service/frontend.h"
+#include "util/auth-util.h"
 #include "util/bit-util.h"
 #include "util/container-util.h"
 #include "util/debug-util.h"
@@ -124,12 +125,15 @@ DECLARE_string(authorized_proxy_group_config);
 DECLARE_string(authorized_proxy_group_config_delimiter);
 DECLARE_bool(abort_on_config_error);
 DECLARE_bool(disk_spill_encryption);
+DECLARE_bool(enable_ldap_auth);
 DECLARE_bool(use_local_catalog);
 
 DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served."
     "If 0 or less, the Beeswax server is not started.");
 DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 client requests are served."
     "If 0 or less, the HiveServer2 server is not started.");
+DEFINE_int32(hs2_http_port, 28000, "port on which HiveServer2 HTTP(s) client "
+    "requests are served. If 0 or less, the HiveServer2 http server is not started.");
 
 DEFINE_int32(fe_service_threads, 64,
     "number of threads available to serve client requests");
@@ -288,6 +292,7 @@ const uint32_t MAX_CANCELLATION_QUEUE_SIZE = 65536;
 
 const string BEESWAX_SERVER_NAME = "beeswax-frontend";
 const string HS2_SERVER_NAME = "hiveserver2-frontend";
+const string HS2_HTTP_SERVER_NAME = "hiveserver2-http-frontend";
 
 const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000";
 const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
@@ -2106,7 +2111,8 @@ void ImpalaServer::ConnectionEnd(
       }
     }
   } else {
-    DCHECK_EQ(connection_context.server_name, HS2_SERVER_NAME);
+    DCHECK(connection_context.server_name ==  HS2_SERVER_NAME
+        || connection_context.server_name == HS2_HTTP_SERVER_NAME);
     for (const TUniqueId& session_id : disconnected_sessions) {
       shared_ptr<SessionState> state;
       Status status = GetSessionState(session_id, &state);
@@ -2414,8 +2420,8 @@ void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status) {
   crs->set_expired();
 }
 
-Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
-   int32_t hs2_port) {
+Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port,
+    int32_t hs2_http_port) {
   exec_env_->SetImpalaServer(this);
 
   // We must register the HTTP handlers after registering the ImpalaServer with the
@@ -2524,6 +2530,39 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
       hs2_server_.reset(server);
       hs2_server_->SetConnectionHandler(this);
     }
+
+    // We can't currently secure the http server with Kerberos, only with LDAP, so if
+    // Kerberos is enabled and LDAP isn't we automatically disable the http server.
+    if ((hs2_http_port > 0 && (!IsKerberosEnabled() || FLAGS_enable_ldap_auth))
+        || (TestInfo::is_test() && hs2_http_port == 0)) {
+      boost::shared_ptr<TProcessor> hs2_http_processor(
+          new ImpalaHiveServer2ServiceProcessor(handler));
+      boost::shared_ptr<TProcessorEventHandler> event_handler(
+          new RpcEventHandler("hs2_http", exec_env_->metrics()));
+      hs2_http_processor->setEventHandler(event_handler);
+
+      ThriftServer* http_server;
+      ThriftServerBuilder http_builder(
+          HS2_HTTP_SERVER_NAME, hs2_http_processor, hs2_http_port);
+      if (IsExternalTlsConfigured()) {
+        LOG(INFO) << "Enabling SSL for HiveServer2 HTTP endpoint.";
+        http_builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
+            .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
+            .ssl_version(ssl_version)
+            .cipher_list(FLAGS_ssl_cipher_list);
+      }
+
+      RETURN_IF_ERROR(
+          http_builder
+              .auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
+              .transport_type(ThriftServer::TransportType::HTTP)
+              .metrics(exec_env_->metrics())
+              .max_concurrent_connections(FLAGS_fe_service_threads)
+              .queue_timeout(FLAGS_accepted_client_cnxn_timeout)
+              .Build(&http_server));
+      hs2_http_server_.reset(http_server);
+      hs2_http_server_->SetConnectionHandler(this);
+    }
   }
   LOG(INFO) << "Initialized coordinator/executor Impala server on "
       << TNetworkAddressToString(GetThriftBackendAddress());
@@ -2538,6 +2577,11 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
     RETURN_IF_ERROR(hs2_server_->Start());
     LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_server_->port();
   }
+  if (hs2_http_server_.get()) {
+    RETURN_IF_ERROR(hs2_http_server_->Start());
+    LOG(INFO) << "Impala HiveServer2 Service (HTTP) listening on "
+              << hs2_http_server_->port();
+  }
   if (beeswax_server_.get()) {
     RETURN_IF_ERROR(beeswax_server_->Start());
     LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 7d99a6e..8be9cb6 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -179,7 +179,8 @@ class ImpalaServer : public ImpalaServiceIf,
   /// ephemeral port in tests and to not start the service in a daemon. A port < 0
   /// always means to not start the service. The port values can be obtained after
   /// Start() by calling GetThriftBackendPort(), GetBeeswaxPort() or GetHS2Port().
-  Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port);
+  Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port,
+      int32_t hs2_http_port);
 
   /// Blocks until the server shuts down.
   void Join();
@@ -1032,6 +1033,15 @@ class ImpalaServer : public ImpalaServiceIf,
       // We won't have a connection context in the case of ChildQuery, which calls into
       // hiveserver2 functions directly without going through the Thrift stack.
       if (ThriftServer::HasThreadConnectionContext()) {
+        // Check that the session user matches the user authenticated on the connection.
+        const ThriftServer::Username& connection_username =
+            ThriftServer::GetThreadConnectionContext()->username;
+        if (!connection_username.empty()
+            && session_->connected_user != connection_username) {
+          return Status::Expected(TErrorCode::UNAUTHORIZED_SESSION_USER,
+              connection_username, session_->connected_user);
+        }
+
         // Try adding the session id to the connection's set of sessions in case this is
         // the first time this session has been used on this connection.
         impala_->AddSessionToConnection(session_id, session_.get());
@@ -1258,6 +1268,7 @@ class ImpalaServer : public ImpalaServiceIf,
   /// explicitly.
   boost::scoped_ptr<ThriftServer> beeswax_server_;
   boost::scoped_ptr<ThriftServer> hs2_server_;
+  boost::scoped_ptr<ThriftServer> hs2_http_server_;
   boost::scoped_ptr<ThriftServer> thrift_be_server_;
 
   /// Flag that records if backend and/or client services have been started. The flag is
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 7650126..9d44773 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -53,6 +53,7 @@ using namespace impala;
 
 DECLARE_int32(beeswax_port);
 DECLARE_int32(hs2_port);
+DECLARE_int32(hs2_http_port);
 DECLARE_int32(be_port);
 DECLARE_bool(is_coordinator);
 
@@ -84,8 +85,8 @@ int ImpaladMain(int argc, char** argv) {
   InitRpcEventTracing(exec_env.webserver(), exec_env.rpc_mgr());
 
   boost::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
-  Status status =
-      impala_server->Start(FLAGS_be_port, FLAGS_beeswax_port, FLAGS_hs2_port);
+  Status status = impala_server->Start(
+      FLAGS_be_port, FLAGS_beeswax_port, FLAGS_hs2_port, FLAGS_hs2_http_port);
   if (!status.ok()) {
     LOG(ERROR) << "Impalad services did not start correctly, exiting.  Error: "
         << status.GetDetail();
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index c09ef0b..0a58211 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -57,33 +57,36 @@ Status InProcessImpalaServer::StartWithEphemeralPorts(const string& statestore_h
       statestore_host, statestore_port);
   // Start the daemon and check if it works, if not delete the current server object and
   // pick a new set of ports
-  return (*server)->StartWithClientServers(0, 0);
+  return (*server)->StartWithClientServers(0, 0, 0);
 }
 
 InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int backend_port,
     int krpc_port, int subscriber_port, int webserver_port, const string& statestore_host,
     int statestore_port)
-    : backend_port_(backend_port),
-      beeswax_port_(0),
-      hs2_port_(0),
-      impala_server_(NULL),
-      exec_env_(new ExecEnv(backend_port, krpc_port, subscriber_port,
-          webserver_port, statestore_host, statestore_port)) {
-}
+  : backend_port_(backend_port),
+    beeswax_port_(0),
+    hs2_port_(0),
+    hs2_http_port_(0),
+    impala_server_(NULL),
+    exec_env_(new ExecEnv(backend_port, krpc_port, subscriber_port, webserver_port,
+        statestore_host, statestore_port)) {}
 
 void InProcessImpalaServer::SetCatalogIsReady() {
   DCHECK(impala_server_ != NULL) << "Call Start*() first.";
   exec_env_->frontend()->SetCatalogIsReady();
 }
 
-Status InProcessImpalaServer::StartWithClientServers(int beeswax_port, int hs2_port) {
+Status InProcessImpalaServer::StartWithClientServers(
+    int beeswax_port, int hs2_port, int hs2_http_port) {
   RETURN_IF_ERROR(exec_env_->Init());
   beeswax_port_ = beeswax_port;
   hs2_port_ = hs2_port;
+  hs2_http_port_ = hs2_http_port;
 
   impala_server_.reset(new ImpalaServer(exec_env_.get()));
   SetCatalogIsReady();
-  RETURN_IF_ERROR(impala_server_->Start(backend_port_, beeswax_port, hs2_port));
+  RETURN_IF_ERROR(
+      impala_server_->Start(backend_port_, beeswax_port, hs2_port, hs2_http_port_));
   exec_env_->scheduler()->UpdateLocalBackendAddrForBeTest();
 
   // This flag is read directly in several places to find the address of the local
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index f863650..3902520 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -59,7 +59,7 @@ class InProcessImpalaServer {
 
   /// Starts all servers, including the beeswax and hs2 client
   /// servers.
-  Status StartWithClientServers(int beeswax_port, int hs2_port);
+  Status StartWithClientServers(int beeswax_port, int hs2_port, int hs2_http_port);
 
   /// Blocks until the backend server exits. Returns Status::OK unless
   /// there was an error joining.
@@ -86,6 +86,8 @@ class InProcessImpalaServer {
 
   uint32_t hs2_port_;
 
+  uint32_t hs2_http_port_;
+
   /// The ImpalaServer that handles client and backend requests.
   boost::shared_ptr<ImpalaServer> impala_server_;
 
diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp
index a20d612..6e77b6c 100644
--- a/be/src/transport/THttpServer.cpp
+++ b/be/src/transport/THttpServer.cpp
@@ -21,7 +21,10 @@
 #include <sstream>
 #include <iostream>
 
-#include <thrift/transport/THttpServer.h>
+#include <gutil/strings/strip.h>
+#include <gutil/strings/util.h>
+
+#include "transport/THttpServer.h"
 #include <thrift/transport/TSocket.h>
 #ifdef _MSC_VER
 #include <Shlwapi.h>
@@ -33,7 +36,8 @@ namespace transport {
 
 using namespace std;
 
-THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) : THttpTransport(transport) {
+THttpServer::THttpServer(boost::shared_ptr<TTransport> transport, bool requireBasicAuth)
+  : THttpTransport(transport), requireBasicAuth_(requireBasicAuth) {
 }
 
 THttpServer::~THttpServer() {
@@ -64,6 +68,8 @@ void THttpServer::parseHeader(char* header) {
     contentLength_ = atoi(value);
   } else if (strncmp(header, "X-Forwarded-For", sz) == 0) {
     origin_ = value;
+  } else if (requireBasicAuth_ && THRIFT_strncasecmp(header, "Authorization", sz) == 0) {
+    authValue_ = string(value);
   }
 }
 
@@ -115,6 +121,26 @@ bool THttpServer::parseStatusLine(char* status) {
   throw TTransportException(string("Bad Status (unsupported method): ") + status);
 }
 
+void THttpServer::headersDone() {
+  if (requireBasicAuth_) {
+    bool authorized = false;
+    if (authValue_ != "") {
+      StripWhiteSpace(&authValue_);
+      string base64;
+      if (TryStripPrefixString(authValue_, "Basic ", &base64)) {
+        if (authFn_(base64.c_str())) {
+          authorized = true;
+        }
+      }
+    }
+    if (!authorized) {
+      returnUnauthorized();
+      throw TTransportException("HTTP Basic auth failed.");
+    }
+    authValue_ = "";
+  }
+}
+
 void THttpServer::flush() {
   // Fetch the contents of the write buffer
   uint8_t* buf;
@@ -159,6 +185,15 @@ std::string THttpServer::getTimeRFC1123() {
           broken_t->tm_sec);
   return std::string(buff);
 }
+
+void THttpServer::returnUnauthorized() {
+  std::ostringstream h;
+  h << "HTTP/1.1 401 Unauthorized" << CRLF << "Date: " << getTimeRFC1123() << CRLF
+    << "WWW-Authenticate: Basic" << CRLF << CRLF;
+  string header = h.str();
+  transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
+  transport_->flush();
+}
 }
 }
 } // apache::thrift::transport
diff --git a/be/src/transport/THttpServer.h b/be/src/transport/THttpServer.h
index a7ab944..85413c5 100644
--- a/be/src/transport/THttpServer.h
+++ b/be/src/transport/THttpServer.h
@@ -17,28 +17,54 @@
  * under the License.
  */
 
-#ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_
-#define _THRIFT_TRANSPORT_THTTPSERVER_H_ 1
+#ifndef IMPALA_TRANSPORT_THTTPSERVER_H
+#define IMPALA_TRANSPORT_THTTPSERVER_H
 
-#include <thrift/transport/THttpTransport.h>
+#include "transport/THttpTransport.h"
 
 namespace apache {
 namespace thrift {
 namespace transport {
 
+/*
+ * Implements server side work for http connections, including support for BASIC auth.
+ */
 class THttpServer : public THttpTransport {
 public:
-  THttpServer(boost::shared_ptr<TTransport> transport);
+
+  // Function that takes a base64 encoded string of the form 'username:password' and
+  // returns true if authentication is successful.
+  typedef std::function<bool(const char*)> BasicAuthFn;
+
+  THttpServer(boost::shared_ptr<TTransport> transport, bool requireBasicAuth = false);
 
   virtual ~THttpServer();
 
   virtual void flush();
 
+  void setAuthFn(const BasicAuthFn& fn) { authFn_ = fn; }
+
 protected:
   void readHeaders();
   virtual void parseHeader(char* header);
   virtual bool parseStatusLine(char* status);
+  virtual void headersDone();
   std::string getTimeRFC1123();
+  // Returns a '401 - Unauthorized' to the client.
+  void returnUnauthorized();
+
+ private:
+  static bool dummyAuthFn(const char*) { return false; }
+
+  // If true, a '401' will be returned and a TTransportException thrown unless each set
+  // of headers contains a valid 'Authorization: Basic...'.
+  bool requireBasicAuth_ = false;
+
+  // Called with the base64 encoded authorization from a 'Authorization: Basic' header.
+  BasicAuthFn authFn_ = &dummyAuthFn;
+
+  // The value from the 'Authorization' header.
+  std::string authValue_ = "";
 };
 
 /**
@@ -46,7 +72,9 @@ protected:
  */
 class THttpServerTransportFactory : public TTransportFactory {
 public:
-  THttpServerTransportFactory() {}
+
+  explicit THttpServerTransportFactory(bool requireBasicAuth = false)
+    : requireBasicAuth_(requireBasicAuth) {}
 
   virtual ~THttpServerTransportFactory() {}
 
@@ -54,11 +82,14 @@ public:
    * Wraps the transport into a buffered one.
    */
   virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
-    return boost::shared_ptr<TTransport>(new THttpServer(trans));
+    return boost::shared_ptr<TTransport>(new THttpServer(trans, requireBasicAuth_));
   }
+
+ private:
+  bool requireBasicAuth_ = false;
 };
 }
 }
 } // apache::thrift::transport
 
-#endif // #ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_
+#endif // #ifndef IMPALA_TRANSPORT_THTTPSERVER_H
diff --git a/be/src/transport/THttpTransport.cpp b/be/src/transport/THttpTransport.cpp
index a466ff6..339e02e 100644
--- a/be/src/transport/THttpTransport.cpp
+++ b/be/src/transport/THttpTransport.cpp
@@ -19,7 +19,7 @@
 
 #include <sstream>
 
-#include <thrift/transport/THttpTransport.h>
+#include "transport/THttpTransport.h"
 
 namespace apache {
 namespace thrift {
@@ -234,7 +234,7 @@ void THttpTransport::readHeaders() {
     if (strlen(line) == 0) {
       if (finished) {
         readHeaders_ = false;
-        return;
+        break;
       } else {
         // Must have been an HTTP 100, keep going for another status line
         statusLine = true;
@@ -248,6 +248,8 @@ void THttpTransport::readHeaders() {
       }
     }
   }
+
+  headersDone();
 }
 
 void THttpTransport::write(const uint8_t* buf, uint32_t len) {
diff --git a/be/src/transport/THttpTransport.h b/be/src/transport/THttpTransport.h
index a9f564c..03b2a71 100644
--- a/be/src/transport/THttpTransport.h
+++ b/be/src/transport/THttpTransport.h
@@ -17,8 +17,8 @@
  * under the License.
  */
 
-#ifndef _THRIFT_TRANSPORT_THTTPTRANSPORT_H_
-#define _THRIFT_TRANSPORT_THTTPTRANSPORT_H_ 1
+#ifndef IMPALA_TRANSPORT_THTTPTRANSPORT_H
+#define IMPALA_TRANSPORT_THTTPTRANSPORT_H
 
 #include <thrift/transport/TBufferTransports.h>
 #include <thrift/transport/TVirtualTransport.h>
@@ -58,6 +58,8 @@ public:
 
   virtual const std::string getOrigin();
 
+  boost::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }
+
 protected:
   boost::shared_ptr<TTransport> transport_;
   std::string origin_;
@@ -76,7 +78,7 @@ protected:
   uint32_t httpBufLen_;
   uint32_t httpBufSize_;
 
-  virtual void init();
+  void init();
 
   uint32_t readMoreData();
   char* readLine();
@@ -84,6 +86,9 @@ protected:
   void readHeaders();
   virtual void parseHeader(char* header) = 0;
   virtual bool parseStatusLine(char* status) = 0;
+  // Called each time we finish reading a set of headers. Allows subclasses to do
+  // verification, eg. of authorization, before proceeding.
+  virtual void headersDone() {}
 
   uint32_t readChunked();
   void readChunkedFooters();
@@ -101,4 +106,4 @@ protected:
 }
 } // apache::thrift::transport
 
-#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
+#endif // #ifndef IMPALA_TRANSPORT_THTTPCLIENT_H
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 5849ed6..455efcb 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -36,7 +36,7 @@ from subprocess import call, check_call
 from testdata.common import cgroups
 from tests.common.environ import build_flavor_timeout
 from tests.common.impala_cluster import (ImpalaCluster, DEFAULT_BEESWAX_PORT,
-    DEFAULT_HS2_PORT, DEFAULT_BE_PORT, DEFAULT_KRPC_PORT,
+    DEFAULT_HS2_PORT, DEFAULT_BE_PORT, DEFAULT_KRPC_PORT, DEFAULT_HS2_HTTP_PORT,
     DEFAULT_STATE_STORE_SUBSCRIBER_PORT, DEFAULT_IMPALAD_WEBSERVER_PORT,
     DEFAULT_STATESTORED_WEBSERVER_PORT, DEFAULT_CATALOGD_WEBSERVER_PORT,
     DEFAULT_CATALOGD_JVM_DEBUG_PORT, DEFAULT_IMPALAD_JVM_DEBUG_PORT,
@@ -201,6 +201,7 @@ def choose_impalad_ports(instance_num):
   from the argument name to the port number."""
   return {'beeswax_port': DEFAULT_BEESWAX_PORT + instance_num,
           'hs2_port': DEFAULT_HS2_PORT + instance_num,
+          'hs2_http_port': DEFAULT_HS2_HTTP_PORT + instance_num,
           'be_port': DEFAULT_BE_PORT + instance_num,
           'krpc_port': DEFAULT_KRPC_PORT + instance_num,
           'state_store_subscriber_port':
@@ -212,6 +213,7 @@ def build_impalad_port_args(instance_num):
   IMPALAD_PORTS = (
       "-beeswax_port={beeswax_port} "
       "-hs2_port={hs2_port} "
+      "-hs2_http_port={hs2_http_port} "
       "-be_port={be_port} "
       "-krpc_port={krpc_port} "
       "-state_store_subscriber_port={state_store_subscriber_port} "
@@ -511,6 +513,7 @@ class DockerMiniClusterOperations(object):
       chosen_ports = choose_impalad_ports(i)
       port_map = {DEFAULT_BEESWAX_PORT: chosen_ports['beeswax_port'],
                   DEFAULT_HS2_PORT: chosen_ports['hs2_port'],
+                  DEFAULT_HS2_HTTP_PORT: chosen_ports['hs2_http_port'],
                   DEFAULT_IMPALAD_WEBSERVER_PORT: chosen_ports['webserver_port']}
       self.__run_container__("impalad_coord_exec", impalad_arg_lists[i], port_map, i,
           mem_limit=mem_limit)
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index cc1a38f..a21d195 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -411,6 +411,9 @@ error_codes = (
 
   ("DISCONNECTED_SESSION_CLOSED", 135,
    "Session closed because it has no active connections"),
+
+  ("UNAUTHORIZED_SESSION_USER", 136,
+   "The user authorized on the connection '$0' does not match the session username '$1'"),
 )
 
 import sys
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 4437325..5fac28e 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -902,6 +902,46 @@
     "key": "impala.thrift-server.hiveserver2-frontend.timedout-cnxn-requests"
   },
   {
+    "description": "The number of active HiveServer2 HTTP API connections to this Impala Daemon.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Active Connections",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.connections-in-use"
+  },
+  {
+    "description": "The total number of HiveServer2 HTTP API connections made to this Impala Daemon over its lifetime.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Total Connections",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.total-connections"
+  },
+  {
+    "description": "The number of HiveServer2 HTTP API connections to this Impala Daemon that have been accepted and are waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Connections Queued for Setup",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.connection-setup-queue-size"
+  },
+  {
+    "description": "The number of HiveServer2 HTTP API connection requests to this Impala Daemon that have been timed out waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Connection Requests Timed Out",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.timedout-cnxn-requests"
+  },
+  {
     "description": "The amount of memory freed by the last memory tracker garbage collection.",
     "contexts": [
       "IMPALAD"
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
new file mode 100644
index 0000000..cbc7ab5
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.customcluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.directory.server.core.annotations.CreateDS;
+import org.apache.directory.server.core.annotations.CreatePartition;
+import org.apache.directory.server.annotations.CreateLdapServer;
+import org.apache.directory.server.annotations.CreateTransport;
+import org.apache.directory.server.core.annotations.ApplyLdifFiles;
+import org.apache.directory.server.core.integ.CreateLdapServerRule;
+import org.apache.hive.service.rpc.thrift.*;
+import org.apache.thrift.transport.THttpClient;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+@CreateDS(name = "myDS",
+    partitions = { @CreatePartition(name = "test", suffix = "dc=myorg,dc=com") })
+@CreateLdapServer(
+    transports = { @CreateTransport(protocol = "LDAP", address = "localhost") })
+@ApplyLdifFiles({"users.ldif"})
+/**
+ * Tests that hiveserver2 operations over the http interface work as expected when
+ * ldap authentication is being used.
+ */
+public class LdapHS2Test {
+  @ClassRule
+  public static CreateLdapServerRule serverRule = new CreateLdapServerRule();
+
+  @Before
+  public void setUp() throws Exception {
+    String uri =
+        String.format("ldap://localhost:%s", serverRule.getLdapServer().getPort());
+    String dn = "cn=#UID,ou=Users,dc=myorg,dc=com";
+    String ldapArgs = String.format("--enable_ldap_auth --ldap_uri='%s' "
+        + "--ldap_bind_pattern='%s' --ldap_passwords_in_clear_ok", uri, dn);
+    int ret = CustomClusterRunner.StartImpalaCluster(ldapArgs);
+    assertEquals(ret, 0);
+  }
+
+  static void verifySuccess(TStatus status) throws Exception {
+    if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS
+        || status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS) {
+      return;
+    }
+    throw new Exception(status.toString());
+  }
+
+  /**
+   * Executes 'query' and fetches the results. Expects there to be exactly one string
+   * returned, which be be equal to 'expectedResult'.
+   */
+  static TOperationHandle execAndFetch(TCLIService.Iface client,
+      TSessionHandle sessionHandle, String query, String expectedResult)
+      throws Exception {
+    TExecuteStatementReq execReq = new TExecuteStatementReq(sessionHandle, query);
+    TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
+    verifySuccess(execResp.getStatus());
+
+    TFetchResultsReq fetchReq = new TFetchResultsReq(
+        execResp.getOperationHandle(), TFetchOrientation.FETCH_NEXT, 1000);
+    TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
+    verifySuccess(fetchResp.getStatus());
+    List<TColumn> columns = fetchResp.getResults().getColumns();
+    assertEquals(columns.size(), 1);
+    assertEquals(columns.get(0).getStringVal().getValues().get(0), expectedResult);
+
+    return execResp.getOperationHandle();
+  }
+
+  /**
+   * Tests LDAP authentication to the HTTP hiveserver2 endpoint.
+   */
+  @Test
+  public void testHiveserver2() throws Exception {
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+    // Authenticate as 'Test1Ldap' with password '12345'
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'Test1Ldap'.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    // Running a query should succeed.
+    TOperationHandle operationHandle = execAndFetch(
+        client, openResp.getSessionHandle(), "select logged_in_user()", "Test1Ldap");
+
+    // Authenticate as 'Test2Ldap' with password 'abcde'
+    headers.put("Authorization", "Basic VGVzdDJMZGFwOmFiY2Rl");
+    transport.setCustomHeaders(headers);
+    String expectedError = "The user authorized on the connection 'Test2Ldap' does not "
+        + "match the session username 'Test1Ldap'\n";
+    try {
+      // Run a query which should fail.
+      execAndFetch(client, openResp.getSessionHandle(), "select 1", "1");
+      fail("Expected error: " + expectedError);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(expectedError));
+    }
+
+    // Try to cancel the first query, which should fail.
+    TCancelOperationReq cancelReq = new TCancelOperationReq(operationHandle);
+    TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
+    assertEquals(cancelResp.getStatus().getStatusCode(), TStatusCode.ERROR_STATUS);
+    assertEquals(cancelResp.getStatus().getErrorMessage(), expectedError);
+
+    // Open another session which will get username 'Test2Ldap'.
+    TOpenSessionReq openReq2 = new TOpenSessionReq();
+    TOpenSessionResp openResp2 = client.OpenSession(openReq);
+    // Running a query with the new session should succeed.
+    execAndFetch(
+        client, openResp2.getSessionHandle(), "select logged_in_user()", "Test2Ldap");
+
+    // Attempt to authenticate with some bad headers:
+    // - invalid username/password combination
+    // - invalid base64 encoded value
+    // - Invalid mechanism
+    for (String authStr : new String[] {"Basic VGVzdDJMZGFwOjEyMzQ1",
+             "Basic invalid-base64", "Negotiate VGVzdDFMZGFwOjEyMzQ1"}) {
+      // Attempt to authenticate with an invalid password.
+      headers.put("Authorization", authStr);
+      transport.setCustomHeaders(headers);
+      try {
+        TOpenSessionReq openReq3 = new TOpenSessionReq();
+        TOpenSessionResp openResp3 = client.OpenSession(openReq);
+        fail("Exception exception.");
+      } catch (Exception e) {
+        assertEquals(e.getMessage(), "HTTP Response code: 401");
+      }
+    }
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapJdbcTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapJdbcTest.java
index 8c77cc4..c841b0b 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapJdbcTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapJdbcTest.java
@@ -33,8 +33,8 @@ import org.apache.directory.server.annotations.CreateTransport;
 import org.apache.directory.server.core.annotations.ApplyLdifFiles;
 import org.apache.directory.server.core.integ.CreateLdapServerRule;
 import org.apache.impala.testutil.ImpalaJdbcClient;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 
@@ -56,8 +56,10 @@ public class LdapJdbcTest extends JdbcTestBase {
   private static final String testUser_ = "Test1Ldap";
   private static final String testPassword_ = "12345";
 
-  @BeforeClass
-  public static void setUp() throws Exception {
+  public LdapJdbcTest(String connectionType) { super(connectionType); }
+
+  @Before
+  public void setUp() throws Exception {
     String uri =
         String.format("ldap://localhost:%s", serverRule.getLdapServer().getPort());
     String dn = "cn=#UID,ou=Users,dc=myorg,dc=com";
@@ -66,13 +68,13 @@ public class LdapJdbcTest extends JdbcTestBase {
     int ret = CustomClusterRunner.StartImpalaCluster(ldapArgs);
     assertEquals(ret, 0);
 
-    con_ =
-        createConnection(ImpalaJdbcClient.getLdapConnectionStr(testUser_, testPassword_));
+    con_ = createConnection(
+        ImpalaJdbcClient.getLdapConnectionStr(connectionType_, testUser_, testPassword_));
   }
 
-  @AfterClass
-  public static void cleanUp() throws Exception {
-    JdbcTestBase.cleanUp();
+  @After
+  public void cleanUp() throws Exception {
+    super.cleanUp();
     CustomClusterRunner.StartImpalaCluster();
   }
 
@@ -87,16 +89,16 @@ public class LdapJdbcTest extends JdbcTestBase {
   @Test
   public void testFailedConnection() throws Exception {
     try {
-      Connection con = createConnection(
-          ImpalaJdbcClient.getLdapConnectionStr(testUser_, "invalid-password"));
+      Connection con = createConnection(ImpalaJdbcClient.getLdapConnectionStr(
+          connectionType_, testUser_, "invalid-password"));
       fail("Connecting with an invalid password should throw an error.");
     } catch (SQLException e) {
       assertTrue(e.getMessage().contains("Could not open client transport"));
     }
 
     try {
-      Connection con = createConnection(
-          ImpalaJdbcClient.getLdapConnectionStr("invalid-user", testPassword_));
+      Connection con = createConnection(ImpalaJdbcClient.getLdapConnectionStr(
+          connectionType_, "invalid-user", testPassword_));
       fail("Connecting with an invalid user name should throw an error.");
     } catch (SQLException e) {
       assertTrue(e.getMessage().contains("Could not open client transport"));
diff --git a/fe/src/test/java/org/apache/impala/service/JdbcTest.java b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
index 1fbc0d0..2f4d1db 100644
--- a/fe/src/test/java/org/apache/impala/service/JdbcTest.java
+++ b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
@@ -37,7 +37,7 @@ import java.util.Map;
 import org.apache.impala.testutil.ImpalaJdbcClient;
 import org.apache.impala.util.Metrics;
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -48,9 +48,11 @@ import org.junit.Test;
  *
  */
 public class JdbcTest extends JdbcTestBase {
-  @BeforeClass
-  public static void setUp() throws Exception {
-    con_ = createConnection(ImpalaJdbcClient.getNoAuthConnectionStr());
+  public JdbcTest(String connectionType) { super(connectionType); }
+
+  @Before
+  public void setUp() throws Exception {
+    con_ = createConnection(ImpalaJdbcClient.getNoAuthConnectionStr(connectionType_));
   }
 
   @Test
@@ -554,7 +556,8 @@ public class JdbcTest extends JdbcTestBase {
     List<Long> lastTimeSessionActive = new ArrayList<>();
 
     for (int timeout : timeoutPeriods) {
-      connections.add(createConnection(ImpalaJdbcClient.getNoAuthConnectionStr()));
+      connections.add(
+          createConnection(ImpalaJdbcClient.getNoAuthConnectionStr(connectionType_)));
     }
 
     Long numOpenSessions = (Long)metrics.getMetric(
diff --git a/fe/src/test/java/org/apache/impala/service/JdbcTestBase.java b/fe/src/test/java/org/apache/impala/service/JdbcTestBase.java
index 74e309a..91e529f 100644
--- a/fe/src/test/java/org/apache/impala/service/JdbcTestBase.java
+++ b/fe/src/test/java/org/apache/impala/service/JdbcTestBase.java
@@ -31,25 +31,40 @@ import org.apache.impala.analysis.Parser;
 import org.apache.impala.analysis.StatementBase;
 import org.apache.impala.testutil.ImpalaJdbcClient;
 import org.junit.After;
-import org.junit.AfterClass;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runner.RunWith;
 
 import com.google.common.collect.Lists;
 
 /**
  * Base class providing utility functions for tests that need a Jdbc connection.
  */
+@RunWith(Parameterized.class)
 public abstract class JdbcTestBase {
-  protected static Connection con_;
+  protected String connectionType_;
+  protected Connection con_;
 
   // Test-local list of test tables. These are cleaned up in @After.
   protected final List<String> testTableNames_ = Lists.newArrayList();
 
+  public JdbcTestBase(String connectionType) { connectionType_ = connectionType; }
+
+  @Parameterized.Parameters
+  public static String[] createConnections() {
+    return new String[] {"binary", "http"};
+  }
+
   /**
-   * Closes 'con_'. Any subclasses that specify their own 'AfterClass' will need to call
-   * this function there.
+   * Closes 'con_'. Any subclasses that specify their own 'After' will need to call this
+   * function there.
    */
-  @AfterClass
-  public static void cleanUp() throws Exception {
+  @After
+  public void cleanUp() throws Exception {
+    for (String tableName : testTableNames_) {
+      dropTestTable(tableName);
+    }
+
     con_.close();
     assertTrue("Connection should be closed", con_.isClosed());
 
@@ -112,11 +127,4 @@ public abstract class JdbcTestBase {
       stmt.close();
     }
   }
-
-  @After
-  public void testCleanUp() throws SQLException {
-    for (String tableName : testTableNames_) {
-      dropTestTable(tableName);
-    }
-  }
 }
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java b/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
index 32b3e77..01a4b2f 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
@@ -60,10 +60,17 @@ public class ImpalaJdbcClient {
 
   private final static String LDAP_AUTH_SPEC = ";user=%s;password=%s";
 
-  // The default connection string connects to localhost at the default hs2_port without
-  // Sasl.
-  private final static String DEFAULT_CONNECTION_STRING =
-      "jdbc:hive2://localhost:21050/default";
+  // Connects with HTTP as the transport.
+  private final static String HTTP_TRANSPORT_SPEC = ";transportMode=http";
+
+  // HiveServer2 compatible ports on coordinator for BINARY and HTTP based transports.
+  private final static int HS2_BINARY_PORT = 21050;
+  private final static int HS2_HTTP_PORT = 28000;
+
+  // The default connection string template to connect to localhost on a given port
+  // number.
+  private final static String DEFAULT_CONNECTION_TEMPLATE =
+      "jdbc:hive2://localhost:%d/default";
 
   private final String driverName_;
   private final String connString_;
@@ -139,23 +146,35 @@ public class ImpalaJdbcClient {
   }
 
   public static ImpalaJdbcClient createClientUsingHiveJdbcDriver() {
-    return new ImpalaJdbcClient(HIVE_SERVER2_DRIVER_NAME, getNoAuthConnectionStr());
+    return new ImpalaJdbcClient(
+        HIVE_SERVER2_DRIVER_NAME, getNoAuthConnectionStr("binary"));
   }
 
   public static ImpalaJdbcClient createClientUsingHiveJdbcDriver(String connString) {
     return new ImpalaJdbcClient(HIVE_SERVER2_DRIVER_NAME, connString);
   }
 
-  public static String getNoAuthConnectionStr() {
-    return getConnectionStr(NOSASL_AUTH_SPEC);
+  public static ImpalaJdbcClient createHttpClientUsingHiveJdbcDriver() {
+    return new ImpalaJdbcClient(HIVE_SERVER2_DRIVER_NAME, getNoAuthConnectionStr("http"));
+  }
+
+  public static String getNoAuthConnectionStr(String connType) {
+    return getConnectionStr(connType, NOSASL_AUTH_SPEC);
   }
 
-  public static String getLdapConnectionStr(String username, String password) {
-    return getConnectionStr(String.format(LDAP_AUTH_SPEC, username, password));
+  public static String getLdapConnectionStr(
+      String connType, String username, String password) {
+    return getConnectionStr(connType, String.format(LDAP_AUTH_SPEC, username, password));
   }
 
-  private static String getConnectionStr(String authStr) {
-    return DEFAULT_CONNECTION_STRING + authStr;
+  private static String getConnectionStr(String connType, String authStr) {
+    String connString = DEFAULT_CONNECTION_TEMPLATE + authStr;
+    if (connType == "binary") {
+      return String.format(connString, HS2_BINARY_PORT);
+    } else {
+      Preconditions.checkState(connType == "http");
+      return String.format(connString + HTTP_TRANSPORT_SPEC, HS2_HTTP_PORT);
+    }
   }
 
   /**
diff --git a/fe/src/test/resources/users.ldif b/fe/src/test/resources/users.ldif
index 93ced04..0415efb 100644
--- a/fe/src/test/resources/users.ldif
+++ b/fe/src/test/resources/users.ldif
@@ -22,4 +22,14 @@ objectClass: top
 cn: Test1Ldap
 sn: Ldap
 uid: ldaptest1
-userPassword: 12345
\ No newline at end of file
+userPassword: 12345
+
+dn: cn=Test2Ldap,ou=Users,dc=myorg,dc=com
+objectClass: inetOrgPerson
+objectClass: organizationalPerson
+objectClass: person
+objectClass: top
+cn: Test2Ldap
+sn: Ldap
+uid: ldaptest2
+userPassword: abcde
\ No newline at end of file
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 4b45979..967b66d 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -50,6 +50,7 @@ START_DAEMON_PATH = os.path.join(IMPALA_HOME, 'bin/start-daemon.sh')
 
 DEFAULT_BEESWAX_PORT = 21000
 DEFAULT_HS2_PORT = 21050
+DEFAULT_HS2_HTTP_PORT = 28000
 DEFAULT_BE_PORT = 22000
 DEFAULT_KRPC_PORT = 27000
 DEFAULT_CATALOG_SERVICE_PORT = 26000


[impala] 02/04: IMPALA-1653: Don't close hiveserver2 session when connection is closed

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b1cb879577f1e7ea8b6ea1e72c856eee0a582627
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Fri May 10 09:24:48 2019 -0700

    IMPALA-1653: Don't close hiveserver2 session when connection is closed
    
    Currently, when a client connection is closed, we always close any
    session started over that connection. This is a requirement for
    beeswax, which always ties sessions to connections, but it is not
    required for hiveserver2, which allows sessions to be used across
    connections with a session token.
    
    This patch changes this behavior so that hiveserver2 sessions are no
    longer closed when the corresponding connection is closed.
    
    One downside of this change is that clients may inadvertently leave
    sessions open indefinitely if they close their connection without
    calling CloseSession(), which can waste space on the coordinator.
    We already have a flag --idle_session_timeout, but this flag is off
    by default and sessions that hit this timeout are expired but not
    fully closed.
    
    Rather than changing the default idle session behavior, which could
    affect existing users, this patch mitigates this issue by adding a
    new flag: --disconnected_session_timeout which is set to 1 hour by
    default. When a session has had no open connections for longer than
    this time, it will be closed and any associated queries will be
    unregistered.
    
    Testing:
    - Added e2e tests.
    
    Change-Id: Ia4555cd9b73db5b4dde92cd4fac4f9bfa3664d78
    Reviewed-on: http://gerrit.cloudera.org:8080/13306
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/thrift-server.cc             |   3 +
 be/src/rpc/thrift-server.h              |   3 +
 be/src/service/impala-beeswax-server.cc |  36 ++++-----
 be/src/service/impala-hs2-server.cc     |  29 ++++++--
 be/src/service/impala-server.cc         | 127 +++++++++++++++++++++++---------
 be/src/service/impala-server.h          |  54 +++++++++++---
 common/thrift/generate_error_codes.py   |   3 +
 tests/common/impala_connection.py       |   6 ++
 tests/custom_cluster/test_hs2.py        | 103 ++++++++++++++++++++++++++
 tests/hs2/test_hs2.py                   |  57 +++++++++-----
 10 files changed, 331 insertions(+), 90 deletions(-)

diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 070e665..ccf1aff 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -239,6 +239,9 @@ void ThriftServer::ThriftServerEventProcessor::preServe() {
 // connection state such as the connection identifier and the username.
 __thread ThriftServer::ConnectionContext* __connection_context__;
 
+bool ThriftServer::HasThreadConnectionContext() {
+  return __connection_context__ != nullptr;
+}
 
 const TUniqueId& ThriftServer::GetThreadConnectionId() {
   return __connection_context__->connection_id;
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index e7d861b..4afa040 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -110,6 +110,9 @@ class ThriftServer {
     connection_handler_ = connection;
   }
 
+  /// Returns true if the current thread has a connection context set on it.
+  static bool HasThreadConnectionContext();
+
   /// Returns a unique identifier for the current connection. A connection is
   /// identified with the lifetime of a socket connection to this server.
   /// It is only safe to call this method during a Thrift processor RPC
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 3f0bce0..48bb26c 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -57,7 +57,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
   ScopedSessionState session_handle(this);
   shared_ptr<SessionState> session;
   RAISE_IF_ERROR(
-      session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
+      session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId(), &session),
       SQLSTATE_GENERAL_ERROR);
   TQueryCtx query_ctx;
   // raise general error for request conversion error;
@@ -93,7 +93,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
   ScopedSessionState session_handle(this);
   shared_ptr<SessionState> session;
   RAISE_IF_ERROR(
-      session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
+      session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId(), &session),
       SQLSTATE_GENERAL_ERROR);
   TQueryCtx query_ctx;
   // raise general error for request conversion error;
@@ -145,7 +145,7 @@ void ImpalaServer::explain(QueryExplanation& query_explanation, const Query& que
   VLOG_QUERY << "explain(): query=" << query.query;
   RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
 
   TQueryCtx query_ctx;
@@ -162,7 +162,7 @@ void ImpalaServer::explain(QueryExplanation& query_explanation, const Query& que
 void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle,
     const bool start_over, const int32_t fetch_size) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
 
   if (start_over) {
@@ -188,7 +188,7 @@ void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle
 void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
     const QueryHandle& handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
 
   // Convert QueryHandle to TUniqueId and get the query exec state.
@@ -232,7 +232,7 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
 
 void ImpalaServer::close(const QueryHandle& handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   TUniqueId query_id;
   QueryHandleToTUniqueId(handle, &query_id);
@@ -244,7 +244,7 @@ void ImpalaServer::close(const QueryHandle& handle) {
 
 beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   TUniqueId query_id;
   QueryHandleToTUniqueId(handle, &query_id);
@@ -267,7 +267,7 @@ beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
 
 void ImpalaServer::echo(string& echo_string, const string& input_string) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   echo_string = input_string;
 }
@@ -277,7 +277,7 @@ void ImpalaServer::clean(const LogContextId& log_context) {
 
 void ImpalaServer::get_log(string& log, const LogContextId& context) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   // LogContextId is the same as QueryHandle.id
   QueryHandle handle;
@@ -322,7 +322,7 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
 void ImpalaServer::get_default_configuration(vector<ConfigVariable> &configurations,
     const bool include_hadoop) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   configurations.insert(configurations.end(), default_configs_.begin(),
       default_configs_.end());
@@ -330,7 +330,7 @@ void ImpalaServer::get_default_configuration(vector<ConfigVariable> &configurati
 
 void ImpalaServer::dump_config(string& config) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   config = "";
 }
@@ -338,7 +338,7 @@ void ImpalaServer::dump_config(string& config) {
 void ImpalaServer::Cancel(impala::TStatus& tstatus,
     const beeswax::QueryHandle& query_handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   // Convert QueryHandle to TUniqueId and get the query exec state.
   TUniqueId query_id;
@@ -350,7 +350,7 @@ void ImpalaServer::Cancel(impala::TStatus& tstatus,
 void ImpalaServer::CloseInsert(TInsertResult& insert_result,
     const QueryHandle& query_handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   TUniqueId query_id;
   QueryHandleToTUniqueId(query_handle, &query_id);
@@ -370,8 +370,8 @@ void ImpalaServer::GetRuntimeProfile(string& profile_output, const QueryHandle&
   const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
   stringstream ss;
   shared_ptr<SessionState> session;
-  RAISE_IF_ERROR(session_handle.WithSession(session_id, &session),
-      SQLSTATE_GENERAL_ERROR);
+  RAISE_IF_ERROR(
+      session_handle.WithBeeswaxSession(session_id, &session), SQLSTATE_GENERAL_ERROR);
   if (session == NULL) {
     ss << Substitute("Invalid session id: $0", PrintId(session_id));
     RaiseBeeswaxException(ss.str(), SQLSTATE_GENERAL_ERROR);
@@ -393,8 +393,8 @@ void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
   ScopedSessionState session_handle(this);
   const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
   shared_ptr<SessionState> session;
-  RAISE_IF_ERROR(session_handle.WithSession(session_id, &session),
-      SQLSTATE_GENERAL_ERROR);
+  RAISE_IF_ERROR(
+      session_handle.WithBeeswaxSession(session_id, &session), SQLSTATE_GENERAL_ERROR);
   if (session == NULL) {
     stringstream ss;
     ss << Substitute("Invalid session id: $0", PrintId(session_id));
@@ -409,7 +409,7 @@ void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
 
 void ImpalaServer::PingImpalaService(TPingImpalaServiceResp& return_val) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
 
   VLOG_RPC << "PingImpalaService()";
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 4233f22..0ad3750 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -87,6 +87,7 @@ const TProtocolVersion::type MAX_SUPPORTED_HS2_VERSION =
 DECLARE_string(hostname);
 DECLARE_int32(webserver_port);
 DECLARE_int32(idle_session_timeout);
+DECLARE_int32(disconnected_session_timeout);
 
 namespace impala {
 
@@ -360,16 +361,17 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
       FLAGS_hostname, FLAGS_webserver_port));
   return_val.configuration.insert(make_pair("http_addr", http_addr));
 
-  // Put the session state in session_state_map_
   {
-    lock_guard<mutex> l(session_state_map_lock_);
-    session_state_map_.insert(make_pair(session_id, state));
+    lock_guard<mutex> l(connection_to_sessions_map_lock_);
+    const TUniqueId& connection_id = ThriftServer::GetThreadConnectionId();
+    connection_to_sessions_map_[connection_id].insert(session_id);
+    state->connections.insert(connection_id);
   }
 
+  // Put the session state in session_state_map_
   {
-    lock_guard<mutex> l(connection_to_sessions_map_lock_);
-    const TUniqueId& connection_id = ThriftServer::GetThreadConnectionId();
-    connection_to_sessions_map_[connection_id].push_back(session_id);
+    lock_guard<mutex> l(session_state_map_lock_);
+    session_state_map_.insert(make_pair(session_id, state));
   }
 
   ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(1);
@@ -940,5 +942,20 @@ void ImpalaServer::RenewDelegationToken(TRenewDelegationTokenResp& return_val,
   return_val.status.__set_errorMessage("Not implemented");
 }
 
+void ImpalaServer::AddSessionToConnection(
+    const TUniqueId& session_id, SessionState* session) {
+  const TUniqueId& connection_id = ThriftServer::GetThreadConnectionId();
+  {
+    boost::lock_guard<boost::mutex> l(connection_to_sessions_map_lock_);
+    connection_to_sessions_map_[connection_id].insert(session_id);
+  }
 
+  boost::lock_guard<boost::mutex> session_lock(session->lock);
+  if (session->connections.empty()) {
+    // This session was previously disconnected but now has an associated
+    // connection. It should no longer be considered for the disconnected timeout.
+    UnregisterSessionTimeout(FLAGS_disconnected_session_timeout);
+  }
+  session->connections.insert(connection_id);
+}
 }
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index f8d25b5..09fa5c7 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -209,6 +209,9 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i
     "before it is cancelled. If 0, idle queries are never expired. The query option "
     "QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents"
     " the maximum allowable timeout.");
+DEFINE_int32(disconnected_session_timeout, 15 * 60, "The time, in seconds, that a "
+    "hiveserver2 session will be maintained after the last connection that it has been "
+    "used over is disconnected.");
 
 DEFINE_int32(status_report_interval_ms, 5000, "(Advanced) Interval between profile "
     "reports in milliseconds. If set to <= 0, periodic reporting is disabled and only "
@@ -412,8 +415,8 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
   // Initialize a session expiry thread which blocks indefinitely until the first session
   // with non-zero timeout value is opened. Note that a session which doesn't specify any
   // idle session timeout value will use the default value FLAGS_idle_session_timeout.
-  ABORT_IF_ERROR(Thread::Create("impala-server", "session-expirer",
-      bind<void>(&ImpalaServer::ExpireSessions, this), &session_timeout_thread_));
+  ABORT_IF_ERROR(Thread::Create("impala-server", "session-maintenance",
+      bind<void>(&ImpalaServer::SessionMaintenance, this), &session_maintenance_thread_));
 
   ABORT_IF_ERROR(Thread::Create("impala-server", "query-expirer",
       bind<void>(&ImpalaServer::ExpireQueries, this), &query_expiration_thread_));
@@ -1382,7 +1385,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
     // TODO: deal with an error status
     discard_result(UnregisterQuery(query_id, false, &status));
   }
-  // Reconfigure the poll period of session_timeout_thread_ if necessary.
+  // Reconfigure the poll period of session_maintenance_thread_ if necessary.
   UnregisterSessionTimeout(session_state->session_timeout);
   VLOG_QUERY << "Closed session: " << PrintId(session_id);
   return Status::OK();
@@ -2046,6 +2049,7 @@ void ImpalaServer::ConnectionStart(
     session_state->network_address = connection_context.network_address;
     session_state->server_default_query_options = &default_query_options_;
     session_state->kudu_latest_observed_ts = 0;
+    session_state->connections.insert(connection_context.connection_id);
 
     // If the username was set by a lower-level transport, use it.
     if (!connection_context.username.empty()) {
@@ -2062,7 +2066,7 @@ void ImpalaServer::ConnectionStart(
     }
     {
       lock_guard<mutex> l(connection_to_sessions_map_lock_);
-      connection_to_sessions_map_[connection_context.connection_id].push_back(session_id);
+      connection_to_sessions_map_[connection_context.connection_id].insert(session_id);
     }
     ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS->Increment(1L);
   }
@@ -2070,8 +2074,7 @@ void ImpalaServer::ConnectionStart(
 
 void ImpalaServer::ConnectionEnd(
     const ThriftServer::ConnectionContext& connection_context) {
-
-  vector<TUniqueId> sessions_to_close;
+  set<TUniqueId> disconnected_sessions;
   {
     unique_lock<mutex> l(connection_to_sessions_map_lock_);
     ConnectionToSessionMap::iterator it =
@@ -2082,20 +2085,39 @@ void ImpalaServer::ConnectionEnd(
 
     // We don't expect a large number of sessions per connection, so we copy it, so that
     // we can drop the map lock early.
-    sessions_to_close = it->second;
+    disconnected_sessions = std::move(it->second);
     connection_to_sessions_map_.erase(it);
   }
 
+  bool close = connection_context.server_name == BEESWAX_SERVER_NAME
+      || FLAGS_disconnected_session_timeout <= 0;
   LOG(INFO) << "Connection from client "
             << TNetworkAddressToString(connection_context.network_address)
-            << " closed, closing " << sessions_to_close.size()
+            << " to server " << connection_context.server_name << " closed."
+            << (close ? " Closing " : " Disconnecting ") << disconnected_sessions.size()
             << " associated session(s)";
 
-  for (const TUniqueId& session_id: sessions_to_close) {
-    Status status = CloseSessionInternal(session_id, true);
-    if (!status.ok()) {
-      LOG(WARNING) << "Error closing session " << PrintId(session_id) << ": "
-                   << status.GetDetail();
+  if (close) {
+    for (const TUniqueId& session_id : disconnected_sessions) {
+      Status status = CloseSessionInternal(session_id, true);
+      if (!status.ok()) {
+        LOG(WARNING) << "Error closing session " << PrintId(session_id) << ": "
+                     << status.GetDetail();
+      }
+    }
+  } else {
+    DCHECK_EQ(connection_context.server_name, HS2_SERVER_NAME);
+    for (const TUniqueId& session_id : disconnected_sessions) {
+      shared_ptr<SessionState> state;
+      Status status = GetSessionState(session_id, &state);
+      // The session may not exist if it was explicitly closed.
+      if (!status.ok()) continue;
+      lock_guard<mutex> state_lock(state->lock);
+      state->connections.erase(connection_context.connection_id);
+      if (state->connections.empty()) {
+        state->disconnected_ms = UnixMillis();
+        RegisterSessionTimeout(FLAGS_disconnected_session_timeout);
+      }
     }
   }
 }
@@ -2118,55 +2140,88 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
   }
 }
 
-[[noreturn]] void ImpalaServer::ExpireSessions() {
+[[noreturn]] void ImpalaServer::SessionMaintenance() {
   while (true) {
     {
       unique_lock<mutex> timeout_lock(session_timeout_lock_);
       if (session_timeout_set_.empty()) {
         session_timeout_cv_.Wait(timeout_lock);
       } else {
-        // Sleep for a second before checking whether an active session can be expired.
+        // Sleep for a second before doing maintenance.
         session_timeout_cv_.WaitFor(timeout_lock, MICROS_PER_SEC);
       }
     }
 
     int64_t now = UnixMillis();
     int expired_cnt = 0;
-    VLOG(3) << "Session expiration thread waking up";
+    VLOG(3) << "Session maintenance thread waking up";
     {
       // TODO: If holding session_state_map_lock_ for the duration of this loop is too
       // expensive, consider a priority queue.
       lock_guard<mutex> map_lock(session_state_map_lock_);
-      for (SessionStateMap::value_type& session_state: session_state_map_) {
+      vector<TUniqueId> sessions_to_remove;
+      for (SessionStateMap::value_type& map_entry : session_state_map_) {
+        const TUniqueId& session_id = map_entry.first;
+        std::shared_ptr<SessionState> session_state = map_entry.second;
         unordered_set<TUniqueId> inflight_queries;
+        Status query_cancel_status;
         {
-          lock_guard<mutex> state_lock(session_state.second->lock);
-          if (session_state.second->ref_count > 0) continue;
+          lock_guard<mutex> state_lock(session_state->lock);
+          if (session_state->ref_count > 0) continue;
           // A session closed by other means is in the process of being removed, and it's
           // best not to interfere.
-          if (session_state.second->closed || session_state.second->expired) continue;
-          if (session_state.second->session_timeout == 0) continue;
-
-          int64_t last_accessed_ms = session_state.second->last_accessed_ms;
-          int64_t session_timeout_ms = session_state.second->session_timeout * 1000;
-          if (now - last_accessed_ms <= session_timeout_ms) continue;
-          LOG(INFO) << "Expiring session: " << PrintId(session_state.first) << ", user: "
-                    << session_state.second->connected_user << ", last active: "
-                    << ToStringFromUnixMillis(last_accessed_ms);
-          session_state.second->expired = true;
-          ++expired_cnt;
-          ImpaladMetrics::NUM_SESSIONS_EXPIRED->Increment(1L);
-          // Since expired is true, no more queries will be added to the inflight list.
-          inflight_queries.insert(session_state.second->inflight_queries.begin(),
-              session_state.second->inflight_queries.end());
+          if (session_state->closed) continue;
+
+          if (session_state->connections.size() == 0
+              && (now - session_state->disconnected_ms)
+                  >= FLAGS_disconnected_session_timeout * 1000L) {
+            // This session has no active connections and is past the disconnected session
+            // timeout, so close it.
+            DCHECK_ENUM_EQ(session_state->session_type, TSessionType::HIVESERVER2);
+            LOG(INFO) << "Closing session: " << PrintId(session_id)
+                      << ", user: " << session_state->connected_user
+                      << ", because it no longer  has any open connections. The last "
+                      << "connection was closed at: "
+                      << ToStringFromUnixMillis(session_state->disconnected_ms);
+            session_state->closed = true;
+            sessions_to_remove.push_back(session_id);
+            ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(-1L);
+            UnregisterSessionTimeout(FLAGS_disconnected_session_timeout);
+            query_cancel_status =
+                Status::Expected(TErrorCode::DISCONNECTED_SESSION_CLOSED);
+          } else {
+            // Check if the session should be expired.
+            if (session_state->expired || session_state->session_timeout == 0) {
+              continue;
+            }
+
+            int64_t last_accessed_ms = session_state->last_accessed_ms;
+            int64_t session_timeout_ms = session_state->session_timeout * 1000;
+            if (now - last_accessed_ms <= session_timeout_ms) continue;
+            LOG(INFO) << "Expiring session: " << PrintId(session_id)
+                      << ", user: " << session_state->connected_user
+                      << ", last active: " << ToStringFromUnixMillis(last_accessed_ms);
+            session_state->expired = true;
+            ++expired_cnt;
+            ImpaladMetrics::NUM_SESSIONS_EXPIRED->Increment(1L);
+            query_cancel_status = Status::Expected(TErrorCode::INACTIVE_SESSION_EXPIRED);
+          }
+
+          // Since either expired or closed is true no more queries will be added to the
+          // inflight list.
+          inflight_queries.insert(session_state->inflight_queries.begin(),
+              session_state->inflight_queries.end());
         }
         // Unregister all open queries from this session.
-        Status status = Status::Expected(TErrorCode::INACTIVE_SESSION_EXPIRED);
         for (const TUniqueId& query_id : inflight_queries) {
           cancellation_thread_pool_->Offer(
-              CancellationWork::TerminatedByServer(query_id, status, true));
+              CancellationWork::TerminatedByServer(query_id, query_cancel_status, true));
         }
       }
+      // Remove any sessions that were closed from the map.
+      for (const TUniqueId& session_id : sessions_to_remove) {
+        session_state_map_.erase(session_id);
+      }
     }
     LOG_IF(INFO, expired_cnt > 0) << "Expired sessions. Count: " << expired_cnt;
   }
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index b23a518..7d99a6e 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -512,6 +512,10 @@ class ImpalaServer : public ImpalaServiceIf,
     /// Time the session was last accessed, in ms since epoch (UTC).
     int64_t last_accessed_ms;
 
+    /// If this session has no open connections, this is the time in UTC when the last
+    /// connection was closed.
+    int64_t disconnected_ms;
+
     /// The latest Kudu timestamp observed after DML operations executed within this
     /// session.
     uint64_t kudu_latest_observed_ts;
@@ -527,6 +531,9 @@ class ImpalaServer : public ImpalaServiceIf,
     /// HS2 session, or using the SET command.
     int32_t session_timeout = 0;
 
+    /// The connection ids of any connections that this session has been used over.
+    std::set<TUniqueId> connections;
+
     /// Updates the session timeout based on the query option idle_session_timeout.
     /// It registers/unregisters the session timeout to the Impala server.
     /// The lock must be owned by the caller of this function.
@@ -893,11 +900,15 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Unregister timeout value.
   void UnregisterSessionTimeout(int32_t timeout);
 
-  /// To be run in a thread which wakes up every second. This function checks all
-  /// sessions for their last-idle times. Those that have been idle for longer than
-  /// their configured timeout values are 'expired': they will no longer accept queries
-  /// and any running queries associated with those sessions are unregistered.
-  [[noreturn]] void ExpireSessions();
+  /// To be run in a thread which wakes up every second if there are registered sesions
+  /// timeouts. This function checks all sessions for:
+  /// - Last-idle times. Those that have been idle for longer than their configured
+  ///   timeout values are 'expired': they will no longer accept queries.
+  /// - Disconnected times. Those that have had no active connections for longer than
+  ///   FLAGS_disconnected_session_timeout are closed: they are removed from the session
+  ///   state map and can no longer be accessed by clients.
+  /// For either case any running queries associated with those sessions are unregistered.
+  [[noreturn]] void SessionMaintenance();
 
   /// Runs forever, walking queries_by_timestamp_ and expiring any queries that have been
   /// idle (i.e. no client input and no time spent processing locally) for
@@ -974,11 +985,11 @@ class ImpalaServer : public ImpalaServiceIf,
   /// avoid blocking the statestore callback.
   boost::scoped_ptr<ThreadPool<CancellationWork>> cancellation_thread_pool_;
 
-  /// Thread that runs ExpireSessions. It will wake up periodically to check for sessions
-  /// which are idle for more their timeout values.
-  std::unique_ptr<Thread> session_timeout_thread_;
+  /// Thread that runs SessionMaintenance. It will wake up periodically to check for
+  /// sessions which are idle for more their timeout values.
+  std::unique_ptr<Thread> session_maintenance_thread_;
 
-  /// Contains all the non-zero idle session timeout values.
+  /// Contains all the non-zero idle or disconnected session timeout values.
   std::multiset<int32_t> session_timeout_set_;
 
   /// The lock for protecting the session_timeout_set_.
@@ -1017,6 +1028,24 @@ class ImpalaServer : public ImpalaServiceIf,
       DCHECK(session_.get() == NULL);
       RETURN_IF_ERROR(impala_->GetSessionState(session_id, &session_, true));
       if (session != NULL) (*session) = session_;
+
+      // We won't have a connection context in the case of ChildQuery, which calls into
+      // hiveserver2 functions directly without going through the Thrift stack.
+      if (ThriftServer::HasThreadConnectionContext()) {
+        // Try adding the session id to the connection's set of sessions in case this is
+        // the first time this session has been used on this connection.
+        impala_->AddSessionToConnection(session_id, session_.get());
+      }
+      return Status::OK();
+    }
+
+    /// Same as WithSession(), except does not update the session/connection mapping, as
+    /// beeswax sessions can only be used over a single connection.
+    Status WithBeeswaxSession(const TUniqueId& session_id,
+        std::shared_ptr<SessionState>* session = NULL) WARN_UNUSED_RESULT {
+      DCHECK(session_.get() == NULL);
+      RETURN_IF_ERROR(impala_->GetSessionState(session_id, &session_, true));
+      if (session != NULL) (*session) = session_;
       return Status::OK();
     }
 
@@ -1054,8 +1083,7 @@ class ImpalaServer : public ImpalaServiceIf,
   /// closed when the connection ends. HS2 allows for multiplexing several sessions across
   /// a single connection. If a session has already been closed (only possible via HS2) it
   /// is not removed from this map to avoid the cost of looking it up.
-  typedef boost::unordered_map<TUniqueId, std::vector<TUniqueId>>
-    ConnectionToSessionMap;
+  typedef boost::unordered_map<TUniqueId, std::set<TUniqueId>> ConnectionToSessionMap;
   ConnectionToSessionMap connection_to_sessions_map_;
 
   /// Returns session state for given session_id.
@@ -1075,6 +1103,10 @@ class ImpalaServer : public ImpalaServiceIf,
     session->last_accessed_ms = UnixMillis();
   }
 
+  /// Associate the current connection context with the given session in
+  /// 'connection_to_sessions_map_' and 'SessionState::connections'.
+  void AddSessionToConnection(const TUniqueId& session_id, SessionState* session);
+
   /// Protects query_locations_. Not held in conjunction with other locks.
   boost::mutex query_locations_lock_;
 
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 9e76be5..cc1a38f 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -408,6 +408,9 @@ error_codes = (
   ("PARQUET_DATE_OUT_OF_RANGE", 134,
    "Parquet file '$0' column '$1' contains an out of range date. "
    "The valid date range is 0000-01-01..9999-12-31."),
+
+  ("DISCONNECTED_SESSION_CLOSED", 135,
+   "Session closed because it has no active connections"),
 )
 
 import sys
diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py
index 2b10bd1..b05478f 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -278,6 +278,12 @@ class ImpylaHS2Connection(ImpalaConnection):
 
   def close(self):
     LOG.info("-- closing connection to: {0}".format(self.__host_port))
+    try:
+      # Explicitly close the cursor so that it will close the session.
+      self.__cursor.close()
+    except Exception, e:
+      # The session may no longer be valid if the impalad was restarted during the test.
+      LOG.exception(e)
     self.__impyla_conn.close()
 
   def close_query(self, operation_handle):
diff --git a/tests/custom_cluster/test_hs2.py b/tests/custom_cluster/test_hs2.py
new file mode 100644
index 0000000..9b641de
--- /dev/null
+++ b/tests/custom_cluster/test_hs2.py
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+import pytest
+
+from tests.hs2.hs2_test_suite import HS2TestSuite, operation_id_to_query_id
+from time import sleep
+from TCLIService import TCLIService
+
+
+class TestHS2(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('These tests only run in exhaustive')
+    super(TestHS2, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--disconnected_session_timeout=1")
+  def test_disconnected_session_timeout(self):
+    """Test that a session gets closed if it has no active connections for more than
+    disconnected_session_timeout."""
+    conn = HS2TestSuite()
+    conn.setup()
+    open_session_req = TCLIService.TOpenSessionReq()
+    open_session_resp = conn.hs2_client.OpenSession(open_session_req)
+    HS2TestSuite.check_response(open_session_resp)
+    conn.session_handle = open_session_resp.sessionHandle
+    # Ren a query, which should succeed.
+    conn.execute_statement("select 1")
+
+    # Set up another connection and run a long-running query with the same session.
+    conn2 = HS2TestSuite()
+    conn2.setup()
+    conn2.session_handle = open_session_resp.sessionHandle
+    execute_resp = conn2.execute_statement("select sleep(10000)")
+
+    # Close one connection and wait for longer than disconnected_session_timeout. The
+    # session should still be available since there's still one active connection.
+    conn2.teardown()
+    sleep(5)
+    conn.execute_statement("select 3")
+
+    # Close the other connection and sleep again. THe session shuold now be closed.
+    conn.teardown()
+    sleep(5)
+    conn.setup()
+
+    # Run another query, which should fail since the session is closed.
+    conn.execute_statement("select 2", expected_error_prefix="Invalid session id",
+        expected_status_code=TCLIService.TStatusCode.ERROR_STATUS)
+
+    # Check that the query was cancelled correctly.
+    query_id = operation_id_to_query_id(execute_resp.operationHandle.operationId)
+    status = self.cluster.get_first_impalad().service.get_query_status(query_id)
+    assert status == "Session closed because it has no active connections"
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      "--idle_session_timeout=1 --disconnected_session_timeout=5")
+  def test_expire_disconnected_session(self):
+    """Test for the interaction between idle_session_timeout and
+    disconnected_session_timeout"""
+    # Close the default test clients so that they don't expire while the test is running
+    # and affect the metric values.
+    self.client.close()
+    self.hs2_client.close()
+    impalad = self.cluster.get_first_impalad()
+
+    conn = HS2TestSuite()
+    conn.setup()
+    # Open a session and then close the connection.
+    open_session_req = TCLIService.TOpenSessionReq()
+    open_session_resp = conn.hs2_client.OpenSession(open_session_req)
+    HS2TestSuite.check_response(open_session_resp)
+    conn.teardown()
+
+    # The idle session timeout should be hit first, so the session will be expired.
+    impalad.service.wait_for_metric_value(
+        "impala-server.num-sessions-expired", 1)
+    # The session should eventually be closed by the disconnected timeout.
+    impalad.service.wait_for_metric_value(
+        "impala-server.num-open-hiveserver2-sessions", 0)
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index cec1d9e..9fa92af 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -245,6 +245,12 @@ class TestHS2(HS2TestSuite):
       assert num_expired_sessions == self.impalad_test_service.get_metric_value(
           "impala-server.num-sessions-expired")
 
+    # Close the remaining sessions.
+    for session_handle in session_handles:
+      if session_handle is not None:
+        close_session_req = TCLIService.TCloseSessionReq(session_handle)
+        TestHS2.check_response(self.hs2_client.CloseSession(close_session_req))
+
   @needs_session()
   def test_get_operation_status(self):
     """Tests that GetOperationStatus returns a valid result for a running query"""
@@ -347,23 +353,6 @@ class TestHS2(HS2TestSuite):
     assert err_msg in get_result_set_metadata_resp.status.errorMessage
 
   @pytest.mark.execute_serially
-  def test_socket_close_forces_session_close(self):
-    """Test that closing the underlying socket forces the associated session to close.
-    See IMPALA-564"""
-    open_session_req = TCLIService.TOpenSessionReq()
-    resp = self.hs2_client.OpenSession(open_session_req)
-    TestHS2.check_response(resp)
-    num_sessions = self.impalad_test_service.get_metric_value(
-        "impala-server.num-open-hiveserver2-sessions")
-
-    assert num_sessions > 0
-
-    self.socket.close()
-    self.socket = None
-    self.impalad_test_service.wait_for_metric_value(
-        "impala-server.num-open-hiveserver2-sessions", num_sessions - 1)
-
-  @pytest.mark.execute_serially
   def test_multiple_sessions(self):
     """Test that multiple sessions on the same socket connection are allowed"""
     num_sessions = self.impalad_test_service.get_metric_value(
@@ -380,8 +369,11 @@ class TestHS2(HS2TestSuite):
     self.impalad_test_service.wait_for_metric_value(
         "impala-server.num-open-hiveserver2-sessions", num_sessions + 5)
 
-    self.socket.close()
-    self.socket = None
+    for session_id in session_ids:
+      close_session_req = TCLIService.TCloseSessionReq(session_id)
+      resp = self.hs2_client.CloseSession(close_session_req)
+      TestHS2.check_response(resp)
+
     self.impalad_test_service.wait_for_metric_value(
         "impala-server.num-open-hiveserver2-sessions", num_sessions)
 
@@ -616,3 +608,30 @@ class TestHS2(HS2TestSuite):
     fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
     TestHS2.check_response(fetch_results_resp)
     return fetch_results_resp
+
+  def test_close_connection(self):
+    """Tests that an hs2 session remains valid even after the connection is dropped."""
+    open_session_req = TCLIService.TOpenSessionReq()
+    open_session_resp = self.hs2_client.OpenSession(open_session_req)
+    TestHS2.check_response(open_session_resp)
+    self.session_handle = open_session_resp.sessionHandle
+    # Ren a query, which should succeed.
+    self.execute_statement("select 1")
+
+    # Reset the connection.
+    self.teardown()
+    self.setup()
+
+    # Run another query with the same session handle. It should succeed even though it's
+    # on a new connection, since disconnected_session_timeout (default of 1 hour) will not
+    # have been hit.
+    self.execute_statement("select 2")
+
+    # Close the session.
+    close_session_req = TCLIService.TCloseSessionReq()
+    close_session_req.sessionHandle = self.session_handle
+    TestHS2.check_response(self.hs2_client.CloseSession(close_session_req))
+
+    # Run another query, which should fail since the session is closed.
+    self.execute_statement("select 3", expected_error_prefix="Invalid session id",
+        expected_status_code=TCLIService.TStatusCode.ERROR_STATUS)


[impala] 03/04: IMPALA-6042: Allow Impala shell to use a global impalarc config

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 487547ec44ac0742c9f59c90b20b05124174e57f
Author: Ethan Xue <et...@cloudera.com>
AuthorDate: Thu May 9 14:21:42 2019 -0700

    IMPALA-6042: Allow Impala shell to use a global impalarc config
    
    Currently, impalarc files can be specified on a per-user basis
    (stored in ~/.impalarc), and they aren't created by default. The
    Impala shell should pick up /etc/impalarc as well, in addition
    to the user-specific configurations.
    
    The intent here is to allow a "global" configuration of the shell
    by a system administrator. The default path of the global config
    file can be changed by setting the $IMPALA_SHELL_GLOBAL_CONFIG_FILE
    environment variable.
    
    Note that the options set in the user config file take precedence
    over those in the global config file.
    
    Change-Id: I3a3179b6d9c9e3b2b01d6d3c5847cadb68782816
    Reviewed-on: http://gerrit.cloudera.org:8080/13313
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/rat_exclude_files.txt               |  1 +
 shell/impala_shell.py                   | 66 +++++++++++++++++++++++----------
 shell/impala_shell_config_defaults.py   |  5 ++-
 shell/option_parser.py                  |  6 +--
 tests/shell/good_impalarc2              |  7 ++++
 tests/shell/impalarc_with_query_options |  3 ++
 tests/shell/test_shell_commandline.py   | 44 ++++++++++++++++++++--
 tests/shell/test_shell_interactive.py   |  2 +
 tests/shell/util.py                     | 10 ++---
 9 files changed, 110 insertions(+), 34 deletions(-)

diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index e352a17..1cf051c 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -133,6 +133,7 @@ testdata/tzdb_tiny/*
 tests/pytest.ini
 tests/shell/bad_impalarc
 tests/shell/good_impalarc
+tests/shell/good_impalarc2
 tests/shell/impalarc_with_error
 tests/shell/impalarc_with_query_options
 tests/shell/impalarc_with_warnings
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 1caeb9a..f36f456 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -153,6 +153,8 @@ class ImpalaShell(object, cmd.Cmd):
 
   # Minimum time in seconds between two calls to get the exec summary.
   PROGRESS_UPDATE_INTERVAL = 1.0
+  # Environment variable used to source a global config file
+  GLOBAL_CONFIG_FILE = "IMPALA_SHELL_GLOBAL_CONFIG_FILE"
 
   def __init__(self, options, query_options):
     cmd.Cmd.__init__(self)
@@ -1608,34 +1610,58 @@ def get_intro(options):
 
 if __name__ == "__main__":
   """
-  There are two types of options: shell options and query_options. Both can be set in the
-  command line, which override the options set in config file (.impalarc). The default
-  shell options come from impala_shell_config_defaults.py. Query options have no defaults
-  within the impala-shell, but they do have defaults on the server. Query options can be
-  also changed in impala-shell with the 'set' command.
+  There are two types of options: shell options and query_options. Both can be set on the
+  command line, which override default options. Specifically, if there exists a global
+  config file (default path: /etc/impalarc) then options are loaded from that file. If
+  there exists a user config file (~/.impalarc), then options are loaded in from that
+  file and override any options already loaded from the global impalarc. The default shell
+  options come from impala_shell_config_defaults.py. Query options have no defaults within
+  the impala-shell, but they do have defaults on the server. Query options can be also
+  changed in impala-shell with the 'set' command.
   """
   # pass defaults into option parser
   parser = get_option_parser(impala_shell_defaults)
   options, args = parser.parse_args()
-  # use path to file specified by user in config_file option
-  user_config = os.path.expanduser(options.config_file);
-  # by default, use the .impalarc in the home directory
-  config_to_load = impala_shell_defaults.get("config_file")
-  # verify user_config, if found
-  if os.path.isfile(user_config) and user_config != config_to_load:
+
+  # by default, use the impalarc in the user's home directory
+  # and superimpose it on the global impalarc config
+  global_config = os.path.expanduser(
+    os.environ.get(ImpalaShell.GLOBAL_CONFIG_FILE,
+                   impala_shell_defaults['global_config_default_path']))
+  if os.path.isfile(global_config):
+    # Always output the source of the global config if verbose
     if options.verbose:
-      print_to_stderr("Loading in options from config file: %s \n" % user_config)
-    # Command line overrides loading ~/.impalarc
-    config_to_load = user_config
-  elif user_config != config_to_load:
-    print_to_stderr('%s not found.\n' % user_config)
+      print_to_stderr(
+        "Loading in options from global config file: %s \n" % global_config)
+  elif global_config != impala_shell_defaults['global_config_default_path']:
+    print_to_stderr('%s not found.\n' % global_config)
     sys.exit(1)
+  # Override the default user config by a custom config if necessary
+  user_config = impala_shell_defaults.get("config_file")
+  input_config = os.path.expanduser(options.config_file)
+  # verify input_config, if found
+  if input_config != user_config:
+    if os.path.isfile(input_config):
+      if options.verbose:
+        print_to_stderr("Loading in options from config file: %s \n" % input_config)
+      # command line overrides loading ~/.impalarc
+      user_config = input_config
+    else:
+      print_to_stderr('%s not found.\n' % input_config)
+      sys.exit(1)
+  configs_to_load = [global_config, user_config]
 
-  # default shell options loaded in from impala_shell_config_defaults.py
-  # options defaults overwritten by those in config file
+  # load shell and query options from the list of config files
+  # in ascending order of precedence
   try:
-    loaded_shell_options, query_options = get_config_from_file(config_to_load,
-                                                               parser.option_list)
+    loaded_shell_options = {}
+    query_options = {}
+    for config_file in configs_to_load:
+      s_options, q_options = get_config_from_file(config_file,
+                                                  parser.option_list)
+      loaded_shell_options.update(s_options)
+      query_options.update(q_options)
+
     impala_shell_defaults.update(loaded_shell_options)
   except Exception, e:
     print_to_stderr(e)
diff --git a/shell/impala_shell_config_defaults.py b/shell/impala_shell_config_defaults.py
index 2029bf4..4ecf534 100644
--- a/shell/impala_shell_config_defaults.py
+++ b/shell/impala_shell_config_defaults.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# defaults for OptionParser options stored in dict
+# default options used by the Impala shell stored in a dict
 
 import getpass
 import os
@@ -52,4 +52,5 @@ impala_shell_defaults = {
             'version': False,
             'write_delimited': False,
             'client_connect_timeout_ms': 60000,
-            }
+            'global_config_default_path': '/etc/impalarc',
+    }
diff --git a/shell/option_parser.py b/shell/option_parser.py
index ae28121..b035967 100755
--- a/shell/option_parser.py
+++ b/shell/option_parser.py
@@ -30,7 +30,7 @@
 import ConfigParser
 import sys
 from impala_shell_config_defaults import impala_shell_defaults
-from optparse import OptionParser
+from optparse import OptionParser, SUPPRESS_HELP
 
 
 class ConfigFileFormatError(Exception):
@@ -285,8 +285,8 @@ def get_option_parser(defaults):
     # (print quiet is false since verbose is true)
     if option == parser.get_option('--quiet'):
       option.help += " [default: %s]" % (not defaults['verbose'])
-    elif option != parser.get_option('--help'):
-      # don't want to print default value for help
+    elif option != parser.get_option('--help') and option.help is not SUPPRESS_HELP:
+      # don't want to print default value for help or options without help text
       option.help += " [default: %default]"
 
   parser.set_defaults(**defaults)
diff --git a/tests/shell/good_impalarc2 b/tests/shell/good_impalarc2
new file mode 100644
index 0000000..2da8d4b
--- /dev/null
+++ b/tests/shell/good_impalarc2
@@ -0,0 +1,7 @@
+[impala]
+query=select 2
+keyval=msg1=test
+verbose=true
+Q=DEFAULT_FILE_FORMAT=avro
+[impala.query_options]
+DEFAULT_FILE_FORMAT=text
\ No newline at end of file
diff --git a/tests/shell/impalarc_with_query_options b/tests/shell/impalarc_with_query_options
index 2ae2129..0fd1a4b 100644
--- a/tests/shell/impalarc_with_query_options
+++ b/tests/shell/impalarc_with_query_options
@@ -1,6 +1,9 @@
+[impala]
+Q=DEFAULT_FILE_FORMAT=avro
 [impala.query_options]
 EXPLAIN_LEVEL=1
 explain_LEVEL=2
 MT_DOP=2
 invalid_query_option=1
+DEFAULT_FILE_FORMAT=parquet
 
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index cfef7c6..5491f31 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -24,6 +24,8 @@ import signal
 import socket
 import tempfile
 
+from shell.impala_shell import ImpalaShell as ImpalaShellClass
+
 from subprocess import call, Popen
 from tests.common.impala_service import ImpaladService
 from tests.common.impala_test_suite import ImpalaTestSuite, IMPALAD_HS2_HOST_PORT
@@ -474,15 +476,49 @@ class TestImpalaShell(ImpalaTestSuite):
     assert 'UnicodeDecodeError' not in result.stderr
     assert RUSSIAN_CHARS.encode('utf-8') in result.stdout
 
-  @pytest.mark.execute_serially  # This tests invalidates metadata, and must run serially
+  def test_global_config_file(self, vector):
+    """Test global and user configuration files."""
+    args = []
+    # shell uses shell options in global config
+    env = {
+      ImpalaShellClass.GLOBAL_CONFIG_FILE: '{0}/good_impalarc2'.format(QUERY_FILE_PATH)}
+    result = run_impala_shell_cmd(vector, args, env=env)
+    assert 'WARNING:' not in result.stderr, \
+      "A valid config file should not trigger any warning: {0}".format(result.stderr)
+    assert 'Query: select 2' in result.stderr
+
+    # shell uses query options in global config
+    args = ['-q', 'set;']
+    result = run_impala_shell_cmd(vector, args, env=env)
+    assert 'DEFAULT_FILE_FORMAT: avro' in result.stdout
+
+    # shell options and query options in global config get overriden
+    # by options in user config
+    args = ['--config_file={0}/good_impalarc'.format(QUERY_FILE_PATH),
+            """--query=select '${VAR:msg1}'; set"""]
+    result = run_impala_shell_cmd(vector, args, env=env)
+    assert 'Query: select \'hello\'' in result.stderr
+    assert 'DEFAULT_FILE_FORMAT: parquet' in result.stdout
+
+    # command line options override options in global config
+    args = ['--query_option=DEFAULT_FILE_FORMAT=text',
+            """--query=select '${VAR:msg1}'; set"""]
+    result = run_impala_shell_cmd(vector, args, env=env)
+    assert 'Query: select \'test\'' in result.stderr
+    assert 'DEFAULT_FILE_FORMAT: text' in result.stdout
+
+    # specified global config file does not exist
+    env = {ImpalaShellClass.GLOBAL_CONFIG_FILE: '/does_not_exist'}
+    run_impala_shell_cmd(vector, args, env=env, expect_success=False)
+
   def test_config_file(self, vector):
     """Test the optional configuration file."""
     # Positive tests
     args = ['--config_file=%s/good_impalarc' % QUERY_FILE_PATH]
     result = run_impala_shell_cmd(vector, args)
-    assert 'WARNING:' not in result.stderr
+    assert 'WARNING:' not in result.stderr, \
+      "A valid config file should not trigger any warning: {0}".format(result.stderr)
     assert 'Query: select 1' in result.stderr
-
     # override option in config file through command line
     args = ['--config_file={0}/good_impalarc'.format(QUERY_FILE_PATH), '--query=select 2']
     result = run_impala_shell_cmd(vector, args)
@@ -773,7 +809,7 @@ class TestImpalaShell(ImpalaTestSuite):
     try:
       args = ['-q', '-f', sql_path, '-d', unique_database]
       start_time = time()
-      run_impala_shell_cmd(vector, args, False)
+      run_impala_shell_cmd(vector, args, expect_success=False)
       end_time = time()
       time_limit_s = 10
       actual_time_s = end_time - start_time
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index 68fdd83..5668a33 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -431,6 +431,8 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     assert "\tEXPLAIN_LEVEL: 2" in result.stdout
     assert "INVALID_QUERY_OPTION is not supported for the impalad being connected to, "\
            "ignoring." in result.stdout
+    # Verify that query options under [impala] override those under [impala.query_options]
+    assert "\tDEFAULT_FILE_FORMAT: avro" in result.stdout
 
   def test_source_file(self, vector):
     cwd = os.getcwd()
diff --git a/tests/shell/util.py b/tests/shell/util.py
index 80c8d9d..b917ef7 100755
--- a/tests/shell/util.py
+++ b/tests/shell/util.py
@@ -98,14 +98,14 @@ def assert_pattern(pattern, result, text, message):
   assert m and m.group(0) == result, message
 
 
-def run_impala_shell_cmd(vector, shell_args, expect_success=True, stdin_input=None,
-                         wait_until_connected=True):
+def run_impala_shell_cmd(vector, shell_args, env=None, expect_success=True,
+                         stdin_input=None, wait_until_connected=True):
   """Runs the Impala shell on the commandline.
 
   'shell_args' is a string which represents the commandline options.
   Returns a ImpalaShellResult.
   """
-  result = run_impala_shell_cmd_no_expect(vector, shell_args, stdin_input,
+  result = run_impala_shell_cmd_no_expect(vector, shell_args, env, stdin_input,
                                           expect_success and wait_until_connected)
   if expect_success:
     assert result.rc == 0, "Cmd %s was expected to succeed: %s" % (shell_args,
@@ -115,7 +115,7 @@ def run_impala_shell_cmd(vector, shell_args, expect_success=True, stdin_input=No
   return result
 
 
-def run_impala_shell_cmd_no_expect(vector, shell_args, stdin_input=None,
+def run_impala_shell_cmd_no_expect(vector, shell_args, env=None, stdin_input=None,
                                    wait_until_connected=True):
   """Runs the Impala shell on the commandline.
 
@@ -124,7 +124,7 @@ def run_impala_shell_cmd_no_expect(vector, shell_args, stdin_input=None,
 
   Does not assert based on success or failure of command.
   """
-  p = ImpalaShell(vector, shell_args, wait_until_connected=wait_until_connected)
+  p = ImpalaShell(vector, shell_args, env=env, wait_until_connected=wait_until_connected)
   result = p.get_result(stdin_input)
   return result
 


[impala] 01/04: IMPALA-8538 (part 1) Copied THttp(Server|Transport) from thrift-0.9.3

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9e13fd7de53978f7ef8543a3661ce70c5ed4d60a
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Wed May 8 12:33:54 2019 -0700

    IMPALA-8538 (part 1) Copied THttp(Server|Transport) from thrift-0.9.3
    
    This is a mechanical change that just copies several files over from
    thrift. This is for convenience in reviewing changes to these files,
    which have been submitted as a follow up patch.
    
    Change-Id: I1916e17eaeb7854eb93c2415396f0ee0243e4e32
    Reviewed-on: http://gerrit.cloudera.org:8080/13298
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/transport/CMakeLists.txt     |   4 +-
 be/src/transport/THttpServer.cpp    | 164 ++++++++++++++++++++++
 be/src/transport/THttpServer.h      |  64 +++++++++
 be/src/transport/THttpTransport.cpp | 267 ++++++++++++++++++++++++++++++++++++
 be/src/transport/THttpTransport.h   | 104 ++++++++++++++
 5 files changed, 602 insertions(+), 1 deletion(-)

diff --git a/be/src/transport/CMakeLists.txt b/be/src/transport/CMakeLists.txt
index 8a9eda8..59fca8b 100644
--- a/be/src/transport/CMakeLists.txt
+++ b/be/src/transport/CMakeLists.txt
@@ -24,10 +24,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/transport")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/transport")
 
 add_library(ThriftSaslTransport
+    THttpServer.cpp
+    THttpTransport.cpp
     TSaslClientTransport.cpp
     TSasl.cpp
     TSaslServerTransport.cpp
     TSaslTransport.cpp
     undef.cpp
   )
-add_dependencies(ThriftSaslTransport gen-deps)
\ No newline at end of file
+add_dependencies(ThriftSaslTransport gen-deps)
diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp
new file mode 100644
index 0000000..a20d612
--- /dev/null
+++ b/be/src/transport/THttpServer.cpp
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdlib>
+#include <sstream>
+#include <iostream>
+
+#include <thrift/transport/THttpServer.h>
+#include <thrift/transport/TSocket.h>
+#ifdef _MSC_VER
+#include <Shlwapi.h>
+#endif
+
+namespace apache {
+namespace thrift {
+namespace transport {
+
+using namespace std;
+
+THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) : THttpTransport(transport) {
+}
+
+THttpServer::~THttpServer() {
+}
+
+#ifdef _MSC_VER
+  #define THRIFT_strncasecmp(str1, str2, len) _strnicmp(str1, str2, len)
+  #define THRIFT_strcasestr(haystack, needle) StrStrIA(haystack, needle)
+#else
+  #define THRIFT_strncasecmp(str1, str2, len) strncasecmp(str1, str2, len)
+  #define THRIFT_strcasestr(haystack, needle) strcasestr(haystack, needle)
+#endif
+
+void THttpServer::parseHeader(char* header) {
+  char* colon = strchr(header, ':');
+  if (colon == NULL) {
+    return;
+  }
+  size_t sz = colon - header;
+  char* value = colon + 1;
+
+  if (THRIFT_strncasecmp(header, "Transfer-Encoding", sz) == 0) {
+    if (THRIFT_strcasestr(value, "chunked") != NULL) {
+      chunked_ = true;
+    }
+  } else if (THRIFT_strncasecmp(header, "Content-length", sz) == 0) {
+    chunked_ = false;
+    contentLength_ = atoi(value);
+  } else if (strncmp(header, "X-Forwarded-For", sz) == 0) {
+    origin_ = value;
+  }
+}
+
+bool THttpServer::parseStatusLine(char* status) {
+  char* method = status;
+
+  char* path = strchr(method, ' ');
+  if (path == NULL) {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
+
+  *path = '\0';
+  while (*(++path) == ' ') {
+  };
+
+  char* http = strchr(path, ' ');
+  if (http == NULL) {
+    throw TTransportException(string("Bad Status: ") + status);
+  }
+  *http = '\0';
+
+  if (strcmp(method, "POST") == 0) {
+    // POST method ok, looking for content.
+    return true;
+  } else if (strcmp(method, "OPTIONS") == 0) {
+    // preflight OPTIONS method, we don't need further content.
+    // how to graciously close connection?
+    uint8_t* buf;
+    uint32_t len;
+    writeBuffer_.getBuffer(&buf, &len);
+
+    // Construct the HTTP header
+    std::ostringstream h;
+    h << "HTTP/1.1 200 OK" << CRLF << "Date: " << getTimeRFC1123() << CRLF
+      << "Access-Control-Allow-Origin: *" << CRLF << "Access-Control-Allow-Methods: POST, OPTIONS"
+      << CRLF << "Access-Control-Allow-Headers: Content-Type" << CRLF << CRLF;
+    string header = h.str();
+
+    // Write the header, then the data, then flush
+    transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
+    transport_->write(buf, len);
+    transport_->flush();
+
+    // Reset the buffer and header variables
+    writeBuffer_.resetBuffer();
+    readHeaders_ = true;
+    return true;
+  }
+  throw TTransportException(string("Bad Status (unsupported method): ") + status);
+}
+
+void THttpServer::flush() {
+  // Fetch the contents of the write buffer
+  uint8_t* buf;
+  uint32_t len;
+  writeBuffer_.getBuffer(&buf, &len);
+
+  // Construct the HTTP header
+  std::ostringstream h;
+  h << "HTTP/1.1 200 OK" << CRLF << "Date: " << getTimeRFC1123() << CRLF << "Server: Thrift/"
+    << VERSION << CRLF << "Access-Control-Allow-Origin: *" << CRLF
+    << "Content-Type: application/x-thrift" << CRLF << "Content-Length: " << len << CRLF
+    << "Connection: Keep-Alive" << CRLF << CRLF;
+  string header = h.str();
+
+  // Write the header, then the data, then flush
+  // cast should be fine, because none of "header" is under attacker control
+  transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size()));
+  transport_->write(buf, len);
+  transport_->flush();
+
+  // Reset the buffer and header variables
+  writeBuffer_.resetBuffer();
+  readHeaders_ = true;
+}
+
+std::string THttpServer::getTimeRFC1123() {
+  static const char* Days[] = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"};
+  static const char* Months[]
+      = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
+  char buff[128];
+  time_t t = time(NULL);
+  tm* broken_t = gmtime(&t);
+
+  sprintf(buff,
+          "%s, %d %s %d %d:%d:%d GMT",
+          Days[broken_t->tm_wday],
+          broken_t->tm_mday,
+          Months[broken_t->tm_mon],
+          broken_t->tm_year + 1900,
+          broken_t->tm_hour,
+          broken_t->tm_min,
+          broken_t->tm_sec);
+  return std::string(buff);
+}
+}
+}
+} // apache::thrift::transport
diff --git a/be/src/transport/THttpServer.h b/be/src/transport/THttpServer.h
new file mode 100644
index 0000000..a7ab944
--- /dev/null
+++ b/be/src/transport/THttpServer.h
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_
+#define _THRIFT_TRANSPORT_THTTPSERVER_H_ 1
+
+#include <thrift/transport/THttpTransport.h>
+
+namespace apache {
+namespace thrift {
+namespace transport {
+
+class THttpServer : public THttpTransport {
+public:
+  THttpServer(boost::shared_ptr<TTransport> transport);
+
+  virtual ~THttpServer();
+
+  virtual void flush();
+
+protected:
+  void readHeaders();
+  virtual void parseHeader(char* header);
+  virtual bool parseStatusLine(char* status);
+  std::string getTimeRFC1123();
+};
+
+/**
+ * Wraps a transport into HTTP protocol
+ */
+class THttpServerTransportFactory : public TTransportFactory {
+public:
+  THttpServerTransportFactory() {}
+
+  virtual ~THttpServerTransportFactory() {}
+
+  /**
+   * Wraps the transport into a buffered one.
+   */
+  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+    return boost::shared_ptr<TTransport>(new THttpServer(trans));
+  }
+};
+}
+}
+} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_
diff --git a/be/src/transport/THttpTransport.cpp b/be/src/transport/THttpTransport.cpp
new file mode 100644
index 0000000..a466ff6
--- /dev/null
+++ b/be/src/transport/THttpTransport.cpp
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <sstream>
+
+#include <thrift/transport/THttpTransport.h>
+
+namespace apache {
+namespace thrift {
+namespace transport {
+
+using namespace std;
+
+// Yeah, yeah, hacky to put these here, I know.
+const char* THttpTransport::CRLF = "\r\n";
+const int THttpTransport::CRLF_LEN = 2;
+
+THttpTransport::THttpTransport(boost::shared_ptr<TTransport> transport)
+  : transport_(transport),
+    origin_(""),
+    readHeaders_(true),
+    chunked_(false),
+    chunkedDone_(false),
+    chunkSize_(0),
+    contentLength_(0),
+    httpBuf_(NULL),
+    httpPos_(0),
+    httpBufLen_(0),
+    httpBufSize_(1024) {
+  init();
+}
+
+void THttpTransport::init() {
+  httpBuf_ = (char*)std::malloc(httpBufSize_ + 1);
+  if (httpBuf_ == NULL) {
+    throw std::bad_alloc();
+  }
+  httpBuf_[httpBufLen_] = '\0';
+}
+
+THttpTransport::~THttpTransport() {
+  if (httpBuf_ != NULL) {
+    std::free(httpBuf_);
+  }
+}
+
+uint32_t THttpTransport::read(uint8_t* buf, uint32_t len) {
+  if (readBuffer_.available_read() == 0) {
+    readBuffer_.resetBuffer();
+    uint32_t got = readMoreData();
+    if (got == 0) {
+      return 0;
+    }
+  }
+  return readBuffer_.read(buf, len);
+}
+
+uint32_t THttpTransport::readEnd() {
+  // Read any pending chunked data (footers etc.)
+  if (chunked_) {
+    while (!chunkedDone_) {
+      readChunked();
+    }
+  }
+  return 0;
+}
+
+uint32_t THttpTransport::readMoreData() {
+  uint32_t size;
+
+  // Get more data!
+  refill();
+
+  if (readHeaders_) {
+    readHeaders();
+  }
+
+  if (chunked_) {
+    size = readChunked();
+  } else {
+    size = readContent(contentLength_);
+    readHeaders_ = true;
+  }
+
+  return size;
+}
+
+uint32_t THttpTransport::readChunked() {
+  uint32_t length = 0;
+
+  char* line = readLine();
+  uint32_t chunkSize = parseChunkSize(line);
+  if (chunkSize == 0) {
+    readChunkedFooters();
+  } else {
+    // Read data content
+    length += readContent(chunkSize);
+    // Read trailing CRLF after content
+    readLine();
+  }
+  return length;
+}
+
+void THttpTransport::readChunkedFooters() {
+  // End of data, read footer lines until a blank one appears
+  while (true) {
+    char* line = readLine();
+    if (strlen(line) == 0) {
+      chunkedDone_ = true;
+      break;
+    }
+  }
+}
+
+uint32_t THttpTransport::parseChunkSize(char* line) {
+  char* semi = strchr(line, ';');
+  if (semi != NULL) {
+    *semi = '\0';
+  }
+  uint32_t size = 0;
+  sscanf(line, "%x", &size);
+  return size;
+}
+
+uint32_t THttpTransport::readContent(uint32_t size) {
+  uint32_t need = size;
+  while (need > 0) {
+    uint32_t avail = httpBufLen_ - httpPos_;
+    if (avail == 0) {
+      // We have given all the data, reset position to head of the buffer
+      httpPos_ = 0;
+      httpBufLen_ = 0;
+      refill();
+
+      // Now have available however much we read
+      avail = httpBufLen_;
+    }
+    uint32_t give = avail;
+    if (need < give) {
+      give = need;
+    }
+    readBuffer_.write((uint8_t*)(httpBuf_ + httpPos_), give);
+    httpPos_ += give;
+    need -= give;
+  }
+  return size;
+}
+
+char* THttpTransport::readLine() {
+  while (true) {
+    char* eol = NULL;
+
+    eol = strstr(httpBuf_ + httpPos_, CRLF);
+
+    // No CRLF yet?
+    if (eol == NULL) {
+      // Shift whatever we have now to front and refill
+      shift();
+      refill();
+    } else {
+      // Return pointer to next line
+      *eol = '\0';
+      char* line = httpBuf_ + httpPos_;
+      httpPos_ = static_cast<uint32_t>((eol - httpBuf_) + CRLF_LEN);
+      return line;
+    }
+  }
+}
+
+void THttpTransport::shift() {
+  if (httpBufLen_ > httpPos_) {
+    // Shift down remaining data and read more
+    uint32_t length = httpBufLen_ - httpPos_;
+    memmove(httpBuf_, httpBuf_ + httpPos_, length);
+    httpBufLen_ = length;
+  } else {
+    httpBufLen_ = 0;
+  }
+  httpPos_ = 0;
+  httpBuf_[httpBufLen_] = '\0';
+}
+
+void THttpTransport::refill() {
+  uint32_t avail = httpBufSize_ - httpBufLen_;
+  if (avail <= (httpBufSize_ / 4)) {
+    httpBufSize_ *= 2;
+    httpBuf_ = (char*)std::realloc(httpBuf_, httpBufSize_ + 1);
+    if (httpBuf_ == NULL) {
+      throw std::bad_alloc();
+    }
+  }
+
+  // Read more data
+  uint32_t got = transport_->read((uint8_t*)(httpBuf_ + httpBufLen_), httpBufSize_ - httpBufLen_);
+  httpBufLen_ += got;
+  httpBuf_[httpBufLen_] = '\0';
+
+  if (got == 0) {
+    throw TTransportException("Could not refill buffer");
+  }
+}
+
+void THttpTransport::readHeaders() {
+  // Initialize headers state variables
+  contentLength_ = 0;
+  chunked_ = false;
+  chunkedDone_ = false;
+  chunkSize_ = 0;
+
+  // Control state flow
+  bool statusLine = true;
+  bool finished = false;
+
+  // Loop until headers are finished
+  while (true) {
+    char* line = readLine();
+
+    if (strlen(line) == 0) {
+      if (finished) {
+        readHeaders_ = false;
+        return;
+      } else {
+        // Must have been an HTTP 100, keep going for another status line
+        statusLine = true;
+      }
+    } else {
+      if (statusLine) {
+        statusLine = false;
+        finished = parseStatusLine(line);
+      } else {
+        parseHeader(line);
+      }
+    }
+  }
+}
+
+void THttpTransport::write(const uint8_t* buf, uint32_t len) {
+  writeBuffer_.write(buf, len);
+}
+
+const std::string THttpTransport::getOrigin() {
+  std::ostringstream oss;
+  if (!origin_.empty()) {
+    oss << origin_ << ", ";
+  }
+  oss << transport_->getOrigin();
+  return oss.str();
+}
+}
+}
+}
diff --git a/be/src/transport/THttpTransport.h b/be/src/transport/THttpTransport.h
new file mode 100644
index 0000000..a9f564c
--- /dev/null
+++ b/be/src/transport/THttpTransport.h
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TRANSPORT_THTTPTRANSPORT_H_
+#define _THRIFT_TRANSPORT_THTTPTRANSPORT_H_ 1
+
+#include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TVirtualTransport.h>
+
+namespace apache {
+namespace thrift {
+namespace transport {
+
+/**
+ * HTTP implementation of the thrift transport. This was irritating
+ * to write, but the alternatives in C++ land are daunting. Linking CURL
+ * requires 23 dynamic libraries last time I checked (WTF?!?). All we have
+ * here is a VERY basic HTTP/1.1 client which supports HTTP 100 Continue,
+ * chunked transfer encoding, keepalive, etc. Tested against Apache.
+ */
+class THttpTransport : public TVirtualTransport<THttpTransport> {
+public:
+  THttpTransport(boost::shared_ptr<TTransport> transport);
+
+  virtual ~THttpTransport();
+
+  void open() { transport_->open(); }
+
+  bool isOpen() { return transport_->isOpen(); }
+
+  bool peek() { return transport_->peek(); }
+
+  void close() { transport_->close(); }
+
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  uint32_t readEnd();
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  virtual void flush() = 0;
+
+  virtual const std::string getOrigin();
+
+protected:
+  boost::shared_ptr<TTransport> transport_;
+  std::string origin_;
+
+  TMemoryBuffer writeBuffer_;
+  TMemoryBuffer readBuffer_;
+
+  bool readHeaders_;
+  bool chunked_;
+  bool chunkedDone_;
+  uint32_t chunkSize_;
+  uint32_t contentLength_;
+
+  char* httpBuf_;
+  uint32_t httpPos_;
+  uint32_t httpBufLen_;
+  uint32_t httpBufSize_;
+
+  virtual void init();
+
+  uint32_t readMoreData();
+  char* readLine();
+
+  void readHeaders();
+  virtual void parseHeader(char* header) = 0;
+  virtual bool parseStatusLine(char* status) = 0;
+
+  uint32_t readChunked();
+  void readChunkedFooters();
+  uint32_t parseChunkSize(char* line);
+
+  uint32_t readContent(uint32_t size);
+
+  void refill();
+  void shift();
+
+  static const char* CRLF;
+  static const int CRLF_LEN;
+};
+}
+}
+} // apache::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_