You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by be...@apache.org on 2015/04/24 15:55:28 UTC

thrift git commit: THRIFT-2441 Cannot shutdown TThreadedServer when clients are still connected

Repository: thrift
Updated Branches:
  refs/heads/master 95717c92d -> 1684c4295


THRIFT-2441 Cannot shutdown TThreadedServer when clients are still connected

Author: James E. King, III <Ji...@simplivity.com>


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/1684c429
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/1684c429
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/1684c429

Branch: refs/heads/master
Commit: 1684c429501e9df9387cb518e660691f032d7926
Parents: 95717c9
Author: Ben Craig <be...@apache.org>
Authored: Fri Apr 24 08:52:44 2015 -0500
Committer: Ben Craig <be...@apache.org>
Committed: Fri Apr 24 08:52:44 2015 -0500

----------------------------------------------------------------------
 .gitignore                                      |   2 +
 configure.ac                                    |   3 +
 lib/cpp/src/thrift/server/TSimpleServer.cpp     |  23 +-
 lib/cpp/src/thrift/server/TSimpleServer.h       |  19 +-
 lib/cpp/src/thrift/server/TThreadPoolServer.cpp |  28 +-
 lib/cpp/src/thrift/server/TThreadPoolServer.h   |   7 +-
 lib/cpp/src/thrift/server/TThreadedServer.cpp   |  26 +-
 lib/cpp/src/thrift/server/TThreadedServer.h     |   6 +-
 lib/cpp/src/thrift/transport/TServerSocket.cpp  | 125 ++++++---
 lib/cpp/src/thrift/transport/TServerSocket.h    |  30 ++-
 lib/cpp/src/thrift/transport/TServerTransport.h |  17 +-
 lib/cpp/src/thrift/transport/TSocket.cpp        | 135 +++++++---
 lib/cpp/src/thrift/transport/TSocket.h          |  28 +-
 lib/cpp/test/CMakeLists.txt                     |  27 +-
 lib/cpp/test/Makefile.am                        |  32 ++-
 lib/cpp/test/TServerIntegrationTest.cpp         | 253 +++++++++++++++++++
 lib/cpp/test/TServerSocketTest.cpp              |  15 +-
 lib/cpp/test/TServerTransportTest.cpp           |  62 +++++
 lib/cpp/test/TSocketInterruptTest.cpp           | 154 +++++++++++
 19 files changed, 855 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 3421aef..7c01d64 100644
--- a/.gitignore
+++ b/.gitignore
@@ -98,6 +98,8 @@ test-driver
 /lib/cpp/test/TFileTransportTest
 /lib/cpp/test/TNonblockingServerTest
 /lib/cpp/test/TPipedTransportTest
+/lib/cpp/test/TServerIntegrationTest
+/lib/cpp/test/TSocketInterruptTest
 /lib/cpp/test/TransportTest
 /lib/cpp/test/UnitTests
 /lib/cpp/test/ZlibTest

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 0032354..b1a79e7 100755
--- a/configure.ac
+++ b/configure.ac
@@ -136,7 +136,10 @@ if test "$with_cpp" = "yes";  then
   AX_BOOST_BASE([1.53.0])
   if test "x$succeeded" = "xyes" ; then
     AC_SUBST([BOOST_LIB_DIR], [$(echo "$BOOST_LDFLAGS" | sed -e 's/^\-L//')])
+    AC_SUBST([BOOST_CHRONO_LDADD], [$(echo "$BOOST_LIB_DIR/libboost_chrono.a")])
+    AC_SUBST([BOOST_SYSTEM_LDADD], [$(echo "$BOOST_LIB_DIR/libboost_system.a")])
     AC_SUBST([BOOST_TEST_LDADD], [$(echo "$BOOST_LIB_DIR/libboost_unit_test_framework.a")])
+    AC_SUBST([BOOST_THREAD_LDADD], [$(echo "$BOOST_LIB_DIR/libboost_thread.a")])
     have_cpp="yes"
   fi
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/server/TSimpleServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp
index fa6bff5..19f44ac 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.cpp
+++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp
@@ -70,11 +70,11 @@ void TSimpleServer::serve() {
       if (client) {
         client->close();
       }
-      if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+      if (ttx.getType() != TTransportException::INTERRUPTED) {
         string errStr = string("TServerTransport died on accept: ") + ttx.what();
         GlobalOutput(errStr.c_str());
       }
-      continue;
+      if (stop_) break; else continue;
     } catch (TException& tx) {
       if (inputTransport) {
         inputTransport->close();
@@ -88,7 +88,7 @@ void TSimpleServer::serve() {
       string errStr = string("Some kind of accept exception: ") + tx.what();
       GlobalOutput(errStr.c_str());
       continue;
-    } catch (string s) {
+    } catch (const string& s) {
       if (inputTransport) {
         inputTransport->close();
       }
@@ -122,8 +122,12 @@ void TSimpleServer::serve() {
         }
       }
     } catch (const TTransportException& ttx) {
-      string errStr = string("TSimpleServer client died: ") + ttx.what();
-      GlobalOutput(errStr.c_str());
+      if (ttx.getType() != TTransportException::END_OF_FILE &&
+          ttx.getType() != TTransportException::INTERRUPTED)
+      {
+        string errStr = string("TSimpleServer client died: ") + ttx.what();
+        GlobalOutput(errStr.c_str());
+      }
     } catch (const std::exception& x) {
       GlobalOutput.printf("TSimpleServer exception: %s: %s", typeid(x).name(), x.what());
     } catch (...) {
@@ -163,6 +167,15 @@ void TSimpleServer::serve() {
     stop_ = false;
   }
 }
+
+void TSimpleServer::stop() {
+  if (!stop_) {
+    stop_ = true;
+    serverTransport_->interrupt();
+    serverTransport_->interruptChildren();
+  }
+}
+
 }
 }
 } // apache::thrift::server

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/server/TSimpleServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h
index 967f834..941f12b 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.h
+++ b/lib/cpp/src/thrift/server/TSimpleServer.h
@@ -32,7 +32,6 @@ namespace server {
  * continuous loop of accepting a single connection, processing requests on
  * that connection until it closes, and then repeating. It is a good example
  * of how to extend the TServer interface.
- *
  */
 class TSimpleServer : public TServer {
 public:
@@ -84,14 +83,20 @@ public:
               outputProtocolFactory),
       stop_(false) {}
 
-  ~TSimpleServer() {}
-
+  /**
+   * Process one connection at a time using the caller's thread.
+   * Call stop() on another thread to interrupt processing and
+   * return control to the caller.
+   * Post-conditions (return guarantees):
+   *   The serverTransport will be closed.
+   *   There will be no connected client.
+   */
   void serve();
 
-  void stop() {
-    stop_ = true;
-    serverTransport_->interrupt();
-  }
+  /**
+   * Interrupt serve() so that it meets post-conditions.
+   */
+  void stop();
 
 protected:
   bool stop_;

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index 0530d8d..58cfe3e 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -69,11 +69,12 @@ public:
           break;
         }
       }
