You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2016/07/08 20:07:13 UTC

thrift git commit: Revert "THRIFT-3768 fix TThreadedServer refactoring issues with client lifetime guarantees"

Repository: thrift
Updated Branches:
  refs/heads/master 0e9fed1e1 -> e5fbedd29


Revert "THRIFT-3768 fix TThreadedServer refactoring issues with client lifetime guarantees"

This reverts commit 0b433de5d5c7454f5410ac7b3d1ac86a07d1beef.


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/e5fbedd2
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/e5fbedd2
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/e5fbedd2

Branch: refs/heads/master
Commit: e5fbedd29f9c4f4889530c26dae37f07a04b2189
Parents: 0e9fed1
Author: Jens Geyer <je...@apache.org>
Authored: Fri Jul 8 22:00:37 2016 +0200
Committer: Jens Geyer <je...@apache.org>
Committed: Fri Jul 8 22:00:37 2016 +0200

----------------------------------------------------------------------
 .../src/thrift/concurrency/ThreadManager.cpp    |  1 +
 lib/cpp/src/thrift/server/TServerFramework.cpp  | 26 ++++---
 lib/cpp/src/thrift/server/TThreadedServer.cpp   | 71 +++++++++++---------
 lib/cpp/src/thrift/server/TThreadedServer.h     | 26 +++----
 4 files changed, 67 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/e5fbedd2/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
index 8f74fa6..24bfeec 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -544,6 +544,7 @@ public:
 private:
   const size_t workerCount_;
   const size_t pendingTaskCountMax_;
+  Monitor monitor_;
 };
 
 shared_ptr<ThreadManager> ThreadManager::newThreadManager() {

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5fbedd2/lib/cpp/src/thrift/server/TServerFramework.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp
index ea78bcd..56b6cca 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.cpp
+++ b/lib/cpp/src/thrift/server/TServerFramework.cpp
@@ -221,27 +221,25 @@ void TServerFramework::stop() {
 }
 
 void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) {
-  {
-    // Count a concurrent client added.
-    Synchronized sync(mon_);
-    ++clients_;
-    hwm_ = (std::max)(hwm_, clients_);
-  }
-
   onClientConnected(pClient);
+
+  // Count a concurrent client added.
+  Synchronized sync(mon_);
+  ++clients_;
+  hwm_ = (std::max)(hwm_, clients_);
 }
 
 void TServerFramework::disposeConnectedClient(TConnectedClient* pClient) {
+  {
+    // Count a concurrent client removed.
+    Synchronized sync(mon_);
+    if (limit_ - --clients_ > 0) {
+      mon_.notify();
+    }
+  }
   onClientDisconnected(pClient);
   delete pClient;
-
-  // Count a concurrent client removed.
-  Synchronized sync(mon_);
-  if (limit_ - --clients_ > 0) {
-    mon_.notify();
-  }
 }
-
 }
 }
 } // apache::thrift::server

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5fbedd2/lib/cpp/src/thrift/server/TThreadedServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 36d1fc4..92f5cf8 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -41,10 +41,8 @@ TThreadedServer::TThreadedServer(const shared_ptr<TProcessorFactory>& processorF
                                  const shared_ptr<TTransportFactory>& transportFactory,
                                  const shared_ptr<TProtocolFactory>& protocolFactory,
                                  const shared_ptr<ThreadFactory>& threadFactory)
