You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by jk...@apache.org on 2017/02/28 04:50:15 UTC

thrift git commit: THRIFT-3891 TNonblockingServer configured with more than one IO threads does not always return from serve() upon stop() Client: C++ Patch: additional changes by jking@apache.org to improve the test and stop clean in all cases

Repository: thrift
Updated Branches:
  refs/heads/master 5038466e5 -> 36d1b0dea


THRIFT-3891 TNonblockingServer configured with more than one IO threads does not always return from serve() upon stop()
Client: C++
Patch: additional changes by jking@apache.org to improve the test and stop clean in all cases

This closes #1080
This closes #1196


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/36d1b0de
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/36d1b0de
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/36d1b0de

Branch: refs/heads/master
Commit: 36d1b0dea566c0dea06e321421e32a6cad0abb32
Parents: 5038466
Author: Bu\u011fra Gedik <bg...@gmail.com>
Authored: Sun Sep 4 17:18:15 2016 +0900
Committer: James E. King, III <jk...@apache.org>
Committed: Mon Feb 27 23:44:35 2017 -0500

----------------------------------------------------------------------
 .../src/thrift/server/TNonblockingServer.cpp    | 54 ++++++++++----------
 lib/cpp/test/TNonblockingServerTest.cpp         | 37 +++++++++++++-
 2 files changed, 61 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/36d1b0de/lib/cpp/src/thrift/server/TNonblockingServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 649910f..d4418bd 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -510,7 +510,7 @@ void TNonblockingServer::TConnection::workSocket() {
 
     // If there is no data to send, then let us move on
     if (writeBufferPos_ == writeBufferSize_) {
-      GlobalOutput("WARNING: Send state with no data to send\n");
+      GlobalOutput("WARNING: Send state with no data to send");
       transition();
       return;
     }
@@ -765,11 +765,9 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) {
   }
 
   // Delete a previously existing event
-  if (eventFlags_ != 0) {
-    if (event_del(&event_) == -1) {
-      GlobalOutput("TConnection::setFlags event_del");
-      return;
-    }
+  if (eventFlags_ && event_del(&event_) == -1) {
+    GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR);
+    return;
   }
 
   // Update in memory structure
@@ -812,7 +810,7 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) {
 
   // Add the event
   if (event_add(&event_, 0) == -1) {
-    GlobalOutput("TConnection::setFlags(): could not event_add");
+    GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR);
   }
 }
 