-    } catch (const TTransportException&) {
-      // This is reasonably expected, client didn't send a full request so just
-      // ignore him
-      // string errStr = string("TThreadPoolServer client died: ") + ttx.what();
-      // GlobalOutput(errStr.c_str());
+    } catch (const TTransportException& ttx) {
+      if (ttx.getType() != TTransportException::END_OF_FILE &&
+          ttx.getType() != TTransportException::INTERRUPTED) {
+        string errStr = string("TThreadPoolServer::Task client died: ") + ttx.what();
+        GlobalOutput(errStr.c_str());
+      }
     } catch (const std::exception& x) {
       GlobalOutput.printf("TThreadPoolServer exception %s: %s", typeid(x).name(), x.what());
     } catch (...) {
@@ -108,8 +109,7 @@ private:
   shared_ptr<TTransport> transport_;
 };
 
-TThreadPoolServer::~TThreadPoolServer() {
-}
+TThreadPoolServer::~TThreadPoolServer() {}
 
 void TThreadPoolServer::serve() {
   shared_ptr<TTransport> client;
@@ -160,11 +160,11 @@ void TThreadPoolServer::serve() {
       if (client) {
         client->close();
       }
-      if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+      if (ttx.getType() != TTransportException::INTERRUPTED) {
         string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what();
         GlobalOutput(errStr.c_str());
       }
-      continue;
+      if (stop_) break; else continue;
     } catch (TException& tx) {
       if (inputTransport) {
         inputTransport->close();
@@ -178,7 +178,7 @@ void TThreadPoolServer::serve() {
       string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what();
       GlobalOutput(errStr.c_str());
       continue;
-    } catch (string s) {
+    } catch (const string& s) {
       if (inputTransport) {
         inputTransport->close();
       }
@@ -207,6 +207,14 @@ void TThreadPoolServer::serve() {
   }
 }
 
+void TThreadPoolServer::stop() {
+  if (!stop_) {
+    stop_ = true;
+    serverTransport_->interrupt();
+    serverTransport_->interruptChildren();
+  }
+}
+
 int64_t TThreadPoolServer::getTimeout() const {
   return timeout_;
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/server/TThreadPoolServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h
index ad7e7ef..1696700 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.h
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h
@@ -109,15 +109,12 @@ public:
 
   virtual void serve();
 
+  virtual void stop();
+
   virtual int64_t getTimeout() const;
 
   virtual void setTimeout(int64_t value);
 
-  virtual void stop() {
-    stop_ = true;
-    serverTransport_->interrupt();
-  }
-
   virtual int64_t getTaskExpiration() const;
 
   virtual void setTaskExpiration(int64_t value);

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/server/TThreadedServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 380f69c..118c9cb 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -55,10 +55,6 @@ public:
 
   ~Task() {}
 
-  void stop() {
-    input_->getTransport()->close();
-  }
-
   void run() {
     boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
     void* connectionContext = NULL;
@@ -76,7 +72,8 @@ public:
         }
       }
     } catch (const TTransportException& ttx) {
-      if (ttx.getType() != TTransportException::END_OF_FILE) {
+      if (ttx.getType() != TTransportException::END_OF_FILE &&
+          ttx.getType() != TTransportException::INTERRUPTED) {
         string errStr = string("TThreadedServer client died: ") + ttx.what();
         GlobalOutput(errStr.c_str());
       }
@@ -130,8 +127,7 @@ void TThreadedServer::init() {
   }
 }
 
-TThreadedServer::~TThreadedServer() {
-}
+TThreadedServer::~TThreadedServer() {}
 
 void TThreadedServer::serve() {
 
@@ -196,11 +192,11 @@ void TThreadedServer::serve() {
       if (client) {
         client->close();
       }
-      if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+      if (ttx.getType() != TTransportException::INTERRUPTED) {
         string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what();
         GlobalOutput(errStr.c_str());
       }
-      continue;
+      if (stop_) break; else continue;
     } catch (TException& tx) {
       if (inputTransport) {
         inputTransport->close();
@@ -214,7 +210,7 @@ void TThreadedServer::serve() {
       string errStr = string("TThreadedServer: Caught TException: ") + tx.what();
       GlobalOutput(errStr.c_str());
       continue;
-    } catch (string s) {
+    } catch (const string& s) {
       if (inputTransport) {
         inputTransport->close();
       }
@@ -240,8 +236,6 @@ void TThreadedServer::serve() {
     }
     try {
       Synchronized s(tasksMonitor_);
-      for ( std::set<Task*>::iterator tIt = tasks_.begin(); tIt != tasks_.end(); ++tIt )
-        (*tIt)->stop();
       while (!tasks_.empty()) {
         tasksMonitor_.wait();
       }
@@ -252,6 +246,14 @@ void TThreadedServer::serve() {
     stop_ = false;
   }
 }
+
+void TThreadedServer::stop() {
+  if (!stop_) {
+	stop_ = true;
+	serverTransport_->interrupt();
+	serverTransport_->interruptChildren();
+  }
+}
 }
 }
 } // apache::thrift::server

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/server/TThreadedServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index 2b1f757..b9b24fe 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -75,11 +75,7 @@ public:
   virtual ~TThreadedServer();
 
   virtual void serve();
-
-  void stop() {
-    stop_ = true;
-    serverTransport_->interrupt();
-  }
+  void stop();
 
 protected:
   void init();

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/transport/TServerSocket.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.cpp b/lib/cpp/src/thrift/transport/TServerSocket.cpp
index 8c8fd73..f5c3ea5 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TServerSocket.cpp
@@ -20,6 +20,7 @@
 #include <thrift/thrift-config.h>
 
 #include <cstring>
+#include <stdexcept>
 #include <sys/types.h>
 #ifdef HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
@@ -69,6 +70,12 @@ inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
   return reinterpret_cast<SOCKOPT_CAST_T*>(v);
 }
 
+void destroyer_of_fine_sockets(THRIFT_SOCKET *ssock)
+{
+    ::THRIFT_CLOSESOCKET(*ssock);
+    delete ssock;
+}
+
 namespace apache {
 namespace thrift {
 namespace transport {
@@ -88,9 +95,12 @@ TServerSocket::TServerSocket(int port)
     tcpSendBuffer_(0),
     tcpRecvBuffer_(0),
     keepAlive_(false),
-    intSock1_(THRIFT_INVALID_SOCKET),
-    intSock2_(THRIFT_INVALID_SOCKET) {
-}
+    interruptableChildren_(true),
+    listening_(false),
+    interruptSockWriter_(THRIFT_INVALID_SOCKET),
+    interruptSockReader_(THRIFT_INVALID_SOCKET),
+    childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
+{}
 
 TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout)
   : port_(port),
@@ -104,9 +114,12 @@ TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout)
     tcpSendBuffer_(0),
     tcpRecvBuffer_(0),
     keepAlive_(false),
-    intSock1_(THRIFT_INVALID_SOCKET),
-    intSock2_(THRIFT_INVALID_SOCKET) {
-}
+    interruptableChildren_(true),
+    listening_(false),
+    interruptSockWriter_(THRIFT_INVALID_SOCKET),
+    interruptSockReader_(THRIFT_INVALID_SOCKET),
+    childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
+{}
 
 TServerSocket::TServerSocket(const string& address, int port)
   : port_(port),
@@ -121,9 +134,12 @@ TServerSocket::TServerSocket(const string& address, int port)
     tcpSendBuffer_(0),
     tcpRecvBuffer_(0),
     keepAlive_(false),
-    intSock1_(THRIFT_INVALID_SOCKET),
-    intSock2_(THRIFT_INVALID_SOCKET) {
-}
+    interruptableChildren_(true),
+    listening_(false),
+    interruptSockWriter_(THRIFT_INVALID_SOCKET),
+    interruptSockReader_(THRIFT_INVALID_SOCKET),
+    childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
+{}
 
 TServerSocket::TServerSocket(const string& path)
   : port_(0),
@@ -138,9 +154,12 @@ TServerSocket::TServerSocket(const string& path)
     tcpSendBuffer_(0),
     tcpRecvBuffer_(0),
     keepAlive_(false),
-    intSock1_(THRIFT_INVALID_SOCKET),
-    intSock2_(THRIFT_INVALID_SOCKET) {
-}
+    interruptableChildren_(true),
+    listening_(false),
+    interruptSockWriter_(THRIFT_INVALID_SOCKET),
+    interruptSockReader_(THRIFT_INVALID_SOCKET),
+    childInterruptSockWriter_(THRIFT_INVALID_SOCKET)
+{}
 
 TServerSocket::~TServerSocket() {
   close();
@@ -178,18 +197,41 @@ void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
   tcpRecvBuffer_ = tcpRecvBuffer;
 }
 
+void TServerSocket::setInterruptableChildren(bool enable) {
+  if (listening_) {
+    throw std::logic_error("setInterruptableChildren cannot be called after listen()");
+  }
+  interruptableChildren_ = enable;
+}
+
 void TServerSocket::listen() {
+  listening_ = true;
 #ifdef _WIN32
   TWinsockSingleton::create();
 #endif // _WIN32
   THRIFT_SOCKET sv[2];
+  // Create the socket pair used to interrupt
   if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
-    GlobalOutput.perror("TServerSocket::listen() socketpair() ", THRIFT_GET_SOCKET_ERROR);
-    intSock1_ = THRIFT_INVALID_SOCKET;
-    intSock2_ = THRIFT_INVALID_SOCKET;
+    GlobalOutput.perror("TServerSocket::listen() socketpair() interrupt",
+    		THRIFT_GET_SOCKET_ERROR);
+    interruptSockWriter_ = THRIFT_INVALID_SOCKET;
+    interruptSockReader_ = THRIFT_INVALID_SOCKET;
   } else {
-    intSock1_ = sv[1];
-    intSock2_ = sv[0];
+    interruptSockWriter_ = sv[1];
+    interruptSockReader_ = sv[0];
+  }
+
+  // Create the socket pair used to interrupt all clients
+  if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+    GlobalOutput.perror("TServerSocket::listen() socketpair() childInterrupt",
+    		THRIFT_GET_SOCKET_ERROR);
+    childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
+    pChildInterruptSockReader_.reset();
+  } else {
+    childInterruptSockWriter_ = sv[1];
+    pChildInterruptSockReader_ =
+    		boost::shared_ptr<THRIFT_SOCKET>(new THRIFT_SOCKET(sv[0]),
+    				destroyer_of_fine_sockets);
   }
 
   // Validate port number
@@ -469,8 +511,8 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
     std::memset(fds, 0, sizeof(fds));
     fds[0].fd = serverSocket_;
     fds[0].events = THRIFT_POLLIN;
-    if (intSock2_ != THRIFT_INVALID_SOCKET) {
-      fds[1].fd = intSock2_;
+    if (interruptSockReader_ != THRIFT_INVALID_SOCKET) {
+      fds[1].fd = interruptSockReader_;
       fds[1].events = THRIFT_POLLIN;
     }
     /*
@@ -491,9 +533,9 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
       throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
     } else if (ret > 0) {
       // Check for an interrupt signal
-      if (intSock2_ != THRIFT_INVALID_SOCKET && (fds[1].revents & THRIFT_POLLIN)) {
+      if (interruptSockReader_ != THRIFT_INVALID_SOCKET && (fds[1].revents & THRIFT_POLLIN)) {
         int8_t buf;
-        if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
+        if (-1 == recv(interruptSockReader_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
           GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ",
                               THRIFT_GET_SOCKET_ERROR);
         }
@@ -562,33 +604,52 @@ shared_ptr<TTransport> TServerSocket::acceptImpl() {
 }
 
 shared_ptr<TSocket> TServerSocket::createSocket(THRIFT_SOCKET clientSocket) {
-  return shared_ptr<TSocket>(new TSocket(clientSocket));
+  if (interruptableChildren_) {
+    return shared_ptr<TSocket>(new TSocket(clientSocket, pChildInterruptSockReader_));
+  } else {
+    return shared_ptr<TSocket>(new TSocket(clientSocket));
+  }
 }
 
-void TServerSocket::interrupt() {
-  if (intSock1_ != THRIFT_INVALID_SOCKET) {
+void TServerSocket::notify(THRIFT_SOCKET notifySocket) {
+  if (notifySocket != THRIFT_INVALID_SOCKET) {
     int8_t byte = 0;
-    if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0)) {
-      GlobalOutput.perror("TServerSocket::interrupt() send() ", THRIFT_GET_SOCKET_ERROR);
+    if (-1 == send(notifySocket, cast_sockopt(&byte), sizeof(int8_t), 0)) {
+      GlobalOutput.perror("TServerSocket::notify() send() ", THRIFT_GET_SOCKET_ERROR);
     }
   }
 }
 
+void TServerSocket::interrupt() {
+  notify(interruptSockWriter_);
+}
+
+void TServerSocket::interruptChildren() {
+  notify(childInterruptSockWriter_);
+}
+
 void TServerSocket::close() {
   if (serverSocket_ != THRIFT_INVALID_SOCKET) {
     shutdown(serverSocket_, THRIFT_SHUT_RDWR);
     ::THRIFT_CLOSESOCKET(serverSocket_);
   }
-  if (intSock1_ != THRIFT_INVALID_SOCKET) {
-    ::THRIFT_CLOSESOCKET(intSock1_);
+  if (interruptSockWriter_ != THRIFT_INVALID_SOCKET) {
+    ::THRIFT_CLOSESOCKET(interruptSockWriter_);
   }
-  if (intSock2_ != THRIFT_INVALID_SOCKET) {
-    ::THRIFT_CLOSESOCKET(intSock2_);
+  if (interruptSockReader_ != THRIFT_INVALID_SOCKET) {
+    ::THRIFT_CLOSESOCKET(interruptSockReader_);
+  }
+  if (childInterruptSockWriter_ != THRIFT_INVALID_SOCKET) {
+    ::THRIFT_CLOSESOCKET(childInterruptSockWriter_);
   }
   serverSocket_ = THRIFT_INVALID_SOCKET;
-  intSock1_ = THRIFT_INVALID_SOCKET;
-  intSock2_ = THRIFT_INVALID_SOCKET;
+  interruptSockWriter_ = THRIFT_INVALID_SOCKET;
+  interruptSockReader_ = THRIFT_INVALID_SOCKET;
+  childInterruptSockWriter_ = THRIFT_INVALID_SOCKET;
+  pChildInterruptSockReader_.reset();
+  listening_ = false;
 }
+
 }
 }
 } // apache::thrift::transport

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/transport/TServerSocket.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.h b/lib/cpp/src/thrift/transport/TServerSocket.h
index 49711e8..58e4afd 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.h
+++ b/lib/cpp/src/thrift/transport/TServerSocket.h
@@ -100,17 +100,33 @@ public:
   // socket, this is the place to do it.
   void setAcceptCallback(const socket_func_t& acceptCallback) { acceptCallback_ = acceptCallback; }
 
-  void listen();
-  void close();
+  // When enabled (the default), new children TSockets will be constructed so
+  // they can be interrupted by TServerTransport::interruptChildren().
+  // This is more expensive in terms of system calls (poll + recv) however
+  // ensures a connected client cannot interfere with TServer::stop().
+  //
+  // When disabled, TSocket children do not incur an additional poll() call.
+  // Server-side reads are more efficient, however a client can interfere with
+  // the server's ability to shutdown properly by staying connected.
+  //
+  // Must be called before listen(); mode cannot be switched after that.
+  // \throws std::logic_error if listen() has been called
+  void setInterruptableChildren(bool enable);
 
-  void interrupt();
   int getPort();
 
+  void listen();
+  void interrupt();
+  void interruptChildren();
+  void close();
+
 protected:
   boost::shared_ptr<TTransport> acceptImpl();
   virtual boost::shared_ptr<TSocket> createSocket(THRIFT_SOCKET client);
 
 private:
+  void notify(THRIFT_SOCKET notifySock);
+
   int port_;
   std::string address_;
   std::string path_;
@@ -124,9 +140,13 @@ private:
   int tcpSendBuffer_;
   int tcpRecvBuffer_;
   bool keepAlive_;
+  bool interruptableChildren_;
+  bool listening_;
 
-  THRIFT_SOCKET intSock1_;
-  THRIFT_SOCKET intSock2_;
+  THRIFT_SOCKET interruptSockWriter_;                          // is notified on interrupt()
+  THRIFT_SOCKET interruptSockReader_;                          // is used in select/poll with serverSocket_ for interruptability
+  THRIFT_SOCKET childInterruptSockWriter_;                     // is notified on interruptChildren()
+  boost::shared_ptr<THRIFT_SOCKET> pChildInterruptSockReader_; // if interruptableChildren_ this is shared with child TSockets
 
   socket_func_t listenCallback_;
   socket_func_t acceptCallback_;

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/transport/TServerTransport.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TServerTransport.h b/lib/cpp/src/thrift/transport/TServerTransport.h
index 7c4a7c3..cd1d3da 100644
--- a/lib/cpp/src/thrift/transport/TServerTransport.h
+++ b/lib/cpp/src/thrift/transport/TServerTransport.h
@@ -30,8 +30,9 @@ namespace transport {
 
 /**
  * Server transport framework. A server needs to have some facility for
- * creating base transports to read/write from.
- *
+ * creating base transports to read/write from.  The server is expected
+ * to keep track of TTransport children that it creates for purposes of
+ * controlling their lifetime.
  */
 class TServerTransport {
 public:
@@ -67,11 +68,21 @@ public:
    * For "smart" TServerTransport implementations that work in a multi
    * threaded context this can be used to break out of an accept() call.
    * It is expected that the transport will throw a TTransportException
-   * with the interrupted error code.
+   * with the INTERRUPTED error code.
+   *
+   * This will not make an attempt to interrupt any TTransport children.
    */
   virtual void interrupt() {}
 
   /**
+   * This will interrupt the children created by the server transport.
+   * allowing them to break out of any blocking data reception call.
+   * It is expected that the children will throw a TTransportException
+   * with the INTERRUPTED error code.
+   */
+  virtual void interruptChildren() {}
+
+  /**
    * Closes this transport such that future calls to accept will do nothing.
    */
   virtual void close() = 0;

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/transport/TSocket.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp
index bcea291..cc4dce0 100644
--- a/lib/cpp/src/thrift/transport/TSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSocket.cpp
@@ -74,7 +74,7 @@ using namespace std;
  *
  */
 
-TSocket::TSocket(string host, int port)
+TSocket::TSocket(const string& host, int port)
   : host_(host),
     port_(port),
     path_(""),
@@ -89,7 +89,7 @@ TSocket::TSocket(string host, int port)
     maxRecvRetries_(5) {
 }
 
-TSocket::TSocket(string path)
+TSocket::TSocket(const string& path)
   : host_(""),
     port_(0),
     path_(path),
@@ -143,6 +143,30 @@ TSocket::TSocket(THRIFT_SOCKET socket)
 #endif
 }
 
+TSocket::TSocket(THRIFT_SOCKET socket,
+                 boost::shared_ptr<THRIFT_SOCKET> interruptListener) :
+  host_(""),
+  port_(0),
+  path_(""),
+  socket_(socket),
+  interruptListener_(interruptListener),
+  connTimeout_(0),
+  sendTimeout_(0),
+  recvTimeout_(0),
+  keepAlive_(false),
+  lingerOn_(1),
+  lingerVal_(0),
+  noDelay_(1),
+  maxRecvRetries_(5) {
+  cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
+#ifdef SO_NOSIGPIPE
+  {
+    int one = 1;
+    setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
+  }
+#endif
+}
+
 TSocket::~TSocket() {
   close();
 }
@@ -153,8 +177,41 @@ bool TSocket::isOpen() {
 
 bool TSocket::peek() {
   if (!isOpen()) {
-    return false;
+     return false;
   }
+  if (interruptListener_)
+  {
+    for (int retries = 0; ; ) {
+      struct THRIFT_POLLFD fds[2];
+      std::memset(fds, 0, sizeof(fds));
+      fds[0].fd = socket_;
+      fds[0].events = THRIFT_POLLIN;
+      fds[1].fd = *(interruptListener_.get());
+      fds[1].events = THRIFT_POLLIN;
+      int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
+      int errno_copy = THRIFT_GET_SOCKET_ERROR;
+      if (ret < 0) {
+        // error cases
+        if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
+          continue;
+        }
+        GlobalOutput.perror("TSocket::peek() THRIFT_POLL() ", errno_copy);
+        throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
+      } else if (ret > 0) {
+        // Check the interruptListener
+        if (fds[1].revents & THRIFT_POLLIN) {
+          return false;
+        }
+        // There must be data or a disconnection, fall through to the PEEK
+        break;
+      } else {
+        // timeout
+        return false;
+      }
+    }
+  }
+
+  // Check to see if data is available or if the remote side closed
   uint8_t buf;
   int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK));
   if (r == -1) {
@@ -455,9 +512,44 @@ try_again:
     // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
     begin.tv_sec = begin.tv_usec = 0;
   }
-  int got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
-  int errno_copy = THRIFT_GET_SOCKET_ERROR; // THRIFT_GETTIMEOFDAY can change
-                                            // THRIFT_GET_SOCKET_ERROR
+
+  int got = 0;
+
+  if (interruptListener_)
+  {
+    struct THRIFT_POLLFD fds[2];
+    std::memset(fds, 0, sizeof(fds));
+    fds[0].fd = socket_;
+    fds[0].events = THRIFT_POLLIN;
+    fds[1].fd = *(interruptListener_.get());
+    fds[1].events = THRIFT_POLLIN;
+
+    int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
+    int errno_copy = THRIFT_GET_SOCKET_ERROR;
+    if (ret < 0) {
+      // error cases
+      if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
+        goto try_again;
+      }
+      GlobalOutput.perror("TSocket::read() THRIFT_POLL() ", errno_copy);
+      throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
+    } else if (ret > 0) {
+      // Check the interruptListener
+      if (fds[1].revents & THRIFT_POLLIN) {
+        throw TTransportException(TTransportException::INTERRUPTED,
+                                  "Interrupted");
+      }
+    } else /* ret == 0 */ {
+      throw TTransportException(TTransportException::TIMED_OUT,
+                                  "THRIFT_EAGAIN (timed out)");
+    }
+
+    // falling through means there is something to recv and it cannot block
+  }
+
+  got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
+  // THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
+  int errno_copy = THRIFT_GET_SOCKET_ERROR;
 
   // Check for error on read
   if (got < 0) {
@@ -493,29 +585,9 @@ try_again:
       goto try_again;
     }
 
-#if defined __FreeBSD__ || defined __MACH__
     if (errno_copy == THRIFT_ECONNRESET) {
-      /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with
-       * THRIFT_ECONNRESET if peer performed shutdown
-       * edhall: eliminated close() since we do that in the destructor.
-       */
       return 0;
     }
-#endif
-
-#ifdef _WIN32
-    if (errno_copy == WSAECONNRESET) {
-      return 0; // EOF
-    }
-#endif
-
-    // Now it's not a try again case, but a real probblez
-    GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
-
-    // If we disconnect with no linger time
-    if (errno_copy == THRIFT_ECONNRESET) {
-      throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ECONNRESET");
-    }
 
     // This ish isn't open
     if (errno_copy == THRIFT_ENOTCONN) {
@@ -527,18 +599,13 @@ try_again:
       throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT");
     }
 
+    // Now it's not a try again case, but a real probblez
+    GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
+
     // Some other error, whatevz
     throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
   }
 
-  // The remote host has closed the socket
-  if (got == 0) {
-    // edhall: we used to call close() here, but our caller may want to deal
-    // with the socket fd and we'll close() in our destructor in any case.
-    return 0;
-  }
-
-  // Pack data into string
   return got;
 }
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/src/thrift/transport/TSocket.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TSocket.h b/lib/cpp/src/thrift/transport/TSocket.h
index 427b91f..09a64e3 100644
--- a/lib/cpp/src/thrift/transport/TSocket.h
+++ b/lib/cpp/src/thrift/transport/TSocket.h
@@ -61,7 +61,7 @@ public:
    * @param host An IP address or hostname to connect to
    * @param port The port to connect on
    */
