You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by ro...@apache.org on 2015/04/26 22:25:47 UTC

thrift git commit: THRIFT-3081 consolidate client processing loop in Simple, Threaded, and Thread Pool servers

Repository: thrift
Updated Branches:
  refs/heads/master 811d279d5 -> 5ec805b22


THRIFT-3081 consolidate client processing loop in Simple, Threaded, and Thread Pool servers


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/5ec805b2
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/5ec805b2
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/5ec805b2

Branch: refs/heads/master
Commit: 5ec805b22b81001b1b785cd7f85eb8647fde60df
Parents: 811d279
Author: Jim King <ji...@simplivity.com>
Authored: Sun Apr 26 07:52:40 2015 -0400
Committer: Roger Meier <ro...@apache.org>
Committed: Sun Apr 26 20:58:17 2015 +0200

----------------------------------------------------------------------
 lib/cpp/CMakeLists.txt                          |   1 +
 lib/cpp/Makefile.am                             |   1 +
 lib/cpp/src/thrift/server/TConnectedClient.cpp  | 121 +++++++++++++++++
 lib/cpp/src/thrift/server/TConnectedClient.h    | 116 +++++++++++++++++
 lib/cpp/src/thrift/server/TSimpleServer.cpp     |  56 +-------
 lib/cpp/src/thrift/server/TSimpleServer.h       |   2 +-
 lib/cpp/src/thrift/server/TThreadPoolServer.cpp |  84 ++----------
 lib/cpp/src/thrift/server/TThreadPoolServer.h   |  13 +-
 lib/cpp/src/thrift/server/TThreadedServer.cpp   | 129 ++++---------------
 lib/cpp/src/thrift/server/TThreadedServer.h     |  96 ++++++--------
 10 files changed, 333 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt
