You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/10/06 19:10:45 UTC

svn commit: r1005164 - in /incubator/thrift/trunk/lib/cpp/src/server: TNonblockingServer.cpp TNonblockingServer.h

Author: dreiss
Date: Wed Oct  6 17:10:45 2010
New Revision: 1005164

URL: http://svn.apache.org/viewvc?rev=1005164&view=rev
Log:
THRIFT-926. cpp: Add configurable buffer recycling for TNonblockingServer

Add methods to TNonblockingServer to set the maximum size of idle read
and write buffers and the check interval (in calls).  When checked, if
the buffers are larger than the configured maximum, they will be resized
down the to maximum size.

Modified:
    incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
    incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h

Modified: incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp?rev=1005164&r1=1005163&r2=1005164&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp Wed Oct  6 17:10:45 2010
@@ -112,9 +112,11 @@ void TConnection::init(int socket, short
   writeBuffer_ = NULL;
   writeBufferSize_ = 0;
   writeBufferPos_ = 0;
+  largestWriteBufferSize_ = 0;
 
   socketState_ = SOCKET_RECV;
   appState_ = APP_INIT;
+  callsForResize_ = 0;
 
   // Set flags, which also registers the event
   setFlags(eventFlags);
@@ -342,6 +344,16 @@ void TConnection::transition() {
     goto LABEL_APP_INIT;
 
   case APP_SEND_RESULT:
+    // it's now safe to perform buffer size housekeeping.
+    if (writeBufferSize_ > largestWriteBufferSize_) {
+      largestWriteBufferSize_ = writeBufferSize_;
+    }
+    if (server_->getResizeBufferEveryN() > 0
+        && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
+      checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
+                              server_->getIdleWriteBufferLimit());
+      callsForResize_ = 0;
+    }
 
     // N.B.: We also intentionally fall through here into the INIT state!
 
@@ -486,15 +498,22 @@ void TConnection::close() {
   server_->returnConnection(this);
 }
 
-void TConnection::checkIdleBufferMemLimit(size_t limit) {
-  if (readBufferSize_ > limit) {
-    readBufferSize_ = limit;
+void TConnection::checkIdleBufferMemLimit(size_t readLimit,
+                                          size_t writeLimit) {
+  if (readLimit > 0 && readBufferSize_ > readLimit) {
+    readBufferSize_ = readLimit;
     readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
     if (readBuffer_ == NULL) {
       GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
       close();
     }
   }
+
+  if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
+    // just start over
+    outputTransport_->resetBuffer(NULL, 0, TMemoryBuffer::TAKE_OWNERSHIP);
+    largestWriteBufferSize_ = 0;
+  }
 }
 
 TNonblockingServer::~TNonblockingServer() {
@@ -546,7 +565,7 @@ void TNonblockingServer::returnConnectio
       (connectionStack_.size() >= connectionStackLimit_)) {
     delete connection;
   } else {
-    connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
+    connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
     connectionStack_.push(connection);
   }
 }

Modified: incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h?rev=1005164&r1=1005163&r2=1005164&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h Wed Oct  6 17:10:45 2010
@@ -70,15 +70,21 @@ class TNonblockingServer : public TServe
   /// Default limit on size of idle connection pool
   static const size_t CONNECTION_STACK_LIMIT = 1024;
 
-  /// Maximum size of buffer allocated to idle connection
-  static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
-
   /// Default limit on total number of connected sockets
   static const int MAX_CONNECTIONS = INT_MAX;
 
   /// Default limit on connections in handler/task processing
   static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
 
+  /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
+  static const int IDLE_READ_BUFFER_LIMIT = 1024;
+
+  /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
+  static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
+
+  /// # of calls before resizing oversized buffers (0 = check only on close)
+  static const int RESIZE_BUFFER_EVERY_N = 512;
+
   /// Server socket file descriptor
   int serverSocket_;
 
@@ -129,11 +135,27 @@ class TNonblockingServer : public TServe
   TOverloadAction overloadAction_;
 
   /**
-   * Max read buffer size for an idle connection.  When we place an idle
-   * TConnection into connectionStack_, we insure that its read buffer is
-   * reduced to this size to insure that idle connections don't hog memory.
+   * Max read buffer size for an idle TConnection.  When we place an idle
+   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
+   * we insure that its read buffer is reduced to this size to insure that
+   * idle connections don't hog memory.  0 disables this check.
+   */
+  size_t idleReadBufferLimit_;
+
+  /**
+   * Max write buffer size for an idle connection.  When we place an idle
+   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
+   * we insure that its write buffer is <= to this size; otherwise we
+   * replace it with a new one to insure that idle connections don't hog
+   * memory. 0 disables this check.
    */
-  size_t idleBufferMemLimit_;
+  size_t idleWriteBufferLimit_;
+
+  /**
+   * Every N calls we check the buffer size limits on a connected TConnection.
+   * 0 disables (i.e. the checks are only done when a connection closes).
+   */
+  int32_t resizeBufferEveryN_;
 
   /// Set if we are currently in an overloaded state.
   bool overloaded_;
@@ -181,7 +203,9 @@ class TNonblockingServer : public TServe
     taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
-    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+    idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
+    idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
+    resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
     overloaded_(false),
     nConnectionsDropped_(0),
     nTotalConnectionsDropped_(0) {}
@@ -203,7 +227,9 @@ class TNonblockingServer : public TServe
     taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
-    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+    idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
+    idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
+    resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
     overloaded_(false),
     nConnectionsDropped_(0),
     nTotalConnectionsDropped_(0) {
@@ -234,7 +260,9 @@ class TNonblockingServer : public TServe
     taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
-    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+    idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
+    idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
+    resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
     overloaded_(false),
     nConnectionsDropped_(0),
     nTotalConnectionsDropped_(0) {
@@ -446,25 +474,94 @@ class TNonblockingServer : public TServe
   bool drainPendingTask();
 
   /**
-   * Get the maximum limit of memory allocated to idle TConnection objects.
+   * Get the maximum size of read buffer allocated to idle TConnection objects.
+   *
+   * @return # bytes beyond which we will shrink buffers when idle.
+   */
+  size_t getIdleReadBufferLimit() const {
+    return idleReadBufferLimit_;
+  }
+
+  /**
+   * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
+   * Get the maximum size of read buffer allocated to idle TConnection objects.
    *
    * @return # bytes beyond which we will shrink buffers when idle.
    */
   size_t getIdleBufferMemLimit() const {
-    return idleBufferMemLimit_;
+    return idleReadBufferLimit_;
   }
 
   /**
-   * Set the maximum limit of memory allocated to idle TConnection objects.
-   * If a TConnection object goes idle with more than this much memory
-   * allocated to its buffer, we shrink it to this value.
+   * Set the maximum size read buffer allocated to idle TConnection objects.
+   * If a TConnection object is found (either on connection close or between
+   * calls when resizeBufferEveryN_ is set) with more than this much memory
+   * allocated to its read buffer, we shrink it to this value.
    *
-   * @param limit of bytes beyond which we will shrink buffers when idle.
+   * @param limit of bytes beyond which we will shrink buffers when checked.
+   */
+  void setIdleReadBufferLimit(size_t limit) {
+    idleReadBufferLimit_ = limit;
+  }
+
+  /**
+   * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
+   * Set the maximum size read buffer allocated to idle TConnection objects.
+   * If a TConnection object is found (either on connection close or between
+   * calls when resizeBufferEveryN_ is set) with more than this much memory
+   * allocated to its read buffer, we shrink it to this value.
+   *
+   * @param limit of bytes beyond which we will shrink buffers when checked.
    */
   void setIdleBufferMemLimit(size_t limit) {
-    idleBufferMemLimit_ = limit;
+    idleReadBufferLimit_ = limit;
   }
 
+  
+
+  /**
+   * Get the maximum size of write buffer allocated to idle TConnection objects.
+   *
+   * @return # bytes beyond which we will reallocate buffers when checked.
+   */
+  size_t getIdleWriteBufferLimit() const {
+    return idleWriteBufferLimit_;
+  }
+
+  /**
+   * Set the maximum size write buffer allocated to idle TConnection objects.
+   * If a TConnection object is found (either on connection close or between
+   * calls when resizeBufferEveryN_ is set) with more than this much memory
+   * allocated to its write buffer, we destroy and construct that buffer.
+   *
+   * @param limit of bytes beyond which we will shrink buffers when idle.
+   */
+  void setIdleWriteBufferLimit(size_t limit) {
+    idleWriteBufferLimit_ = limit;
+  }
+
+  /**
+   * Get # of calls made between buffer size checks.  0 means disabled.
+   *
+   * @return # of calls between buffer size checks.
+   */
+  int32_t getResizeBufferEveryN() const {
+    return resizeBufferEveryN_;
+  }
+
+  /**
+   * Check buffer sizes every "count" calls.  This allows buffer limits
+   * to be enforced for persistant connections with a controllable degree
+   * of overhead. 0 disables checks except at connection close.
+   *
+   * @param count the number of calls between checks, or 0 to disable
+   */
+  void setResizeBufferEveryN(int32_t count) {
+    resizeBufferEveryN_ = count;
+  }
+
+
+
   /**
    * Return an initialized connection object.  Creates or recovers from
    * pool a TConnection and initializes it with the provided socket FD
@@ -581,7 +678,7 @@ enum TAppState {
  * Represents a connection that is handled via libevent. This connection
  * essentially encapsulates a socket that has some associated libevent state.
  */
-  class TConnection {
+class TConnection {
  private:
 
   /// Starting size for new connection buffer
@@ -626,6 +723,12 @@ enum TAppState {
   /// How far through writing are we?
   uint32_t writeBufferPos_;
 
+  /// Largest size of write buffer seen since buffer was constructed
+  size_t largestWriteBufferSize_;
+
+  /// Count of the number of calls for use with getResizeBufferEveryN().
+  int32_t callsForResize_;
+
   /// Task handle
   int taskHandle_;
 
@@ -716,12 +819,13 @@ enum TAppState {
     server_->decrementNumConnections();
   }
 
-  /**
-   * Check read buffer against a given limit and shrink it if exceeded.
+ /**
+   * Check buffers against any size limits and shrink it if exceeded.
    *
-   * @param limit we limit buffer size to.
+   * @param readLimit we reduce read buffer size to this (if nonzero).
+   * @param writeLimit if nonzero and write buffer is larger, replace it.
    */
-  void checkIdleBufferMemLimit(size_t limit);
+  void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
 
   /// Initialize
   void init(int socket, short eventFlags, TNonblockingServer *s,