You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/10/06 19:09:34 UTC

svn commit: r1005126 - in /incubator/thrift/trunk: compiler/cpp/src/generate/t_cpp_generator.cc lib/cpp/src/TProcessor.h test/cpp/src/TestServer.cpp

Author: dreiss
Date: Wed Oct  6 17:09:33 2010
New Revision: 1005126

URL: http://svn.apache.org/viewvc?rev=1005126&view=rev
Log:
THRIFT-928. cpp: Processor-level event callbacks

- Add a TProcessorEventHandler callback interface.
- Add methods to TProcessor to hold an instance of the interface.
- Add code to the compiler to make the processor call callbacks at key points.
- Add an optional processor event handler to the test server.

Modified:
    incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc
    incubator/thrift/trunk/lib/cpp/src/TProcessor.h
    incubator/thrift/trunk/test/cpp/src/TestServer.cpp

Modified: incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc?rev=1005126&r1=1005125&r2=1005126&view=diff
==============================================================================
--- incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc (original)
+++ incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc Wed Oct  6 17:09:33 2010
@@ -2059,10 +2059,31 @@ void t_cpp_generator::generate_process_f
   string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result";
 
   f_service_ <<
+    indent() << "void* ctx = NULL;" << endl <<
+    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+    indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+    indent() << "}" << endl <<
+    indent() << "// A glorified finally block since ctx is a void*" << endl <<
+    indent() << "class ContextFreer {" << endl <<
+    indent() << "  public:" << endl <<
+    indent() << "    ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
+    indent() << "      handler_(handler), context_(context) {}" << endl <<
+    indent() << "    ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
+    indent() << "  private:" << endl <<
+    indent() << "    ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
+    indent() << "    void* context_;" << endl <<
+    indent() << "};" << endl <<
+    indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
+    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+    indent() << "  eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+    indent() << "}" << endl << endl <<
     indent() << argsname << " args;" << endl <<
     indent() << "args.read(iprot);" << endl <<
     indent() << "iprot->readMessageEnd();" << endl <<
-    indent() << "iprot->getTransport()->readEnd();" << endl <<
+    indent() << "iprot->getTransport()->readEnd();" << endl << endl <<
+    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+    indent() << "  eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+    indent() << "}" << endl <<
     endl;
 
   t_struct* xs = tfunction->get_xceptions();
@@ -2135,38 +2156,52 @@ void t_cpp_generator::generate_process_f
 
   f_service_ << " catch (const std::exception& e) {" << endl;
 
+  indent_up();
+  f_service_ <<
+    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+    indent() << "  eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+    indent() << "}" << endl;
+
   if (!tfunction->is_oneway()) {
-    indent_up();
     f_service_ <<
+      endl <<
       indent() << "::apache::thrift::TApplicationException x(e.what());" << endl <<
       indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
       indent() << "x.write(oprot);" << endl <<
       indent() << "oprot->writeMessageEnd();" << endl <<
       indent() << "oprot->getTransport()->flush();" << endl <<
-      indent() << "oprot->getTransport()->writeEnd();" << endl <<
-      indent() << "return;" << endl;
-    indent_down();
+      indent() << "oprot->getTransport()->writeEnd();" << endl;
   }
-  f_service_ << indent() << "}" << endl;
+  f_service_ << indent() << "return;" << endl;
+  indent_down();
+  f_service_ << indent() << "}" << endl << endl;
 
   // Shortcut out here for oneway functions
   if (tfunction->is_oneway()) {
     f_service_ <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl << endl <<
       indent() << "return;" << endl;
-    indent_down();
-    f_service_ << "}" << endl <<
-      endl;
+    // Close function
+    scope_down(f_service_);
+    f_service_ << endl;
     return;
   }
 
   // Serialize the result into a struct
   f_service_ <<
-    endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl << endl <<
     indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
     indent() << "result.write(oprot);" << endl <<
     indent() << "oprot->writeMessageEnd();" << endl <<
     indent() << "oprot->getTransport()->flush();" << endl <<
-    indent() << "oprot->getTransport()->writeEnd();" << endl;
+    indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
+    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+    indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+    indent() << "}" << endl;
 
   // Close function
   scope_down(f_service_);