index a965593..c11fc56 100755
--- a/lib/cpp/CMakeLists.txt
+++ b/lib/cpp/CMakeLists.txt
@@ -54,6 +54,7 @@ set( thriftcpp_SOURCES
    src/thrift/transport/TServerSocket.cpp
    src/thrift/transport/TTransportUtils.cpp
    src/thrift/transport/TBufferTransports.cpp
+   src/thrift/server/TConnectedClient.cpp
    src/thrift/server/TServer.cpp
    src/thrift/server/TSimpleServer.cpp
    src/thrift/server/TThreadPoolServer.cpp

http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/Makefile.am
----------------------------------------------------------------------
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index e6a6015..cb30bda 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -89,6 +89,7 @@ libthrift_la_SOURCES = src/thrift/Thrift.cpp \
                        src/thrift/transport/TSSLServerSocket.cpp \
                        src/thrift/transport/TTransportUtils.cpp \
                        src/thrift/transport/TBufferTransports.cpp \
+                       src/thrift/server/TConnectedClient.cpp \
                        src/thrift/server/TServer.cpp \
                        src/thrift/server/TSimpleServer.cpp \
                        src/thrift/server/TThreadPoolServer.cpp \

http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TConnectedClient.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TConnectedClient.cpp b/lib/cpp/src/thrift/server/TConnectedClient.cpp
new file mode 100644
index 0000000..630c28e
--- /dev/null
+++ b/lib/cpp/src/thrift/server/TConnectedClient.cpp
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/server/TConnectedClient.h>
+
+namespace apache {
+namespace thrift {
+namespace server {
+
+using apache::thrift::TProcessor;
+using apache::thrift::protocol::TProtocol;
+using apache::thrift::server::TServerEventHandler;
+using apache::thrift::transport::TTransport;
+using apache::thrift::transport::TTransportException;
+using boost::shared_ptr;
+using std::string;
+
+TConnectedClient::TConnectedClient(const string& serverType,
+                                   const shared_ptr<TProcessor>& processor,
+                                   const shared_ptr<TProtocol>& inputProtocol,
+                                   const shared_ptr<TProtocol>& outputProtocol,
+                                   const shared_ptr<TServerEventHandler>& eventHandler,
+                                   const shared_ptr<TTransport>& client)
+                        
+  : serverType_(serverType),
+    processor_(processor),
+    inputProtocol_(inputProtocol),
+    outputProtocol_(outputProtocol),
+    eventHandler_(eventHandler),
+    client_(client),
+    opaqueContext_(0) {}
+
+TConnectedClient::~TConnectedClient() {}
+
+void TConnectedClient::run() {
+  if (eventHandler_) {
+    opaqueContext_ = eventHandler_->createContext(inputProtocol_, outputProtocol_);
+  }
+
+  for (;;) {
+    if (eventHandler_) {
+      eventHandler_->processContext(opaqueContext_, client_);
+    }
+
+    try {
+      if (!processor_->process(inputProtocol_, outputProtocol_, opaqueContext_)) {
+        break;
+      }
+    } catch (const TTransportException& ttx) {
+      if (ttx.getType() == TTransportException::TIMED_OUT) {
+        // Receive timeout - continue processing.
+        continue;
+      } else if (ttx.getType() == TTransportException::END_OF_FILE ||
+                 ttx.getType() == TTransportException::INTERRUPTED) {
+        // Client disconnected or was interrupted.  No logging needed.  Done.
+        break;
+      } else {
+        // All other transport exceptions are logged.
+        // State of connection is unknown.  Done.
+        string errStr = (serverType_ + " client died: ") + ttx.what();
+        GlobalOutput(errStr.c_str());
+        break;
+      }
+    } catch (const TException& tex) {
+      // Some protocols throw this after they send an error response to the client
+      // They should be trained to return true instead and if they want to log,
+      // then they should log.
+      string errStr = (serverType_ + " processing exception: ") + tex.what();
+      GlobalOutput(errStr.c_str());
+      // Continue processing
+    }
+  }
+
+  cleanup();
+}
+
+void TConnectedClient::cleanup()
+{
+  if (eventHandler_) {
+    eventHandler_->deleteContext(opaqueContext_, inputProtocol_, outputProtocol_);
+  }
+
+  try {
+    inputProtocol_->getTransport()->close();
+  } catch (const TTransportException& ttx) {
+    string errStr = string(serverType_ + " input close failed: ") + ttx.what();
+    GlobalOutput(errStr.c_str());
+  }
+  try {
+    outputProtocol_->getTransport()->close();
+  } catch (const TTransportException& ttx) {
+    string errStr = string(serverType_ + " output close failed: ") + ttx.what();
+    GlobalOutput(errStr.c_str());
+  }
+  try {
+    client_->close();
+  } catch (const TTransportException& ttx) {
+    string errStr = string(serverType_ + " client close failed: ") + ttx.what();
+    GlobalOutput(errStr.c_str());
+  }
+}
+
+}
+}
+} // apache::thrift::server

http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TConnectedClient.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TConnectedClient.h b/lib/cpp/src/thrift/server/TConnectedClient.h
new file mode 100644
index 0000000..6304398
--- /dev/null
+++ b/lib/cpp/src/thrift/server/TConnectedClient.h
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_SERVER_TCONNECTEDCLIENT_H_
+#define _THRIFT_SERVER_TCONNECTEDCLIENT_H_ 1
+
+#include <boost/shared_ptr.hpp>
+#include <thrift/TProcessor.h>
+#include <thrift/protocol/TProtocol.h>
+#include <thrift/server/TServer.h>
+#include <thrift/transport/TTransport.h>
+
+namespace apache {
+namespace thrift {
+namespace server {
+
+/**
+ * This represents a client connected to a TServer.  The
+ * processing loop for a client must provide some required
+ * functionality common to all implementations so it is
+ * encapsulated here.
+ */
+
+class TConnectedClient : public apache::thrift::concurrency::Runnable
+{
+  public:
+    /**
+     * Constructor.
+     *
+     * @param[in] serverType     the server type as a string, used
+     *                           for logging output.
+     * @param[in] processor      the TProcessor
+     * @param[in] inputProtocol  the input TProtocol
+     * @param[in] outputProtocol the output TProtocol
+     * @param[in] eventHandler   the server event handler
+     * @param[in] client         the TTransport representing the client
+     */
+    TConnectedClient(
+            const std::string& serverType,
+            const boost::shared_ptr<apache::thrift::TProcessor>& processor,
+            const boost::shared_ptr<apache::thrift::protocol::TProtocol>& inputProtocol,
+            const boost::shared_ptr<apache::thrift::protocol::TProtocol>& outputProtocol,
+            const boost::shared_ptr<apache::thrift::server::TServerEventHandler>& eventHandler,
+            const boost::shared_ptr<apache::thrift::transport::TTransport>& client);
+
+    /**
+     * Destructor.
+     */
+    virtual ~TConnectedClient();
+
+    /**
+     * Drive the client until it is done.
+     * The client processing loop is:
+     *
+     * [optional] call eventHandler->createContext once
+     * [optional] call eventHandler->processContext per request
+     *            call processor->process per request
+     *              handle expected transport exceptions:
+     *                END_OF_FILE means the client is gone
+     *                INTERRUPTED means the client was interrupted
+     *                            by TServerTransport::interruptChildren()
+     *              handle unexpected transport exceptions by logging
+     *              handle standard exceptions by logging
+     *              handle unexpected exceptions by logging
+     *            cleanup()
+     */
+    virtual void run() /* override */;
+
+  protected:
+    /**
+     * Cleanup after a client.  This happens if the client disconnects,
+     * or if the server is stopped, or if an exception occurs.
+     *
+     * The cleanup processing is:
+     * [optional] call eventHandler->deleteContext once
+     *            close the inputProtocol's TTransport
+     *            close the outputProtocol's TTransport
+     *            close the client
+     */
+    virtual void cleanup();
+
+  private:
+    std::string serverType_;
+    boost::shared_ptr<apache::thrift::TProcessor> processor_;
+    boost::shared_ptr<apache::thrift::protocol::TProtocol> inputProtocol_;
+    boost::shared_ptr<apache::thrift::protocol::TProtocol> outputProtocol_;
+    boost::shared_ptr<apache::thrift::server::TServerEventHandler> eventHandler_;
+    boost::shared_ptr<apache::thrift::transport::TTransport> client_;
+
+    /**
+     * Context acquired from the eventHandler_ if one exists.
+     */
+    void *opaqueContext_;
+};
+
+}
+}
+}
+
+#endif // #ifndef _THRIFT_SERVER_TCONNECTEDCLIENT_H_

http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TSimpleServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp
index 19f44ac..b63c45e 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.cpp
+++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+#include <thrift/server/TConnectedClient.h>
 #include <thrift/server/TSimpleServer.h>
 #include <thrift/transport/TTransportException.h>
 #include <string>
@@ -103,58 +104,9 @@ void TSimpleServer::serve() {
       break;
     }
 
-    // Get the processor
-    shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
-
-    void* connectionContext = NULL;
-    if (eventHandler_) {
-      connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol);
-    }
-    try {
-      for (;;) {
-        if (eventHandler_) {
-          eventHandler_->processContext(connectionContext, client);
-        }
-        if (!processor->process(inputProtocol, outputProtocol, connectionContext) ||
-            // Peek ahead, is the remote side closed?
-            !inputProtocol->getTransport()->peek()) {
-          break;
-        }
-      }
-    } catch (const TTransportException& ttx) {
-      if (ttx.getType() != TTransportException::END_OF_FILE &&
-          ttx.getType() != TTransportException::INTERRUPTED)
-      {
-        string errStr = string("TSimpleServer client died: ") + ttx.what();
-        GlobalOutput(errStr.c_str());
-      }
-    } catch (const std::exception& x) {
-      GlobalOutput.printf("TSimpleServer exception: %s: %s", typeid(x).name(), x.what());
-    } catch (...) {
-      GlobalOutput("TSimpleServer uncaught exception.");
-    }
-    if (eventHandler_) {
-      eventHandler_->deleteContext(connectionContext, inputProtocol, outputProtocol);
-    }
-
-    try {
-      inputTransport->close();
-    } catch (const TTransportException& ttx) {
-      string errStr = string("TSimpleServer input close failed: ") + ttx.what();
-      GlobalOutput(errStr.c_str());
-    }
-    try {
-      outputTransport->close();
-    } catch (const TTransportException& ttx) {
-      string errStr = string("TSimpleServer output close failed: ") + ttx.what();
-      GlobalOutput(errStr.c_str());
-    }
-    try {
-      client->close();
-    } catch (const TTransportException& ttx) {
-      string errStr = string("TSimpleServer client close failed: ") + ttx.what();
-      GlobalOutput(errStr.c_str());
-    }
+    TConnectedClient("TSimpleServer",
+            getProcessor(inputProtocol, outputProtocol, client),
+            inputProtocol, outputProtocol, eventHandler_, client).run();
   }
 
   if (stop_) {

http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TSimpleServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h
index 941f12b..7b8677d 100644
--- a/lib/cpp/src/thrift/server/TSimpleServer.h
+++ b/lib/cpp/src/thrift/server/TSimpleServer.h
@@ -94,7 +94,7 @@ public:
   void serve();
 
   /**
-   * Interrupt serve() so that it meets post-conditions.
+   * Interrupt serve() so that it meets post-conditions and returns.
    */
   void stop();
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index 58cfe3e..f8ed6cf 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -19,12 +19,14 @@
 
 #include <thrift/thrift-config.h>
 
+#include <thrift/server/TConnectedClient.h>
 #include <thrift/server/TThreadPoolServer.h>
 #include <thrift/transport/TTransportException.h>
 #include <thrift/concurrency/Thread.h>
 #include <thrift/concurrency/ThreadManager.h>
 #include <string>
 #include <iostream>
+#include <boost/make_shared.hpp>
 
 namespace apache {
 namespace thrift {
@@ -37,78 +39,6 @@ using namespace apache::thrift::concurrency;
 using namespace apache::thrift::protocol;
 using namespace apache::thrift::transport;
 
-class TThreadPoolServer::Task : public Runnable {
-
-public:
-  Task(TThreadPoolServer& server,
-       shared_ptr<TProcessor> processor,
-       shared_ptr<TProtocol> input,
-       shared_ptr<TProtocol> output,
-       shared_ptr<TTransport> transport)
-    : server_(server),
-      processor_(processor),
-      input_(input),
-      output_(output),
-      transport_(transport) {}
-
-  ~Task() {}
-
-  void run() {
-    boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
-    void* connectionContext = NULL;
-    if (eventHandler) {
-      connectionContext = eventHandler->createContext(input_, output_);
-    }
-    try {
-      for (;;) {
-        if (eventHandler) {
-          eventHandler->processContext(connectionContext, transport_);
-        }
-        if (!processor_->process(input_, output_, connectionContext)
-            || !input_->getTransport()->peek()) {
-          break;
-        }
-      }
-    } catch (const TTransportException& ttx) {
-      if (ttx.getType() != TTransportException::END_OF_FILE &&
-          ttx.getType() != TTransportException::INTERRUPTED) {
-        string errStr = string("TThreadPoolServer::Task client died: ") + ttx.what();
-        GlobalOutput(errStr.c_str());
-      }
-    } catch (const std::exception& x) {
-      GlobalOutput.printf("TThreadPoolServer exception %s: %s", typeid(x).name(), x.what());
-    } catch (...) {
-      GlobalOutput(
-          "TThreadPoolServer, unexpected exception in "
-          "TThreadPoolServer::Task::run()");
-    }
-
-    if (eventHandler) {
-      eventHandler->deleteContext(connectionContext, input_, output_);
-    }
-
-    try {
-      input_->getTransport()->close();
-    } catch (TTransportException& ttx) {
-      string errStr = string("TThreadPoolServer input close failed: ") + ttx.what();
-      GlobalOutput(errStr.c_str());
-    }
-    try {
-      output_->getTransport()->close();
-    } catch (TTransportException& ttx) {
-      string errStr = string("TThreadPoolServer output close failed: ") + ttx.what();
-      GlobalOutput(errStr.c_str());
-    }
-  }
-
-private:
-  TServer& server_;
-  shared_ptr<TProcessor> processor_;
-  shared_ptr<TProtocol> input_;
-  shared_ptr<TProtocol> output_;
-  shared_ptr<TTransport> transport_;
-};
-
 TThreadPoolServer::~TThreadPoolServer() {}
 
 void TThreadPoolServer::serve() {
@@ -146,9 +76,13 @@ void TThreadPoolServer::serve() {
       shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
 
       // Add to threadmanager pool
-      shared_ptr<TThreadPoolServer::Task> task(
-          new TThreadPoolServer::Task(*this, processor, inputProtocol, outputProtocol, client));
-      threadManager_->add(task, timeout_, taskExpiration_);
+      threadManager_->add(
+              boost::make_shared<TConnectedClient>(
+                      "TThreadPoolServer",
+                      getProcessor(inputProtocol, outputProtocol, client),
+                      inputProtocol, outputProtocol, eventHandler_, client),
+              timeout_,
+              taskExpiration_);
 
     } catch (TTransportException& ttx) {
       if (inputTransport) {

http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TThreadPoolServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h
index 1696700..2f93463 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.h
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h
@@ -37,8 +37,6 @@ using apache::thrift::transport::TTransportFactory;
 
 class TThreadPoolServer : public TServer {
 public:
-  class Task;
-
   template <typename ProcessorFactory>
   TThreadPoolServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
                     const boost::shared_ptr<TServerTransport>& serverTransport,
@@ -107,8 +105,19 @@ public:
 
   virtual ~TThreadPoolServer();
 
+  /**
+   * Process all connections that arrive using a thread pool.
+   * Call stop() on another thread to interrupt processing and
+   * return control to the caller.
+   * Post-conditions (return guarantees):
+   *   The serverTransport will be closed.
+   *   There will be no connected clients.
+   */
   virtual void serve();
 
+  /**
+   * Interrupt serve() so that it meets post-conditions and returns.
+   */
   virtual void stop();
 
   virtual int64_t getTimeout() const;

http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TThreadedServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 118c9cb..4dcdb44 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+#include <boost/bind.hpp>
+#include <thrift/server/TConnectedClient.h>
 #include <thrift/server/TThreadedServer.h>
 #include <thrift/transport/TTransportException.h>
 #include <thrift/concurrency/PlatformThreadFactory.h>
@@ -39,94 +41,6 @@ using namespace apache::thrift::protocol;
 using namespace apache::thrift::transport;
 using namespace apache::thrift::concurrency;
 
-class TThreadedServer::Task : public Runnable {
-
-public:
-  Task(TThreadedServer& server,
-       shared_ptr<TProcessor> processor,
-       shared_ptr<TProtocol> input,
-       shared_ptr<TProtocol> output,
-       shared_ptr<TTransport> transport)
-    : server_(server),
-      processor_(processor),
-      input_(input),
-      output_(output),
-      transport_(transport) {}
-
-  ~Task() {}
-
-  void run() {
-    boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
-    void* connectionContext = NULL;
-    if (eventHandler) {
-      connectionContext = eventHandler->createContext(input_, output_);
-    }
-    try {
-      for (;;) {
-        if (eventHandler) {
-          eventHandler->processContext(connectionContext, transport_);
-        }
-        if (!processor_->process(input_, output_, connectionContext)
-            || !input_->getTransport()->peek()) {
-          break;
-        }
-      }
-    } catch (const TTransportException& ttx) {
-      if (ttx.getType() != TTransportException::END_OF_FILE &&
-          ttx.getType() != TTransportException::INTERRUPTED) {
-        string errStr = string("TThreadedServer client died: ") + ttx.what();
-        GlobalOutput(errStr.c_str());
-      }
-    } catch (const std::exception& x) {
-      GlobalOutput.printf("TThreadedServer exception: %s: %s", typeid(x).name(), x.what());
-    } catch (...) {
-      GlobalOutput("TThreadedServer uncaught exception.");
-    }
-    if (eventHandler) {
-      eventHandler->deleteContext(connectionContext, input_, output_);
-    }
-
-    try {
-      input_->getTransport()->close();
-    } catch (TTransportException& ttx) {
-      string errStr = string("TThreadedServer input close failed: ") + ttx.what();
-      GlobalOutput(errStr.c_str());
-    }
-    try {
-      output_->getTransport()->close();
-    } catch (TTransportException& ttx) {
-      string errStr = string("TThreadedServer output close failed: ") + ttx.what();
-      GlobalOutput(errStr.c_str());
-    }
-
-    // Remove this task from parent bookkeeping
-    {
-      Synchronized s(server_.tasksMonitor_);
-      server_.tasks_.erase(this);
-      if (server_.tasks_.empty()) {
-        server_.tasksMonitor_.notify();
-      }
-    }
-  }
-
-private:
-  TThreadedServer& server_;
-  friend class TThreadedServer;
-
-  shared_ptr<TProcessor> processor_;
-  shared_ptr<TProtocol> input_;
-  shared_ptr<TProtocol> output_;
-  shared_ptr<TTransport> transport_;
-};
-
-void TThreadedServer::init() {
-  stop_ = false;
-
-  if (!threadFactory_) {
-    threadFactory_.reset(new PlatformThreadFactory);
-  }
-}
-
 TThreadedServer::~TThreadedServer() {}
 
 void TThreadedServer::serve() {
@@ -162,21 +76,19 @@ void TThreadedServer::serve() {
       inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
       outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
 
-      shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client);
-
-      TThreadedServer::Task* task
-          = new TThreadedServer::Task(*this, processor, inputProtocol, outputProtocol, client);
+      shared_ptr<TConnectedClient> pClient(
+              new TConnectedClient("TThreadedServer",
+                      getProcessor(inputProtocol, outputProtocol, client),
+                      inputProtocol, outputProtocol, eventHandler_, client),
+              boost::bind(&TThreadedServer::disposeClient, this, _1));
 
-      // Create a task
-      shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task);
-
-      // Create a thread for this task
-      shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable));
+      // Create a thread for this client
+      shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient));
 
       // Insert thread into the set of threads
       {
-        Synchronized s(tasksMonitor_);
-        tasks_.insert(task);
+        Synchronized s(clientsMonitor_);
+        clients_.insert(pClient.get());
       }
 
       // Start the thread!
@@ -235,9 +147,9 @@ void TThreadedServer::serve() {
       GlobalOutput(errStr.c_str());
     }
     try {
-      Synchronized s(tasksMonitor_);
-      while (!tasks_.empty()) {
-        tasksMonitor_.wait();
+      Synchronized s(clientsMonitor_);
+      while (!clients_.empty()) {
+          clientsMonitor_.wait();
       }
     } catch (TException& tx) {
       string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what();
@@ -254,6 +166,19 @@ void TThreadedServer::stop() {
 	serverTransport_->interruptChildren();
   }
 }
+
+void TThreadedServer::disposeClient(TConnectedClient *pClient) {
+  // Remove this task from parent bookkeeping
+  {
+    Synchronized s(clientsMonitor_);
+    clients_.erase(pClient);
+    if (clients_.empty()) {
+        clientsMonitor_.notify();
+    }
+  }
+  delete pClient;
+}
+
 }
 }
 } // apache::thrift::server

http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TThreadedServer.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index b9b24fe..5d510d6 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -20,9 +20,11 @@
 #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_
 #define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1
 
+#include <set>
 #include <thrift/server/TServer.h>
 #include <thrift/transport/TServerTransport.h>
 #include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
 #include <thrift/concurrency/Thread.h>
 
 #include <boost/shared_ptr.hpp>
@@ -35,19 +37,22 @@ using apache::thrift::TProcessor;
 using apache::thrift::transport::TServerTransport;
 using apache::thrift::transport::TTransportFactory;
 using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::PlatformThreadFactory;
 using apache::thrift::concurrency::ThreadFactory;
 
-class TThreadedServer : public TServer {
+class TConnectedClient;
 
+class TThreadedServer : public TServer {
 public:
-  class Task;
-
   template <typename ProcessorFactory>
   TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
                   const boost::shared_ptr<TServerTransport>& serverTransport,
                   const boost::shared_ptr<TTransportFactory>& transportFactory,
                   const boost::shared_ptr<TProtocolFactory>& protocolFactory,
-                  THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
+                  THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
+    : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
+      threadFactory_(new PlatformThreadFactory),
+      stop_(false) {}
 
   template <typename ProcessorFactory>
   TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
@@ -55,14 +60,20 @@ public:
                   const boost::shared_ptr<TTransportFactory>& transportFactory,
                   const boost::shared_ptr<TProtocolFactory>& protocolFactory,
                   const boost::shared_ptr<ThreadFactory>& threadFactory,
-                  THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
+                  THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory))
+    : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
+      threadFactory_(threadFactory),
+      stop_(false) {}
 
   template <typename Processor>
   TThreadedServer(const boost::shared_ptr<Processor>& processor,
                   const boost::shared_ptr<TServerTransport>& serverTransport,
                   const boost::shared_ptr<TTransportFactory>& transportFactory,
                   const boost::shared_ptr<TProtocolFactory>& protocolFactory,
-                  THRIFT_OVERLOAD_IF(Processor, TProcessor));
+                  THRIFT_OVERLOAD_IF(Processor, TProcessor))
+    : TServer(processor, serverTransport, transportFactory, protocolFactory),
+      threadFactory_(new PlatformThreadFactory),
+      stop_(false) {}
 
   template <typename Processor>
   TThreadedServer(const boost::shared_ptr<Processor>& processor,
@@ -70,66 +81,43 @@ public:
                   const boost::shared_ptr<TTransportFactory>& transportFactory,
                   const boost::shared_ptr<TProtocolFactory>& protocolFactory,
                   const boost::shared_ptr<ThreadFactory>& threadFactory,
-                  THRIFT_OVERLOAD_IF(Processor, TProcessor));
+                  THRIFT_OVERLOAD_IF(Processor, TProcessor))
+    : TServer(processor, serverTransport, transportFactory, protocolFactory),
+      threadFactory_(threadFactory),
+      stop_(false) {}
 
   virtual ~TThreadedServer();
 
