You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/05/30 17:29:54 UTC

[impala] 02/04: IMPALA-1653: Don't close hiveserver2 session when connection is closed

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b1cb879577f1e7ea8b6ea1e72c856eee0a582627
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Fri May 10 09:24:48 2019 -0700

    IMPALA-1653: Don't close hiveserver2 session when connection is closed
    
    Currently, when a client connection is closed, we always close any
    session started over that connection. This is a requirement for
    beeswax, which always ties sessions to connections, but it is not
    required for hiveserver2, which allows sessions to be used across
    connections with a session token.
    
    This patch changes this behavior so that hiveserver2 sessions are no
    longer closed when the corresponding connection is closed.
    
    One downside of this change is that clients may inadvertently leave
    sessions open indefinitely if they close their connection without
    calling CloseSession(), which can waste space on the coordinator.
    We already have a flag --idle_session_timeout, but this flag is off
    by default and sessions that hit this timeout are expired but not
    fully closed.
    
    Rather than changing the default idle session behavior, which could
    affect existing users, this patch mitigates this issue by adding a
    new flag: --disconnected_session_timeout which is set to 1 hour by
    default. When a session has had no open connections for longer than
    this time, it will be closed and any associated queries will be
    unregistered.
    
    Testing:
    - Added e2e tests.
    
    Change-Id: Ia4555cd9b73db5b4dde92cd4fac4f9bfa3664d78
    Reviewed-on: http://gerrit.cloudera.org:8080/13306
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/thrift-server.cc             |   3 +
 be/src/rpc/thrift-server.h              |   3 +
 be/src/service/impala-beeswax-server.cc |  36 ++++-----
 be/src/service/impala-hs2-server.cc     |  29 ++++++--
 be/src/service/impala-server.cc         | 127 +++++++++++++++++++++++---------
 be/src/service/impala-server.h          |  54 +++++++++++---
 common/thrift/generate_error_codes.py   |   3 +
 tests/common/impala_connection.py       |   6 ++
 tests/custom_cluster/test_hs2.py        | 103 ++++++++++++++++++++++++++
 tests/hs2/test_hs2.py                   |  57 +++++++++-----
 10 files changed, 331 insertions(+), 90 deletions(-)

diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 070e665..ccf1aff 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -239,6 +239,9 @@ void ThriftServer::ThriftServerEventProcessor::preServe() {
 // connection state such as the connection identifier and the username.
 __thread ThriftServer::ConnectionContext* __connection_context__;
 
+bool ThriftServer::HasThreadConnectionContext() {
+  return __connection_context__ != nullptr;
+}
 
 const TUniqueId& ThriftServer::GetThreadConnectionId() {
   return __connection_context__->connection_id;
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index e7d861b..4afa040 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -110,6 +110,9 @@ class ThriftServer {
     connection_handler_ = connection;
   }
 
+  /// Returns true if the current thread has a connection context set on it.
+  static bool HasThreadConnectionContext();
+
   /// Returns a unique identifier for the current connection. A connection is
   /// identified with the lifetime of a socket connection to this server.
   /// It is only safe to call this method during a Thrift processor RPC
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 3f0bce0..48bb26c 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -57,7 +57,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
   ScopedSessionState session_handle(this);
   shared_ptr<SessionState> session;
   RAISE_IF_ERROR(
-      session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
+      session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId(), &session),
       SQLSTATE_GENERAL_ERROR);
   TQueryCtx query_ctx;
   // raise general error for request conversion error;
@@ -93,7 +93,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
   ScopedSessionState session_handle(this);
   shared_ptr<SessionState> session;
   RAISE_IF_ERROR(
-      session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
+      session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId(), &session),
       SQLSTATE_GENERAL_ERROR);
   TQueryCtx query_ctx;
   // raise general error for request conversion error;