Modified: incubator/thrift/trunk/lib/cpp/src/TProcessor.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/TProcessor.h?rev=1005126&r1=1005125&r2=1005126&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/TProcessor.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/TProcessor.h Wed Oct  6 17:09:33 2010
@@ -27,6 +27,65 @@
 namespace apache { namespace thrift {
 
 /**
+ * Virtual interface class that can handle events from the processor. To
+ * use this you should subclass it and implement the methods that you care
+ * about. Your subclass can also store local data that you may care about,
+ * such as additional "arguments" to these methods (stored in the object
+ * instance's state).
+ */
+class TProcessorEventHandler {
+ public:
+
+  virtual ~TProcessorEventHandler() {}
+
+  /**
+   * Called before calling other callback methods.
+   * Expected to return some sort of context object.
+   * The return value is passed to all other callbacks
+   * for that function invocation.
+   */
+  virtual void* getContext(const char* fn_name) { return NULL; }
+
+  /**
+   * Expected to free resources associated with a context.
+   */
+  virtual void freeContext(void* ctx, const char* fn_name) { }
+
+  /**
+   * Called before reading arguments.
+   */
+  virtual void preRead(void* ctx, const char* fn_name) {}
+
+  /**
+   * Called between reading arguments and calling the handler.
+   */
+  virtual void postRead(void* ctx, const char* fn_name) {}
+
+  /**
+   * Called between calling the handler and writing the response.
+   */
+  virtual void preWrite(void* ctx, const char* fn_name) {}
+
+  /**
+   * Called after writing the response.
+   */
+  virtual void postWrite(void* ctx, const char* fn_name) {}
+
+  /**
+   * Called when an async function call completes successfully.
+   */
+  virtual void asyncComplete(void* ctx, const char* fn_name) {}
+
+  /**
+   * Called if the handler throws an undeclared exception.
+   */
+  virtual void handlerError(void* ctx, const char* fn_name) {}
+
+ protected:
+  TProcessorEventHandler() {}
+};
+
+/**
  * A processor is a generic object that acts upon two streams of data, one
  * an input and the other an output. The definition of this object is loose,
  * though the typical case is for some sort of server that either generates
@@ -44,8 +103,18 @@ class TProcessor {
     return process(io, io);
   }
 
+  boost::shared_ptr<TProcessorEventHandler> getEventHandler() {
+    return eventHandler_;
+  }
+
+  void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) {
+    eventHandler_ = eventHandler;
+  }
+
  protected:
   TProcessor() {}
+
+  boost::shared_ptr<TProcessorEventHandler> eventHandler_;
 };
 
 }} // apache::thrift

Modified: incubator/thrift/trunk/test/cpp/src/TestServer.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/test/cpp/src/TestServer.cpp?rev=1005126&r1=1005125&r2=1005126&view=diff
==============================================================================
--- incubator/thrift/trunk/test/cpp/src/TestServer.cpp (original)
+++ incubator/thrift/trunk/test/cpp/src/TestServer.cpp Wed Oct  6 17:09:33 2010
@@ -287,6 +287,39 @@ class TestHandler : public ThriftTestIf 
   }
 };
 
+
+class TestProcessorEventHandler : public TProcessorEventHandler {
+  virtual void* getContext(const char* fn_name) {
+    return new std::string(fn_name);
+  }
+  virtual void freeContext(void* ctx, const char* fn_name) {
+    delete static_cast<std::string*>(ctx);
+  }
+  virtual void preRead(void* ctx, const char* fn_name) {
+    communicate("preRead", ctx, fn_name);
+  }
+  virtual void postRead(void* ctx, const char* fn_name) {
+    communicate("postRead", ctx, fn_name);
+  }
+  virtual void preWrite(void* ctx, const char* fn_name) {
+    communicate("preWrite", ctx, fn_name);
+  }
+  virtual void postWrite(void* ctx, const char* fn_name) {
+    communicate("postWrite", ctx, fn_name);
+  }
+  virtual void asyncComplete(void* ctx, const char* fn_name) {
+    communicate("asyncComplete", ctx, fn_name);
+  }
+  virtual void handlerError(void* ctx, const char* fn_name) {
+    communicate("handlerError", ctx, fn_name);
+  }
+
+  void communicate(const char* event, void* ctx, const char* fn_name) {
+    std::cout << event << ": " << *static_cast<std::string*>(ctx) << " = " << fn_name << std::endl;
+  }
+};
+
+
 int main(int argc, char **argv) {
 
   int port = 9090;
@@ -297,7 +330,7 @@ int main(int argc, char **argv) {
   ostringstream usage;
 
   usage <<
-    argv[0] << " [--port=<port number>] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>]" << endl <<
+    argv[0] << " [--port=<port number>] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--processor-events]" << endl <<
 
     "\t\tserver-type\t\ttype of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\".  Default is " << serverType << endl <<
 
@@ -365,6 +398,12 @@ int main(int argc, char **argv) {
 
   shared_ptr<ThriftTestProcessor> testProcessor(new ThriftTestProcessor(testHandler));
 
+
+  if (!args["processor-events"].empty()) {
+    testProcessor->setEventHandler(shared_ptr<TProcessorEventHandler>(
+          new TestProcessorEventHandler()));
+  }
+
   // Transport
   shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));