-  : TThreadPoolServer(processorFactory, serverTransport, transportFactory, protocolFactory,
-          apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
-    threadManager_->threadFactory(threadFactory);
-    threadManager_->start();
+  : TServerFramework(processorFactory, serverTransport, transportFactory, protocolFactory),
+    threadFactory_(threadFactory) {
 }
 
 TThreadedServer::TThreadedServer(const shared_ptr<TProcessor>& processor,
@@ -52,10 +50,8 @@ TThreadedServer::TThreadedServer(const shared_ptr<TProcessor>& processor,
                                  const shared_ptr<TTransportFactory>& transportFactory,
                                  const shared_ptr<TProtocolFactory>& protocolFactory,
                                  const shared_ptr<ThreadFactory>& threadFactory)
-  : TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory,
-          apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
-    threadManager_->threadFactory(threadFactory);
-    threadManager_->start();
+  : TServerFramework(processor, serverTransport, transportFactory, protocolFactory),
+    threadFactory_(threadFactory) {
 }
 
 TThreadedServer::TThreadedServer(const shared_ptr<TProcessorFactory>& processorFactory,
@@ -65,15 +61,13 @@ TThreadedServer::TThreadedServer(const shared_ptr<TProcessorFactory>& processorF
                                  const shared_ptr<TProtocolFactory>& inputProtocolFactory,
                                  const shared_ptr<TProtocolFactory>& outputProtocolFactory,
                                  const shared_ptr<ThreadFactory>& threadFactory)
-  : TThreadPoolServer(processorFactory,
-                      serverTransport,
-                      inputTransportFactory,
-                      outputTransportFactory,
-                      inputProtocolFactory,
-                      outputProtocolFactory,
-                      apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
-    threadManager_->threadFactory(threadFactory);
-    threadManager_->start();
+  : TServerFramework(processorFactory,
+                     serverTransport,
+                     inputTransportFactory,
+                     outputTransportFactory,
+                     inputProtocolFactory,
+                     outputProtocolFactory),
+    threadFactory_(threadFactory) {
 }
 
 TThreadedServer::TThreadedServer(const shared_ptr<TProcessor>& processor,
@@ -83,29 +77,44 @@ TThreadedServer::TThreadedServer(const shared_ptr<TProcessor>& processor,
                                  const shared_ptr<TProtocolFactory>& inputProtocolFactory,
                                  const shared_ptr<TProtocolFactory>& outputProtocolFactory,
                                  const shared_ptr<ThreadFactory>& threadFactory)
-  : TThreadPoolServer(processor,
-                      serverTransport,
-                      inputTransportFactory,
-                      outputTransportFactory,
-                      inputProtocolFactory,
-                      outputProtocolFactory,
-                      apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(0, 0)) {
-    threadManager_->threadFactory(threadFactory);
-    threadManager_->start();
+  : TServerFramework(processor,
+                     serverTransport,
+                     inputTransportFactory,
+                     outputTransportFactory,
+                     inputProtocolFactory,
+                     outputProtocolFactory),
+    threadFactory_(threadFactory) {
 }
 
 TThreadedServer::~TThreadedServer() {
 }
 
-void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
-  if (!threadManager_->idleWorkerCount())
-  {
-    threadManager_->addWorker();
+void TThreadedServer::serve() {
+  TServerFramework::serve();
+
+  // Drain all clients - no more will arrive
+  try {
+    Synchronized s(clientsMonitor_);
+    while (getConcurrentClientCount() > 0) {
+      clientsMonitor_.wait();
+    }
+  } catch (TException& tx) {
+    string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
+    GlobalOutput(errStr.c_str());
   }
+}
 
-  TThreadPoolServer::onClientConnected(pClient);
+void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
+  threadFactory_->newThread(pClient)->start();
 }
 
+void TThreadedServer::onClientDisconnected(TConnectedClient* pClient) {
+  THRIFT_UNUSED_VARIABLE(pClient);
+  Synchronized s(clientsMonitor_);
+  if (getConcurrentClientCount() == 0) {
+    clientsMonitor_.notify();
+  }
+}
 }
 }
 } // apache::thrift::server

http://git-wip-us.apache.org/repos/asf/thrift/blob/e5fbedd2/lib/cpp/src/thrift/server/TThreadedServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index a5b005a..cdacfd7 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -20,26 +20,19 @@
 #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
 #define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
 
+#include <thrift/concurrency/Monitor.h>
 #include <thrift/concurrency/PlatformThreadFactory.h>
 #include <thrift/concurrency/Thread.h>
-#include <thrift/server/TThreadPoolServer.h>
+#include <thrift/server/TServerFramework.h>
 
 namespace apache {
 namespace thrift {
 namespace server {
 
 /**
- * Manage clients using threads.  Once the refactoring for THRIFT-3083 took place it became
- * obvious that the differences between the two threaded server types was becoming insignificant.
- * Therefore to satisfy THRIFT-3096 and fix THRIFT-3768, TThreadedServer is simply a wrapper
- * around TThreadedPoolServer now.  If backwards compatibility was not a concern, it would have
- * been removed.
- *
- * The default thread pool size is
+ * Manage clients using a thread pool.
  */
-
-/* [[deprecated]] */
-class TThreadedServer : public TThreadPoolServer {
+class TThreadedServer : public TServerFramework {
 public:
   TThreadedServer(
       const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
@@ -83,10 +76,19 @@ public:
 
   virtual ~TThreadedServer();
 
+  /**
+   * Post-conditions (return guarantees):
+   *   There will be no clients connected.
+   */
+  virtual void serve();
+
 protected:
   virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */;
-};
+  virtual void onClientDisconnected(TConnectedClient* pClient) /* override */;
 
+  boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_;
+  apache::thrift::concurrency::Monitor clientsMonitor_;
+};
 }
 }
 } // apache::thrift::server