You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/09/07 08:12:26 UTC

[impala] 01/02: IMPALA-11548: Share same TTransport object for RPC input and output

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5c01c36227a3559e2021c0ead5c30dedb5b1b7ef
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Aug 29 14:41:32 2022 -0700

    IMPALA-11548: Share same TTransport object for RPC input and output
    
    After the CPP thrift upgrade to version 0.16.0 by IMPALA-11384, we are
    seeing long-running connections closed by the Thrift server due to
    MaxMessageSize reached. Additional tracing log in the thrift library and
    TAcceptQueueServer reveals that TTransport's message size counter was
    continuously decreasing over the lifetime of the connection and was
    never reset. Further investigation points down to having separate input
    and output TTransport object in TAcceptQueueServer::SetupConnection as
    the root cause.
    
    This patch fixes the issue by sharing the same TTransport object for
    both Thrift RPC input and output in TAcceptQueueServer::SetupConnection.
    Using the same TTransport causes the counter to be decremented and reset
    on the same object. TSaslTransport had a caching algorithm in
    TSaslServerTransport::Factory::getTransport() that guaranteed that the
    underlying transport was shared between input and output. This is no
    longer necessary and has been removed.
    
    Testing:
    - Pass core tests.
    
    Change-Id: I199d6b0c6c62e940e131eb39d38a8a51b36e11c4
    Reviewed-on: http://gerrit.cloudera.org:8080/18938
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/TAcceptQueueServer.cpp         | 40 ++++++++++++++++++-------------
 be/src/rpc/TAcceptQueueServer.h           |  4 +---
 be/src/transport/TSaslServerTransport.cpp | 35 ++-------------------------
 be/src/transport/TSaslServerTransport.h   |  8 -------
 4 files changed, 27 insertions(+), 60 deletions(-)

diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index b605cb406..ce21da936 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -203,13 +203,9 @@ void TAcceptQueueServer::init() {
 }
 
 void TAcceptQueueServer::CleanupAndClose(const string& error,
-    shared_ptr<TTransport> input, shared_ptr<TTransport> output,
-    shared_ptr<TTransport> client) {
-  if (input != nullptr) {
-    input->close();
-  }
-  if (output != nullptr) {
-    output->close();
+    shared_ptr<TTransport> io_transport, shared_ptr<TTransport> client) {
+  if (io_transport != nullptr) {
+    io_transport->close();
   }
   if (client != nullptr) {
     client->close();
@@ -220,8 +216,7 @@ void TAcceptQueueServer::CleanupAndClose(const string& error,
 // New.
 void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
   if (metrics_enabled_) queue_size_metric_->Increment(-1);
-  shared_ptr<TTransport> inputTransport;
-  shared_ptr<TTransport> outputTransport;
+  shared_ptr<TTransport> io_transport;
   shared_ptr<TTransport> client = entry->client_;
   const string& socket_info = reinterpret_cast<TSocket*>(client.get())->getSocketInfo();
   VLOG(2) << Substitute("TAcceptQueueServer: $0 started connection setup for client $1",
@@ -230,12 +225,25 @@ void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
     MonotonicStopWatch timer;
     // Start timing for connection setup.
     timer.Start();
-    inputTransport = inputTransportFactory_->getTransport(client);
-    outputTransport = outputTransportFactory_->getTransport(client);
+
+    // Since THRIFT-5237, it is necessary for Impala to have the same TTransport object
+    // for both input and output transport. The detailed reasoning on why this TTransport
+    // object sharing requirement is as follow:
+    // - Thrift decrements the max message size counter as messages arrive.
+    // - Thrift resets the max message size counter with a flush.
+    // - If the input and output transport are distinct, the decrement is happening on
+    //   one object while the reset is happening on a different object, so it eventually
+    //   throws an error.
+    // Using same transport fixes the counter logic. This also helps with simplifying
+    // Impala's custom TSaslTransport since its caching algorithm in
+    // TSaslServerTransport::Factory is not required anymore.
+    DCHECK(inputTransportFactory_ == outputTransportFactory_);
+    io_transport = inputTransportFactory_->getTransport(client);
+
     shared_ptr<TProtocol> inputProtocol =
-        inputProtocolFactory_->getProtocol(inputTransport);
+        inputProtocolFactory_->getProtocol(io_transport);
     shared_ptr<TProtocol> outputProtocol =
-        outputProtocolFactory_->getProtocol(outputTransport);
+        outputProtocolFactory_->getProtocol(io_transport);
     shared_ptr<TProcessor> processor =
         getProcessor(inputProtocol, outputProtocol, client);
 
@@ -278,7 +286,7 @@ void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
           }
           LOG(INFO) << name_ << ": Server busy. Timing out connection request.";
           string errStr = "TAcceptQueueServer: " + name_ + " server busy";
-          CleanupAndClose(errStr, inputTransport, outputTransport, client);
+          CleanupAndClose(errStr, io_transport, client);
           return;
         }
       }
@@ -293,11 +301,11 @@ void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
   } catch (const TException& tx) {
     string errStr = Substitute("TAcceptQueueServer: $0 connection setup failed for "
         "client $1. Caught TException: $2", name_, socket_info, string(tx.what()));
-    CleanupAndClose(errStr, inputTransport, outputTransport, client);
+    CleanupAndClose(errStr, io_transport, client);
   } catch (const string& s) {
     string errStr = Substitute("TAcceptQueueServer: $0 connection setup failed for "
         "client $1. Unknown exception: $2", name_, socket_info, s);
-    CleanupAndClose(errStr, inputTransport, outputTransport, client);
+    CleanupAndClose(errStr, io_transport, client);
   }
 }
 
diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h
index 373f789fd..d0d077ef6 100644
--- a/be/src/rpc/TAcceptQueueServer.h
+++ b/be/src/rpc/TAcceptQueueServer.h
@@ -87,9 +87,7 @@ class TAcceptQueueServer : public TServer {
   void SetupConnection(std::shared_ptr<TAcceptQueueEntry> entry);
 
   // Helper function to close a client connection in case of server side errors.
-  void CleanupAndClose(const std::string& error,
-      std::shared_ptr<TTransport> input,
-      std::shared_ptr<TTransport> output,
+  void CleanupAndClose(const std::string& error, std::shared_ptr<TTransport> io_transport,
       std::shared_ptr<TTransport> client);
 
   std::shared_ptr<ThreadFactory> threadFactory_;
diff --git a/be/src/transport/TSaslServerTransport.cpp b/be/src/transport/TSaslServerTransport.cpp
index 28c1cf322..570198fe3 100644
--- a/be/src/transport/TSaslServerTransport.cpp
+++ b/be/src/transport/TSaslServerTransport.cpp
@@ -132,35 +132,9 @@ std::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport(
   // Thrift servers use both an input and an output transport to communicate with
   // clients. In principal, these can be different, but for SASL clients we require them
   // to be the same so that the authentication state is identical for communication in
-  // both directions. In order to do this, we cache the transport that we return in a map
-  // keyed by the transport argument to this method. Then if there are two successive
-  // calls to getTransport() with the same transport, we are sure to return the same
-  // wrapped transport both times.
-  //
-  // However, the cache map would retain references to all the transports it ever
-  // created. Instead, we remove an entry in the map after it has been found for the first
-  // time, that is, after the second call to getTransport() with the same argument. That
-  // matches the calling pattern in TAcceptQueueServer which calls getTransport() twice in
-  // succession when a connection is established, and then never again. This is obviously
-  // brittle (what if for some reason getTransport() is called a third time?) but for our
-  // usage of Thrift it's a tolerable band-aid.
-  //
-  // An alternative approach is to use the 'custom deleter' feature of shared_ptr to
-  // ensure that when ret_transport is eventually deleted, its corresponding map entry is
-  // removed. That is likely to be error prone given the locking involved; for now we go
-  // with the simple solution.
+  // both directions. In order to do this, we share the same TTransport object for both
+  // input and output set in TAcceptQueueServer::SetupConnection.
   std::shared_ptr<TBufferedTransport> ret_transport;
-  {
-    lock_guard<mutex> l(transportMap_mutex_);
-    TransportMap::iterator trans_map = transportMap_.find(trans);
-    if (trans_map != transportMap_.end()) {
-      ret_transport = trans_map->second;
-      transportMap_.erase(trans_map);
-      return ret_transport;
-    }
-    // This method should never be called concurrently with the same 'trans' object.
-    // Therefore, it is safe to drop the transportMap_mutex_ here.
-  }
   std::shared_ptr<TTransport> wrapped(
       new TSaslServerTransport(serverDefinitionMap_, trans));
   // Set socket timeouts to prevent TSaslServerTransport->open from blocking the server
@@ -174,11 +148,6 @@ std::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport(
   // Reset socket timeout back to zero, so idle clients do not timeout
   socket->setRecvTimeout(0);
   socket->setSendTimeout(0);
-  {
-    lock_guard<mutex> l(transportMap_mutex_);
-    DCHECK(transportMap_.find(trans) == transportMap_.end());
-    transportMap_[trans] = ret_transport;
-  }
   return ret_transport;
 }
 
diff --git a/be/src/transport/TSaslServerTransport.h b/be/src/transport/TSaslServerTransport.h
index 30f615d17..a70cfe819 100644
--- a/be/src/transport/TSaslServerTransport.h
+++ b/be/src/transport/TSaslServerTransport.h
@@ -184,14 +184,6 @@ class TSaslServerTransport : public TSaslTransport {
    private:
     /* Map for holding and returning server definitions. */
     std::map<std::string, TSaslServerDefinition*> serverDefinitionMap_;
-
-    /* Map from a transport to its Sasl Transport (wrapped by a TBufferedTransport). */
-    typedef std::map<std::shared_ptr<TTransport>,
-                     std::shared_ptr<TBufferedTransport>> TransportMap;
-    TransportMap transportMap_;
-
-    /* Lock to synchronize the transport map. */
-    std::mutex transportMap_mutex_;
   };
 
 };