You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by ro...@apache.org on 2015/04/30 19:59:44 UTC
thrift git commit: THRIFT-3084 add optional concurrent client limit
enforcement to lib/cpp threaded servers
Repository: thrift
Updated Branches:
refs/heads/master 4bf9399ca -> 79c9911b8
THRIFT-3084 add optional concurrent client limit enforcement to lib/cpp threaded servers
Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/79c9911b
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/79c9911b
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/79c9911b
Branch: refs/heads/master
Commit: 79c9911b8780d1f9d7c2c17623d269f0671d1723
Parents: 4bf9399
Author: Jim King <ji...@simplivity.com>
Authored: Thu Apr 30 07:10:08 2015 -0400
Committer: Roger Meier <ro...@apache.org>
Committed: Thu Apr 30 19:48:15 2015 +0200
----------------------------------------------------------------------
lib/c_glib/test/testthrifttestclient.cpp | 2 +
lib/cpp/CMakeLists.txt | 6 +-
lib/cpp/Makefile.am | 8 +-
lib/cpp/src/thrift/protocol/TDenseProtocol.cpp | 1 -
lib/cpp/src/thrift/server/TServerFramework.cpp | 78 ++++++++-
lib/cpp/src/thrift/server/TServerFramework.h | 60 +++++++
lib/cpp/src/thrift/server/TSimpleServer.cpp | 23 ++-
lib/cpp/src/thrift/server/TSimpleServer.h | 3 +
lib/cpp/src/thrift/server/TThreadPoolServer.cpp | 4 +
lib/cpp/src/thrift/server/TThreadPoolServer.h | 14 +-
lib/cpp/src/thrift/server/TThreadedServer.cpp | 23 +--
lib/cpp/src/thrift/server/TThreadedServer.h | 3 -
lib/cpp/test/Makefile.am | 2 +-
lib/cpp/test/TServerIntegrationTest.cpp | 174 +++++++++++++++----
lib/cpp/test/ZlibTest.cpp | 1 -
15 files changed, 328 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/c_glib/test/testthrifttestclient.cpp
----------------------------------------------------------------------
diff --git a/lib/c_glib/test/testthrifttestclient.cpp b/lib/c_glib/test/testthrifttestclient.cpp
index 4f7bc08..d387396 100755
--- a/lib/c_glib/test/testthrifttestclient.cpp
+++ b/lib/c_glib/test/testthrifttestclient.cpp
@@ -317,6 +317,8 @@ class TestHandler : public ThriftTestIf {
// C CLIENT
extern "C" {
+#undef THRIFT_SOCKET /* from lib/cpp */
+
#include "t_test_thrift_test.h"
#include "t_test_thrift_test_types.h"
#include <thrift/c_glib/transport/thrift_socket.h>
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt
index 8ea0546..b444c35 100755
--- a/lib/cpp/CMakeLists.txt
+++ b/lib/cpp/CMakeLists.txt
@@ -35,9 +35,11 @@ set( thriftcpp_SOURCES
src/thrift/Thrift.cpp
src/thrift/TApplicationException.cpp
src/thrift/VirtualProfiling.cpp
+ src/thrift/async/TAsyncChannel.cpp
src/thrift/concurrency/ThreadManager.cpp
src/thrift/concurrency/TimerManager.cpp
src/thrift/concurrency/Util.cpp
+ src/thrift/processor/PeekProcessor.cpp
src/thrift/protocol/TDebugProtocol.cpp
src/thrift/protocol/TDenseProtocol.cpp
src/thrift/protocol/TJSONProtocol.cpp
@@ -60,8 +62,6 @@ set( thriftcpp_SOURCES
src/thrift/server/TSimpleServer.cpp
src/thrift/server/TThreadPoolServer.cpp
src/thrift/server/TThreadedServer.cpp
- src/thrift/async/TAsyncChannel.cpp
- src/thrift/processor/PeekProcessor.cpp
)
# This files don't work on Windows CE as there is no pipe support
@@ -185,6 +185,8 @@ if(MSVC)
add_definitions("-DUNICODE -D_UNICODE")
endif()
+add_definitions("-D__STDC_LIMIT_MACROS")
+
# Install the headers
install(DIRECTORY "src/thrift" DESTINATION "${INCLUDE_INSTALL_DIR}"
FILES_MATCHING PATTERN "*.h" PATTERN "*.tcc")
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/Makefile.am
----------------------------------------------------------------------
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 28ff7c8..0de8dc7 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -57,7 +57,7 @@ pkgconfig_DATA += thrift-qt5.pc
endif
AM_CXXFLAGS = -Wall -Wextra -pedantic
-AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(OPENSSL_INCLUDES) -I$(srcdir)/src
+AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(OPENSSL_INCLUDES) -I$(srcdir)/src -D__STDC_LIMIT_MACROS
AM_LDFLAGS = $(BOOST_LDFLAGS) $(OPENSSL_LDFLAGS)
# Define the source files for the module
@@ -65,9 +65,11 @@ AM_LDFLAGS = $(BOOST_LDFLAGS) $(OPENSSL_LDFLAGS)
libthrift_la_SOURCES = src/thrift/Thrift.cpp \
src/thrift/TApplicationException.cpp \
src/thrift/VirtualProfiling.cpp \
+ src/thrift/async/TAsyncChannel.cpp \
src/thrift/concurrency/ThreadManager.cpp \
src/thrift/concurrency/TimerManager.cpp \
src/thrift/concurrency/Util.cpp \
+ src/thrift/processor/PeekProcessor.cpp \
src/thrift/protocol/TDebugProtocol.cpp \
src/thrift/protocol/TDenseProtocol.cpp \
src/thrift/protocol/TJSONProtocol.cpp \
@@ -94,9 +96,7 @@ libthrift_la_SOURCES = src/thrift/Thrift.cpp \
src/thrift/server/TServerFramework.cpp \
src/thrift/server/TSimpleServer.cpp \
src/thrift/server/TThreadPoolServer.cpp \
- src/thrift/server/TThreadedServer.cpp \
- src/thrift/async/TAsyncChannel.cpp \
- src/thrift/processor/PeekProcessor.cpp
+ src/thrift/server/TThreadedServer.cpp
if WITH_BOOSTTHREADS
libthrift_la_SOURCES += src/thrift/concurrency/BoostThreadFactory.cpp \
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
index 583b630..259c68e 100644
--- a/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
+++ b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp
@@ -87,7 +87,6 @@ Optional fields are a little tricky also. We write a zero byte if they are
absent and prefix them with an 0x01 byte if they are present
*/
-#define __STDC_LIMIT_MACROS
#include <stdint.h>
#include <thrift/protocol/TDenseProtocol.h>
#include <thrift/TReflectionLocal.h>
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TServerFramework.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp
index 8adb29a..36dab5b 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.cpp
+++ b/lib/cpp/src/thrift/server/TServerFramework.cpp
@@ -18,12 +18,15 @@
*/
#include <boost/bind.hpp>
+#include <stdexcept>
+#include <stdint.h>
#include <thrift/server/TServerFramework.h>
namespace apache {
namespace thrift {
namespace server {
+using apache::thrift::concurrency::Synchronized;
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TTransport;
using apache::thrift::transport::TTransportException;
@@ -39,14 +42,20 @@ TServerFramework::TServerFramework(
const shared_ptr<TServerTransport>& serverTransport,
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory)
- : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {}
+ : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
+ clients_(0),
+ hwm_(0),
+ limit_(INT64_MAX) {}
TServerFramework::TServerFramework(
const shared_ptr<TProcessor>& processor,
const shared_ptr<TServerTransport>& serverTransport,
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory)
- : TServer(processor, serverTransport, transportFactory, protocolFactory) {}
+ : TServer(processor, serverTransport, transportFactory, protocolFactory),
+ clients_(0),
+ hwm_(0),
+ limit_(INT64_MAX) {}
TServerFramework::TServerFramework(
const shared_ptr<TProcessorFactory>& processorFactory,
@@ -57,7 +66,10 @@ TServerFramework::TServerFramework(
const shared_ptr<TProtocolFactory>& outputProtocolFactory)
: TServer(processorFactory, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory) {}
+ inputProtocolFactory, outputProtocolFactory),
+ clients_(0),
+ hwm_(0),
+ limit_(INT64_MAX) {}
TServerFramework::TServerFramework(
const shared_ptr<TProcessor>& processor,
@@ -68,7 +80,10 @@ TServerFramework::TServerFramework(
const shared_ptr<TProtocolFactory>& outputProtocolFactory)
: TServer(processor, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory) {}
+ inputProtocolFactory, outputProtocolFactory),
+ clients_(0),
+ hwm_(0),
+ limit_(INT64_MAX) {}
TServerFramework::~TServerFramework() {}
@@ -111,6 +126,16 @@ void TServerFramework::serve() {
inputTransport.reset();
client.reset();
+ // If we have reached the limit on the number of concurrent
+ // clients allowed, wait for one or more clients to drain before
+ // accepting another.
+ {
+ Synchronized sync(mon_);
+ while (clients_ >= limit_) {
+ mon_.wait();
+ }
+ }
+
client = serverTransport_->accept();
inputTransport = inputTransportFactory_->getTransport(client);
@@ -118,11 +143,12 @@ void TServerFramework::serve() {
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
- onClientConnected(
+ newlyConnectedClient(
shared_ptr<TConnectedClient>(
new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
inputProtocol, outputProtocol, eventHandler_, client),
bind(&TServerFramework::disposeConnectedClient, this, _1)));
+
} catch (TTransportException& ttx) {
releaseOneDescriptor("inputTransport", inputTransport);
releaseOneDescriptor("outputTransport", outputTransport);
@@ -147,12 +173,54 @@ void TServerFramework::serve() {
releaseOneDescriptor("serverTransport", serverTransport_);
}
+int64_t TServerFramework::getConcurrentClientLimit() const {
+ Synchronized sync(mon_);
+ return limit_;
+}
+
+int64_t TServerFramework::getConcurrentClientCount() const {
+ Synchronized sync(mon_);
+ return clients_;
+}
+
+int64_t TServerFramework::getConcurrentClientCountHWM() const {
+ Synchronized sync(mon_);
+ return hwm_;
+}
+
+void TServerFramework::setConcurrentClientLimit(int64_t newLimit) {
+ if (newLimit < 1) {
+ throw std::invalid_argument("newLimit must be greater than zero");
+ }
+ Synchronized sync(mon_);
+ limit_ = newLimit;
+ if (limit_ - clients_ > 0) {
+ mon_.notify();
+ }
+}
+
void TServerFramework::stop() {
serverTransport_->interrupt();
serverTransport_->interruptChildren();
}
+void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
+ onClientConnected(pClient);
+
+ // Count a concurrent client added.
+ Synchronized sync(mon_);
+ ++clients_;
+ hwm_ = std::max(hwm_, clients_);
+}
+
void TServerFramework::disposeConnectedClient(TConnectedClient *pClient) {
+ {
+ // Count a concurrent client removed.
+ Synchronized sync(mon_);
+ if (limit_ - --clients_ > 0) {
+ mon_.notify();
+ }
+ }
onClientDisconnected(pClient);
delete pClient;
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TServerFramework.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TServerFramework.h b/lib/cpp/src/thrift/server/TServerFramework.h
index 67d5420..3f16dd1 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.h
+++ b/lib/cpp/src/thrift/server/TServerFramework.h
@@ -21,7 +21,9 @@
#define _THRIFT_SERVER_TSERVERFRAMEWORK_H_ 1
#include <boost/shared_ptr.hpp>
+#include <stdint.h>
#include <thrift/TProcessor.h>
+#include <thrift/concurrency/Monitor.h>
#include <thrift/server/TConnectedClient.h>
#include <thrift/server/TServer.h>
#include <thrift/transport/TServerTransport.h>
@@ -89,6 +91,36 @@ public:
*/
virtual void stop();
+ /**
+ * Get the concurrent client limit.
+ * \returns the concurrent client limit
+ */
+ virtual int64_t getConcurrentClientLimit() const;
+
+ /**
+ * Get the number of currently connected clients.
+ * \returns the number of currently connected clients
+ */
+ virtual int64_t getConcurrentClientCount() const;
+
+ /**
+ * Get the highest number of concurrent clients.
+ * \returns the highest number of concurrent clients
+ */
+ virtual int64_t getConcurrentClientCountHWM() const;
+
+ /**
+ * Set the concurrent client limit. This can be changed while
+ * the server is serving however it will not necessarily be
+ * enforced until the next client is accepted and added. If the
+ * limit is lowered below the number of connected clients, no
+ * action is taken to disconnect the clients.
+ * The default value used if this is not called is INT64_MAX.
+ * \param[in] newLimit the new limit of concurrent clients
+ * \throws std::invalid_argument if newLimit is less than 1
+ */
+ virtual void setConcurrentClientLimit(int64_t newLimit);
+
protected:
/**
* A client has connected. The implementation is responsible for storing
@@ -102,6 +134,7 @@ protected:
/**
* A client has disconnected.
+ * The server no longer tracks the client.
* The client TTransport has already been closed.
* The implementation must not delete the pointer.
*
@@ -111,10 +144,37 @@ protected:
private:
/**
+ * Common handling for new connected clients. Implements concurrent
+ * client rate limiting after onClientConnected returns by blocking the
+ * serve() thread if the limit has been reached.
+ */
+ void newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient);
+
+ /**
* Smart pointer client deletion.
* Calls onClientDisconnected and then deletes pClient.
*/
void disposeConnectedClient(TConnectedClient *pClient);
+
+ /**
+ * Monitor for limiting the number of concurrent clients.
+ */
+ apache::thrift::concurrency::Monitor mon_;
+
+ /**
+ * The number of concurrent clients.
+ */
+ int64_t clients_;
+
+ /**
+ * The high water mark of concurrent clients.
+ */
+ int64_t hwm_;
+
+ /**
+ * The limit on the number of concurrent clients.
+ */
+ int64_t limit_;
};
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TSimpleServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp
index a133c0d..adcedc8 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.cpp
+++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp
@@ -38,7 +38,9 @@ TSimpleServer::TSimpleServer(
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory)
: TServerFramework(processorFactory, serverTransport,
- transportFactory, protocolFactory) {}
+ transportFactory, protocolFactory) {
+ TServerFramework::setConcurrentClientLimit(1);
+}
TSimpleServer::TSimpleServer(
const shared_ptr<TProcessor>& processor,
@@ -46,7 +48,9 @@ TSimpleServer::TSimpleServer(
const shared_ptr<TTransportFactory>& transportFactory,
const shared_ptr<TProtocolFactory>& protocolFactory)
: TServerFramework(processor, serverTransport,
- transportFactory, protocolFactory) {}
+ transportFactory, protocolFactory) {
+ TServerFramework::setConcurrentClientLimit(1);
+}
TSimpleServer::TSimpleServer(
const shared_ptr<TProcessorFactory>& processorFactory,
@@ -57,7 +61,9 @@ TSimpleServer::TSimpleServer(
const shared_ptr<TProtocolFactory>& outputProtocolFactory)
: TServerFramework(processorFactory, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory) {}
+ inputProtocolFactory, outputProtocolFactory) {
+ TServerFramework::setConcurrentClientLimit(1);
+}
TSimpleServer::TSimpleServer(
const shared_ptr<TProcessor>& processor,
@@ -68,7 +74,9 @@ TSimpleServer::TSimpleServer(
const shared_ptr<TProtocolFactory>& outputProtocolFactory)
: TServerFramework(processor, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory) {}
+ inputProtocolFactory, outputProtocolFactory) {
+ TServerFramework::setConcurrentClientLimit(1);
+}
TSimpleServer::~TSimpleServer() {}
@@ -86,6 +94,13 @@ void TSimpleServer::onClientConnected(const shared_ptr<TConnectedClient>& pClien
*/
void TSimpleServer::onClientDisconnected(TConnectedClient *pClient) {}
+/**
+ * This makes little sense to the simple server because it is not capable
+ * of having more than one client at a time, so we hide it.
+ */
+void TSimpleServer::setConcurrentClientLimit(int64_t newLimit) {}
+
+
}
}
} // apache::thrift::server
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TSimpleServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h
index 51b00e4..30d5046 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.h
+++ b/lib/cpp/src/thrift/server/TSimpleServer.h
@@ -62,6 +62,9 @@ public:
protected:
virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;
+
+private:
+ void setConcurrentClientLimit(int64_t newLimit); // hide
};
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index a5f8c76..5b9b01d 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -112,6 +112,10 @@ void TThreadPoolServer::setTaskExpiration(int64_t value) {
taskExpiration_ = value;
}
+boost::shared_ptr<apache::thrift::concurrency::ThreadManager> TThreadPoolServer::getThreadManager() const {
+ return threadManager_;
+}
+
void TThreadPoolServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
threadManager_->add(pClient, timeout_, taskExpiration_);
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadPoolServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h
index 29e9aaf..267dbad 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.h
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h
@@ -37,14 +37,16 @@ public:
const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
- const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
TThreadPoolServer(
const boost::shared_ptr<apache::thrift::TProcessor>& processor,
const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport,
const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
- const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
TThreadPoolServer(
const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
@@ -53,7 +55,8 @@ public:
const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
- const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
TThreadPoolServer(
const boost::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -62,7 +65,8 @@ public:
const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory,
const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
- const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager);
+ const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager =
+ apache::thrift::concurrency::ThreadManager::newSimpleThreadManager());
virtual ~TThreadPoolServer();
@@ -78,6 +82,8 @@ public:
virtual int64_t getTaskExpiration() const;
virtual void setTaskExpiration(int64_t value);
+ virtual boost::shared_ptr<apache::thrift::concurrency::ThreadManager> getThreadManager() const;
+
protected:
virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
virtual void onClientDisconnected(TConnectedClient *pClient) /* override */;
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadedServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 440cede..b0b22c3 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -89,7 +89,7 @@ void TThreadedServer::serve() {
// Drain all clients - no more will arrive
try {
Synchronized s(clientsMonitor_);
- while (!clients_.empty()) {
+ while (getConcurrentClientCount() > 0) {
clientsMonitor_.wait();
}
} catch (TException& tx) {
@@ -98,27 +98,14 @@ void TThreadedServer::serve() {
}
}
-void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient)
-{
- // Create a thread for this client
- shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient));
-
- // Insert thread into the set of threads
- {
- Synchronized s(clientsMonitor_);
- clients_.insert(pClient.get());
- }
-
- // Start the thread!
- thread->start();
+void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
+ threadFactory_->newThread(pClient)->start();
}
void TThreadedServer::onClientDisconnected(TConnectedClient *pClient) {
- // Remove this task from parent bookkeeping
Synchronized s(clientsMonitor_);
- clients_.erase(pClient);
- if (clients_.empty()) {
- clientsMonitor_.notify();
+ if (getConcurrentClientCount() == 0) {
+ clientsMonitor_.notify();
}
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadedServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index 7b66f1d..21b6a28 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -29,8 +29,6 @@ namespace apache {
namespace thrift {
namespace server {
-#define THRIFT_DEFAULT_THREAD_FACTORY
-
/**
* Manage clients using a thread pool.
*/
@@ -86,7 +84,6 @@ protected:
boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
apache::thrift::concurrency::Monitor clientsMonitor_;
- std::set<TConnectedClient*> clients_;
};
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/test/Makefile.am
----------------------------------------------------------------------
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 0cd1c67..3470abb 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -323,7 +323,7 @@ gen-cpp/SecondService.cpp gen-cpp/ThriftTest_constants.cpp gen-cpp/ThriftTest.cp
gen-cpp/ChildService.cpp gen-cpp/ChildService.h gen-cpp/ParentService.cpp gen-cpp/ParentService.h gen-cpp/proc_types.cpp gen-cpp/proc_types.h: processor/proc.thrift
$(THRIFT) --gen cpp:templates,cob_style $<
-AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src
+AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src -D__STDC_LIMIT_MACROS
AM_LDFLAGS = $(BOOST_LDFLAGS)
AM_CXXFLAGS = -Wall -Wextra -pedantic
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/test/TServerIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
index 9edeb19..73bcdba 100644
--- a/lib/cpp/test/TServerIntegrationTest.cpp
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -20,9 +20,12 @@
#define BOOST_TEST_MODULE TServerIntegrationTest
#include <boost/test/auto_unit_test.hpp>
#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
#include <boost/format.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
+#include <thrift/server/TSimpleServer.h>
+#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TServerSocket.h>
@@ -44,12 +47,17 @@ using apache::thrift::transport::TServerSocket;
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
using apache::thrift::transport::TTransportFactory;
+using apache::thrift::server::TServer;
using apache::thrift::server::TServerEventHandler;
+using apache::thrift::server::TSimpleServer;
+using apache::thrift::server::TThreadPoolServer;
using apache::thrift::server::TThreadedServer;
using apache::thrift::test::ParentServiceClient;
using apache::thrift::test::ParentServiceIf;
using apache::thrift::test::ParentServiceProcessor;
+using boost::posix_time::milliseconds;
/**
* preServe runs after listen() is successful, when we can connect
@@ -81,7 +89,10 @@ private:
uint64_t accepted_;
};
-class ParentHandler : virtual public ParentServiceIf {
+/**
+ * Reusing another generated test, just something to serve up
+ */
+class ParentHandler : public ParentServiceIf {
public:
ParentHandler() : generation_(0) {}
@@ -123,11 +134,17 @@ protected:
std::vector<std::string> strings_;
};
+void autoSocketCloser(TSocket *pSock) {
+ pSock->close();
+ delete pSock;
+}
+
+template<class TServerType>
class TServerIntegrationTestFixture : public TestPortFixture
{
public:
TServerIntegrationTestFixture() :
- pServer(new TThreadedServer(
+ pServer(new TServerType(
boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor(
boost::shared_ptr<ParentServiceIf>(new ParentHandler))),
boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)),
@@ -139,7 +156,7 @@ public:
}
void startServer() {
- pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get())));
+ pServerThread.reset(new boost::thread(boost::bind(&TServerType::serve, pServer.get())));
// block until listen() completes so clients will be able to connect
Synchronized sync(*(pEventHandler.get()));
@@ -160,52 +177,117 @@ public:
}
void stopServer() {
- pServer->stop();
- BOOST_MESSAGE("server stop completed");
- pServerThread->join();
- BOOST_MESSAGE("server thread joined");
+ if (pServerThread) {
+ pServer->stop();
+ BOOST_MESSAGE("server stop completed");
+
+ pServerThread->join();
+ BOOST_MESSAGE("server thread joined");
+ pServerThread.reset();
+ }
}
~TServerIntegrationTestFixture() {
stopServer();
}
- void delayClose(boost::shared_ptr<TTransport> toClose) {
- boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+ void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
+ boost::this_thread::sleep(after);
toClose->close();
}
- boost::shared_ptr<TThreadedServer> pServer;
+ void baseline(int64_t numToMake, int64_t expectedHWM) {
+ startServer();
+ std::vector<boost::shared_ptr<TSocket> > holdSockets;
+ std::vector<boost::shared_ptr<boost::thread> > holdThreads;
+
+ for (int64_t i = 0; i < numToMake; ++i) {
+ boost::shared_ptr<TSocket> pClientSock(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ holdSockets.push_back(pClientSock);
+ boost::shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock));
+ ParentServiceClient client(pClientProtocol);
+ pClientSock->open();
+ client.incrementGeneration();
+ holdThreads.push_back(
+ boost::shared_ptr<boost::thread>(
+ new boost::thread(
+ boost::bind(&TServerIntegrationTestFixture::delayClose, this,
+ pClientSock, milliseconds(100 * numToMake)))));
+ }
+
+ BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
+ stopServer();
+ BOOST_FOREACH(boost::shared_ptr<boost::thread> pThread, holdThreads) {
+ pThread->join();
+ }
+ holdThreads.clear();
+ holdSockets.clear();
+ }
+
+ boost::shared_ptr<TServerType> pServer;
boost::shared_ptr<TServerReadyEventHandler> pEventHandler;
boost::shared_ptr<boost::thread> pServerThread;
};
-BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture )
+BOOST_FIXTURE_TEST_SUITE( Baseline, TestPortFixture )
-BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close)
+BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationTestFixture<TSimpleServer>)
{
- // this test establishes some basic sanity
+ baseline(3, 1);
+}
- startServer();
- boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
- boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1));
- ParentServiceClient client1(pClientProtocol1);
- pClientSock1->open();
- client1.incrementGeneration();
- pClientSock1->close();
- stopServer();
+BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationTestFixture<TThreadedServer>)
+{
+ baseline(10, 10);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threaded_bound, TServerIntegrationTestFixture<TThreadedServer>)
+{
+ pServer->setConcurrentClientLimit(4);
+ baseline(10, 4);
+}
+
+BOOST_FIXTURE_TEST_CASE(test_threadpool, TServerIntegrationTestFixture<TThreadPoolServer>)
+{
+ pServer->getThreadManager()->threadFactory(
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
+ pServer->getThreadManager()->start();
+
+ // thread factory has 4 threads as a default
+ // thread factory however is a bad way to limit concurrent clients
+ // as accept() will be called to grab a 5th client socket, in this case
+ // and then the thread factory will block adding the thread to manage
+ // that client.
+ baseline(10, 5);
}
+BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, TServerIntegrationTestFixture<TThreadPoolServer>)
+{
+ pServer->getThreadManager()->threadFactory(
+ boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
+ new apache::thrift::concurrency::PlatformThreadFactory));
+ pServer->getThreadManager()->start();
+ pServer->setConcurrentClientLimit(4);
+
+ baseline(10, 4);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+
+BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture<TThreadedServer> )
+
BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
{
// This tests THRIFT-2441 new behavior: stopping the server disconnects clients
startServer();
- boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock1->open();
- boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock2->open();
// Ensure they have been accepted
@@ -219,8 +301,6 @@ BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected)
uint8_t buf[1];
BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
- pClientSock1->close();
- pClientSock2->close();
}
BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
@@ -230,24 +310,56 @@ BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)
boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())->
setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
+
startServer();
- boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock1->open();
- boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort));
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
pClientSock2->open();
// Ensure they have been accepted
blockUntilAccepted(2);
- boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1));
- boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2));
+ boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1, milliseconds(250)));
+ boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
// Once the clients disconnect the server will stop
stopServer();
+ t1.join();
+ t2.join();
+}
+
+BOOST_AUTO_TEST_CASE(test_concurrent_client_limit)
+{
+ startServer();
+
+ BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
+ pServer->setConcurrentClientLimit(2);
+ BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount());
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit());
+
+ boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ pClientSock1->open();
+ blockUntilAccepted(1);
+ BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount());
+
+ boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ pClientSock2->open();
+ blockUntilAccepted(2);
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+
+ // a third client cannot connect until one of the other two closes
+ boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250)));
+ boost::shared_ptr<TSocket> pClientSock3(new TSocket("localhost", m_serverPort), autoSocketCloser);
+ pClientSock2->open();
+ blockUntilAccepted(2);
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
+ BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
- pClientSock1->close();
- pClientSock2->close();
+ stopServer();
+ t2.join();
}
+
BOOST_AUTO_TEST_SUITE_END()
http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/test/ZlibTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/ZlibTest.cpp b/lib/cpp/test/ZlibTest.cpp
index bafacf9..465e12d 100644
--- a/lib/cpp/test/ZlibTest.cpp
+++ b/lib/cpp/test/ZlibTest.cpp
@@ -17,7 +17,6 @@
* under the License.
*/
-#define __STDC_LIMIT_MACROS
#define __STDC_FORMAT_MACROS
#ifndef _GNU_SOURCE