You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/21 04:35:27 UTC

[impala] 03/04: IMPALA-7802: Close connections of idle client sessions

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 67cd55e0445d037d834ecd120d08b0948b064491
Author: Michael Ho <kw...@cloudera.com>
AuthorDate: Thu Jun 6 13:32:03 2019 -0700

    IMPALA-7802: Close connections of idle client sessions
    
    Previously, if idle session timeout is set either via
    startup flag or query options, a client session will expire
    after that set period of inactivity. However, the network
    connection and the service thread of an expired session will
    still be around until the session is closed by the client.
    This is highly undesirable as these idle sessions still count
    towards the quota bound by --fe_esrvice_threads, so if the
    total number of sessions (including the idle ones) reaches
    that upper bound, all incoming new session will block until
    some of the existing sessions exit. There is no time bound on
    when those expired sessions will be closed. In some sense,
    leaving many idle sessions opened is a denial-of-service attack
    on Impala.
    
    This change implements support for closing expired client sessions.
    In particular, a new flag --idle_client_poll_time_s is added to
    specify a time interval in seconds of client's inactivity which
    will cause an idle service thread of a client connection to wake up
    and check if all sessions associated with the connection are idle.
    If so, the connection will be closed. This allows the service threads
    to be freed up without waiting for client to close the connections.
    
    Testing done:
    - core build
    - new targeted test which verifies the connections of expired sessions
    are closed.
    - verified the flags function as expected in a secure cluster with Kerberos + SSL
    
    Change-Id: I97c4fb8e1b741add273f8a913fb0967303683e38
    Reviewed-on: http://gerrit.cloudera.org:8080/13607
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/TAcceptQueueServer.cpp               |  63 ++++++++++++-
 be/src/rpc/TAcceptQueueServer.h                 |   8 +-
 be/src/rpc/thrift-server.cc                     |  71 +++------------
 be/src/rpc/thrift-server.h                      | 115 +++++++++++++++++++++---
 be/src/rpc/thrift-util.cc                       |  19 ++--
 be/src/rpc/thrift-util.h                        |   7 +-
 be/src/runtime/client-cache.h                   |   4 +-
 be/src/service/impala-server.cc                 |  57 +++++++++++-
 be/src/service/impala-server.h                  |   5 ++
 tests/custom_cluster/test_hs2.py                |   4 +-
 tests/custom_cluster/test_session_expiration.py |  70 +++++++++++++--
 11 files changed, 329 insertions(+), 94 deletions(-)

diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 2a77662..09d7954 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -23,8 +23,11 @@
 #include "rpc/TAcceptQueueServer.h"
 
 #include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/transport/TSocket.h>
 
 #include "util/metrics.h"
