You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by jk...@apache.org on 2017/02/28 04:50:15 UTC
thrift git commit: THRIFT-3891 TNonblockingServer configured with
more than one IO threads does not always return from serve() upon stop()
Client: C++ Patch: additional changes by jking@apache.org to improve the test
and stop clean in all cases
Repository: thrift
Updated Branches:
refs/heads/master 5038466e5 -> 36d1b0dea
THRIFT-3891 TNonblockingServer configured with more than one IO threads does not always return from serve() upon stop()
Client: C++
Patch: additional changes by jking@apache.org to improve the test and stop clean in all cases
This closes #1080
This closes #1196
Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/36d1b0de
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/36d1b0de
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/36d1b0de
Branch: refs/heads/master
Commit: 36d1b0dea566c0dea06e321421e32a6cad0abb32
Parents: 5038466
Author: Bu\u011fra Gedik <bg...@gmail.com>
Authored: Sun Sep 4 17:18:15 2016 +0900
Committer: James E. King, III <jk...@apache.org>
Committed: Mon Feb 27 23:44:35 2017 -0500
----------------------------------------------------------------------
.../src/thrift/server/TNonblockingServer.cpp | 54 ++++++++++----------
lib/cpp/test/TNonblockingServerTest.cpp | 37 +++++++++++++-
2 files changed, 61 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/thrift/blob/36d1b0de/lib/cpp/src/thrift/server/TNonblockingServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 649910f..d4418bd 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -510,7 +510,7 @@ void TNonblockingServer::TConnection::workSocket() {
// If there is no data to send, then let us move on
if (writeBufferPos_ == writeBufferSize_) {
- GlobalOutput("WARNING: Send state with no data to send\n");
+ GlobalOutput("WARNING: Send state with no data to send");
transition();
return;
}
@@ -765,11 +765,9 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) {
}
// Delete a previously existing event
- if (eventFlags_ != 0) {
- if (event_del(&event_) == -1) {
- GlobalOutput("TConnection::setFlags event_del");
- return;
- }
+ if (eventFlags_ && event_del(&event_) == -1) {
+ GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR);
+ return;
}
// Update in memory structure
@@ -812,7 +810,7 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) {
// Add the event
if (event_add(&event_, 0) == -1) {
- GlobalOutput("TConnection::setFlags(): could not event_add");
+ GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR);
}
}
@@ -820,9 +818,9 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) {
* Closes a connection
*/
void TNonblockingServer::TConnection::close() {
- // Delete the registered libevent
- if (event_del(&event_) == -1) {
+ if (eventFlags_ && event_del(&event_) == -1) {
GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
+ return;
}
if (serverEventHandler_) {
@@ -1066,7 +1064,7 @@ void TNonblockingServer::createAndListenOnSocket() {
if (res->ai_family == AF_INET6) {
int zero = 0;
if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
- GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
+ GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY", THRIFT_GET_SOCKET_ERROR);
}
}
#endif // #ifdef IPV6_V6ONLY
@@ -1486,6 +1484,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
if (nBytes == kSize) {
if (connection == NULL) {
// this is the command to stop our thread, exit the handler!
+ ioThread->breakLoop(false);
return;
}
connection->transition();
@@ -1496,6 +1495,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
return;
} else if (nBytes == 0) {
GlobalOutput.printf("notifyHandler: Notify socket closed!");
+ ioThread->breakLoop(false);
// exit the loop
break;
} else { // nBytes < 0
@@ -1520,19 +1520,15 @@ void TNonblockingIOThread::breakLoop(bool error) {
::abort();
}
- // sets a flag so that the loop exits on the next event
- event_base_loopbreak(eventBase_);
-
- // event_base_loopbreak() only causes the loop to exit the next time
- // it wakes up. We need to force it to wake up, in case there are
- // no real events it needs to process.
- //
// If we're running in the same thread, we can't use the notify(0)
// mechanism to stop the thread, but happily if we're running in the
// same thread, this means the thread can't be blocking in the event
// loop either.
if (!Thread::is_current(threadId_)) {
notify(NULL);
+ } else {
+ // cause the loop to stop ASAP - even if it has things to do in it
+ event_base_loopbreak(eventBase_);
}
}
@@ -1566,24 +1562,26 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
}
void TNonblockingIOThread::run() {
- if (eventBase_ == NULL)
+ if (eventBase_ == NULL) {
registerEvents();
-
- GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
-
+ }
if (useHighPriority_) {
setCurrentThreadHighPriority(true);
}
- // Run libevent engine, never returns, invokes calls to eventHandler
- event_base_loop(eventBase_, 0);
+ if (eventBase_ != NULL)
+ {
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
+ // Run libevent engine, never returns, invokes calls to eventHandler
+ event_base_loop(eventBase_, 0);
- if (useHighPriority_) {
- setCurrentThreadHighPriority(false);
- }
+ if (useHighPriority_) {
+ setCurrentThreadHighPriority(false);
+ }
- // cleans up our registered events
- cleanupEvents();
+ // cleans up our registered events
+ cleanupEvents();
+ }
GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}
http://git-wip-us.apache.org/repos/asf/thrift/blob/36d1b0de/lib/cpp/test/TNonblockingServerTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/TNonblockingServerTest.cpp b/lib/cpp/test/TNonblockingServerTest.cpp
index 48ea913..e933d6b 100644
--- a/lib/cpp/test/TNonblockingServerTest.cpp
+++ b/lib/cpp/test/TNonblockingServerTest.cpp
@@ -21,6 +21,7 @@
#include <boost/test/unit_test.hpp>
#include <boost/smart_ptr.hpp>
+#include "thrift/concurrency/Monitor.h"
#include "thrift/concurrency/Thread.h"
#include "thrift/server/TNonblockingServer.h"
@@ -29,6 +30,10 @@
#include <event.h>
using namespace apache::thrift;
+using apache::thrift::concurrency::Guard;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::server::TServerEventHandler;
struct Handler : public test::ParentServiceIf {
void addString(const std::string& s) { strings_.push_back(s); }
@@ -46,11 +51,31 @@ struct Handler : public test::ParentServiceIf {
class Fixture {
private:
+ struct ListenEventHandler : public TServerEventHandler {
+ public:
+ ListenEventHandler(Mutex* mutex) : listenMonitor_(mutex), ready_(false) {}
+
+ void preServe() /* override */ {
+ Guard g(listenMonitor_.mutex());
+ ready_ = true;
+ listenMonitor_.notify();
+ }
+
+ Monitor listenMonitor_;
+ bool ready_;
+ };
+
struct Runner : public apache::thrift::concurrency::Runnable {
int port;
boost::shared_ptr<event_base> userEventBase;
boost::shared_ptr<TProcessor> processor;
boost::shared_ptr<server::TNonblockingServer> server;
+ boost::shared_ptr<ListenEventHandler> listenHandler;
+ Mutex mutex_;
+
+ Runner() {
+ listenHandler.reset(new ListenEventHandler(&mutex_));
+ }
virtual void run() {
// When binding to explicit port, allow retrying to workaround bind failures on ports in use
@@ -58,10 +83,18 @@ private:
startServer(retryCount);
}
+ void readyBarrier() {
+ // block until server is listening and ready to accept connections
+ Guard g(mutex_);
+ while (!listenHandler->ready_) {
+ listenHandler->listenMonitor_.wait();
+ }
+ }
private:
void startServer(int retry_count) {
try {
server.reset(new server::TNonblockingServer(processor, port));
+ server->setServerEventHandler(listenHandler);
if (userEventBase) {
server->registerEvents(userEventBase.get());
}
@@ -112,8 +145,8 @@ protected:
false));
thread = threadFactory->newThread(runner);
thread->start();
- // wait 100 ms for the server to begin listening
- THRIFT_SLEEP_USEC(100000);
+ runner->readyBarrier();
+
server = runner->server;
return runner->port;
}