@@ -145,7 +145,7 @@ void ImpalaServer::explain(QueryExplanation& query_explanation, const Query& que
   VLOG_QUERY << "explain(): query=" << query.query;
   RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
 
   TQueryCtx query_ctx;
@@ -162,7 +162,7 @@ void ImpalaServer::explain(QueryExplanation& query_explanation, const Query& que
 void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle,
     const bool start_over, const int32_t fetch_size) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
 
   if (start_over) {
@@ -188,7 +188,7 @@ void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle
 void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
     const QueryHandle& handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
 
   // Convert QueryHandle to TUniqueId and get the query exec state.
@@ -232,7 +232,7 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
 
 void ImpalaServer::close(const QueryHandle& handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   TUniqueId query_id;
   QueryHandleToTUniqueId(handle, &query_id);
@@ -244,7 +244,7 @@ void ImpalaServer::close(const QueryHandle& handle) {
 
 beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   TUniqueId query_id;
   QueryHandleToTUniqueId(handle, &query_id);
@@ -267,7 +267,7 @@ beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
 
 void ImpalaServer::echo(string& echo_string, const string& input_string) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   echo_string = input_string;
 }
@@ -277,7 +277,7 @@ void ImpalaServer::clean(const LogContextId& log_context) {
 
 void ImpalaServer::get_log(string& log, const LogContextId& context) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   // LogContextId is the same as QueryHandle.id
   QueryHandle handle;
@@ -322,7 +322,7 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
 void ImpalaServer::get_default_configuration(vector<ConfigVariable> &configurations,
     const bool include_hadoop) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   configurations.insert(configurations.end(), default_configs_.begin(),
       default_configs_.end());
@@ -330,7 +330,7 @@ void ImpalaServer::get_default_configuration(vector<ConfigVariable> &configurati
 
 void ImpalaServer::dump_config(string& config) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   config = "";
 }
@@ -338,7 +338,7 @@ void ImpalaServer::dump_config(string& config) {
 void ImpalaServer::Cancel(impala::TStatus& tstatus,
     const beeswax::QueryHandle& query_handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   // Convert QueryHandle to TUniqueId and get the query exec state.
   TUniqueId query_id;
@@ -350,7 +350,7 @@ void ImpalaServer::Cancel(impala::TStatus& tstatus,
 void ImpalaServer::CloseInsert(TInsertResult& insert_result,
     const QueryHandle& query_handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
   TUniqueId query_id;
   QueryHandleToTUniqueId(query_handle, &query_id);
@@ -370,8 +370,8 @@ void ImpalaServer::GetRuntimeProfile(string& profile_output, const QueryHandle&
   const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
   stringstream ss;
   shared_ptr<SessionState> session;
-  RAISE_IF_ERROR(session_handle.WithSession(session_id, &session),
-      SQLSTATE_GENERAL_ERROR);
+  RAISE_IF_ERROR(
+      session_handle.WithBeeswaxSession(session_id, &session), SQLSTATE_GENERAL_ERROR);
   if (session == NULL) {
     ss << Substitute("Invalid session id: $0", PrintId(session_id));
     RaiseBeeswaxException(ss.str(), SQLSTATE_GENERAL_ERROR);
@@ -393,8 +393,8 @@ void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
   ScopedSessionState session_handle(this);
   const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
   shared_ptr<SessionState> session;
-  RAISE_IF_ERROR(session_handle.WithSession(session_id, &session),
-      SQLSTATE_GENERAL_ERROR);
+  RAISE_IF_ERROR(
+      session_handle.WithBeeswaxSession(session_id, &session), SQLSTATE_GENERAL_ERROR);
   if (session == NULL) {
     stringstream ss;
     ss << Substitute("Invalid session id: $0", PrintId(session_id));
@@ -409,7 +409,7 @@ void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
 
 void ImpalaServer::PingImpalaService(TPingImpalaServiceResp& return_val) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
 
   VLOG_RPC << "PingImpalaService()";
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 4233f22..0ad3750 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -87,6 +87,7 @@ const TProtocolVersion::type MAX_SUPPORTED_HS2_VERSION =
 DECLARE_string(hostname);
 DECLARE_int32(webserver_port);
 DECLARE_int32(idle_session_timeout);
+DECLARE_int32(disconnected_session_timeout);
 
 namespace impala {
 
@@ -360,16 +361,17 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
       FLAGS_hostname, FLAGS_webserver_port));
   return_val.configuration.insert(make_pair("http_addr", http_addr));
 
-  // Put the session state in session_state_map_
   {
-    lock_guard<mutex> l(session_state_map_lock_);
-    session_state_map_.insert(make_pair(session_id, state));
+    lock_guard<mutex> l(connection_to_sessions_map_lock_);
+    const TUniqueId& connection_id = ThriftServer::GetThreadConnectionId();
+    connection_to_sessions_map_[connection_id].insert(session_id);
+    state->connections.insert(connection_id);
   }
 
+  // Put the session state in session_state_map_
   {
-    lock_guard<mutex> l(connection_to_sessions_map_lock_);
-    const TUniqueId& connection_id = ThriftServer::GetThreadConnectionId();
-    connection_to_sessions_map_[connection_id].push_back(session_id);
+    lock_guard<mutex> l(session_state_map_lock_);
+    session_state_map_.insert(make_pair(session_id, state));
   }
 
   ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(1);
@@ -940,5 +942,20 @@ void ImpalaServer::RenewDelegationToken(TRenewDelegationTokenResp& return_val,
   return_val.status.__set_errorMessage("Not implemented");
 }
 
+void ImpalaServer::AddSessionToConnection(
+    const TUniqueId& session_id, SessionState* session) {
+  const TUniqueId& connection_id = ThriftServer::GetThreadConnectionId();
+  {
+    boost::lock_guard<boost::mutex> l(connection_to_sessions_map_lock_);
+    connection_to_sessions_map_[connection_id].insert(session_id);
+  }
 
+  boost::lock_guard<boost::mutex> session_lock(session->lock);
+  if (session->connections.empty()) {
+    // This session was previously disconnected but now has an associated
+    // connection. It should no longer be considered for the disconnected timeout.
+    UnregisterSessionTimeout(FLAGS_disconnected_session_timeout);
+  }
+  session->connections.insert(connection_id);
+}
 }
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index f8d25b5..09fa5c7 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -209,6 +209,9 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i
     "before it is cancelled. If 0, idle queries are never expired. The query option "
     "QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents"
     " the maximum allowable timeout.");
+DEFINE_int32(disconnected_session_timeout, 15 * 60, "The time, in seconds, that a "
+    "hiveserver2 session will be maintained after the last connection that it has been "
+    "used over is disconnected.");
 
 DEFINE_int32(status_report_interval_ms, 5000, "(Advanced) Interval between profile "
     "reports in milliseconds. If set to <= 0, periodic reporting is disabled and only "
@@ -412,8 +415,8 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
   // Initialize a session expiry thread which blocks indefinitely until the first session
   // with non-zero timeout value is opened. Note that a session which doesn't specify any
   // idle session timeout value will use the default value FLAGS_idle_session_timeout.
-  ABORT_IF_ERROR(Thread::Create("impala-server", "session-expirer",
-      bind<void>(&ImpalaServer::ExpireSessions, this), &session_timeout_thread_));
+  ABORT_IF_ERROR(Thread::Create("impala-server", "session-maintenance",
+      bind<void>(&ImpalaServer::SessionMaintenance, this), &session_maintenance_thread_));
 
   ABORT_IF_ERROR(Thread::Create("impala-server", "query-expirer",
       bind<void>(&ImpalaServer::ExpireQueries, this), &query_expiration_thread_));
@@ -1382,7 +1385,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
     // TODO: deal with an error status
     discard_result(UnregisterQuery(query_id, false, &status));
   }
-  // Reconfigure the poll period of session_timeout_thread_ if necessary.
+  // Reconfigure the poll period of session_maintenance_thread_ if necessary.
   UnregisterSessionTimeout(session_state->session_timeout);
   VLOG_QUERY << "Closed session: " << PrintId(session_id);
   return Status::OK();
@@ -2046,6 +2049,7 @@ void ImpalaServer::ConnectionStart(
     session_state->network_address = connection_context.network_address;
     session_state->server_default_query_options = &default_query_options_;
     session_state->kudu_latest_observed_ts = 0;
+    session_state->connections.insert(connection_context.connection_id);
 
     // If the username was set by a lower-level transport, use it.
     if (!connection_context.username.empty()) {
@@ -2062,7 +2066,7 @@ void ImpalaServer::ConnectionStart(
     }
     {
       lock_guard<mutex> l(connection_to_sessions_map_lock_);
-      connection_to_sessions_map_[connection_context.connection_id].push_back(session_id);
+      connection_to_sessions_map_[connection_context.connection_id].insert(session_id);
     }
     ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS->Increment(1L);
   }
@@ -2070,8 +2074,7 @@ void ImpalaServer::ConnectionStart(
 
 void ImpalaServer::ConnectionEnd(
     const ThriftServer::ConnectionContext& connection_context) {
-
-  vector<TUniqueId> sessions_to_close;
+  set<TUniqueId> disconnected_sessions;
   {
     unique_lock<mutex> l(connection_to_sessions_map_lock_);
     ConnectionToSessionMap::iterator it =
@@ -2082,20 +2085,39 @@ void ImpalaServer::ConnectionEnd(
 
     // We don't expect a large number of sessions per connection, so we copy it, so that
     // we can drop the map lock early.
-    sessions_to_close = it->second;
+    disconnected_sessions = std::move(it->second);
     connection_to_sessions_map_.erase(it);
   }
 
+  bool close = connection_context.server_name == BEESWAX_SERVER_NAME
+      || FLAGS_disconnected_session_timeout <= 0;
   LOG(INFO) << "Connection from client "
             << TNetworkAddressToString(connection_context.network_address)
-            << " closed, closing " << sessions_to_close.size()
+            << " to server " << connection_context.server_name << " closed."
+            << (close ? " Closing " : " Disconnecting ") << disconnected_sessions.size()
             << " associated session(s)";
 
-  for (const TUniqueId& session_id: sessions_to_close) {
-    Status status = CloseSessionInternal(session_id, true);
-    if (!status.ok()) {
-      LOG(WARNING) << "Error closing session " << PrintId(session_id) << ": "
-                   << status.GetDetail();
+  if (close) {
+    for (const TUniqueId& session_id : disconnected_sessions) {
+      Status status = CloseSessionInternal(session_id, true);
+      if (!status.ok()) {
+        LOG(WARNING) << "Error closing session " << PrintId(session_id) << ": "
+                     << status.GetDetail();
+      }
+    }
+  } else {
+    DCHECK_EQ(connection_context.server_name, HS2_SERVER_NAME);
+    for (const TUniqueId& session_id : disconnected_sessions) {
+      shared_ptr<SessionState> state;
+      Status status = GetSessionState(session_id, &state);
+      // The session may not exist if it was explicitly closed.
+      if (!status.ok()) continue;
+      lock_guard<mutex> state_lock(state->lock);
+      state->connections.erase(connection_context.connection_id);
+      if (state->connections.empty()) {
+        state->disconnected_ms = UnixMillis();
+        RegisterSessionTimeout(FLAGS_disconnected_session_timeout);
+      }
     }
   }
 }
@@ -2118,55 +2140,88 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
   }
 }
 
-[[noreturn]] void ImpalaServer::ExpireSessions() {
+[[noreturn]] void ImpalaServer::SessionMaintenance() {
   while (true) {
     {
       unique_lock<mutex> timeout_lock(session_timeout_lock_);
       if (session_timeout_set_.empty()) {
         session_timeout_cv_.Wait(timeout_lock);
       } else {
-        // Sleep for a second before checking whether an active session can be expired.
+        // Sleep for a second before doing maintenance.
         session_timeout_cv_.WaitFor(timeout_lock, MICROS_PER_SEC);
       }
     }
 
     int64_t now = UnixMillis();
     int expired_cnt = 0;
-    VLOG(3) << "Session expiration thread waking up";
+    VLOG(3) << "Session maintenance thread waking up";
     {
       // TODO: If holding session_state_map_lock_ for the duration of this loop is too
       // expensive, consider a priority queue.
       lock_guard<mutex> map_lock(session_state_map_lock_);
-      for (SessionStateMap::value_type& session_state: session_state_map_) {
+      vector<TUniqueId> sessions_to_remove;
+      for (SessionStateMap::value_type& map_entry : session_state_map_) {
+        const TUniqueId& session_id = map_entry.first;
+        std::shared_ptr<SessionState> session_state = map_entry.second;
         unordered_set<TUniqueId> inflight_queries;
+        Status query_cancel_status;
         {
-          lock_guard<mutex> state_lock(session_state.second->lock);
-          if (session_state.second->ref_count > 0) continue;
+          lock_guard<mutex> state_lock(session_state->lock);
+          if (session_state->ref_count > 0) continue;
           // A session closed by other means is in the process of being removed, and it's
           // best not to interfere.
-          if (session_state.second->closed || session_state.second->expired) continue;
-          if (session_state.second->session_timeout == 0) continue;
-
-          int64_t last_accessed_ms = session_state.second->last_accessed_ms;
-          int64_t session_timeout_ms = session_state.second->session_timeout * 1000;
-          if (now - last_accessed_ms <= session_timeout_ms) continue;
-          LOG(INFO) << "Expiring session: " << PrintId(session_state.first) << ", user: "
-                    << session_state.second->connected_user << ", last active: "
-                    << ToStringFromUnixMillis(last_accessed_ms);
-          session_state.second->expired = true;
-          ++expired_cnt;
-          ImpaladMetrics::NUM_SESSIONS_EXPIRED->Increment(1L);
-          // Since expired is true, no more queries will be added to the inflight list.
-          inflight_queries.insert(session_state.second->inflight_queries.begin(),
-              session_state.second->inflight_queries.end());
+          if (session_state->closed) continue;
+
+          if (session_state->connections.size() == 0
+              && (now - session_state->disconnected_ms)
+                  >= FLAGS_disconnected_session_timeout * 1000L) {
+            // This session has no active connections and is past the disconnected session
+            // timeout, so close it.
+            DCHECK_ENUM_EQ(session_state->session_type, TSessionType::HIVESERVER2);
+            LOG(INFO) << "Closing session: " << PrintId(session_id)
+                      << ", user: " << session_state->connected_user
+                      << ", because it no longer  has any open connections. The last "
+                      << "connection was closed at: "
+                      << ToStringFromUnixMillis(session_state->disconnected_ms);
+            session_state->closed = true;
+            sessions_to_remove.push_back(session_id);
+            ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(-1L);
+            UnregisterSessionTimeout(FLAGS_disconnected_session_timeout);
+            query_cancel_status =
+                Status::Expected(TErrorCode::DISCONNECTED_SESSION_CLOSED);
+          } else {
+            // Check if the session should be expired.
+            if (session_state->expired || session_state->session_timeout == 0) {
+              continue;
+            }
+
+            int64_t last_accessed_ms = session_state->last_accessed_ms;
+            int64_t session_timeout_ms = session_state->session_timeout * 1000;
+            if (now - last_accessed_ms <= session_timeout_ms) continue;
+            LOG(INFO) << "Expiring session: " << PrintId(session_id)
+                      << ", user: " << session_state->connected_user
+                      << ", last active: " << ToStringFromUnixMillis(last_accessed_ms);
+            session_state->expired = true;
+            ++expired_cnt;
+            ImpaladMetrics::NUM_SESSIONS_EXPIRED->Increment(1L);
+            query_cancel_status = Status::Expected(TErrorCode::INACTIVE_SESSION_EXPIRED);
+          }
+
+          // Since either expired or closed is true no more queries will be added to the
+          // inflight list.
+          inflight_queries.insert(session_state->inflight_queries.begin(),
+              session_state->inflight_queries.end());
         }
         // Unregister all open queries from this session.
-        Status status = Status::Expected(TErrorCode::INACTIVE_SESSION_EXPIRED);
         for (const TUniqueId& query_id : inflight_queries) {
           cancellation_thread_pool_->Offer(
-              CancellationWork::TerminatedByServer(query_id, status, true));
+              CancellationWork::TerminatedByServer(query_id, query_cancel_status, true));
         }
       }
+      // Remove any sessions that were closed from the map.
+      for (const TUniqueId& session_id : sessions_to_remove) {
+        session_state_map_.erase(session_id);
+      }
     }
     LOG_IF(INFO, expired_cnt > 0) << "Expired sessions. Count: " << expired_cnt;
   }
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index b23a518..7d99a6e 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -512,6 +512,10 @@ class ImpalaServer : public ImpalaServiceIf,
     /// Time the session was last accessed, in ms since epoch (UTC).
     int64_t last_accessed_ms;
 
+    /// If this session has no open connections, this is the time in UTC when the last
+    /// connection was closed.
+    int64_t disconnected_ms;
+
     /// The latest Kudu timestamp observed after DML operations executed within this
     /// session.
     uint64_t kudu_latest_observed_ts;
@@ -527,6 +531,9 @@ class ImpalaServer : public ImpalaServiceIf,
     /// HS2 session, or using the SET command.
     int32_t session_timeout = 0;
 
+    /// The connection ids of any connections that this session has been used over.
+    std::set<TUniqueId> connections;
+
     /// Updates the session timeout based on the query option idle_session_timeout.
     /// It registers/unregisters the session timeout to the Impala server.
     /// The lock must be owned by the caller of this function.
@@ -893,11 +900,15 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Unregister timeout value.
   void UnregisterSessionTimeout(int32_t timeout);
 
-  /// To be run in a thread which wakes up every second. This function checks all
-  /// sessions for their last-idle times. Those that have been idle for longer than
-  /// their configured timeout values are 'expired': they will no longer accept queries
-  /// and any running queries associated with those sessions are unregistered.
-  [[noreturn]] void ExpireSessions();
+  /// To be run in a thread which wakes up every second if there are registered sesions
+  /// timeouts. This function checks all sessions for:
+  /// - Last-idle times. Those that have been idle for longer than their configured
+  ///   timeout values are 'expired': they will no longer accept queries.
+  /// - Disconnected times. Those that have had no active connections for longer than
+  ///   FLAGS_disconnected_session_timeout are closed: they are removed from the session
+  ///   state map and can no longer be accessed by clients.
+  /// For either case any running queries associated with those sessions are unregistered.
+  [[noreturn]] void SessionMaintenance();
 
   /// Runs forever, walking queries_by_timestamp_ and expiring any queries that have been
   /// idle (i.e. no client input and no time spent processing locally) for
@@ -974,11 +985,11 @@ class ImpalaServer : public ImpalaServiceIf,
   /// avoid blocking the statestore callback.
   boost::scoped_ptr<ThreadPool<CancellationWork>> cancellation_thread_pool_;
 
-  /// Thread that runs ExpireSessions. It will wake up periodically to check for sessions
-  /// which are idle for more their timeout values.
-  std::unique_ptr<Thread> session_timeout_thread_;
+  /// Thread that runs SessionMaintenance. It will wake up periodically to check for
+  /// sessions which are idle for more their timeout values.
+  std::unique_ptr<Thread> session_maintenance_thread_;
 
-  /// Contains all the non-zero idle session timeout values.
+  /// Contains all the non-zero idle or disconnected session timeout values.
   std::multiset<int32_t> session_timeout_set_;
 
   /// The lock for protecting the session_timeout_set_.
@@ -1017,6 +1028,24 @@ class ImpalaServer : public ImpalaServiceIf,
       DCHECK(session_.get() == NULL);
       RETURN_IF_ERROR(impala_->GetSessionState(session_id, &session_, true));
       if (session != NULL) (*session) = session_;
+
+      // We won't have a connection context in the case of ChildQuery, which calls into
+      // hiveserver2 functions directly without going through the Thrift stack.
+      if (ThriftServer::HasThreadConnectionContext()) {
+        // Try adding the session id to the connection's set of sessions in case this is
+        // the first time this session has been used on this connection.
+        impala_->AddSessionToConnection(session_id, session_.get());
+      }
+      return Status::OK();
+    }
+
+    /// Same as WithSession(), except does not update the session/connection mapping, as
+    /// beeswax sessions can only be used over a single connection.
+    Status WithBeeswaxSession(const TUniqueId& session_id,
+        std::shared_ptr<SessionState>* session = NULL) WARN_UNUSED_RESULT {
+      DCHECK(session_.get() == NULL);
+      RETURN_IF_ERROR(impala_->GetSessionState(session_id, &session_, true));
+      if (session != NULL) (*session) = session_;
       return Status::OK();
     }
 
@@ -1054,8 +1083,7 @@ class ImpalaServer : public ImpalaServiceIf,
   /// closed when the connection ends. HS2 allows for multiplexing several sessions across
   /// a single connection. If a session has already been closed (only possible via HS2) it
   /// is not removed from this map to avoid the cost of looking it up.
-  typedef boost::unordered_map<TUniqueId, std::vector<TUniqueId>>
-    ConnectionToSessionMap;
+  typedef boost::unordered_map<TUniqueId, std::set<TUniqueId>> ConnectionToSessionMap;
   ConnectionToSessionMap connection_to_sessions_map_;
 
   /// Returns session state for given session_id.
@@ -1075,6 +1103,10 @@ class ImpalaServer : public ImpalaServiceIf,
     session->last_accessed_ms = UnixMillis();
   }
 
+  /// Associate the current connection context with the given session in
+  /// 'connection_to_sessions_map_' and 'SessionState::connections'.
+  void AddSessionToConnection(const TUniqueId& session_id, SessionState* session);
+
   /// Protects query_locations_. Not held in conjunction with other locks.
   boost::mutex query_locations_lock_;
 
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 9e76be5..cc1a38f 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -408,6 +408,9 @@ error_codes = (
   ("PARQUET_DATE_OUT_OF_RANGE", 134,
    "Parquet file '$0' column '$1' contains an out of range date. "
    "The valid date range is 0000-01-01..9999-12-31."),
+
+  ("DISCONNECTED_SESSION_CLOSED", 135,
+   "Session closed because it has no active connections"),
 )
 
 import sys
diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py
index 2b10bd1..b05478f 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -278,6 +278,12 @@ class ImpylaHS2Connection(ImpalaConnection):
 
   def close(self):
     LOG.info("-- closing connection to: {0}".format(self.__host_port))
+    try:
+      # Explicitly close the cursor so that it will close the session.
+      self.__cursor.close()
+    except Exception, e:
+      # The session may no longer be valid if the impalad was restarted during the test.
+      LOG.exception(e)
     self.__impyla_conn.close()
 
   def close_query(self, operation_handle):
diff --git a/tests/custom_cluster/test_hs2.py b/tests/custom_cluster/test_hs2.py
new file mode 100644
index 0000000..9b641de
--- /dev/null
+++ b/tests/custom_cluster/test_hs2.py
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+import pytest
+
+from tests.hs2.hs2_test_suite import HS2TestSuite, operation_id_to_query_id
+from time import sleep
+from TCLIService import TCLIService
+
+
+class TestHS2(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('These tests only run in exhaustive')
+    super(TestHS2, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--disconnected_session_timeout=1")
+  def test_disconnected_session_timeout(self):
+    """Test that a session gets closed if it has no active connections for more than
+    disconnected_session_timeout."""
+    conn = HS2TestSuite()
+    conn.setup()
+    open_session_req = TCLIService.TOpenSessionReq()
+    open_session_resp = conn.hs2_client.OpenSession(open_session_req)
+    HS2TestSuite.check_response(open_session_resp)
+    conn.session_handle = open_session_resp.sessionHandle
+    # Ren a query, which should succeed.
+    conn.execute_statement("select 1")
+
+    # Set up another connection and run a long-running query with the same session.
+    conn2 = HS2TestSuite()
+    conn2.setup()
+    conn2.session_handle = open_session_resp.sessionHandle
+    execute_resp = conn2.execute_statement("select sleep(10000)")
+
+    # Close one connection and wait for longer than disconnected_session_timeout. The
+    # session should still be available since there's still one active connection.
+    conn2.teardown()
+    sleep(5)
+    conn.execute_statement("select 3")
+
+    # Close the other connection and sleep again. THe session shuold now be closed.
+    conn.teardown()
+    sleep(5)
+    conn.setup()
+
+    # Run another query, which should fail since the session is closed.
+    conn.execute_statement("select 2", expected_error_prefix="Invalid session id",
+        expected_status_code=TCLIService.TStatusCode.ERROR_STATUS)
+
+    # Check that the query was cancelled correctly.
+    query_id = operation_id_to_query_id(execute_resp.operationHandle.operationId)
+    status = self.cluster.get_first_impalad().service.get_query_status(query_id)
+    assert status == "Session closed because it has no active connections"
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      "--idle_session_timeout=1 --disconnected_session_timeout=5")
+  def test_expire_disconnected_session(self):
+    """Test for the interaction between idle_session_timeout and
+    disconnected_session_timeout"""
+    # Close the default test clients so that they don't expire while the test is running
+    # and affect the metric values.
+    self.client.close()
+    self.hs2_client.close()
+    impalad = self.cluster.get_first_impalad()
+
+    conn = HS2TestSuite()
+    conn.setup()
+    # Open a session and then close the connection.
+    open_session_req = TCLIService.TOpenSessionReq()
+    open_session_resp = conn.hs2_client.OpenSession(open_session_req)
+    HS2TestSuite.check_response(open_session_resp)
+    conn.teardown()
+
+    # The idle session timeout should be hit first, so the session will be expired.
+    impalad.service.wait_for_metric_value(
+        "impala-server.num-sessions-expired", 1)
+    # The session should eventually be closed by the disconnected timeout.
+    impalad.service.wait_for_metric_value(
+        "impala-server.num-open-hiveserver2-sessions", 0)
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index cec1d9e..9fa92af 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -245,6 +245,12 @@ class TestHS2(HS2TestSuite):
       assert num_expired_sessions == self.impalad_test_service.get_metric_value(
           "impala-server.num-sessions-expired")
 
+    # Close the remaining sessions.
+    for session_handle in session_handles:
+      if session_handle is not None:
+        close_session_req = TCLIService.TCloseSessionReq(session_handle)
+        TestHS2.check_response(self.hs2_client.CloseSession(close_session_req))
+
   @needs_session()
   def test_get_operation_status(self):
     """Tests that GetOperationStatus returns a valid result for a running query"""
@@ -347,23 +353,6 @@ class TestHS2(HS2TestSuite):
     assert err_msg in get_result_set_metadata_resp.status.errorMessage
 
   @pytest.mark.execute_serially
-  def test_socket_close_forces_session_close(self):
-    """Test that closing the underlying socket forces the associated session to close.
-    See IMPALA-564"""
-    open_session_req = TCLIService.TOpenSessionReq()
-    resp = self.hs2_client.OpenSession(open_session_req)
-    TestHS2.check_response(resp)
-    num_sessions = self.impalad_test_service.get_metric_value(
-        "impala-server.num-open-hiveserver2-sessions")
-
-    assert num_sessions > 0
-
-    self.socket.close()
-    self.socket = None
-    self.impalad_test_service.wait_for_metric_value(
-        "impala-server.num-open-hiveserver2-sessions", num_sessions - 1)
-
-  @pytest.mark.execute_serially
   def test_multiple_sessions(self):
     """Test that multiple sessions on the same socket connection are allowed"""
     num_sessions = self.impalad_test_service.get_metric_value(
@@ -380,8 +369,11 @@ class TestHS2(HS2TestSuite):
     self.impalad_test_service.wait_for_metric_value(
         "impala-server.num-open-hiveserver2-sessions", num_sessions + 5)
 
-    self.socket.close()
-    self.socket = None
+    for session_id in session_ids:
+      close_session_req = TCLIService.TCloseSessionReq(session_id)
+      resp = self.hs2_client.CloseSession(close_session_req)
+      TestHS2.check_response(resp)
+
     self.impalad_test_service.wait_for_metric_value(
         "impala-server.num-open-hiveserver2-sessions", num_sessions)
 
@@ -616,3 +608,30 @@ class TestHS2(HS2TestSuite):
     fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
     TestHS2.check_response(fetch_results_resp)
     return fetch_results_resp
+
+  def test_close_connection(self):
+    """Tests that an hs2 session remains valid even after the connection is dropped."""
+    open_session_req = TCLIService.TOpenSessionReq()
+    open_session_resp = self.hs2_client.OpenSession(open_session_req)
+    TestHS2.check_response(open_session_resp)
+    self.session_handle = open_session_resp.sessionHandle
+    # Ren a query, which should succeed.
+    self.execute_statement("select 1")
+
+    # Reset the connection.
+    self.teardown()
+    self.setup()
+
+    # Run another query with the same session handle. It should succeed even though it's
+    # on a new connection, since disconnected_session_timeout (default of 1 hour) will not
+    # have been hit.
+    self.execute_statement("select 2")
+
+    # Close the session.
+    close_session_req = TCLIService.TCloseSessionReq()
+    close_session_req.sessionHandle = self.session_handle
+    TestHS2.check_response(self.hs2_client.CloseSession(close_session_req))
+
+    # Run another query, which should fail since the session is closed.
+    self.execute_statement("select 3", expected_error_prefix="Invalid session id",
+        expected_status_code=TCLIService.TStatusCode.ERROR_STATUS)