+#include "rpc/thrift-util.h"
+#include "rpc/thrift-server.h"
 #include "util/thread-pool.h"
 
 DEFINE_int32(accepted_cnxn_queue_depth, 10000,
@@ -72,8 +75,10 @@ class TAcceptQueueServer::Task : public Runnable {
         if (eventHandler != nullptr) {
           eventHandler->processContext(connectionContext, transport_);
         }
-        if (!processor_->process(input_, output_, connectionContext)
-            || !input_->getTransport()->peek()) {
+        // Setting a socket timeout for process() may lead to false positive
+        // and prematurely closes a slow client's connection.
+        if (!processor_->process(input_, output_, connectionContext) ||
+            !Peek(input_, connectionContext, eventHandler)) {
           break;
         }
       }
@@ -114,6 +119,56 @@ class TAcceptQueueServer::Task : public Runnable {
   }
 
  private:
+
+  // This function blocks until some bytes show up from the client.
+  // Returns true if some bytes are available from client;
+  // Returns false upon reading EOF, in which case the connection
+  // will be closed by the caller.
+  //
+  // If idle_poll_period_ms_ is not 0, this function will block up
+  // to idle_poll_period_ms_ milliseconds before waking up to check
+  // if the sessions associated with the connection have all expired
+  // due to inactivity. If so, it will return false and the connection
+  // will be closed by the caller.
+  bool Peek(shared_ptr<TProtocol> input, void* connectionContext,
+      boost::shared_ptr<TServerEventHandler> eventHandler) {
+    // Set a timeout on input socket if idle_poll_period_ms_ is non-zero.
+    TSocket* socket = static_cast<TSocket*>(transport_.get());
+    if (server_.idle_poll_period_ms_ > 0) {
+      socket->setRecvTimeout(server_.idle_poll_period_ms_);
+    }
+
+    // Block until some bytes show up or EOF or timeout.
+    bool bytes_pending = true;
+    for (;;) {
+      try {
+        bytes_pending = input_->getTransport()->peek();
+        break;
+      } catch (const TTransportException& ttx) {
+        // Implementaion of the underlying transport's peek() may call either
+        // read() or peek() of the socket.
+        if (eventHandler != nullptr && server_.idle_poll_period_ms_ > 0 &&
+            (IsReadTimeoutTException(ttx) || IsPeekTimeoutTException(ttx))) {
+          ThriftServer::ThriftServerEventProcessor* thriftServerHandler =
+              static_cast<ThriftServer::ThriftServerEventProcessor*>(eventHandler.get());
+          if (thriftServerHandler->IsIdleContext(connectionContext)) {
+            const string& client = socket->getSocketInfo();
+            GlobalOutput.printf(
+               "TAcceptQueueServer closing connection to idle client %s", client.c_str());
+            bytes_pending = false;
+            break;
+          }
+        } else {
+          // Rethrow the exception to be handled by callers.
+          throw;
+        }
+      }
+    }
+    // Unset the socket timeout.
+    if (server_.idle_poll_period_ms_ > 0) socket->setRecvTimeout(0);
+    return bytes_pending;
+  }
+
   TAcceptQueueServer& server_;
   friend class TAcceptQueueServer;
 
@@ -128,10 +183,10 @@ TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<TProcessor>& proc
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
     const boost::shared_ptr<ThreadFactory>& threadFactory, const string& name,
-    int32_t maxTasks, int64_t timeout_ms)
+    int32_t maxTasks, int64_t queue_timeout_ms, int64_t idle_poll_period_ms)
     : TServer(processor, serverTransport, transportFactory, protocolFactory),
       threadFactory_(threadFactory), name_(name), maxTasks_(maxTasks),
-      queue_timeout_ms_(timeout_ms) {
+      queue_timeout_ms_(queue_timeout_ms), idle_poll_period_ms_(idle_poll_period_ms) {
   init();
 }
 
diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h
index 8f16add..08a7244 100644
--- a/be/src/rpc/TAcceptQueueServer.h
+++ b/be/src/rpc/TAcceptQueueServer.h
@@ -62,7 +62,8 @@ class TAcceptQueueServer : public TServer {
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
       const boost::shared_ptr<ThreadFactory>& threadFactory,
-      const std::string& name, int32_t maxTasks = 0, int64_t timeout_ms = 0);
+      const std::string& name, int32_t maxTasks = 0,
+      int64_t queue_timeout_ms = 0, int64_t idle_poll_period_ms = 0);
 
   ~TAcceptQueueServer() override = default;
 
@@ -116,6 +117,11 @@ class TAcceptQueueServer : public TServer {
   /// Amount of time in milliseconds after which a connection request will be timed out.
   /// Default value is 0, which means no timeout.
   int64_t queue_timeout_ms_;
+
+  /// Amount of time, in milliseconds, of client's inactivity before the service thread
+  /// wakes up to check if the connection should be closed due to inactivity. If 0, no
+  /// polling happens.
+  int64_t idle_poll_period_ms_;
 };
 
 } // namespace server
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index a859c9a..c1ce270 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -106,60 +106,6 @@ bool SSLProtoVersions::IsSupported(const SSLProtocol& protocol) {
   }
 }
 