@@ -820,9 +818,9 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) {
  * Closes a connection
  */
 void TNonblockingServer::TConnection::close() {
-  // Delete the registered libevent
-  if (event_del(&event_) == -1) {
+  if (eventFlags_ && event_del(&event_) == -1) {
     GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
+    return;
   }
 
   if (serverEventHandler_) {
@@ -1066,7 +1064,7 @@ void TNonblockingServer::createAndListenOnSocket() {
   if (res->ai_family == AF_INET6) {
     int zero = 0;
     if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
-      GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
+      GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY", THRIFT_GET_SOCKET_ERROR);
     }
   }
 #endif // #ifdef IPV6_V6ONLY
@@ -1486,6 +1484,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
     if (nBytes == kSize) {
       if (connection == NULL) {
         // this is the command to stop our thread, exit the handler!
+        ioThread->breakLoop(false);
         return;
       }
       connection->transition();
@@ -1496,6 +1495,7 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void*
       return;
     } else if (nBytes == 0) {
       GlobalOutput.printf("notifyHandler: Notify socket closed!");
+      ioThread->breakLoop(false);
       // exit the loop
       break;
     } else { // nBytes < 0
@@ -1520,19 +1520,15 @@ void TNonblockingIOThread::breakLoop(bool error) {
     ::abort();
   }
 
-  // sets a flag so that the loop exits on the next event
-  event_base_loopbreak(eventBase_);
-
-  // event_base_loopbreak() only causes the loop to exit the next time
-  // it wakes up.  We need to force it to wake up, in case there are
-  // no real events it needs to process.
-  //
   // If we're running in the same thread, we can't use the notify(0)
   // mechanism to stop the thread, but happily if we're running in the
   // same thread, this means the thread can't be blocking in the event
   // loop either.
   if (!Thread::is_current(threadId_)) {
     notify(NULL);
+  } else {
+    // cause the loop to stop ASAP - even if it has things to do in it
+    event_base_loopbreak(eventBase_);
   }
 }
 
@@ -1566,24 +1562,26 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
 }
 
 void TNonblockingIOThread::run() {
-  if (eventBase_ == NULL)
+  if (eventBase_ == NULL) {
     registerEvents();
-
-  GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
-
+  }
   if (useHighPriority_) {
     setCurrentThreadHighPriority(true);
   }
 
-  // Run libevent engine, never returns, invokes calls to eventHandler
-  event_base_loop(eventBase_, 0);
+  if (eventBase_ != NULL)
+  {
+    GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
+    // Run libevent engine, never returns, invokes calls to eventHandler
+    event_base_loop(eventBase_, 0);
 
-  if (useHighPriority_) {
-    setCurrentThreadHighPriority(false);
-  }
+    if (useHighPriority_) {
+      setCurrentThreadHighPriority(false);
+    }
 
-  // cleans up our registered events
-  cleanupEvents();
+    // cleans up our registered events
+    cleanupEvents();
+  }
 
   GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/36d1b0de/lib/cpp/test/TNonblockingServerTest.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/test/TNonblockingServerTest.cpp b/lib/cpp/test/TNonblockingServerTest.cpp
index 48ea913..e933d6b 100644
--- a/lib/cpp/test/TNonblockingServerTest.cpp
+++ b/lib/cpp/test/TNonblockingServerTest.cpp
@@ -21,6 +21,7 @@
 #include <boost/test/unit_test.hpp>
 #include <boost/smart_ptr.hpp>
 
+#include "thrift/concurrency/Monitor.h"
 #include "thrift/concurrency/Thread.h"
 #include "thrift/server/TNonblockingServer.h"
 
@@ -29,6 +30,10 @@
 #include <event.h>
 
 using namespace apache::thrift;
+using apache::thrift::concurrency::Guard;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::server::TServerEventHandler;
 
 struct Handler : public test::ParentServiceIf {
   void addString(const std::string& s) { strings_.push_back(s); }
@@ -46,11 +51,31 @@ struct Handler : public test::ParentServiceIf {
 
 class Fixture {
 private:
+  struct ListenEventHandler : public TServerEventHandler {
+    public:
+      ListenEventHandler(Mutex* mutex) : listenMonitor_(mutex), ready_(false) {}
+
+      void preServe() /* override */ {
+        Guard g(listenMonitor_.mutex());
+        ready_ = true;
+        listenMonitor_.notify();
+      }
+
+      Monitor listenMonitor_;
+      bool ready_;
+  };
+
   struct Runner : public apache::thrift::concurrency::Runnable {
     int port;
     boost::shared_ptr<event_base> userEventBase;
     boost::shared_ptr<TProcessor> processor;
     boost::shared_ptr<server::TNonblockingServer> server;
+    boost::shared_ptr<ListenEventHandler> listenHandler;
+    Mutex mutex_;
+
+    Runner() {
+      listenHandler.reset(new ListenEventHandler(&mutex_));
+    }
 
     virtual void run() {
       // When binding to explicit port, allow retrying to workaround bind failures on ports in use
@@ -58,10 +83,18 @@ private:
       startServer(retryCount);
     }
 
+    void readyBarrier() {
+      // block until server is listening and ready to accept connections
+      Guard g(mutex_);
+      while (!listenHandler->ready_) {
+        listenHandler->listenMonitor_.wait();
+      }
+    }
   private:
     void startServer(int retry_count) {
       try {
         server.reset(new server::TNonblockingServer(processor, port));
+        server->setServerEventHandler(listenHandler);
         if (userEventBase) {
           server->registerEvents(userEventBase.get());
         }
@@ -112,8 +145,8 @@ protected:
             false));
     thread = threadFactory->newThread(runner);
     thread->start();
-    // wait 100 ms for the server to begin listening
-    THRIFT_SLEEP_USEC(100000);
+    runner->readyBarrier();
+
     server = runner->server;
     return runner->port;
   }