You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by kc...@apache.org on 2009/03/19 04:50:05 UTC

svn commit: r755824 - in /incubator/thrift/trunk/lib/cpp/src/server: TNonblockingServer.cpp TNonblockingServer.h

Author: kclark
Date: Thu Mar 19 03:50:05 2009
New Revision: 755824

URL: http://svn.apache.org/viewvc?rev=755824&view=rev
Log:
Thrift-357. cpp: Fix buffer and connection bloat in TNonBlockingServer

Author: Anthony Giardullo

Modified:
    incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
    incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h

Modified: incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp?rev=755824&r1=755823&r2=755824&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp Thu Mar 19 03:50:05 2009
@@ -495,6 +495,17 @@
   server_->returnConnection(this);
 }
 
+void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+  if (readBufferSize_ > limit) {
+    readBufferSize_ = limit;
+    readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
+    if (readBuffer_ == NULL) {
+      GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
+      close();
+    }
+  }
+}
+
 /**
  * Creates a new connection either by reusing an object off the stack or
  * by allocating a new one entirely
@@ -515,7 +526,13 @@
  * Returns a connection to the stack
  */
 void TNonblockingServer::returnConnection(TConnection* connection) {
-  connectionStack_.push(connection);
+  if (connectionStackLimit_ &&
+      (connectionStack_.size() >= connectionStackLimit_)) {
+    delete connection;
+  } else {
+    connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
+    connectionStack_.push(connection);
+  }
 }
 
 /**

Modified: incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h?rev=755824&r1=755823&r2=755824&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h Thu Mar 19 03:50:05 2009
@@ -43,6 +43,12 @@
   // Listen backlog
   static const int LISTEN_BACKLOG = 1024;
 
+  // Default limit on size of idle connection pool
+  static const size_t CONNECTION_STACK_LIMIT = 1024;
+
+  // Maximum size of buffer allocated to idle connection
+  static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
+
   // Server socket file descriptor
   int serverSocket_;
 
@@ -64,6 +70,16 @@
   // Number of TConnection object we've created
   size_t numTConnections_;
 
+  // Limit for how many TConnection objects to cache
+  size_t connectionStackLimit_;
+
+  /**
+   * Max read buffer size for an idle connection.  When we place an idle
+   * TConnection into connectionStack_, we insure that its read buffer is
+   * reduced to this size to insure that idle connections don't hog memory.
+   */
+  uint32_t idleBufferMemLimit_;
+
   /**
    * This is a stack of all the objects that have been created but that
    * are NOT currently in use. When we close a connection, we place it on this
@@ -82,7 +98,9 @@
     port_(port),
     threadPoolProcessing_(false),
     eventBase_(NULL),
-    numTConnections_(0) {}
+    numTConnections_(0),
+    connectionStackLimit_(CONNECTION_STACK_LIMIT),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
 
   TNonblockingServer(boost::shared_ptr<TProcessor> processor,
                      boost::shared_ptr<TProtocolFactory> protocolFactory,
@@ -93,7 +111,9 @@
     port_(port),
     threadManager_(threadManager),
     eventBase_(NULL),
-    numTConnections_(0) {
+    numTConnections_(0),
+    connectionStackLimit_(CONNECTION_STACK_LIMIT),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setInputProtocolFactory(protocolFactory);
@@ -113,7 +133,9 @@
     port_(port),
     threadManager_(threadManager),
     eventBase_(NULL),
-    numTConnections_(0) {
+    numTConnections_(0),
+    connectionStackLimit_(CONNECTION_STACK_LIMIT),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
     setInputTransportFactory(inputTransportFactory);
     setOutputTransportFactory(outputTransportFactory);
     setInputProtocolFactory(inputProtocolFactory);
@@ -132,6 +154,24 @@
     return threadManager_;
   }
 
+  /**
+   * Get the maximum number of unused TConnection we will hold in reserve.
+   *
+   * @return the current limit on TConnection pool size.
+   */
+  int getConnectionStackLimit() const {
+    return connectionStackLimit_;
+  }
+
+  /**
+   * Set the maximum number of unused TConnection we will hold in reserve.
+   *
+   * @param sz the new limit for TConnection pool size.
+   */
+  void setConnectionStackLimit(int sz) {
+    connectionStackLimit_ = sz;
+  }
+
   bool isThreadPoolProcessing() const {
     return threadPoolProcessing_;
   }
@@ -160,6 +200,26 @@
     return connectionStack_.size();
   }
 
+  /**
+   * Get the maximum limit of memory allocated to idle TConnection objects.
+   *
+   * @return # bytes beyond which we will shrink buffers when idle.
+   */
+  size_t getIdleBufferMemLimit() const {
+    return idleBufferMemLimit_;
+  }
+
+  /**
+   * Set the maximum limit of memory allocated to idle TConnection objects.
+   * If a TConnection object goes idle with more than this much memory
+   * allocated to its buffer, we shrink it to this value.
+   *
+   * @param limit of bytes beyond which we will shrink buffers when idle.
+   */
+  void setIdleBufferMemLimit(size_t limit) {
+    idleBufferMemLimit_ = limit;
+  }
+
   TConnection* createConnection(int socket, short flags);
 
   void returnConnection(TConnection* connection);
@@ -327,6 +387,13 @@
     server_->decrementNumConnections();
   }
 
+  /**
+   * Check read buffer against a given limit and shrink it if exceeded.
+   *
+   * @param limit we limit buffer size to.
+   */
+  void checkIdleBufferMemLimit(uint32_t limit);
+
   // Initialize
   void init(int socket, short eventFlags, TNonblockingServer *s);