-// Helper class that starts a server in a separate thread, and handles
-// the inter-thread communication to monitor whether it started
-// correctly.
-class ThriftServer::ThriftServerEventProcessor : public TServerEventHandler {
- public:
-  ThriftServerEventProcessor(ThriftServer* thrift_server)
-      : thrift_server_(thrift_server),
-        signal_fired_(false) { }
-
-  // Called by the Thrift server implementation when it has acquired its resources and is
-  // ready to serve, and signals to StartAndWaitForServer that start-up is finished. From
-  // TServerEventHandler.
-  virtual void preServe();
-
-  // Called when a client connects; we create per-client state and call any
-  // ConnectionHandlerIf handler.
-  virtual void* createContext(boost::shared_ptr<TProtocol> input,
-      boost::shared_ptr<TProtocol> output);
-
-  // Called when a client starts an RPC; we set the thread-local connection context.
-  virtual void processContext(void* context, boost::shared_ptr<TTransport> output);
-
-  // Called when a client disconnects; we call any ConnectionHandlerIf handler.
-  virtual void deleteContext(void* serverContext, boost::shared_ptr<TProtocol> input,
-      boost::shared_ptr<TProtocol> output);
-
-  // Waits for a timeout of TIMEOUT_MS for a server to signal that it has started
-  // correctly.
-  Status StartAndWaitForServer();
-
- private:
-  // Lock used to ensure that there are no missed notifications between starting the
-  // supervision thread and calling signal_cond_.WaitUntil. Also used to ensure
-  // thread-safe access to members of thrift_server_
-  boost::mutex signal_lock_;
-
-  // Condition variable that is notified by the supervision thread once either
-  // a) all is well or b) an error occurred.
-  ConditionVariable signal_cond_;
-
-  // The ThriftServer under management. This class is a friend of ThriftServer, and
-  // reaches in to change member variables at will.
-  ThriftServer* thrift_server_;
-
-  // Guards against spurious condition variable wakeups
-  bool signal_fired_;
-
-  // The time, in milliseconds, to wait for a server to come up
-  static const int TIMEOUT_MS = 2500;
-
-  // Called in a separate thread
-  void Supervise();
-};
-
 Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
   // Locking here protects against missed notifications if Supervise executes quickly
   unique_lock<mutex> lock(signal_lock_);
@@ -310,9 +256,17 @@ void ThriftServer::ThriftServerEventProcessor::processContext(void* context,
   __connection_context__ = reinterpret_cast<ConnectionContext*>(context);
 }
 
