You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by ro...@apache.org on 2015/04/30 19:59:44 UTC

thrift git commit: THRIFT-3084 add optional concurrent client limit enforcement to lib/cpp threaded servers

Repository: thrift
Updated Branches:
  refs/heads/master 4bf9399ca -> 79c9911b8


THRIFT-3084 add optional concurrent client limit enforcement to lib/cpp threaded servers


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/79c9911b
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/79c9911b
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/79c9911b

Branch: refs/heads/master
Commit: 79c9911b8780d1f9d7c2c17623d269f0671d1723
Parents: 4bf9399
Author: Jim King <ji...@simplivity.com>
Authored: Thu Apr 30 07:10:08 2015 -0400
Committer: Roger Meier <ro...@apache.org>
Committed: Thu Apr 30 19:48:15 2015 +0200

----------------------------------------------------------------------
 lib/c_glib/test/testthrifttestclient.cpp        |   2 +
 lib/cpp/CMakeLists.txt                          |   6 +-
 lib/cpp/Makefile.am                             |   8 +-
 lib/cpp/src/thrift/protocol/TDenseProtocol.cpp  |   1 -
 lib/cpp/src/thrift/server/TServerFramework.cpp  |  78 ++++++++-
 lib/cpp/src/thrift/server/TServerFramework.h    |  60 +++++++
 lib/cpp/src/thrift/server/TSimpleServer.cpp     |  23 ++-
 lib/cpp/src/thrift/server/TSimpleServer.h       |   3 +
 lib/cpp/src/thrift/server/TThreadPoolServer.cpp |   4 +
 lib/cpp/src/thrift/server/TThreadPoolServer.h   |  14 +-
 lib/cpp/src/thrift/server/TThreadedServer.cpp   |  23 +--
 lib/cpp/src/thrift/server/TThreadedServer.h     |   3 -
 lib/cpp/test/Makefile.am                        |   2 +-
 lib/cpp/test/TServerIntegrationTest.cpp         | 174 +++++++++++++++----
 lib/cpp/test/ZlibTest.cpp                       |   1 -
 15 files changed, 328 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/c_glib/test/testthrifttestclient.cpp