-  TSocket(std::string host, int port);
+  TSocket(const std::string& host, int port);
 
   /**
    * Constructs a new Unix domain socket.
@@ -69,7 +69,7 @@ public:
    *
    * @param path The Unix domain socket e.g. "/tmp/ThriftTest.binary.thrift"
    */
-  TSocket(std::string path);
+  TSocket(const std::string& path);
 
   /**
    * Destroyes the socket object, closing it if necessary.
@@ -102,6 +102,13 @@ public:
 
   /**
    * Reads from the underlying socket.
+   * \returns the number of bytes read or 0 indicates EOF
+   * \throws TTransportException of types:
+   *           INTERRUPTED means the socket was interrupted
+   *                       out of a blocking call
+   *           NOT_OPEN means the socket has been closed
+   *           TIMED_OUT means the receive timeout expired
+   *           UNKNOWN means something unexpected happened
    */
   virtual uint32_t read(uint8_t* buf, uint32_t len);
 
@@ -242,11 +249,18 @@ public:
   virtual const std::string getOrigin();
 
   /**
-   * Constructor to create socket from raw UNIX handle.
+   * Constructor to create socket from file descriptor.
    */
   TSocket(THRIFT_SOCKET socket);
 
   /**
+   * Constructor to create socket from file descriptor that
+   * can be interrupted safely.
+   */
+  TSocket(THRIFT_SOCKET socket,
+          boost::shared_ptr<THRIFT_SOCKET> interruptListener);
+
+  /**
    * Set a cache of the peer address (used when trivially available: e.g.
    * accept() or connect()). Only caches IPV4 and IPV6; unset for others.
    */
