You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by br...@apache.org on 2011/09/01 20:06:20 UTC

svn commit: r1164190 - in /thrift/trunk/lib/cpp/src: TProcessor.h server/TNonblockingServer.cpp server/TServer.h server/TSimpleServer.cpp server/TThreadPoolServer.cpp server/TThreadedServer.cpp

Author: bryanduxbury
Date: Thu Sep  1 18:06:20 2011
New Revision: 1164190

URL: http://svn.apache.org/viewvc?rev=1164190&view=rev
Log:
THRIFT-1314. cpp: add TProcessorFactory

Patch: Adam Simpkins

Modified:
    thrift/trunk/lib/cpp/src/TProcessor.h
    thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
    thrift/trunk/lib/cpp/src/server/TServer.h
    thrift/trunk/lib/cpp/src/server/TSimpleServer.cpp
    thrift/trunk/lib/cpp/src/server/TThreadPoolServer.cpp
    thrift/trunk/lib/cpp/src/server/TThreadedServer.cpp

Modified: thrift/trunk/lib/cpp/src/TProcessor.h
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/cpp/src/TProcessor.h?rev=1164190&r1=1164189&r2=1164190&view=diff
==============================================================================
--- thrift/trunk/lib/cpp/src/TProcessor.h (original)
+++ thrift/trunk/lib/cpp/src/TProcessor.h Thu Sep  1 18:06:20 2011
@@ -189,6 +189,45 @@ class ReleaseHandler {
    boost::shared_ptr<HandlerFactory_> handlerFactory_;
 };
 
+struct TConnectionInfo {
+  // The input and output protocols
+  boost::shared_ptr<protocol::TProtocol> input;
+  boost::shared_ptr<protocol::TProtocol> output;
+  // The underlying transport used for the connection
+  // This is the transport that was returned by TServerTransport::accept(),
+  // and it may be different than the transport pointed to by the input and
+  // output protocols.
+  boost::shared_ptr<transport::TTransport> transport;
+};
+
+class TProcessorFactory {
+ public:
+  virtual ~TProcessorFactory() {}
+
+  /**
+   * Get the TProcessor to use for a particular connection.
+   *
+   * This method is always invoked in the same thread that the connection was
+   * accepted on.  This generally means that this call does not need to be
+   * thread safe, as it will always be invoked from a single thread.
+   */
+  virtual boost::shared_ptr<TProcessor> getProcessor(
+      const TConnectionInfo& connInfo) = 0;
+};
+
+class TSingletonProcessorFactory : public TProcessorFactory {
+ public:
+  TSingletonProcessorFactory(boost::shared_ptr<TProcessor> processor) :
+      processor_(processor) {}
+
+  boost::shared_ptr<TProcessor> getProcessor(const TConnectionInfo&) {
+    return processor_;
+  }
+
+ private:
+  boost::shared_ptr<TProcessor> processor_;
+};
+
 }} // apache::thrift
 
 #endif // #ifndef _THRIFT_TPROCESSOR_H_

Modified: thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp?rev=1164190&r1=1164189&r2=1164190&view=diff
==============================================================================
--- thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp (original)
+++ thrift/trunk/lib/cpp/src/server/TNonblockingServer.cpp Thu Sep  1 18:06:20 2011
@@ -88,6 +88,9 @@ class TNonblockingServer::TConnection {
   /// Server handle
   TNonblockingServer* server_;
 
+  /// TProcessor
+  boost::shared_ptr<TProcessor> processor_;
+
   /// Object wrapping network socket
   boost::shared_ptr<TSocket> tSocket_;
 
@@ -420,6 +423,9 @@ void TNonblockingServer::TConnection::in
   } else {
     connectionContext_ = NULL;
   }