----------------------------------------------------------------------
diff --git a/lib/c_glib/test/testthrifttestclient.cpp b/lib/c_glib/test/testthrifttestclient.cpp
index 4f7bc08..d387396 100755
--- a/lib/c_glib/test/testthrifttestclient.cpp
+++ b/lib/c_glib/test/testthrifttestclient.cpp
@@ -317,6 +317,8 @@ class TestHandler : public ThriftTestIf {
 // C CLIENT
 extern "C" {
 
+#undef THRIFT_SOCKET /* from lib/cpp */
+
 #include "t_test_thrift_test.h"
 #include "t_test_thrift_test_types.h"
 #include <thrift/c_glib/transport/thrift_socket.h>

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt
index 8ea0546..b444c35 100755
--- a/lib/cpp/CMakeLists.txt
+++ b/lib/cpp/CMakeLists.txt
@@ -35,9 +35,11 @@ set( thriftcpp_SOURCES
    src/thrift/Thrift.cpp
    src/thrift/TApplicationException.cpp
    src/thrift/VirtualProfiling.cpp
+   src/thrift/async/TAsyncChannel.cpp
    src/thrift/concurrency/ThreadManager.cpp
    src/thrift/concurrency/TimerManager.cpp
    src/thrift/concurrency/Util.cpp
+   src/thrift/processor/PeekProcessor.cpp
    src/thrift/protocol/TDebugProtocol.cpp
    src/thrift/protocol/TDenseProtocol.cpp
    src/thrift/protocol/TJSONProtocol.cpp
@@ -60,8 +62,6 @@ set( thriftcpp_SOURCES
    src/thrift/server/TSimpleServer.cpp
    src/thrift/server/TThreadPoolServer.cpp
    src/thrift/server/TThreadedServer.cpp
-   src/thrift/async/TAsyncChannel.cpp
-   src/thrift/processor/PeekProcessor.cpp
 )
 
 # This files don't work on Windows CE as there is no pipe support
@@ -185,6 +185,8 @@ if(MSVC)
     add_definitions("-DUNICODE -D_UNICODE")
 endif()
 
+add_definitions("-D__STDC_LIMIT_MACROS")
+
 # Install the headers
 install(DIRECTORY "src/thrift" DESTINATION "${INCLUDE_INSTALL_DIR}"
     FILES_MATCHING PATTERN "*.h" PATTERN "*.tcc")

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/Makefile.am
----------------------------------------------------------------------
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 28ff7c8..0de8dc7 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -57,7 +57,7 @@ pkgconfig_DATA += thrift-qt5.pc
 endif
 
 AM_CXXFLAGS = -Wall -Wextra -pedantic
-AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(OPENSSL_INCLUDES) -I$(srcdir)/src
+AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(OPENSSL_INCLUDES) -I$(srcdir)/src -D__STDC_LIMIT_MACROS
 AM_LDFLAGS = $(BOOST_LDFLAGS) $(OPENSSL_LDFLAGS)
 
 # Define the source files for the module
@@ -65,9 +65,11 @@ AM_LDFLAGS = $(BOOST_LDFLAGS) $(OPENSSL_LDFLAGS)
 libthrift_la_SOURCES = src/thrift/Thrift.cpp \
                        src/thrift/TApplicationException.cpp \
                        src/thrift/VirtualProfiling.cpp \
+                       src/thrift/async/TAsyncChannel.cpp \
                        src/thrift/concurrency/ThreadManager.cpp \
                        src/thrift/concurrency/TimerManager.cpp \
                        src/thrift/concurrency/Util.cpp \
+                       src/thrift/processor/PeekProcessor.cpp \
                        src/thrift/protocol/TDebugProtocol.cpp \
                        src/thrift/protocol/TDenseProtocol.cpp \
                        src/thrift/protocol/TJSONProtocol.cpp \
@@ -94,9 +96,7 @@ libthrift_la_SOURCES = src/thrift/Thrift.cpp \
                        src/thrift/server/TServerFramework.cpp \
                        src/thrift/server/TSimpleServer.cpp \
                        src/thrift/server/TThreadPoolServer.cpp \
-                       src/thrift/server/TThreadedServer.cpp \
-                       src/thrift/async/TAsyncChannel.cpp \
-                       src/thrift/processor/PeekProcessor.cpp
+                       src/thrift/server/TThreadedServer.cpp
 
 if WITH_BOOSTTHREADS
 libthrift_la_SOURCES += src/thrift/concurrency/BoostThreadFactory.cpp \

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
index 583b630..259c68e 100644
--- a/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
+++ b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
@@ -87,7 +87,6 @@ Optional fields are a little tricky also.  We write a zero byte if they are
 absent and prefix them with an 0x01 byte if they are present
 */
 
-#define __STDC_LIMIT_MACROS
 #include <stdint.h>
 #include <thrift/protocol/TDenseProtocol.h>
 #include <thrift/TReflectionLocal.h>

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TServerFramework.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp
index 8adb29a..36dab5b 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.cpp
+++ b/lib/cpp/src/thrift/server/TServerFramework.cpp
@@ -18,12 +18,15 @@
  */
 
 #include <boost/bind.hpp>
+#include <stdexcept>
+#include <stdint.h>
 #include <thrift/server/TServerFramework.h>
 
 namespace apache {
 namespace thrift {
 namespace server {
 
+using apache::thrift::concurrency::Synchronized;
 using apache::thrift::transport::TServerTransport;
 using apache::thrift::transport::TTransport;
 using apache::thrift::transport::TTransportException;
@@ -39,14 +42,20 @@ TServerFramework::TServerFramework(
         const shared_ptr<TServerTransport>& serverTransport,
         const shared_ptr<TTransportFactory>& transportFactory,
         const shared_ptr<TProtocolFactory>& protocolFactory)
-  : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {}
+  : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
+    clients_(0),
+    hwm_(0),
+    limit_(INT64_MAX) {}
 
 TServerFramework::TServerFramework(
         const shared_ptr<TProcessor>& processor,
         const shared_ptr<TServerTransport>& serverTransport,
         const shared_ptr<TTransportFactory>& transportFactory,
         const shared_ptr<TProtocolFactory>& protocolFactory)
-  : TServer(processor, serverTransport, transportFactory, protocolFactory) {}
+  : TServer(processor, serverTransport, transportFactory, protocolFactory),
+    clients_(0),
+    hwm_(0),
+    limit_(INT64_MAX) {}
 
 TServerFramework::TServerFramework(
         const shared_ptr<TProcessorFactory>& processorFactory,
@@ -57,7 +66,10 @@ TServerFramework::TServerFramework(
         const shared_ptr<TProtocolFactory>& outputProtocolFactory)
   : TServer(processorFactory, serverTransport,
             inputTransportFactory, outputTransportFactory,
-            inputProtocolFactory, outputProtocolFactory) {}
+            inputProtocolFactory, outputProtocolFactory),
+    clients_(0),
+    hwm_(0),
+    limit_(INT64_MAX) {}
 
 TServerFramework::TServerFramework(
         const shared_ptr<TProcessor>& processor,
@@ -68,7 +80,10 @@ TServerFramework::TServerFramework(
         const shared_ptr<TProtocolFactory>& outputProtocolFactory)
   : TServer(processor, serverTransport,
             inputTransportFactory, outputTransportFactory,
-            inputProtocolFactory, outputProtocolFactory) {}
+            inputProtocolFactory, outputProtocolFactory),
+    clients_(0),
+    hwm_(0),
+    limit_(INT64_MAX) {}
 
 TServerFramework::~TServerFramework() {}
 
@@ -111,6 +126,16 @@ void TServerFramework::serve() {
       inputTransport.reset();
       client.reset();
 
+      // If we have reached the limit on the number of concurrent
+      // clients allowed, wait for one or more clients to drain before
+      // accepting another.
+      {
+          Synchronized sync(mon_);
+          while (clients_ >= limit_) {
+              mon_.wait();
+          }
+      }
+
       client = serverTransport_->accept();
 
       inputTransport = inputTransportFactory_->getTransport(client);
@@ -118,11 +143,12 @@ void TServerFramework::serve() {
       inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
       outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
 
-      onClientConnected(
+      newlyConnectedClient(
               shared_ptr<TConnectedClient>(
                       new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
                                            inputProtocol, outputProtocol, eventHandler_, client),
                       bind(&TServerFramework::disposeConnectedClient, this, _1)));
+
     } catch (TTransportException& ttx) {
       releaseOneDescriptor("inputTransport", inputTransport);
       releaseOneDescriptor("outputTransport", outputTransport);
@@ -147,12 +173,54 @@ void TServerFramework::serve() {
   releaseOneDescriptor("serverTransport", serverTransport_);
 }
 
+int64_t TServerFramework::getConcurrentClientLimit() const {
+  Synchronized sync(mon_);
+  return limit_;
+}
+
+int64_t TServerFramework::getConcurrentClientCount() const {
+  Synchronized sync(mon_);
+  return clients_;
+}
+
+int64_t TServerFramework::getConcurrentClientCountHWM() const {
+  Synchronized sync(mon_);
+  return hwm_;
+}
+
+void TServerFramework::setConcurrentClientLimit(int64_t newLimit) {
+  if (newLimit < 1) {
+    throw std::invalid_argument("newLimit must be greater than zero");
+  }
+  Synchronized sync(mon_);
+  limit_ = newLimit;
+  if (limit_ - clients_ > 0) {
+    mon_.notify();
+  }
+}
+
 void TServerFramework::stop() {
   serverTransport_->interrupt();
   serverTransport_->interruptChildren();
 }
 
+void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
+  onClientConnected(pClient);
+
+  // Count a concurrent client added.
+  Synchronized sync(mon_);
+  ++clients_;
+  hwm_ = std::max(hwm_, clients_);
+}
+
 void TServerFramework::disposeConnectedClient(TConnectedClient *pClient) {
+  {
+    // Count a concurrent client removed.
+    Synchronized sync(mon_);
+    if (limit_ - --clients_ > 0) {
+      mon_.notify();
+    }
+  }
   onClientDisconnected(pClient);
   delete pClient;
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TServerFramework.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TServerFramework.h b/lib/cpp/src/thrift/server/TServerFramework.h
index 67d5420..3f16dd1 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.h
+++ b/lib/cpp/src/thrift/server/TServerFramework.h
@@ -21,7 +21,9 @@
 #define _THRIFT_SERVER_TSERVERFRAMEWORK_H_ 1
 
 #include <boost/shared_ptr.hpp>
+#include <stdint.h>
 #include <thrift/TProcessor.h>
+#include <thrift/concurrency/Monitor.h>
 #include <thrift/server/TConnectedClient.h>
 #include <thrift/server/TServer.h>
 #include <thrift/transport/TServerTransport.h>
@@ -89,6 +91,36 @@ public:
    */
   virtual void stop();
 
+  /**
+   * Get the concurrent client limit.
+   * \returns the concurrent client limit
+   */
+  virtual int64_t getConcurrentClientLimit() const;
+
+  /**
+   * Get the number of currently connected clients.
+   * \returns the number of currently connected clients
+   */
+  virtual int64_t getConcurrentClientCount() const;
+
+  /**
+   * Get the highest number of concurrent clients.
+   * \returns the highest number of concurrent clients
+   */
+  virtual int64_t getConcurrentClientCountHWM() const;
+
+  /**
+   * Set the concurrent client limit.  This can be changed while
+   * the server is serving however it will not necessarily be
+   * enforced until the next client is accepted and added.  If the
+   * limit is lowered below the number of connected clients, no
+   * action is taken to disconnect the clients.
+   * The default value used if this is not called is INT64_MAX.
+   * \param[in]  newLimit  the new limit of concurrent clients
+   * \throws std::invalid_argument if newLimit is less than 1
+   */
+  virtual void setConcurrentClientLimit(int64_t newLimit);
+
 protected:
   /**
    * A client has connected.  The implementation is responsible for storing
@@ -102,6 +134,7 @@ protected:
 
   /**
    * A client has disconnected.
+   * The server no longer tracks the client.
    * The client TTransport has already been closed.
    * The implementation must not delete the pointer.
    *
@@ -111,10 +144,37 @@ protected:
 
 private:
   /**
+   * Common handling for new connected clients.  Implements concurrent
+   * client rate limiting after onClientConnected returns by blocking the
+   * serve() thread if the limit has been reached.
+   */
+  void newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient);
+
+  /**
    * Smart pointer client deletion.
    * Calls onClientDisconnected and then deletes pClient.
    */
   void disposeConnectedClient(TConnectedClient *pClient);
+
+  /**
+   * Monitor for limiting the number of concurrent clients.
+   */
+  apache::thrift::concurrency::Monitor mon_;
+
+  /**
+   * The number of concurrent clients.
+   */
+  int64_t clients_;
+
+  /**
+   * The high water mark of concurrent clients.
+   */
+  int64_t hwm_;
+
+  /**
+   * The limit on the number of concurrent clients.
+   */
+  int64_t limit_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TSimpleServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp
index a133c0d..adcedc8 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.cpp
+++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp
@@ -38,7 +38,9 @@ TSimpleServer::TSimpleServer(
         const shared_ptr<TTransportFactory>& transportFactory,
         const shared_ptr<TProtocolFactory>& protocolFactory)
   : TServerFramework(processorFactory, serverTransport,
-                     transportFactory, protocolFactory) {}
+                     transportFactory, protocolFactory) {
+  TServerFramework::setConcurrentClientLimit(1);
+}
 
 TSimpleServer::TSimpleServer(
         const shared_ptr<TProcessor>& processor,
@@ -46,7 +48,9 @@ TSimpleServer::TSimpleServer(
         const shared_ptr<TTransportFactory>& transportFactory,
         const shared_ptr<TProtocolFactory>& protocolFactory)
   : TServerFramework(processor, serverTransport,
-                     transportFactory, protocolFactory) {}
+                     transportFactory, protocolFactory) {
+  TServerFramework::setConcurrentClientLimit(1);
+}
 
 TSimpleServer::TSimpleServer(
         const shared_ptr<TProcessorFactory>& processorFactory,
@@ -57,7 +61,9 @@ TSimpleServer::TSimpleServer(
         const shared_ptr<TProtocolFactory>& outputProtocolFactory)
   : TServerFramework(processorFactory, serverTransport,
           inputTransportFactory, outputTransportFactory,
-          inputProtocolFactory, outputProtocolFactory) {}
+          inputProtocolFactory, outputProtocolFactory) {
+  TServerFramework::setConcurrentClientLimit(1);
+}
 
 TSimpleServer::TSimpleServer(
         const shared_ptr<TProcessor>& processor,
@@ -68,7 +74,9 @@ TSimpleServer::TSimpleServer(
         const shared_ptr<TProtocolFactory>& outputProtocolFactory)
   : TServerFramework(processor, serverTransport,
           inputTransportFactory, outputTransportFactory,
-          inputProtocolFactory, outputProtocolFactory) {}
+          inputProtocolFactory, outputProtocolFactory) {
+  TServerFramework::setConcurrentClientLimit(1);
+}
 
 TSimpleServer::~TSimpleServer() {}
 
@@ -86,6 +94,13 @@ void TSimpleServer::onClientConnected(const shared_ptr<TConnectedClient>& pClien
  */
 void TSimpleServer::onClientDisconnected(TConnectedClient *pClient) {}
 
+/**
+ * This makes little sense to the simple server because it is not capable
+ * of having more than one client at a time, so we hide it.
+ */
+void TSimpleServer::setConcurrentClientLimit(int64_t newLimit) {}
+
+
 }
 }
 } // apache::thrift::server

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TSimpleServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h
index 51b00e4..30d5046 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.h
+++ b/lib/cpp/src/thrift/server/TSimpleServer.h
@@ -62,6 +62,9 @@ public:
 protected:
   virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
   virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;
+
+private:
+  void setConcurrentClientLimit(int64_t newLimit);  // hide
 };
 
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index a5f8c76..5b9b01d 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -112,6 +112,10 @@ void TThreadPoolServer::setTaskExpiration(int64_t value) {
   taskExpiration_ = value;
 }
 
+boost::shared_ptr<apache::thrift::concurrency::ThreadManager> TThreadPoolServer::getThreadManager() const {
+  return threadManager_;
+}
+
 void TThreadPoolServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
   threadManager_->add(pClient, timeout_, taskExpiration_);
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadPoolServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h
index 29e9aaf..267dbad 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.h
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h
@@ -37,14 +37,16 @@ public:
           const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
           const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
           const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
-          const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+          const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+                  apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
 
   TThreadPoolServer(
           const boost::shared_ptr<apache::thrift::TProcessor>& processor,
           const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
           const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
           const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
-          const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+          const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+                  apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
 
   TThreadPoolServer(
           const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
@@ -53,7 +55,8 @@ public:
           const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
           const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
           const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
-          const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+          const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+                  apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
 
   TThreadPoolServer(
           const boost::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -62,7 +65,8 @@ public:
           const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
           const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
           const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
-          const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+          const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+                  apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
 
   virtual ~TThreadPoolServer();
 
@@ -78,6 +82,8 @@ public:
   virtual int64_t getTaskExpiration() const;
   virtual void setTaskExpiration(int64_t value);
 
+  virtual boost::shared_ptr<apache::thrift::concurrency::ThreadManager> getThreadManager() const;
+
 protected:
   virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
   virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadedServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 440cede..b0b22c3 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -89,7 +89,7 @@ void TThreadedServer::serve() {
   // Drain all clients - no more will arrive
   try {
     Synchronized s(clientsMonitor_);
-    while (!clients_.empty()) {
+    while (getConcurrentClientCount() > 0) {
         clientsMonitor_.wait();
     }
   } catch (TException& tx) {
@@ -98,27 +98,14 @@ void TThreadedServer::serve() {
   }
 }
 
-void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient)
-{
-  // Create a thread for this client
-  shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient));
-
-  // Insert thread into the set of threads
-  {
-    Synchronized s(clientsMonitor_);
-    clients_.insert(pClient.get());
-  }
-
-  // Start the thread!
-  thread->start();
+void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
+  threadFactory_->newThread(pClient)->start();
 }
 
 void TThreadedServer::onClientDisconnected(TConnectedClient *pClient) {
-  // Remove this task from parent bookkeeping
   Synchronized s(clientsMonitor_);
-  clients_.erase(pClient);
-  if (clients_.empty()) {
-      clientsMonitor_.notify();
+  if (getConcurrentClientCount() == 0) {
+    clientsMonitor_.notify();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadedServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index 7b66f1d..21b6a28 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -29,8 +29,6 @@ namespace apache {
 namespace thrift {
 namespace server {
 
-#define THRIFT_DEFAULT_THREAD_FACTORY
-
 /**
  * Manage clients using a thread pool.
  */
@@ -86,7 +84,6 @@ protected:
 
   boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
   apache::thrift::concurrency::Monitor clientsMonitor_;
-  std::set<TConnectedClient*> clients_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/test/Makefile.am
----------------------------------------------------------------------
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 0cd1c67..3470abb 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -323,7 +323,7 @@ gen-cpp/SecondService.cpp gen-cpp/ThriftTest_constants.cpp gen-cpp/ThriftTest.cp
 gen-cpp/ChildService.cpp gen-cpp/ChildService.h gen-cpp/ParentService.cpp gen-cpp/ParentService.h gen-cpp/proc_types.cpp gen-cpp/proc_types.h: processor/proc.thrift
 	$(THRIFT) --gen cpp:templates,cob_style $<
 
-AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src
+AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src -D__STDC_LIMIT_MACROS
 AM_LDFLAGS = $(BOOST_LDFLAGS)
 AM_CXXFLAGS = -Wall -Wextra -pedantic
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/test/TServerIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
index 9edeb19..73bcdba 100644
--- a/lib/cpp/test/TServerIntegrationTest.cpp
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -20,9 +20,12 @@
 #define BOOST_TEST_MODULE TServerIntegrationTest
 #include <boost/test/auto_unit_test.hpp>
 #include <boost/bind.hpp>
+#include <boost/foreach.hpp>
 #include <boost/format.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
+#include <thrift/server/TSimpleServer.h>
+#include <thrift/server/TThreadPoolServer.h>
 #include <thrift/server/TThreadedServer.h>
 #include <thrift/protocol/TBinaryProtocol.h>
 #include <thrift/transport/TServerSocket.h>
@@ -44,12 +47,17 @@ using apache::thrift::transport::TServerSocket;
 using apache::thrift::transport::TServerTransport;
 using apache::thrift::transport::TSocket;
 using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
 using apache::thrift::transport::TTransportFactory;
+using apache::thrift::server::TServer;
 using apache::thrift::server::TServerEventHandler;
+using apache::thrift::server::TSimpleServer;
+using apache::thrift::server::TThreadPoolServer;
 using apache::thrift::server::TThreadedServer;
 using apache::thrift::test::ParentServiceClient;
 using apache::thrift::test::ParentServiceIf;
 using apache::thrift::test::ParentServiceProcessor;
+using boost::posix_time::milliseconds;
 
 /**
  * preServe runs after listen() is successful, when we can connect
@@ -81,7 +89,10 @@ private:
   uint64_t accepted_;
 };
 
-class ParentHandler : virtual public ParentServiceIf {
+/**
+ * Reusing another generated test, just something to serve up
+ */
+class ParentHandler : public ParentServiceIf {
 public:
   ParentHandler() : generation_(0) {}
 
@@ -123,11 +134,17 @@ protected:
   std::vector<std::string> strings_;
 };
 
+void autoSocketCloser(TSocket *pSock) {
+  pSock->close();
+  delete pSock;
+}
+
+template<class TServerType>
 class TServerIntegrationTestFixture : public TestPortFixture
 {
 public:
   TServerIntegrationTestFixture() :
-      pServer(new TThreadedServer(
+      pServer(new TServerType(
                     boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
                             boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
                     boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
@@ -139,7 +156,7 @@ public:
   }
 
   void startServer() {
-    pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
+    pServerThread.reset(new boost::thread(boost::bind(&TServerType::serve, pServer.get())));
 
     // block until listen() completes so clients will be able to connect
     Synchronized sync(*(pEventHandler.get()));
@@ -160,52 +177,117 @@ public:
   }
 
   void stopServer() {
-    pServer->stop();
-    BOOST_MESSAGE("server stop completed");
-    pServerThread->join();
-    BOOST_MESSAGE("server thread joined");
+    if (pServerThread) {
+      pServer->stop();
+      BOOST_MESSAGE("server stop completed");
+
+      pServerThread->join();
+      BOOST_MESSAGE("server thread joined");
+      pServerThread.reset();
+    }
   }
 
   ~TServerIntegrationTestFixture() {
     stopServer();
   }
 
-  void delayClose(boost::shared_ptr<TTransport> toClose) {
-    boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+  void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
+    boost::this_thread::sleep(after);
     toClose->close();
   }
 
-  boost::shared_ptr<TThreadedServer> pServer;
+  void baseline(int64_t numToMake, int64_t expectedHWM) {
+    startServer();
+    std::vector<boost::shared_ptr<TSocket> > holdSockets;
+    std::vector<boost::shared_ptr<boost::thread> > holdThreads;
+
+    for (int64_t i = 0; i < numToMake; ++i) {
+        boost::shared_ptr<TSocket> pClientSock(new TSocket("localhost", m_serverPort), autoSocketCloser);
+        holdSockets.push_back(pClientSock);
+        boost::shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock));
+        ParentServiceClient client(pClientProtocol);
+        pClientSock->open();
+        client.incrementGeneration();
+        holdThreads.push_back(
+                boost::shared_ptr<boost::thread>(
+                        new boost::thread(
+                                boost::bind(&TServerIntegrationTestFixture::delayClose, this,
+                                            pClientSock, milliseconds(100 * numToMake)))));
+    }
+
+    BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
+    stopServer();
+    BOOST_FOREACH(boost::shared_ptr<boost::thread> pThread, holdThreads) {
+        pThread->join();
+    }
+    holdThreads.clear();
+    holdSockets.clear();
+  }
+
+  boost::shared_ptr<TServerType> pServer;
   boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
   boost::shared_ptr<boost::thread> pServerThread;
 };
 
-BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
+BOOST_FIXTURE_TEST_SUITE( Baseline, TestPortFixture )
 
-BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
+BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationTestFixture<TSimpleServer>)
 {
-    // this test establishes some basic sanity
+    baseline(3, 1);
+}
 
-    startServer();
-    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
-    boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
-    ParentServiceClient client1(pClientProtocol1);
-    pClientSock1->open();
-    client1.incrementGeneration();
-    pClientSock1->close();
-    stopServer();
+BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationTestFixture<TThreadedServer>)
+{
+    baseline(10, 10);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded_bound, TServerIntegrationTestFixture<TThreadedServer>)
+{
+    pServer->setConcurrentClientLimit(4);
+    baseline(10, 4);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool, TServerIntegrationTestFixture<TThreadPoolServer>)
+{
+    pServer->getThreadManager()->threadFactory(
+            boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+                    new apache::thrift::concurrency::PlatformThreadFactory));
+    pServer->getThreadManager()->start();
+
+    // thread factory has 4 threads as a default
+    // thread factory however is a bad way to limit concurrent clients
+    // as accept() will be called to grab a 5th client socket, in this case
+    // and then the thread factory will block adding the thread to manage
+    // that client.
+    baseline(10, 5);
 }
 
+BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, TServerIntegrationTestFixture<TThreadPoolServer>)
+{
+    pServer->getThreadManager()->threadFactory(
+            boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+                    new apache::thrift::concurrency::PlatformThreadFactory));
+    pServer->getThreadManager()->start();
+    pServer->setConcurrentClientLimit(4);
+
+    baseline(10, 4);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+
+BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture<TThreadedServer> )
+
 BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
 {
     // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
 
     startServer();
 
-    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
     pClientSock1->open();
 
-    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
     pClientSock2->open();
 
     // Ensure they have been accepted
@@ -219,8 +301,6 @@ BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
     uint8_t buf[1];
     BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1));   // 0 = disconnected
     BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1));   // 0 = disconnected
-    pClientSock1->close();
-    pClientSock2->close();
 }
 
 BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
@@ -230,24 +310,56 @@ BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
 
     boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
             setInterruptableChildren(false);    // returns to pre-THRIFT-2441 behavior
+
     startServer();
 
-    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
     pClientSock1->open();
 
-    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
     pClientSock2->open();
 
     // Ensure they have been accepted
     blockUntilAccepted(2);
 
-    boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
-    boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
+    boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1, milliseconds(250)));
+    boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
 
     // Once the clients disconnect the server will stop
     stopServer();
+    t1.join();
+    t2.join();
+}
+
+BOOST_AUTO_TEST_CASE(test_concurrent_client_limit)
+{
+    startServer();
+
+    BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
+    pServer->setConcurrentClientLimit(2);
+    BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount());
+    BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit());
+
+    boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
+    pClientSock1->open();
+    blockUntilAccepted(1);
+    BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount());
+
+    boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
+    pClientSock2->open();
+    blockUntilAccepted(2);
+    BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+
+    // a third client cannot connect until one of the other two closes
+    boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
+    boost::shared_ptr<TSocket> pClientSock3(new TSocket("localhost", m_serverPort), autoSocketCloser);
+    pClientSock2->open();
+    blockUntilAccepted(2);
+    BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+    BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
 
-    pClientSock1->close();
-    pClientSock2->close();
+    stopServer();
+    t2.join();
 }
+
 BOOST_AUTO_TEST_SUITE_END()

http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/test/ZlibTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/ZlibTest.cpp b/lib/cpp/test/ZlibTest.cpp
index bafacf9..465e12d 100644
--- a/lib/cpp/test/ZlibTest.cpp
+++ b/lib/cpp/test/ZlibTest.cpp
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-#define __STDC_LIMIT_MACROS
 #define __STDC_FORMAT_MACROS
 
 #ifndef _GNU_SOURCE