@@ -274,9 +288,15 @@ protected:
   /** UNIX domain socket path */
   std::string path_;
 
-  /** Underlying UNIX socket handle */
+  /** Underlying socket handle */
   THRIFT_SOCKET socket_;
 
+  /**
+   * A shared socket pointer that will interrupt a blocking read if data
+   * becomes available on it
+   */
+  boost::shared_ptr<THRIFT_SOCKET> interruptListener_;
+
   /** Connect timeout in ms */
   int connTimeout_;
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt
index 721053a..83ebe9e 100644
--- a/lib/cpp/test/CMakeLists.txt
+++ b/lib/cpp/test/CMakeLists.txt
@@ -20,7 +20,7 @@
 
 # Find required packages
 set(Boost_USE_STATIC_LIBS ON) # Force the use of static boost test framework
-find_package(Boost 1.53.0 REQUIRED COMPONENTS unit_test_framework)
+find_package(Boost 1.53.0 REQUIRED COMPONENTS chrono system thread unit_test_framework)
 include_directories(SYSTEM "${Boost_INCLUDE_DIRS}")
 
 #Make sure gen-cpp files can be included
@@ -50,6 +50,8 @@ target_link_libraries(testgencpp thrift)
 set(testgencpp_cob_SOURCES
     gen-cpp/ChildService.cpp
     gen-cpp/ChildService.h
+    gen-cpp/EmptyService.cpp
+    gen-cpp/EmptyService.h
     gen-cpp/ParentService.cpp
     gen-cpp/ParentService.h
     gen-cpp/proc_types.cpp
@@ -71,6 +73,7 @@ set(UnitTest_SOURCES
     ToStringTest.cpp
     TypedefTest.cpp
     TServerSocketTest.cpp
+    TServerTransportTest.cpp
 )
 
 if(NOT WITH_BOOSTTHREADS AND NOT WITH_STDTHREADS)
@@ -81,6 +84,26 @@ add_executable(UnitTests ${UnitTest_SOURCES})
 target_link_libraries(UnitTests testgencpp thrift ${Boost_LIBRARIES})
 add_test(NAME UnitTests COMMAND UnitTests)
 
+add_executable(TSocketInterruptTest TSocketInterruptTest.cpp)
+target_link_libraries(TSocketInterruptTest
+    testgencpp
+    ${Boost_LIBRARIES}
+    #-lrt
+)
+if (NOT MSVC)
+target_link_libraries(TSocketInterruptTest -lrt)
+endif ()
+add_test(NAME TSocketInterruptTest COMMAND TSocketInterruptTest)
+
+add_executable(TServerIntegrationTest TServerIntegrationTest.cpp)
+target_link_libraries(TServerIntegrationTest
+    testgencpp_cob
+    ${Boost_LIBRARIES}
+)
+if (NOT MSVC)
+target_link_libraries(TServerIntegrationTest -lrt)
+endif ()
+add_test(NAME TServerIntegrationTest COMMAND TServerIntegrationTest)
 
 if(WITH_ZLIB)
 add_executable(TransportTest TransportTest.cpp)
@@ -255,7 +278,7 @@ endif()
 #
 
 
-add_custom_command(OUTPUT gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h
+add_custom_command(OUTPUT gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h gen-cpp/EmptyService.cpp  gen-cpp/EmptyService.h
     COMMAND thrift-compiler --gen cpp:dense ${PROJECT_SOURCE_DIR}/test/DebugProtoTest.thrift
 )
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/test/Makefile.am
----------------------------------------------------------------------
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 46ff911..0cd1c67 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -25,6 +25,7 @@ BUILT_SOURCES = gen-cpp/DebugProtoTest_types.h \
                 gen-cpp/ThriftTest_types.h \
                 gen-cpp/TypedefTest_types.h \
                 gen-cpp/ChildService.h \
+                gen-cpp/EmptyService.h \
                 gen-cpp/ParentService.h \
                 gen-cpp/proc_types.h
 
@@ -50,6 +51,8 @@ nodist_libtestgencpp_la_SOURCES = \
 nodist_libprocessortest_la_SOURCES = \
 	gen-cpp/ChildService.cpp \
 	gen-cpp/ChildService.h \
+	gen-cpp/EmptyService.cpp \
+	gen-cpp/EmptyService.h \
 	gen-cpp/ParentService.cpp \
 	gen-cpp/ParentService.h \
 	gen-cpp/proc_types.cpp \
@@ -79,6 +82,8 @@ check_PROGRAMS = \
 	SpecializationTest \
 	AllProtocolsTest \
 	TransportTest \
+	TSocketInterruptTest \
+	TServerIntegrationTest \
 	ZlibTest \
 	TFileTransportTest \
 	link_test \
@@ -107,17 +112,38 @@ UnitTests_SOURCES = \
 	Base64Test.cpp \
 	ToStringTest.cpp \
 	TypedefTest.cpp \
-        TServerSocketTest.cpp
+	TServerSocketTest.cpp \
+	TServerTransportTest.cpp
 
 if !WITH_BOOSTTHREADS
 UnitTests_SOURCES += \
-        RWMutexStarveTest.cpp
+    RWMutexStarveTest.cpp
 endif
 
 UnitTests_LDADD = \
   libtestgencpp.la \
   $(BOOST_TEST_LDADD)
 
+TSocketInterruptTest_SOURCES = \
+	TSocketInterruptTest.cpp
+
+TSocketInterruptTest_LDADD = \
+  libtestgencpp.la \
+  $(BOOST_TEST_LDADD) \
+  $(BOOST_CHRONO_LDADD) \
+  $(BOOST_SYSTEM_LDADD) \
+  $(BOOST_THREAD_LDADD)
+
+TServerIntegrationTest_SOURCES = \
+	TServerIntegrationTest.cpp
+
+TServerIntegrationTest_LDADD = \
+  libtestgencpp.la \
+  libprocessortest.la \
+  $(BOOST_TEST_LDADD) \
+  $(BOOST_SYSTEM_LDADD) \
+  $(BOOST_THREAD_LDADD)
+
 TransportTest_SOURCES = \
 	TransportTest.cpp
 
@@ -273,7 +299,7 @@ OpenSSLManualInitTest_LDADD = \
 #
 THRIFT = $(top_builddir)/compiler/cpp/thrift
 
-gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h: $(top_srcdir)/test/DebugProtoTest.thrift
+gen-cpp/DebugProtoTest_types.cpp gen-cpp/DebugProtoTest_types.h gen-cpp/EmptyService.cpp gen-cpp/EmptyService.h: $(top_srcdir)/test/DebugProtoTest.thrift
 	$(THRIFT) --gen cpp:dense $<
 
 gen-cpp/EnumTest_types.cpp gen-cpp/EnumTest_types.h: $(top_srcdir)/test/EnumTest.thrift

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/test/TServerIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
new file mode 100644
index 0000000..9edeb19
--- /dev/null
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define BOOST_TEST_MODULE TServerIntegrationTest
+#include <boost/test/auto_unit_test.hpp>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <thrift/server/TThreadedServer.h>
+#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/transport/TServerSocket.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TTransport.h>
+#include "gen-cpp/ParentService.h"
+#include "TestPortFixture.h"
+#include <vector>
+
+using apache::thrift::concurrency::Guard;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Synchronized;
+using apache::thrift::protocol::TBinaryProtocol;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::transport::TServerSocket;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportFactory;
+using apache::thrift::server::TServerEventHandler;
+using apache::thrift::server::TThreadedServer;
+using apache::thrift::test::ParentServiceClient;
+using apache::thrift::test::ParentServiceIf;
+using apache::thrift::test::ParentServiceProcessor;
+
+/**
+ * preServe runs after listen() is successful, when we can connect
+ */
+class TServerReadyEventHandler : public TServerEventHandler, public Monitor
+{
+public:
+  TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
+  virtual ~TServerReadyEventHandler() {}
+  virtual void preServe() {
+    Synchronized sync(*this);
+    isListening_ = true;
+    notify();
+  }
+  virtual void* createContext(boost::shared_ptr<TProtocol> input,
+                              boost::shared_ptr<TProtocol> output) {
+    Synchronized sync(*this);
+    ++accepted_;
+    notify();
+
+    (void)input;
+    (void)output;
+    return NULL;
+  }
+  bool isListening() const { return isListening_; }
+  uint64_t acceptedCount() const { return accepted_; }
+private:
+  bool isListening_;
+  uint64_t accepted_;
+};
+
+class ParentHandler : virtual public ParentServiceIf {
+public:
+  ParentHandler() : generation_(0) {}
+
+  int32_t incrementGeneration() {
+    Guard g(mutex_);
+    return ++generation_;
+  }
+
+  int32_t getGeneration() {
+    Guard g(mutex_);
+    return generation_;
+  }
+
+  void addString(const std::string& s) {
+    Guard g(mutex_);
+    strings_.push_back(s);
+  }
+
+  void getStrings(std::vector<std::string>& _return) {
+    Guard g(mutex_);
+    _return = strings_;
+  }
+
+  void getDataWait(std::string& _return, int32_t length) {
+  }
+
+  void onewayWait() {
+  }
+
+  void exceptionWait(const std::string& message) {
+  }
+
+  void unexpectedExceptionWait(const std::string& message) {
+  }
+
+protected:
+  Mutex mutex_;
+  int32_t generation_;
+  std::vector<std::string> strings_;
+};
+
+class TServerIntegrationTestFixture : public TestPortFixture
+{
+public:
+  TServerIntegrationTestFixture() :
+      pServer(new TThreadedServer(
+                    boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
+                            boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
+                    boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
+                    boost::shared_ptr<TTransportFactory>(new TTransportFactory),
+                    boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
+      pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler))
+  {
+    pServer->setServerEventHandler(pEventHandler);
+  }
+
+  void startServer() {
+    pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
+
+    // block until listen() completes so clients will be able to connect
+    Synchronized sync(*(pEventHandler.get()));
+    while (!pEventHandler->isListening()) {
+        pEventHandler->wait();
+    }
+
+    BOOST_MESSAGE("server is listening");
+  }
+
+  void blockUntilAccepted(uint64_t numAccepted) {
+    Synchronized sync(*(pEventHandler.get()));
+    while (pEventHandler->acceptedCount() < numAccepted) {
+        pEventHandler->wait();
+    }
+
+    BOOST_MESSAGE(boost::format("server has accepted %1%") % numAccepted);
+  }
+
+  void stopServer() {
+    pServer->stop();
+    BOOST_MESSAGE("server stop completed");
+    pServerThread->join();
+    BOOST_MESSAGE("server thread joined");
+  }
+
+  ~TServerIntegrationTestFixture() {
+    stopServer();
+  }
+
+  void delayClose(boost::shared_ptr<TTransport> toClose) {
+    boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+    toClose->close();
+  }
+
+  boost::shared_ptr<TThreadedServer> pServer;
+  boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
+  boost::shared_ptr<boost::thread> pServerThread;
+};
+
+BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
+
+BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
+{
+    // this test establishes some basic sanity
+
+    startServer();
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+    boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
+    ParentServiceClient client1(pClientProtocol1);
+    pClientSock1->open();
+    client1.incrementGeneration();
+    pClientSock1->close();
+    stopServer();
+}
+
+BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
+{
+    // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
+
+    startServer();
+
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+    pClientSock1->open();
+
+    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+    pClientSock2->open();
+
+    // Ensure they have been accepted
+    blockUntilAccepted(2);
+
+    // The test fixture destructor will force the sockets to disconnect
+    // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
+    stopServer();
+
+    // extra proof the server end disconnected the clients
+    uint8_t buf[1];
+    BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1));   // 0 = disconnected
+    BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1));   // 0 = disconnected
+    pClientSock1->close();
+    pClientSock2->close();
+}
+
+BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
+{
+    // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
+    // disconnect.
+
+    boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
+            setInterruptableChildren(false);    // returns to pre-THRIFT-2441 behavior
+    startServer();
+
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+    pClientSock1->open();
+
+    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+    pClientSock2->open();
+
+    // Ensure they have been accepted
+    blockUntilAccepted(2);
+
+    boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
+    boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
+
+    // Once the clients disconnect the server will stop
+    stopServer();
+
+    pClientSock1->close();
+    pClientSock2->close();
+}
+BOOST_AUTO_TEST_SUITE_END()

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/test/TServerSocketTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/TServerSocketTest.cpp b/lib/cpp/test/TServerSocketTest.cpp
index eee7c26..65f99f9 100644
--- a/lib/cpp/test/TServerSocketTest.cpp
+++ b/lib/cpp/test/TServerSocketTest.cpp
@@ -22,6 +22,7 @@
 #include <thrift/transport/TServerSocket.h>
 #include "TestPortFixture.h"
 #include "TTransportCheckThrow.h"