+
+  // Get the processor
+  processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
 }
 
 void TNonblockingServer::TConnection::workSocket() {
@@ -572,7 +578,7 @@ void TNonblockingServer::TConnection::tr
 
       // Create task and dispatch to the thread manager
       boost::shared_ptr<Runnable> task =
-        boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+        boost::shared_ptr<Runnable>(new Task(processor_,
                                              inputProtocol_,
                                              outputProtocol_,
                                              this));
@@ -595,8 +601,8 @@ void TNonblockingServer::TConnection::tr
     } else {
       try {
         // Invoke the processor
-        server_->getProcessor()->process(inputProtocol_, outputProtocol_,
-                                         connectionContext_);
+        processor_->process(inputProtocol_, outputProtocol_,
+                            connectionContext_);
       } catch (const TTransportException &ttx) {
         GlobalOutput.printf("TNonblockingServer transport error in "
                             "process(): %s", ttx.what());

Modified: thrift/trunk/lib/cpp/src/server/TServer.h
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/cpp/src/server/TServer.h?rev=1164190&r1=1164189&r2=1164190&view=diff
==============================================================================
--- thrift/trunk/lib/cpp/src/server/TServer.h (original)
+++ thrift/trunk/lib/cpp/src/server/TServer.h Thu Sep  1 18:06:20 2011
@@ -112,8 +112,8 @@ class TServer : public concurrency::Runn
     serve();
   }
 
-  boost::shared_ptr<TProcessor> getProcessor() {
-    return processor_;
+  boost::shared_ptr<TProcessorFactory> getProcessorFactory() {
+    return processorFactory_;
   }
 
   boost::shared_ptr<TServerTransport> getServerTransport() {
@@ -142,7 +142,7 @@ class TServer : public concurrency::Runn
 
 protected:
   TServer(boost::shared_ptr<TProcessor> processor):
-    processor_(processor) {
+    processorFactory_(new TSingletonProcessorFactory(processor)) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setInputProtocolFactory(boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
@@ -151,7 +151,7 @@ protected:
 
   TServer(boost::shared_ptr<TProcessor> processor,
           boost::shared_ptr<TServerTransport> serverTransport):
-    processor_(processor),
+    processorFactory_(new TSingletonProcessorFactory(processor)),
     serverTransport_(serverTransport) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
@@ -163,7 +163,7 @@ protected:
           boost::shared_ptr<TServerTransport> serverTransport,
           boost::shared_ptr<TTransportFactory> transportFactory,
           boost::shared_ptr<TProtocolFactory> protocolFactory):
-    processor_(processor),
+    processorFactory_(new TSingletonProcessorFactory(processor)),
     serverTransport_(serverTransport),
     inputTransportFactory_(transportFactory),
     outputTransportFactory_(transportFactory),
@@ -176,16 +176,33 @@ protected:
           boost::shared_ptr<TTransportFactory> outputTransportFactory,
           boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
           boost::shared_ptr<TProtocolFactory> outputProtocolFactory):
-    processor_(processor),
+    processorFactory_(new TSingletonProcessorFactory(processor)),
     serverTransport_(serverTransport),
     inputTransportFactory_(inputTransportFactory),
     outputTransportFactory_(outputTransportFactory),
     inputProtocolFactory_(inputProtocolFactory),
     outputProtocolFactory_(outputProtocolFactory) {}
 
+  /**
+   * Get a TProcessor to handle calls on a particular connection.
+   *
+   * This method should only be called once per connection (never once per
+   * call).  This allows the TProcessorFactory to return a different processor
+   * for each connection if it desires.
+   */
+  boost::shared_ptr<TProcessor> getProcessor(
+      boost::shared_ptr<TProtocol> inputProtocol,
+      boost::shared_ptr<TProtocol> outputProtocol,
+      boost::shared_ptr<TTransport> transport) {
+    TConnectionInfo connInfo;
+    connInfo.input = inputProtocol;
+    connInfo.output = outputProtocol;
+    connInfo.transport = transport;
+    return processorFactory_->getProcessor(connInfo);
+  }
 
   // Class variables
-  boost::shared_ptr<TProcessor> processor_;
+  boost::shared_ptr<TProcessorFactory> processorFactory_;
   boost::shared_ptr<TServerTransport> serverTransport_;
 
   boost::shared_ptr<TTransportFactory> inputTransportFactory_;

Modified: thrift/trunk/lib/cpp/src/server/TSimpleServer.cpp
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/cpp/src/server/TSimpleServer.cpp?rev=1164190&r1=1164189&r2=1164190&view=diff
==============================================================================
--- thrift/trunk/lib/cpp/src/server/TSimpleServer.cpp (original)
+++ thrift/trunk/lib/cpp/src/server/TSimpleServer.cpp Thu Sep  1 18:06:20 2011
@@ -87,6 +87,10 @@ void TSimpleServer::serve() {
       break;
     }
 
+    // Get the processor
+    shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
+                                                    outputProtocol, client);
+
     void* connectionContext = NULL;
     if (eventHandler_ != NULL) {
       connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol);
@@ -96,8 +100,9 @@ void TSimpleServer::serve() {
         if (eventHandler_ != NULL) {
           eventHandler_->processContext(connectionContext, client);
         }
-        if (!processor_->process(inputProtocol, outputProtocol, connectionContext) ||
-            // Peek ahead, is the remote side closed?
+        if (!processor->process(inputProtocol, outputProtocol,
+                                connectionContext) ||
+          // Peek ahead, is the remote side closed?
             !inputProtocol->getTransport()->peek()) {
           break;
         }

Modified: thrift/trunk/lib/cpp/src/server/TThreadPoolServer.cpp
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/cpp/src/server/TThreadPoolServer.cpp?rev=1164190&r1=1164189&r2=1164190&view=diff
==============================================================================
--- thrift/trunk/lib/cpp/src/server/TThreadPoolServer.cpp (original)
+++ thrift/trunk/lib/cpp/src/server/TThreadPoolServer.cpp Thu Sep  1 18:06:20 2011
@@ -170,8 +170,13 @@ void TThreadPoolServer::serve() {
       inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
       outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
 
+      shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
+                                                      outputProtocol, client);
+
       // Add to threadmanager pool
-      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol, client)), timeout_);
+      shared_ptr<TThreadPoolServer::Task> task(new TThreadPoolServer::Task(
+            *this, processor, inputProtocol, outputProtocol, client));
+      threadManager_->add(task, timeout_);
 
     } catch (TTransportException& ttx) {
       if (inputTransport != NULL) { inputTransport->close(); }

Modified: thrift/trunk/lib/cpp/src/server/TThreadedServer.cpp
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/cpp/src/server/TThreadedServer.cpp?rev=1164190&r1=1164189&r2=1164190&view=diff
==============================================================================
--- thrift/trunk/lib/cpp/src/server/TThreadedServer.cpp (original)
+++ thrift/trunk/lib/cpp/src/server/TThreadedServer.cpp Thu Sep  1 18:06:20 2011
@@ -180,8 +180,11 @@ void TThreadedServer::serve() {
       inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
       outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
 
+      shared_ptr<TProcessor> processor = getProcessor(inputProtocol,
+                                                      outputProtocol, client);
+
       TThreadedServer::Task* task = new TThreadedServer::Task(*this,
-                                                              processor_,
+                                                              processor,
                                                               inputProtocol,
                                                               outputProtocol,
                                                               client);