-void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext,
+bool ThriftServer::ThriftServerEventProcessor::IsIdleContext(void* context) {
+  __connection_context__ = reinterpret_cast<ConnectionContext*>(context);
+  if (thrift_server_->connection_handler_ != nullptr) {
+    return thrift_server_->connection_handler_->IsIdleConnection(*__connection_context__);
+  }
+  return false;
+}
+
+void ThriftServer::ThriftServerEventProcessor::deleteContext(void* context,
     boost::shared_ptr<TProtocol> input, boost::shared_ptr<TProtocol> output) {
-  __connection_context__ = (ConnectionContext*) serverContext;
+  __connection_context__ = reinterpret_cast<ConnectionContext*>(context);
 
   if (thrift_server_->connection_handler_ != NULL) {
     thrift_server_->connection_handler_->ConnectionEnd(*__connection_context__);
@@ -331,12 +285,13 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext
 ThriftServer::ThriftServer(const string& name,
     const boost::shared_ptr<TProcessor>& processor, int port, AuthProvider* auth_provider,
     MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms,
-    TransportType transport_type)
+    int64_t idle_poll_period_ms, TransportType transport_type)
   : started_(false),
     port_(port),
     ssl_enabled_(false),
     max_concurrent_connections_(max_concurrent_connections),
     queue_timeout_ms_(queue_timeout_ms),
+    idle_poll_period_ms_(idle_poll_period_ms),
     name_(name),
     metrics_name_(Substitute("impala.thrift-server.$0", name_)),
     server_(NULL),
@@ -497,7 +452,7 @@ Status ThriftServer::Start() {
 
   server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory,
       protocol_factory, thread_factory, name_, max_concurrent_connections_,
-      queue_timeout_ms_));
+      queue_timeout_ms_, idle_poll_period_ms_));
   if (metrics_ != NULL) {
     (static_cast<TAcceptQueueServer*>(server_.get()))
         ->InitMetrics(metrics_, metrics_name_);
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index f208a10..bf9a78b 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -32,6 +32,17 @@
 #include "util/metrics-fwd.h"
 #include "util/thread.h"
 
+namespace apache {
+namespace thrift {
+namespace protocol {
+class TProtocol;
+}
+namespace server {
+class TAcceptQueueServer;
+}
+}
+}
+
 namespace impala {
 
 class AuthProvider;
@@ -99,6 +110,11 @@ class ThriftServer {
     /// valid and clients must not refer to it again.
     virtual void ConnectionEnd(const ConnectionContext& connection_context) = 0;
 
+    /// Returns true if the connection is considered idle. A connection is considered
+    /// idle if all the sessions associated with it have expired due to idle timeout.
+    /// Called when a client has been inactive for --idle_client_poll_period_s seconds.
+    virtual bool IsIdleConnection(const ConnectionContext& connection_context) = 0;
+
     virtual ~ConnectionHandlerIf() = default;
   };
 
@@ -147,6 +163,74 @@ class ThriftServer {
 
  private:
   friend class ThriftServerBuilder;
+  friend class apache::thrift::server::TAcceptQueueServer;
+
+  /// Helper class which monitors starting servers. Needs access to internal members, and
+  /// is not used outside of this class.
+  friend class ThriftServerEventProcessor;
+
+  /// Helper class that starts a server in a separate thread, and handles
+  /// the inter-thread communication to monitor whether it started
+  /// correctly.
+  class ThriftServerEventProcessor : public apache::thrift::server::TServerEventHandler {
+   public:
+    ThriftServerEventProcessor(ThriftServer* thrift_server)
+      : thrift_server_(thrift_server),
+        signal_fired_(false) { }
+
+    /// Called by the Thrift server implementation when it has acquired its resources and
+    /// is ready to serve, and signals to StartAndWaitForServer that start-up is finished.
+    /// From TServerEventHandler.
+    virtual void preServe();
+
+    /// Called when a client connects; we create per-client state and call any
+    /// ConnectionHandlerIf handler.
+    virtual void* createContext(
+        boost::shared_ptr<apache::thrift::protocol::TProtocol> input,
+        boost::shared_ptr<apache::thrift::protocol::TProtocol> output);
+
+    /// Called when a client starts an RPC; we set the thread-local connection context.
+    virtual void processContext(void* context,
+        boost::shared_ptr<apache::thrift::transport::TTransport> output);
+
+    /// Called when a client disconnects; we call any ConnectionHandlerIf handler.
+    virtual void deleteContext(void* context,
+        boost::shared_ptr<apache::thrift::protocol::TProtocol> input,
+        boost::shared_ptr<apache::thrift::protocol::TProtocol> output);
+
+    /// Returns true if a client's connection is idle. A client's connection is idle iff
+    /// all the sessions associated with it have expired due to idle timeout. Called from
+    /// TAcceptQueueServer::Task::run() after clients have been inactive for
+    /// --idle_client_poll_period_s seconds.
+    bool IsIdleContext(void* context);
+
+    /// Waits for a timeout of TIMEOUT_MS for a server to signal that it has started
+    /// correctly.
+    Status StartAndWaitForServer();
+
+   private:
+    /// Lock used to ensure that there are no missed notifications between starting the
+    /// supervision thread and calling signal_cond_.WaitUntil. Also used to ensure
+    /// thread-safe access to members of thrift_server_
+    boost::mutex signal_lock_;
+
+    /// Condition variable that is notified by the supervision thread once either
+    /// a) all is well or b) an error occurred.
+    ConditionVariable signal_cond_;
+
+    /// The ThriftServer under management. This class is a friend of ThriftServer, and
+    /// reaches in to change member variables at will.
+    ThriftServer* thrift_server_;
+
+    /// Guards against spurious condition variable wakeups
+    bool signal_fired_;
+
+    /// The time, in milliseconds, to wait for a server to come up
+    static const int TIMEOUT_MS = 2500;
+
+    /// Called in a separate thread
+    void Supervise();
+  };
 
   /// Creates, but does not start, a new server on the specified port
   /// that exports the supplied interface.
@@ -158,13 +242,17 @@ class ThriftServer {
   ///  - metrics: if not nullptr, the server will register metrics on this object
   ///  - max_concurrent_connections: The maximum number of concurrent connections allowed.
   ///    If 0, there will be no enforced limit on the number of concurrent connections.
-  ///  - amount of time in milliseconds an accepted client connection will be held in
-  ///    the accepted queue, after which the request will be rejected if a server
-  ///    thread can't be found. If 0, no timeout is enforced.
+  ///  - queue_timeout_ms: amount of time in milliseconds an accepted client connection
+  ///    will be held in the accepted queue, after which the request will be rejected if
+  ///    a service thread can't be found. If 0, no timeout is enforced.
+  ///  - idle_poll_period_ms: Amount of time, in milliseconds, of client's inactivity
+  ///    before the service thread wakes up to check if the connection should be closed
+  ///    due to inactivity. If 0, no polling happens.
   ThriftServer(const std::string& name,
       const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port,
       AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr,
       int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0,
+      int64_t idle_poll_period_ms = 0,
       TransportType server_transport = TransportType::BINARY);
 
   /// Enables secure access over SSL. Must be called before Start(). The first three
@@ -218,6 +306,11 @@ class ThriftServer {
   /// Used in TAcceptQueueServer.
   int64_t queue_timeout_ms_;
 
+  /// Amount of time, in milliseconds, of client's inactivity before the service thread
+  /// wakes up to check if the connection should be closed due to inactivity. If 0, no
+  /// polling happens.
+  int64_t idle_poll_period_ms_;
+
   /// User-specified identifier that shows up in logs
   const std::string name_;
 
@@ -263,11 +356,6 @@ class ThriftServer {
 
   /// Underlying transport type used by this thrift server.
   TransportType transport_type_;
-
-  /// Helper class which monitors starting servers. Needs access to internal members, and
-  /// is not used outside of this class.
-  class ThriftServerEventProcessor;
-  friend class ThriftServerEventProcessor;
 };
 
 /// Helper class to build new ThriftServer instances.
@@ -297,11 +385,16 @@ class ThriftServerBuilder {
     return *this;
   }
 
-  ThriftServerBuilder& queue_timeout(int64_t timeout_ms) {
+  ThriftServerBuilder& queue_timeout_ms(int64_t timeout_ms) {
     queue_timeout_ms_ = timeout_ms;
     return *this;
   }
 
+  ThriftServerBuilder& idle_poll_period_ms(int64_t timeout_ms) {
+    idle_poll_period_ms_ = timeout_ms;
+    return *this;
+  }
+
   /// Enables SSL for this server.
   ThriftServerBuilder& ssl(
       const std::string& certificate, const std::string& private_key) {
@@ -344,7 +437,8 @@ class ThriftServerBuilder {
   Status Build(ThriftServer** server) {
     std::unique_ptr<ThriftServer> ptr(
         new ThriftServer(name_, processor_, port_, auth_provider_, metrics_,
-            max_concurrent_connections_, queue_timeout_ms_, server_transport_type_));
+            max_concurrent_connections_, queue_timeout_ms_, idle_poll_period_ms_,
+            server_transport_type_));
     if (enable_ssl_) {
       RETURN_IF_ERROR(ptr->EnableSsl(
           version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
@@ -355,6 +449,7 @@ class ThriftServerBuilder {
 
  private:
   int64_t queue_timeout_ms_ = 0;
+  int64_t idle_poll_period_ms_ = 0;
   int max_concurrent_connections_ = 0;
   std::string name_;
   boost::shared_ptr<apache::thrift::TProcessor> processor_;
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index 2c3ebc4..34384a0 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -58,9 +58,10 @@ using namespace apache::thrift::server;
 using namespace apache::thrift::protocol;
 using namespace apache::thrift::concurrency;
 
-// IsRecvTimeoutTException() and IsConnResetTException() make assumption about the
-// implementation of read(), write() and write_partial() in TSocket.cpp and those
-// functions may change between different versions of Thrift.
+// IsReadTimeoutTException(), IsPeekTimeoutTException() and IsConnResetTException() make
+// assumption about the implementation of read(), peek(), write() and write_partial() in
+// TSocket.cpp and TSSLSocket.cpp. Those functions may change between different versions
+// of Thrift.
 static_assert(PACKAGE_VERSION[0] == '0', "");
 static_assert(PACKAGE_VERSION[1] == '.', "");
 static_assert(PACKAGE_VERSION[2] == '9', "");
@@ -158,14 +159,22 @@ bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress&
   return false;
 }
 
-bool IsRecvTimeoutTException(const TTransportException& e) {
-  // String taken from TSocket::read() Thrift's TSocket.cpp.
+bool IsReadTimeoutTException(const TTransportException& e) {
+  // String taken from TSocket::read() Thrift's TSocket.cpp and TSSLSocket.cpp.
   return (e.getType() == TTransportException::TIMED_OUT &&
              strstr(e.what(), "EAGAIN (timed out)") != nullptr) ||
          (e.getType() == TTransportException::INTERNAL_ERROR &&
              strstr(e.what(), "SSL_read: Resource temporarily unavailable") != nullptr);
 }
 
+bool IsPeekTimeoutTException(const TTransportException& e) {
+  // String taken from TSocket::peek() Thrift's TSocket.cpp and TSSLSocket.cpp.
+  return (e.getType() == TTransportException::UNKNOWN &&
+             strstr(e.what(), "recv(): Resource temporarily unavailable") != nullptr) ||
+         (e.getType() == TTransportException::INTERNAL_ERROR &&
+             strstr(e.what(), "SSL_peek: Resource temporarily unavailable") != nullptr);
+}
+
 bool IsConnResetTException(const TTransportException& e) {
   // Strings taken from TTransport::readAll(). This happens iff TSocket::read() returns 0.
   // As readAll() is reading non-zero length payload, this can only mean recv() called
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index bd15491..05e7e55 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -145,8 +145,11 @@ void PrintTColumnValue(std::ostream& out, const TColumnValue& colval);
 /// string representation
 bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress& b);
 
-/// Returns true if the TTransportException corresponds to a TCP socket recv timeout.
-bool IsRecvTimeoutTException(const apache::thrift::transport::TTransportException& e);
+/// Returns true if the TTransportException corresponds to a TCP socket read timeout.
+bool IsReadTimeoutTException(const apache::thrift::transport::TTransportException& e);
+
+/// Returns true if the TTransportException corresponds to a TCP socket peek timeout.
+bool IsPeekTimeoutTException(const apache::thrift::transport::TTransportException& e);
 
 /// Returns true if the exception indicates the other end of the TCP socket was closed.
 bool IsConnResetTException(const apache::thrift::transport::TTransportException& e);
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index 0b164f1..c06b979 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -241,7 +241,7 @@ class ClientConnection {
     try {
       (client_->*f)(*response, request, &send_done);
     } catch (const apache::thrift::transport::TTransportException& e) {
-      if (send_done && IsRecvTimeoutTException(e)) {
+      if (send_done && IsReadTimeoutTException(e)) {
         return RecvTimeoutStatus(typeid(*response).name());
       }
 
@@ -310,7 +310,7 @@ class ClientConnection {
     try {
       (client_->*recv_func)(*response);
     } catch (const apache::thrift::transport::TTransportException& e) {
-      if (IsRecvTimeoutTException(e)) {
+      if (IsReadTimeoutTException(e)) {
         return RecvTimeoutStatus(typeid(*response).name());
       }
       // If it's not timeout exception, then the connection is broken, stop retrying.
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index bf040f9..d7ee0df 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -30,6 +30,7 @@
 #include <boost/lexical_cast.hpp>
 #include <gperftools/malloc_extension.h>
 #include <gutil/strings/substitute.h>
+#include <gutil/walltime.h>
 #include <openssl/evp.h>
 #include <openssl/err.h>
 #include <rapidjson/rapidjson.h>
@@ -217,7 +218,12 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i
 DEFINE_int32(disconnected_session_timeout, 15 * 60, "The time, in seconds, that a "
     "hiveserver2 session will be maintained after the last connection that it has been "
     "used over is disconnected.");
-
+DEFINE_int32(idle_client_poll_period_s, 30, "The poll period, in seconds, after "
+    "no activity from an Impala client which an Impala service thread (beeswax and HS2) "
+    "wakes up to check if the connection should be closed. If --idle_session_timeout is "
+    "also set, a client connection will be closed if all the sessions associated with it "
+    "have become idle. Set this to 0 to disable the polling behavior and clients' "
+    "connection will remain opened until they are explicitly closed.");
 DEFINE_int32(status_report_interval_ms, 5000, "(Advanced) Interval between profile "
     "reports in milliseconds. If set to <= 0, periodic reporting is disabled and only "
     "the final report is sent.");
@@ -2058,6 +2064,10 @@ void ImpalaServer::ConnectionEnd(
     // Not every connection must have an associated session
     if (it == connection_to_sessions_map_.end()) return;
 
+    // Sessions are not removed from the map even after they are closed and an entry
+    // won't be added to the map unless a session is established.
+    DCHECK(!it->second.empty());
+
     // We don't expect a large number of sessions per connection, so we copy it, so that
     // we can drop the map lock early.
     disconnected_sessions = std::move(it->second);
@@ -2099,6 +2109,42 @@ void ImpalaServer::ConnectionEnd(
   }
 }
 
+bool ImpalaServer::IsIdleConnection(
+    const ThriftServer::ConnectionContext& connection_context) {
+  // The set of sessions associated with this connection.
+  std::set<TUniqueId> session_ids;
+  {
+    TUniqueId connection_id = connection_context.connection_id;
+    unique_lock<mutex> l(connection_to_sessions_map_lock_);
+    ConnectionToSessionMap::iterator it = connection_to_sessions_map_.find(connection_id);
+
+    // Not every connection must have an associated session
+    if (it == connection_to_sessions_map_.end()) return false;
+
+    session_ids = it->second;
+
+    // Sessions are not removed from the map even after they are closed and an entry
+    // won't be added to the map unless a session is established. The code below relies
+    // on this invariant to not mark a connection with no session yet as idle.
+    DCHECK(!session_ids.empty());
+  }
+
+  // Check if all the sessions associated with the connection are idle.
+  {
+    lock_guard<mutex> map_lock(session_state_map_lock_);
+    for (const TUniqueId& session_id : session_ids) {
+      const auto it = session_state_map_.find(session_id);
+      if (it == session_state_map_.end()) continue;
+
+      // If any session associated with this connection is not idle,
+      // the connection is not idle.
+      lock_guard<mutex> state_lock(it->second->lock);
+      if (!it->second->expired) return false;
+    }
+  }
+  return true;
+}
+
 void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
   if (session_timeout <= 0) return;
   {
@@ -2468,7 +2514,8 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
           builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
           .metrics(exec_env_->metrics())
           .max_concurrent_connections(FLAGS_fe_service_threads)
-          .queue_timeout(FLAGS_accepted_client_cnxn_timeout)
+          .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
+          .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
           .Build(&server));
       beeswax_server_.reset(server);
       beeswax_server_->SetConnectionHandler(this);
@@ -2496,7 +2543,8 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
           builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
           .metrics(exec_env_->metrics())
           .max_concurrent_connections(FLAGS_fe_service_threads)
-          .queue_timeout(FLAGS_accepted_client_cnxn_timeout)
+          .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
+          .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
           .Build(&server));
       hs2_server_.reset(server);
       hs2_server_->SetConnectionHandler(this);
@@ -2529,7 +2577,8 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
               .transport_type(ThriftServer::TransportType::HTTP)
               .metrics(exec_env_->metrics())
               .max_concurrent_connections(FLAGS_fe_service_threads)
-              .queue_timeout(FLAGS_accepted_client_cnxn_timeout)
+              .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
+              .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
               .Build(&http_server));
       hs2_http_server_.reset(http_server);
       hs2_http_server_->SetConnectionHandler(this);
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 7f334ef..6131659 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -362,6 +362,11 @@ class ImpalaServer : public ImpalaServiceIf,
   /// associated with the closed connection.
   virtual void ConnectionEnd(const ThriftServer::ConnectionContext& session_context);
 
+  /// Returns true if the connection is considered idle. A connection is considered
+  /// idle if all the sessions associated with it have expired due to idle timeout.
+  /// Called when a client has been inactive for --idle_client_poll_period_s seconds.
+  virtual bool IsIdleConnection(const ThriftServer::ConnectionContext& session_context);
+
   void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas,
       std::vector<TTopicDelta>* topic_updates);
 
diff --git a/tests/custom_cluster/test_hs2.py b/tests/custom_cluster/test_hs2.py
index 9b641de..15a9153 100644
--- a/tests/custom_cluster/test_hs2.py
+++ b/tests/custom_cluster/test_hs2.py
@@ -76,8 +76,8 @@ class TestHS2(CustomClusterTestSuite):
     assert status == "Session closed because it has no active connections"
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-      "--idle_session_timeout=1 --disconnected_session_timeout=5")
+  @CustomClusterTestSuite.with_args("--idle_session_timeout=1 "
+       "--disconnected_session_timeout=5 --idle_client_poll_period_s=0")
   def test_expire_disconnected_session(self):
     """Test for the interaction between idle_session_timeout and
     disconnected_session_timeout"""
diff --git a/tests/custom_cluster/test_session_expiration.py b/tests/custom_cluster/test_session_expiration.py
index 027cc41..61b31d6 100644
--- a/tests/custom_cluster/test_session_expiration.py
+++ b/tests/custom_cluster/test_session_expiration.py
@@ -18,31 +18,40 @@
 # Tests for query expiration.
 
 import pytest
+import socket
 from time import sleep
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_cluster import DEFAULT_HS2_PORT
 
 class TestSessionExpiration(CustomClusterTestSuite):
   """Tests query expiration logic"""
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--idle_session_timeout=6")
+  @CustomClusterTestSuite.with_args("--idle_session_timeout=6 "
+      "--idle_client_poll_period_s=0")
   def test_session_expiration(self, vector):
     impalad = self.cluster.get_any_impalad()
     self.__close_default_clients()
     num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
+    num_connections = impalad.service.get_metric_value(
+        "impala.thrift-server.beeswax-frontend.connections-in-use")
     client = impalad.service.create_beeswax_client()
     # Sleep for half the expiration time to confirm that the session is not expired early
     # (see IMPALA-838)
     sleep(3)
     assert num_expired == impalad.service.get_metric_value(
-      "impala-server.num-sessions-expired")
+        "impala-server.num-sessions-expired")
     # Wait for session expiration. Impala will poll the session expiry queue every second
     impalad.service.wait_for_metric_value(
-      "impala-server.num-sessions-expired", num_expired + 1, 20)
+        "impala-server.num-sessions-expired", num_expired + 1, 20)
+    # Verify that the idle connection is not closed.
+    assert 1 + num_connections == impalad.service.get_metric_value(
+        "impala.thrift-server.beeswax-frontend.connections-in-use")
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--idle_session_timeout=3")
+  @CustomClusterTestSuite.with_args("--idle_session_timeout=3 "
+      "--idle_client_poll_period_s=0")
   def test_session_expiration_with_set(self, vector):
     impalad = self.cluster.get_any_impalad()
     self.__close_default_clients()
@@ -64,7 +73,8 @@ class TestSessionExpiration(CustomClusterTestSuite):
 
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--idle_session_timeout=5")
+  @CustomClusterTestSuite.with_args("--idle_session_timeout=5 "
+       "--idle_client_poll_period_s=0")
   def test_unsetting_session_expiration(self, vector):
     impalad = self.cluster.get_any_impalad()
     self.__close_default_clients()
@@ -86,7 +96,8 @@ class TestSessionExpiration(CustomClusterTestSuite):
       "impala-server.num-sessions-expired")
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("-default_pool_max_requests 1")
+  @CustomClusterTestSuite.with_args("--default_pool_max_requests=1 "
+      "--idle_client_poll_period_s=0")
   def test_session_expiration_with_queued_query(self, vector):
     """Ensure that a query waiting in queue gets cancelled if the session expires."""
     impalad = self.cluster.get_any_impalad()
@@ -105,6 +116,53 @@ class TestSessionExpiration(CustomClusterTestSuite):
       queued_handle)
     assert "Admission result: Cancelled (queued)" in queued_query_profile
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="--idle_session_timeout=10 "
+      "--idle_client_poll_period_s=1", cluster_size=1)
+  def test_closing_idle_connection(self, vector):
+    """ IMPALA-7802: verifies that connections of idle sessions are closed
+    after the sessions have expired."""
+    impalad = self.cluster.get_any_impalad()
+    self.__close_default_clients()
+
+    for protocol in ['beeswax', 'hiveserver2']:
+      num_expired = impalad.service.get_metric_value("impala-server.num-sessions-expired")
+      num_connections_metrics_name = \
+          "impala.thrift-server.{}-frontend.connections-in-use".format(protocol)
+      num_connections = impalad.service.get_metric_value(num_connections_metrics_name)
+
+      # Connect to Impala using either beeswax or HS2 client and verify the number of
+      # opened connections.
+      if protocol == 'beeswax':
+        client = impalad.service.create_beeswax_client()
+      else:
+        client = impalad.service.create_hs2_client()
+      client.execute("select 1")
+      impalad.service.wait_for_metric_value(num_connections_metrics_name,
+           num_connections + 1, 20)
+
+      # Wait till the session has expired.
+      impalad.service.wait_for_metric_value("impala-server.num-sessions-expired",
+           num_expired + 1, 20)
+      # Wait till the idle connection is closed.
+      impalad.service.wait_for_metric_value(num_connections_metrics_name,
+           num_connections, 5)
+
+    # Verify that connecting to HS2 port without establishing a session will not cause
+    # the connection to be closed.
+    num_hs2_connections = impalad.service.get_metric_value(
+        "impala.thrift-server.hiveserver2-frontend.connections-in-use")
+    sock = socket.socket()
+    sock.connect((impalad._get_hostname(), DEFAULT_HS2_PORT))
+    impalad.service.wait_for_metric_value(
+        "impala.thrift-server.hiveserver2-frontend.connections-in-use",
+        num_hs2_connections + 1, 60)
+    # Sleep for some time for the frontend service thread to check for idleness.
+    sleep(15)
+    assert num_hs2_connections + 1 == impalad.service.get_metric_value(
+        "impala.thrift-server.hiveserver2-frontend.connections-in-use")
+    sock.close()
+
   def __close_default_clients(self):
     """Close the clients that were automatically created by setup_class(). These clients
     can expire during test, which results in metrics that tests depend on changing. Each