+#include <iostream>
 
 using apache::thrift::transport::TServerSocket;
 using apache::thrift::transport::TSocket;
@@ -30,24 +31,18 @@ using apache::thrift::transport::TTransportException;
 
 BOOST_FIXTURE_TEST_SUITE ( TServerSocketTest, TestPortFixture )
 
-class TestTServerSocket : public TServerSocket
-{
-  public:
-    TestTServerSocket(const std::string& address, int port) : TServerSocket(address, port) { }
-    using TServerSocket::acceptImpl;
-};
-
 BOOST_AUTO_TEST_CASE( test_bind_to_address )
 {
-    TestTServerSocket sock1("localhost", m_serverPort);
+    TServerSocket sock1("localhost", m_serverPort);
     sock1.listen();
     TSocket clientSock("localhost", m_serverPort);
     clientSock.open();
-    boost::shared_ptr<TTransport> accepted = sock1.acceptImpl();
+    boost::shared_ptr<TTransport> accepted = sock1.accept();
     accepted->close();
     sock1.close();
 
-    TServerSocket sock2("this.is.truly.an.unrecognizable.address.", m_serverPort);
+    std::cout << "An error message from getaddrinfo on the console is expected:" << std::endl;
+    TServerSocket sock2("257.258.259.260", m_serverPort);
     BOOST_CHECK_THROW(sock2.listen(), TTransportException);
     sock2.close();
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/test/TServerTransportTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/TServerTransportTest.cpp b/lib/cpp/test/TServerTransportTest.cpp
new file mode 100644
index 0000000..09b2c59
--- /dev/null
+++ b/lib/cpp/test/TServerTransportTest.cpp
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <boost/test/auto_unit_test.hpp>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TServerTransport.h>
+#include "TestPortFixture.h"
+
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+
+BOOST_AUTO_TEST_SUITE ( TServerTransportTest )
+
+class TestTTransport : public TTransport
+{
+};
+
+class TestTServerTransport : public TServerTransport
+{
+public:
+    TestTServerTransport() : valid_(true) {}
+    void close() {}
+    bool valid_;
+protected:
+    boost::shared_ptr<TTransport> acceptImpl()
+    {
+        return valid_ ? boost::shared_ptr<TestTTransport>(new TestTTransport) : boost::shared_ptr<TestTTransport>();
+    }
+};
+
+BOOST_AUTO_TEST_CASE( test_positive_accept )
+{
+    TestTServerTransport uut;
+    BOOST_CHECK(uut.accept());
+}
+
+BOOST_AUTO_TEST_CASE( test_negative_accept )
+{
+    TestTServerTransport uut;
+    uut.valid_ = false;
+    BOOST_CHECK_THROW(uut.accept(), TTransportException);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+

http://git-wip-us.apache.org/repos/asf/thrift/blob/1684c429/lib/cpp/test/TSocketInterruptTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/TSocketInterruptTest.cpp b/lib/cpp/test/TSocketInterruptTest.cpp
new file mode 100644
index 0000000..4f6b2bc
--- /dev/null
+++ b/lib/cpp/test/TSocketInterruptTest.cpp
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define BOOST_TEST_MODULE TSocketInterruptTest
+#include <boost/test/auto_unit_test.hpp>
+
+#include <boost/bind.hpp>
+#include <boost/chrono/duration.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/thread/thread.hpp>
+#include <thrift/transport/TSocket.h>
+#include <thrift/transport/TServerSocket.h>
+#include "TestPortFixture.h"
+
+using apache::thrift::transport::TServerSocket;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+
+BOOST_FIXTURE_TEST_SUITE ( TSocketInterruptTest, TestPortFixture )
+
+void readerWorker(boost::shared_ptr<TTransport> tt, uint32_t expectedResult)
+{
+    uint8_t buf[4];
+    BOOST_CHECK_EQUAL(expectedResult, tt->read(buf, 4));
+}
+
+void readerWorkerMustThrow(boost::shared_ptr<TTransport> tt)
+{
+    try
+    {
+        uint8_t buf[4];
+        tt->read(buf, 4);
+        BOOST_ERROR("should not have gotten here");
+    }
+    catch (const TTransportException& tx)
+    {
+        BOOST_CHECK_EQUAL(TTransportException::INTERRUPTED, tx.getType());
+    }
+}
+
+BOOST_AUTO_TEST_CASE( test_interruptable_child_read )
+{
+    TServerSocket sock1("localhost", m_serverPort);
+    sock1.listen();
+    TSocket clientSock("localhost", m_serverPort);
+    clientSock.open();
+    boost::shared_ptr<TTransport> accepted = sock1.accept();
+    boost::thread readThread(boost::bind(readerWorkerMustThrow, accepted));
+    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    // readThread is practically guaranteed to be blocking now
+    sock1.interruptChildren();
+    BOOST_CHECK_MESSAGE(readThread.try_join_for(boost::chrono::milliseconds(200)),
+        "server socket interruptChildren did not interrupt child read");
+    clientSock.close();
+    accepted->close();
+    sock1.close();
+}
+
+BOOST_AUTO_TEST_CASE( test_non_interruptable_child_read )
+{
+    TServerSocket sock1("localhost", m_serverPort);
+    sock1.setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+    sock1.listen();
+    TSocket clientSock("localhost", m_serverPort);
+    clientSock.open();
+    boost::shared_ptr<TTransport> accepted = sock1.accept();
+    boost::thread readThread(boost::bind(readerWorker, accepted, 0));
+    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    // readThread is practically guaranteed to be blocking here
+    sock1.interruptChildren();
+    BOOST_CHECK_MESSAGE(!readThread.try_join_for(boost::chrono::milliseconds(200)),
+        "server socket interruptChildren interrupted child read");
+
+    // only way to proceed is to have the client disconnect
+    clientSock.close();
+    readThread.join();
+    accepted->close();
+    sock1.close();
+}
+
+BOOST_AUTO_TEST_CASE( test_cannot_change_after_listen )
+{
+    TServerSocket sock1("localhost", m_serverPort);
+    sock1.listen();
+    BOOST_CHECK_THROW(sock1.setInterruptableChildren(false), std::logic_error);
+    sock1.close();
+}
+
+void peekerWorker(boost::shared_ptr<TTransport> tt, bool expectedResult)
+{
+    BOOST_CHECK_EQUAL(expectedResult, tt->peek());
+}
+
+BOOST_AUTO_TEST_CASE( test_interruptable_child_peek )
+{
+    TServerSocket sock1("localhost", m_serverPort);
+    sock1.listen();
+    TSocket clientSock("localhost", m_serverPort);
+    clientSock.open();
+    boost::shared_ptr<TTransport> accepted = sock1.accept();
+    // peek() will return false if child is interrupted
+    boost::thread peekThread(boost::bind(peekerWorker, accepted, false));
+    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    // peekThread is practically guaranteed to be blocking now
+    sock1.interruptChildren();
+    BOOST_CHECK_MESSAGE(peekThread.try_join_for(boost::chrono::milliseconds(200)),
+        "server socket interruptChildren did not interrupt child peek");
+    clientSock.close();
+    accepted->close();
+    sock1.close();
+}
+
+BOOST_AUTO_TEST_CASE( test_non_interruptable_child_peek )
+{
+    TServerSocket sock1("localhost", m_serverPort);
+    sock1.setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+    sock1.listen();
+    TSocket clientSock("localhost", m_serverPort);
+    clientSock.open();
+    boost::shared_ptr<TTransport> accepted = sock1.accept();
+    // peek() will return false when remote side is closed
+    boost::thread peekThread(boost::bind(peekerWorker, accepted, false));
+    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
+    // peekThread is practically guaranteed to be blocking now
+    sock1.interruptChildren();
+    BOOST_CHECK_MESSAGE(!peekThread.try_join_for(boost::chrono::milliseconds(200)),
+        "server socket interruptChildren interrupted child peek");
+
+    // only way to proceed is to have the client disconnect
+    clientSock.close();
+    peekThread.join();
+    accepted->close();
+    sock1.close();
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+