You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by br...@apache.org on 2011/08/29 22:28:23 UTC

svn commit: r1162987 - in /thrift/trunk/lib/cpp/src/server: TNonblockingServer.cpp TNonblockingServer.h

Author: bryanduxbury
Date: Mon Aug 29 20:28:23 2011
New Revision: 1162987

URL: http://svn.apache.org/viewvc?rev=1162987&view=rev
Log:
THRIFT-1305. cpp: make TConnection a private inner class of
TNonblockingServer

Patch: Adam Simpkins

Modified:
    thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
    thrift/trunk/lib/cpp/src/server/TNonblockingServer.h

Modified: thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp?rev=1162987&r1=1162986&r2=1162987&view=diff
==============================================================================
--- thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp (original)
+++ thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp Mon Aug 29 20:28:23 2011
@@ -54,7 +54,277 @@ using namespace std;
 using apache::thrift::transport::TSocket;
 using apache::thrift::transport::TTransportException;
 
-class TConnection::Task: public Runnable {
+/// Three states for sockets: recv frame size, recv data, and send mode
+enum TSocketState {
+  SOCKET_RECV_FRAMING,
+  SOCKET_RECV,
+  SOCKET_SEND
+};
+
+/**
+ * Five states for the nonblocking server:
+ *  1) initialize
+ *  2) read 4 byte frame size
+ *  3) read frame of data
+ *  4) send back data (if any)
+ *  5) force immediate connection close
+ */
+enum TAppState {
+  APP_INIT,
+  APP_READ_FRAME_SIZE,
+  APP_READ_REQUEST,
+  APP_WAIT_TASK,
+  APP_SEND_RESULT,
+  APP_CLOSE_CONNECTION
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TNonblockingServer::TConnection {
+ private:
+
+  /// Server handle
+  TNonblockingServer* server_;
+
+  /// Object wrapping network socket
+  boost::shared_ptr<TSocket> tSocket_;
+
+  /// Libevent object
+  struct event event_;
+
+  /// Libevent flags
+  short eventFlags_;
+
+  /// Socket mode
+  TSocketState socketState_;
+
+  /// Application state
+  TAppState appState_;
+
+  /// How much data needed to read
+  uint32_t readWant_;
+
+  /// Where in the read buffer are we
+  uint32_t readBufferPos_;
+
+  /// Read buffer
+  uint8_t* readBuffer_;
+
+  /// Read buffer size
+  uint32_t readBufferSize_;
+
+  /// Write buffer
+  uint8_t* writeBuffer_;
+
+  /// Write buffer size
+  uint32_t writeBufferSize_;
+
+  /// How far through writing are we?
+  uint32_t writeBufferPos_;
+
+  /// Largest size of write buffer seen since buffer was constructed
+  size_t largestWriteBufferSize_;
+
+  /// Count of the number of calls for use with getResizeBufferEveryN().
+  int32_t callsForResize_;
+
+  /// Task handle
+  int taskHandle_;
+
+  /// Task event
+  struct event taskEvent_;
+
+  /// Transport to read from
+  boost::shared_ptr<TMemoryBuffer> inputTransport_;
+
+  /// Transport that processor writes to
+  boost::shared_ptr<TMemoryBuffer> outputTransport_;
+
+  /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
+  boost::shared_ptr<TTransport> factoryInputTransport_;
+  boost::shared_ptr<TTransport> factoryOutputTransport_;
+
+  /// Protocol decoder
+  boost::shared_ptr<TProtocol> inputProtocol_;
+
+  /// Protocol encoder
+  boost::shared_ptr<TProtocol> outputProtocol_;
+
+  /// Server event handler, if any
+  boost::shared_ptr<TServerEventHandler> serverEventHandler_;
+
+  /// Thrift call context, if any
+  void *connectionContext_;
+
+  /// Go into read mode
+  void setRead() {
+    setFlags(EV_READ | EV_PERSIST);
+  }
+
+  /// Go into write mode
+  void setWrite() {
+    setFlags(EV_WRITE | EV_PERSIST);
+  }
+
+  /// Set socket idle
+  void setIdle() {
+    setFlags(0);
+  }
+
+  /**
+   * Set event flags for this connection.
+   *
+   * @param eventFlags flags we pass to libevent for the connection.
+   */
+  void setFlags(short eventFlags);
+
+  /**
+   * Libevent handler called (via our static wrapper) when the connection
+   * socket had something happen.  Rather than use the flags libevent passed,
+   * we use the connection state to determine whether we need to read or
+   * write the socket.
+   */
+  void workSocket();
+
+  /// Close this connection and free or reset its resources.
+  void close();
+
+ public:
+
+  class Task;
+
+  /// Constructor
+  TConnection(int socket, short eventFlags, TNonblockingServer *s,
+              const sockaddr* addr, socklen_t addrLen) {
+    readBuffer_ = NULL;
+    readBufferSize_ = 0;
+
+    // Allocate input and output transports
+    // these only need to be allocated once per TConnection (they don't need to be
+    // reallocated on init() call)
+    inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
+    outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
+    tSocket_.reset(new TSocket());
+
+    init(socket, eventFlags, s, addr, addrLen);
+    server_->incrementNumConnections();
+  }
+
+  ~TConnection() {
+    std::free(readBuffer_);
+    server_->decrementNumConnections();
+  }
+
+ /**
+   * Check buffers against any size limits and shrink it if exceeded.
+   *
+   * @param readLimit we reduce read buffer size to this (if nonzero).
+   * @param writeLimit if nonzero and write buffer is larger, replace it.
+   */
+  void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
+
+  /// Initialize
+  void init(int socket, short eventFlags, TNonblockingServer *s,
+            const sockaddr* addr, socklen_t addrLen);
+
+  /**
+   * This is called when the application transitions from one state into
+   * another. This means that it has finished writing the data that it needed
+   * to, or finished receiving the data that it needed to.
+   */
+  void transition();
+
+  /**
+   * C-callable event handler for connection events.  Provides a callback
+   * that libevent can understand which invokes connection_->workSocket().
+   *
+   * @param fd the descriptor the event occurred on.
+   * @param which the flags associated with the event.
+   * @param v void* callback arg where we placed TConnection's "this".
+   */
+  static void eventHandler(int fd, short /* which */, void* v) {
+    assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
+    ((TConnection*)v)->workSocket();
+  }
+
+  /**
+   * C-callable event handler for signaling task completion.  Provides a
+   * callback that libevent can understand that will read a connection
+   * object's address from a pipe and call connection->transition() for
+   * that object.
+   *
+   * @param fd the descriptor the event occurred on.
+   */
+  static void taskHandler(int fd, short /* which */, void* /* v */) {
+    TConnection* connection;
+    ssize_t nBytes;
+    while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*)))
+        == sizeof(TConnection*)) {
+      connection->transition();
+    }
+    if (nBytes > 0) {
+      throw TException("TConnection::taskHandler unexpected partial read");
+    }
+    if (errno != EWOULDBLOCK && errno != EAGAIN) {
+      GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
+    }
+  }
+
+  /**
+   * Notification to server that processing has ended on this request.
+   * Can be called either when processing is completed or when a waiting
+   * task has been preemptively terminated (on overload).
+   *
+   * @return true if successful, false if unable to notify (check errno).
+   */
+  bool notifyServer() {
+    TConnection* connection = this;
+    if (write(server_->getNotificationSendFD(), (const void*)&connection,
+             sizeof(TConnection*)) != sizeof(TConnection*)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  /// Force connection shutdown for this connection.
+  void forceClose() {
+    appState_ = APP_CLOSE_CONNECTION;
+    if (!notifyServer()) {
+      throw TException("TConnection::forceClose: failed write on notify pipe");
+    }
+  }
+
+  /// return the server this connection was initialized for.
+  TNonblockingServer* getServer() {
+    return server_;
+  }
+
+  /// get state of connection.
+  TAppState getState() {
+    return appState_;
+  }
+
+  /// return the TSocket transport wrapping this network connection
+  boost::shared_ptr<TSocket> getTSocket() const {
+    return tSocket_;
+  }
+
+  /// return the server event handler if any
+  boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
+    return serverEventHandler_;
+  }
+
+  /// return the Thrift connection context if any
+  void* getConnectionContext() {
+    return connectionContext_;
+  }
+
+};
+
+class TNonblockingServer::TConnection::Task: public Runnable {
  public:
   Task(boost::shared_ptr<TProcessor> processor,
        boost::shared_ptr<TProtocol> input,
@@ -109,8 +379,10 @@ class TConnection::Task: public Runnable
   void* connectionContext_;
 };
 
-void TConnection::init(int socket, short eventFlags, TNonblockingServer* s,
-                       const sockaddr* addr, socklen_t addrLen) {
+void TNonblockingServer::TConnection::init(int socket, short eventFlags,
+                                           TNonblockingServer* s,
+                                           const sockaddr* addr,
+                                           socklen_t addrLen) {
   tSocket_->setSocketFD(socket);
   tSocket_->setCachedAddress(addr, addrLen);
 
@@ -150,7 +422,7 @@ void TConnection::init(int socket, short
   }
 }
 
-void TConnection::workSocket() {
+void TNonblockingServer::TConnection::workSocket() {
   int got=0, left=0, sent=0;
   uint32_t fetch = 0;
 
@@ -276,7 +548,10 @@ void TConnection::workSocket() {
  * another. This means that it has finished writing the data that it needed
  * to, or finished receiving the data that it needed to.
  */
-void TConnection::transition() {
+void TNonblockingServer::TConnection::transition() {
+
+  int sz = 0;
+
   // Switch upon the state that we are currently in and move to a new state
   switch (appState_) {
 
@@ -460,7 +735,7 @@ void TConnection::transition() {
   }
 }
 
-void TConnection::setFlags(short eventFlags) {
+void TNonblockingServer::TConnection::setFlags(short eventFlags) {
   // Catch the do nothing case
   if (eventFlags_ == eventFlags) {
     return;
@@ -522,7 +797,7 @@ void TConnection::setFlags(short eventFl
 /**
  * Closes a connection
  */
-void TConnection::close() {
+void TNonblockingServer::TConnection::close() {
   // Delete the registered libevent
   if (event_del(&event_) == -1) {
     GlobalOutput.perror("TConnection::close() event_del", errno);
@@ -543,8 +818,9 @@ void TConnection::close() {
   server_->returnConnection(this);
 }
 
-void TConnection::checkIdleBufferMemLimit(size_t readLimit,
-                                          size_t writeLimit) {
+void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
+    size_t readLimit,
+    size_t writeLimit) {
   if (readLimit > 0 && readBufferSize_ > readLimit) {
     free(readBuffer_);
     readBuffer_ = NULL;
@@ -585,9 +861,10 @@ TNonblockingServer::~TNonblockingServer(
  * Creates a new connection either by reusing an object off the stack or
  * by allocating a new one entirely
  */
-TConnection* TNonblockingServer::createConnection(int socket, short flags,
-                                                  const sockaddr* addr,
-                                                  socklen_t addrLen) {
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(
+    int socket, short flags,
+    const sockaddr* addr,
+    socklen_t addrLen) {
   // Check the stack
   if (connectionStack_.empty()) {
     return new TConnection(socket, flags, this, addr, addrLen);

Modified: thrift/trunk/lib/cpp/src/server/TNonblockingServer.h
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/cpp/src/server/TNonblockingServer.h?rev=1162987&r1=1162986&r2=1162987&view=diff
==============================================================================
--- thrift/trunk/lib/cpp/src/server/TNonblockingServer.h (original)
+++ thrift/trunk/lib/cpp/src/server/TNonblockingServer.h Mon Aug 29 20:28:23 2011
@@ -41,9 +41,6 @@ using apache::thrift::protocol::TProtoco
 using apache::thrift::concurrency::Runnable;
 using apache::thrift::concurrency::ThreadManager;
 
-// Forward declaration of class
-class TConnection;
-
 #ifdef LIBEVENT_VERSION_NUMBER
 #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
 #define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
@@ -94,6 +91,8 @@ enum TOverloadAction {
 
 class TNonblockingServer : public TServer {
  private:
+  class TConnection;
+
   /// Listen backlog
   static const int LISTEN_BACKLOG = 1024;
 
@@ -698,276 +697,6 @@ class TNonblockingServer : public TServe
   void stop();
 };
 
-/// Three states for sockets: recv frame size, recv data, and send mode
-enum TSocketState {
-  SOCKET_RECV_FRAMING,
-  SOCKET_RECV,
-  SOCKET_SEND
-};
-
-/**
- * Five states for the nonblocking servr:
- *  1) initialize
- *  2) read 4 byte frame size
- *  3) read frame of data
- *  4) send back data (if any)
- *  5) force immediate connection close
- */
-enum TAppState {
-  APP_INIT,
-  APP_READ_FRAME_SIZE,
-  APP_READ_REQUEST,
-  APP_WAIT_TASK,
-  APP_SEND_RESULT,
-  APP_CLOSE_CONNECTION
-};
-
-/**
- * Represents a connection that is handled via libevent. This connection
- * essentially encapsulates a socket that has some associated libevent state.
- */
-class TConnection {
- private:
-
-  /// Server handle
-  TNonblockingServer* server_;
-
-  /// Object wrapping network socket
-  boost::shared_ptr<TSocket> tSocket_;
-
-  /// Libevent object
-  struct event event_;
-
-  /// Libevent flags
-  short eventFlags_;
-
-  /// Socket mode
-  TSocketState socketState_;
-
-  /// Application state
-  TAppState appState_;
-
-  /// How much data needed to read
-  uint32_t readWant_;
-
-  /// Where in the read buffer are we
-  uint32_t readBufferPos_;
-
-  /// Read buffer
-  uint8_t* readBuffer_;
-
-  /// Read buffer size
-  uint32_t readBufferSize_;
-
-  /// Write buffer
-  uint8_t* writeBuffer_;
-
-  /// Write buffer size
-  uint32_t writeBufferSize_;
-
-  /// How far through writing are we?
-  uint32_t writeBufferPos_;
-
-  /// Largest size of write buffer seen since buffer was constructed
-  size_t largestWriteBufferSize_;
-
-  /// Count of the number of calls for use with getResizeBufferEveryN().
-  int32_t callsForResize_;
-
-  /// Task handle
-  int taskHandle_;
-
-  /// Task event
-  struct event taskEvent_;
-
-  /// Transport to read from
-  boost::shared_ptr<TMemoryBuffer> inputTransport_;
-
-  /// Transport that processor writes to
-  boost::shared_ptr<TMemoryBuffer> outputTransport_;
-
-  /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
-  boost::shared_ptr<TTransport> factoryInputTransport_;
-  boost::shared_ptr<TTransport> factoryOutputTransport_;
-
-  /// Protocol decoder
-  boost::shared_ptr<TProtocol> inputProtocol_;
-
-  /// Protocol encoder
-  boost::shared_ptr<TProtocol> outputProtocol_;
-
-  /// Server event handler, if any
-  boost::shared_ptr<TServerEventHandler> serverEventHandler_;
-
-  /// Thrift call context, if any
-  void *connectionContext_;
-
-  /// Go into read mode
-  void setRead() {
-    setFlags(EV_READ | EV_PERSIST);
-  }
-
-  /// Go into write mode
-  void setWrite() {
-    setFlags(EV_WRITE | EV_PERSIST);
-  }
-
-  /// Set socket idle
-  void setIdle() {
-    setFlags(0);
-  }
-
-  /**
-   * Set event flags for this connection.
-   *
-   * @param eventFlags flags we pass to libevent for the connection.
-   */
-  void setFlags(short eventFlags);
-
-  /**
-   * Libevent handler called (via our static wrapper) when the connection
-   * socket had something happen.  Rather than use the flags libevent passed,
-   * we use the connection state to determine whether we need to read or
-   * write the socket.
-   */
-  void workSocket();
-
-  /// Close this connection and free or reset its resources.
-  void close();
-
- public:
-
-  class Task;
-
-  /// Constructor
-  TConnection(int socket, short eventFlags, TNonblockingServer *s,
-              const sockaddr* addr, socklen_t addrLen) {
-    readBuffer_ = NULL;
-    readBufferSize_ = 0;
-
-    // Allocate input and output tranpsorts
-    // these only need to be allocated once per TConnection (they don't need to be
-    // reallocated on init() call)
-    inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
-    outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
-    tSocket_.reset(new TSocket());
-
-    init(socket, eventFlags, s, addr, addrLen);
-    server_->incrementNumConnections();
-  }
-
-  ~TConnection() {
-    std::free(readBuffer_);
-    server_->decrementNumConnections();
-  }
-
- /**
-   * Check buffers against any size limits and shrink it if exceeded.
-   *
-   * @param readLimit we reduce read buffer size to this (if nonzero).
-   * @param writeLimit if nonzero and write buffer is larger, replace it.
-   */
-  void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
-
-  /// Initialize
-  void init(int socket, short eventFlags, TNonblockingServer *s,
-            const sockaddr* addr, socklen_t addrLen);
-
-  /**
-   * This is called when the application transitions from one state into
-   * another. This means that it has finished writing the data that it needed
-   * to, or finished receiving the data that it needed to.
-   */
-  void transition();
-
-  /**
-   * C-callable event handler for connection events.  Provides a callback
-   * that libevent can understand which invokes connection_->workSocket().
-   *
-   * @param fd the descriptor the event occured on.
-   * @param which the flags associated with the event.
-   * @param v void* callback arg where we placed TConnection's "this".
-   */
-  static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
-    assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
-    ((TConnection*)v)->workSocket();
-  }
-
-  /**
-   * C-callable event handler for signaling task completion.  Provides a
-   * callback that libevent can understand that will read a connection
-   * object's address from a pipe and call connection->transition() for
-   * that object.
-   *
-   * @param fd the descriptor the event occured on.
-   */
-  static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
-    TConnection* connection;
-    ssize_t nBytes;
-    while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
-        == sizeof(TConnection*)) {
-      connection->transition();
-    }
-    if (nBytes > 0) {
-      throw TException("TConnection::taskHandler unexpected partial read");
-    }
-    if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
-      GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
-    }
-  }
-
-  /**
-   * Notification to server that processing has ended on this request.
-   * Can be called either when processing is completed or when a waiting
-   * task has been preemptively terminated (on overload).
-   *
-   * @return true if successful, false if unable to notify (check errno).
-   */
-  bool notifyServer() {
-    TConnection* connection = this;
-    if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
-             sizeof(TConnection*), 0) != sizeof(TConnection*)) {
-      return false;
-    }
-
-    return true;
-  }
-
-  /// Force connection shutdown for this connection.
-  void forceClose() {
-    appState_ = APP_CLOSE_CONNECTION;
-    if (!notifyServer()) {
-      throw TException("TConnection::forceClose: failed write on notify pipe");
-    }
-  }
-
-  /// return the server this connection was initialized for.
-  TNonblockingServer* getServer() {
-    return server_;
-  }
-
-  /// get state of connection.
-  TAppState getState() {
-    return appState_;
-  }
-
-  /// return the TSocket transport wrapping this network connection
-  boost::shared_ptr<TSocket> getTSocket() const {
-    return tSocket_;
-  }
-
-  /// return the server event handler if any
-  boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
-    return serverEventHandler_;
-  }
-
-  /// return the Thrift connection context if any
-  void* getConnectionContext() {
-    return connectionContext_;
-  }
-
-};
-
 }}} // apache::thrift::server
 
 #endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_