+  /**
+   * Process all connections that arrive, each on their own
+   * dedicated thread.  There is no limit to the number of
+   * threads or connections (see THRIFT-3084).
+   * Call stop() on another thread to interrupt processing and
+   * return control to the caller.
+   * Post-conditions (return guarantees):
+   *   The serverTransport will be closed.
+   *   There will be no connected clients.
+   */
   virtual void serve();
-  void stop();
+
+  /**
+   * Interrupt serve() so that it meets post-conditions and returns.
+   */
+  virtual void stop();
 
 protected:
-  void init();
+  /**
+   * Smart pointer release method
+   */
+  virtual void disposeClient(TConnectedClient *pClient);
 
   boost::shared_ptr<ThreadFactory> threadFactory_;
   volatile bool stop_;
 
-  Monitor tasksMonitor_;
-  std::set<Task*> tasks_;
+  Monitor clientsMonitor_;
+  std::set<TConnectedClient*> clients_;
 };
 
-template <typename ProcessorFactory>
-TThreadedServer::TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
-                                 const boost::shared_ptr<TServerTransport>& serverTransport,
-                                 const boost::shared_ptr<TTransportFactory>& transportFactory,
-                                 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
-                                 THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
-  : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {
-  init();
-}
-
-template <typename ProcessorFactory>
-TThreadedServer::TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
-                                 const boost::shared_ptr<TServerTransport>& serverTransport,
-                                 const boost::shared_ptr<TTransportFactory>& transportFactory,
-                                 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
-                                 const boost::shared_ptr<ThreadFactory>& threadFactory,
-                                 THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
-  : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
-    threadFactory_(threadFactory) {
-  init();
-}
-
-template <typename Processor>
-TThreadedServer::TThreadedServer(const boost::shared_ptr<Processor>& processor,
-                                 const boost::shared_ptr<TServerTransport>& serverTransport,
-                                 const boost::shared_ptr<TTransportFactory>& transportFactory,
-                                 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
-                                 THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
-  : TServer(processor, serverTransport, transportFactory, protocolFactory) {
-  init();
-}
-
-template <typename Processor>
-TThreadedServer::TThreadedServer(const boost::shared_ptr<Processor>& processor,
-                                 const boost::shared_ptr<TServerTransport>& serverTransport,
-                                 const boost::shared_ptr<TTransportFactory>& transportFactory,
-                                 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
-                                 const boost::shared_ptr<ThreadFactory>& threadFactory,
-                                 THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
-  : TServer(processor, serverTransport, transportFactory, protocolFactory),
-    threadFactory_(threadFactory) {
-  init();
-}
 }
 }
 } // apache::thrift::server