You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/10/06 19:10:52 UTC

svn commit: r1005169 - in /incubator/thrift/trunk/lib/cpp/src/server: TNonblockingServer.cpp TNonblockingServer.h

Author: dreiss
Date: Wed Oct  6 17:10:52 2010
New Revision: 1005169

URL: http://svn.apache.org/viewvc?rev=1005169&view=rev
Log:
THRIFT-926. cpp: Better buffer management for TNonblockingServer

Add two improvements to memory management in TNonblocking server:
- Separate the receive code into two distinct states: one for receiving
  the frame header and one for the frame content.  This allows us to
  size the initial read buffer based on the initial frame size, rather
  than allocating an arbitrary amount of memory before reading the
  header.
- Allow setting the initial write buffer size based on the application's
  expected response size.

Modified:
    incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
    incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h

Modified: incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp?rev=1005169&r1=1005168&r2=1005169&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp Wed Oct  6 17:10:52 2010
@@ -114,7 +114,7 @@ void TConnection::init(int socket, short
   writeBufferPos_ = 0;
   largestWriteBufferSize_ = 0;
 
-  socketState_ = SOCKET_RECV;
+  socketState_ = SOCKET_RECV_FRAMING;
   appState_ = APP_INIT;
   callsForResize_ = 0;
 
@@ -143,25 +143,51 @@ void TConnection::workSocket() {
   uint32_t fetch = 0;
 
   switch (socketState_) {
-  case SOCKET_RECV:
-    // It is an error to be in this state if we already have all the data
-    assert(readBufferPos_ < readWant_);
-
-    // Double the buffer size until it is big enough
-    if (readWant_ > readBufferSize_) {
-      uint32_t newSize = readBufferSize_;
-      while (readWant_ > newSize) {
-        newSize *= 2;
-      }
-      uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
-      if (newBuffer == NULL) {
-        GlobalOutput("TConnection::workSocket() realloc");
+  case SOCKET_RECV_FRAMING:
+    union {
+      uint8_t buf[sizeof(uint32_t)];
+      int32_t size;
+    } framing;
+
+    // if we've already received some bytes we kept them here
+    framing.size = readWant_;
+    // determine size of this frame
+    try {
+      // Read from the socket
+      fetch = tSocket_->read(&framing.buf[readBufferPos_],
+                             uint32_t(sizeof(framing.size) - readBufferPos_));
+      if (fetch == 0) {
+        // Whenever we get here it means a remote disconnect
         close();
         return;
       }
-      readBuffer_ = newBuffer;
-      readBufferSize_ = newSize;
+      readBufferPos_ += fetch;
+    } catch (TTransportException& te) {
+      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+      close();
+
+      return;
+    }
+
+    if (readBufferPos_ < sizeof(framing.size)) {
+      // more needed before frame size is known -- save what we have so far
+      readWant_ = framing.size;
+      return;
+    }
+
+    readWant_ = ntohl(framing.size);
+    if (static_cast<int>(readWant_) <= 0) {
+      GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
+      close();
+      return;
     }
+    // size known; now get the rest of the frame
+    transition();
+    return;
+
+  case SOCKET_RECV:
+    // It is an error to be in this state if we already have all the data
+    assert(readBufferPos_ < readWant_);
 
     try {
       // Read from the socket
@@ -365,14 +391,12 @@ void TConnection::transition() {
     writeBufferPos_ = 0;
     writeBufferSize_ = 0;
 
-    // Set up read buffer for getting 4 bytes
-    readBufferPos_ = 0;
-    readWant_ = 4;
-
     // Into read4 state we go
-    socketState_ = SOCKET_RECV;
+    socketState_ = SOCKET_RECV_FRAMING;
     appState_ = APP_READ_FRAME_SIZE;
 
+    readBufferPos_ = 0;
+
     // Register read event
     setRead();
 
@@ -382,21 +406,30 @@ void TConnection::transition() {
     return;
 
   case APP_READ_FRAME_SIZE:
-    // We just read the request length, deserialize it
-    sz = *(int32_t*)readBuffer_;
-    sz = (int32_t)ntohl(sz);
+    // We just read the request length
+    // Double the buffer size until it is big enough
+    if (readWant_ > readBufferSize_) {
+      if (readBufferSize_ == 0) {
+        readBufferSize_ = 1;
+      }
+      uint32_t newSize = readBufferSize_;
+      while (readWant_ > newSize) {
+        newSize *= 2;
+      }
 
-    if (sz <= 0) {
-      GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
-      close();
-      return;
+      uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
+      if (newBuffer == NULL) {
+        // nothing else to be done...
+        throw std::bad_alloc();
+      }
+      readBuffer_ = newBuffer;
+      readBufferSize_ = newSize;
     }
 
-    // Reset the read buffer
-    readWant_ = (uint32_t)sz;
     readBufferPos_= 0;
 
     // Move into read request state
+    socketState_ = SOCKET_RECV;
     appState_ = APP_READ_REQUEST;
 
     // Work the socket right away
@@ -501,17 +534,14 @@ void TConnection::close() {
 void TConnection::checkIdleBufferMemLimit(size_t readLimit,
                                           size_t writeLimit) {
   if (readLimit > 0 && readBufferSize_ > readLimit) {
-    readBufferSize_ = readLimit;
-    readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
-    if (readBuffer_ == NULL) {
-      GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
-      close();
-    }
+    free(readBuffer_);
+    readBuffer_ = NULL;
+    readBufferSize_ = 0;
   }
 
   if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
     // just start over
-    outputTransport_->resetBuffer(NULL, 0, TMemoryBuffer::TAKE_OWNERSHIP);
+    outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
     largestWriteBufferSize_ = 0;
   }
 }

Modified: incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h?rev=1005169&r1=1005168&r2=1005169&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h Wed Oct  6 17:10:52 2010
@@ -76,6 +76,9 @@ class TNonblockingServer : public TServe
   /// Default limit on connections in handler/task processing
   static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
 
+  /// Default size of write buffer
+  static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
+
   /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
   static const int IDLE_READ_BUFFER_LIMIT = 1024;
 
@@ -135,10 +138,16 @@ class TNonblockingServer : public TServe
   TOverloadAction overloadAction_;
 
   /**
+   * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
+   * and found to be exceeded, reinitialized) to this size.
+   */
+  size_t writeBufferDefaultSize_;
+
+  /**
    * Max read buffer size for an idle TConnection.  When we place an idle
    * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
-   * we insure that its read buffer is reduced to this size to insure that
-   * idle connections don't hog memory.  0 disables this check.
+   * we will free the buffer (such that it will be reinitialized by the next
+   * received frame) if it has exceeded this limit.  0 disables this check.
    */
   size_t idleReadBufferLimit_;
 
@@ -146,8 +155,8 @@ class TNonblockingServer : public TServe
    * Max write buffer size for an idle connection.  When we place an idle
    * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
    * we insure that its write buffer is <= to this size; otherwise we
-   * replace it with a new one to insure that idle connections don't hog
-   * memory. 0 disables this check.
+   * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
+   * idle connections don't hog memory. 0 disables this check.
    */
   size_t idleWriteBufferLimit_;
 
@@ -203,6 +212,7 @@ class TNonblockingServer : public TServe
     taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
+    writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
     idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
     idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
     resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
@@ -227,6 +237,7 @@ class TNonblockingServer : public TServe
     taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
+    writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
     idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
     idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
     resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
@@ -260,6 +271,7 @@ class TNonblockingServer : public TServe
     taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
+    writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
     idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
     idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
     resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
@@ -474,9 +486,27 @@ class TNonblockingServer : public TServe
   bool drainPendingTask();
 
   /**
+   * Get the starting size of a TConnection object's write buffer.
+   *
+   * @return # bytes we initialize a TConnection object's write buffer to.
+   */
+  size_t getWriteBufferDefaultSize() const {
+    return writeBufferDefaultSize_;
+  }
+
+  /**
+   * Set the starting size of a TConnection object's write buffer.
+   *
+   * @param size # bytes we initialize a TConnection object's write buffer to.
+   */
+  void setWriteBufferDefaultSize(size_t size) {
+    writeBufferDefaultSize_ = size;
+  }
+
+  /**
    * Get the maximum size of read buffer allocated to idle TConnection objects.
    *
-   * @return # bytes beyond which we will shrink buffers when idle.
+   * @return # bytes beyond which we will dealloc idle buffer.
    */
   size_t getIdleReadBufferLimit() const {
     return idleReadBufferLimit_;
@@ -486,7 +516,7 @@ class TNonblockingServer : public TServe
    * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
    * Get the maximum size of read buffer allocated to idle TConnection objects.
    *
-   * @return # bytes beyond which we will shrink buffers when idle.
+   * @return # bytes beyond which we will dealloc idle buffer.
    */
   size_t getIdleBufferMemLimit() const {
     return idleReadBufferLimit_;
@@ -496,7 +526,8 @@ class TNonblockingServer : public TServe
    * Set the maximum size read buffer allocated to idle TConnection objects.
    * If a TConnection object is found (either on connection close or between
    * calls when resizeBufferEveryN_ is set) with more than this much memory
-   * allocated to its read buffer, we shrink it to this value.
+   * allocated to its read buffer, we free it and allow it to be reinitialized
+   * on the next received frame.
    *
    * @param limit of bytes beyond which we will shrink buffers when checked.
    */
@@ -509,7 +540,8 @@ class TNonblockingServer : public TServe
    * Set the maximum size read buffer allocated to idle TConnection objects.
    * If a TConnection object is found (either on connection close or between
    * calls when resizeBufferEveryN_ is set) with more than this much memory
-   * allocated to its read buffer, we shrink it to this value.
+   * allocated to its read buffer, we free it and allow it to be reinitialized
+   * on the next received frame.
    *
    * @param limit of bytes beyond which we will shrink buffers when checked.
    */
@@ -532,7 +564,8 @@ class TNonblockingServer : public TServe
    * Set the maximum size write buffer allocated to idle TConnection objects.
    * If a TConnection object is found (either on connection close or between
    * calls when resizeBufferEveryN_ is set) with more than this much memory
-   * allocated to its write buffer, we destroy and construct that buffer.
+   * allocated to its write buffer, we destroy and construct that buffer with
+   * writeBufferDefaultSize_ bytes.
    *
    * @param limit of bytes beyond which we will shrink buffers when idle.
    */
@@ -651,8 +684,9 @@ class TNonblockingServer : public TServe
   void serve();
 };
 
-/// Two states for sockets, recv and send mode
+/// Three states for sockets: recv frame size, recv data, and send mode
 enum TSocketState {
+  SOCKET_RECV_FRAMING,
   SOCKET_RECV,
   SOCKET_SEND
 };
@@ -681,9 +715,6 @@ enum TAppState {
 class TConnection {
  private:
 
-  /// Starting size for new connection buffer
-  static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
-
   /// Server handle
   TNonblockingServer* server_;
 
@@ -797,17 +828,14 @@ class TConnection {
   /// Constructor
   TConnection(int socket, short eventFlags, TNonblockingServer *s,
               const sockaddr* addr, socklen_t addrLen) {
-    readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
-    if (readBuffer_ == NULL) {
-      throw new apache::thrift::TException("Out of memory.");
-    }
-    readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
+    readBuffer_ = NULL;
+    readBufferSize_ = 0;
 
     // Allocate input and output tranpsorts
     // these only need to be allocated once per TConnection (they don't need to be
     // reallocated on init() call)
     inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
-    outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
+    outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
     tSocket_.reset(new TSocket());
 
     init(socket, eventFlags, s, addr, addrLen);