You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/10/06 19:09:39 UTC

svn commit: r1005128 - in /incubator/thrift/trunk: compiler/cpp/src/generate/t_cpp_generator.cc lib/cpp/src/TProcessor.h lib/cpp/src/async/TAsyncProcessor.h

Author: dreiss
Date: Wed Oct  6 17:09:39 2010
New Revision: 1005128

URL: http://svn.apache.org/viewvc?rev=1005128&view=rev
Log:
THRIFT-928. cpp: Make clients call writeEnd on their transports before flush

Changing the order of these calls makes more sense from the perspective
of logical operations.  It also simplifies the upcoming stats collection
code.  No clients should be affected.

Modified:
    incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc
    incubator/thrift/trunk/lib/cpp/src/TProcessor.h
    incubator/thrift/trunk/lib/cpp/src/async/TAsyncProcessor.h

Modified: incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc?rev=1005128&r1=1005127&r2=1005128&view=diff
==============================================================================
--- incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc (original)
+++ incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc Wed Oct  6 17:09:39 2010
@@ -1970,8 +1970,8 @@ void t_cpp_generator::generate_service_c
         indent() << "args.write(oprot_);" << endl <<
         endl <<
         indent() << "oprot_->writeMessageEnd();" << endl <<
-        indent() << "oprot_->getTransport()->flush();" << endl <<
-        indent() << "oprot_->getTransport()->writeEnd();" << endl;
+        indent() << "oprot_->getTransport()->writeEnd();" << endl <<
+        indent() << "oprot_->getTransport()->flush();" << endl;
 
       scope_down(f_service_);
       f_service_ << endl;
@@ -2142,8 +2142,8 @@ void t_cpp_generator::generate_service_p
                         : ", const " + type_name((*f_iter)->get_returntype()) + "& _return");
       // XXX Don't declare throw if it doesn't exist
       f_header_ <<
-        "void return_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ");" << endl <<
-        "void throw_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw);" << endl;
+        "void return_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx" << ret_arg << ");" << endl <<
+        "void throw_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, ::apache::thrift::TDelayedException* _throw);" << endl;
     }
   }
   indent_down();
@@ -2209,8 +2209,8 @@ void t_cpp_generator::generate_service_p
     indent() << "  oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
     indent() << "  x.write(oprot);" << endl <<
     indent() << "  oprot->writeMessageEnd();" << endl <<
-    indent() << "  oprot->getTransport()->flush();" << endl <<
     indent() << "  oprot->getTransport()->writeEnd();" << endl <<
+    indent() << "  oprot->getTransport()->flush();" << endl <<
     indent() << (style == "Cob" ? "  return cob(true);" : "  return true;") << endl <<
     indent() << "}" << endl <<
     endl <<
@@ -2241,8 +2241,8 @@ void t_cpp_generator::generate_service_p
       indent() << "  oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
       indent() << "  x.write(oprot);" << endl <<
       indent() << "  oprot->writeMessageEnd();" << endl <<
-      indent() << "  oprot->getTransport()->flush();" << endl <<
       indent() << "  oprot->getTransport()->writeEnd();" << endl <<
+      indent() << "  oprot->getTransport()->flush();" << endl <<
       indent() << (style == "Cob" ? "  return cob(true);" : "  return true;") << endl;
   } else {
     f_service_ <<
@@ -2344,17 +2344,7 @@ void t_cpp_generator::generate_process_f
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
       indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
       indent() << "}" << endl <<
-      indent() << "// A glorified finally block since ctx is a void*" << endl <<
-      indent() << "class ContextFreer {" << endl <<
-      indent() << "  public:" << endl <<
-      indent() << "    ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
-      indent() << "      handler_(handler), context_(context) {}" << endl <<
-      indent() << "    ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
-      indent() << "  private:" << endl <<
-      indent() << "    ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
-      indent() << "    void* context_;" << endl <<
-      indent() << "};" << endl <<
-      indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
+      indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
       indent() << "  eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
       indent() << "}" << endl << endl <<
@@ -2442,8 +2432,8 @@ void t_cpp_generator::generate_process_f
         indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
         indent() << "x.write(oprot);" << endl <<
         indent() << "oprot->writeMessageEnd();" << endl <<
-        indent() << "oprot->getTransport()->flush();" << endl <<
-        indent() << "oprot->getTransport()->writeEnd();" << endl;
+        indent() << "oprot->getTransport()->writeEnd();" << endl <<
+        indent() << "oprot->getTransport()->flush();" << endl;
   }
   f_service_ << indent() << "return;" << endl;
   indent_down();
@@ -2470,8 +2460,8 @@ void t_cpp_generator::generate_process_f
       indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
       indent() << "result.write(oprot);" << endl <<
       indent() << "oprot->writeMessageEnd();" << endl <<
-      indent() << "oprot->getTransport()->flush();" << endl <<
-      indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
+      indent() << "oprot->getTransport()->writeEnd();" << endl <<
+      indent() << "oprot->getTransport()->flush();" << endl << endl <<
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
       indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
       indent() << "}" << endl;
@@ -2491,22 +2481,43 @@ void t_cpp_generator::generate_process_f
 
     f_service_ <<
       indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl <<
+      indent() << "void* ctx = NULL;" << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl <<
+      indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
       indent() << "try {" << endl;
     indent_up();
     f_service_ <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl <<
       indent() << "args.read(iprot);" << endl <<
       indent() << "iprot->readMessageEnd();" << endl <<
-      indent() << "iprot->getTransport()->readEnd();" << endl;
+      indent() << "uint32_t bytes = iprot->getTransport()->readEnd();" << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
+      indent() << "}" << endl;
     scope_down(f_service_);
 
     // TODO(dreiss): Handle TExceptions?  Expose to server?
     f_service_ <<
       indent() << "catch (const std::exception& exn) {" << endl <<
+      indent() << "  if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "    eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "  }" << endl <<
       indent() << "  return cob(false);" << endl <<
       indent() << "}" << endl;
 
+    if (tfunction->is_oneway()) {
+      f_service_ <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->onewayComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl;
+    }
     // TODO(dreiss): Figure out a strategy for exceptions in async handlers.
     f_service_ <<
+      indent() << "freer.unregister();" << endl <<
       indent() << "iface_->" << tfunction->get_name() << "(";
     indent_up(); indent_up();
     if (tfunction->is_oneway()) {
@@ -2524,13 +2535,13 @@ void t_cpp_generator::generate_process_f
       f_service_ <<
         indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
                  << "return_" << tfunction->get_name()
-                 << ", this, cob, seqid, oprot" << ret_placeholder << ")";
+                 << ", this, cob, seqid, oprot, ctx" << ret_placeholder << ")";
       if (!xceptions.empty()) {
         f_service_
-                   << ',' << endl <<
+              << ',' << endl <<
           indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
-                   << "throw_" << tfunction->get_name()
-                   << ", this, cob, seqid, oprot, std::tr1::placeholders::_1)";
+              << "throw_" << tfunction->get_name()
+              << ", this, cob, seqid, oprot, ctx, std::tr1::placeholders::_1)";
       }
     }
 
@@ -2552,7 +2563,7 @@ void t_cpp_generator::generate_process_f
                         : ", const " + type_name(tfunction->get_returntype()) + "& _return");
       f_service_ <<
         "void " << tservice->get_name() << "AsyncProcessor::" <<
-        "return_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ')' << endl;
+        "return_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx" << ret_arg << ')' << endl;
       scope_up(f_service_);
       f_service_ <<
         indent() << tservice->get_name() << "_" << tfunction->get_name() << "_presult result;" << endl;
@@ -2566,11 +2577,21 @@ void t_cpp_generator::generate_process_f
       // Serialize the result into a struct
       f_service_ <<
         endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl <<
+        indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl << endl <<
         indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
         indent() << "result.write(oprot);" << endl <<
         indent() << "oprot->writeMessageEnd();" << endl <<
+        indent() << "uint32_t bytes = oprot->getTransport()->writeEnd();" << endl <<
         indent() << "oprot->getTransport()->flush();" << endl <<
-        indent() << "oprot->getTransport()->writeEnd();" << endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
+        indent() << "}" << endl <<
         indent() << "return cob(true);" << endl;
       scope_down(f_service_);
       f_service_ << endl;
@@ -2580,7 +2601,7 @@ void t_cpp_generator::generate_process_f
     if (!tfunction->is_oneway() && !xceptions.empty()) {
       f_service_ <<
         "void " << tservice->get_name() << "AsyncProcessor::" <<
-        "throw_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw)" << endl;
+        "throw_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, ::apache::thrift::TDelayedException* _throw)" << endl;
       scope_up(f_service_);
       f_service_ <<
         indent() << tservice->get_name() << "_" << tfunction->get_name() << "_result result;" << endl << endl <<
@@ -2600,17 +2621,27 @@ void t_cpp_generator::generate_process_f
           indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
         scope_down(f_service_);
       }
+      // TODO(dreiss): Handle the case where an undeclared exception is thrown?
 
       // Serialize the result into a struct
       f_service_ <<
         endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl <<
+        indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl << endl <<
         indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
         indent() << "result.write(oprot);" << endl <<
         indent() << "oprot->writeMessageEnd();" << endl <<
+        indent() << "uint32_t bytes = oprot->getTransport()->writeEnd();" << endl <<
         indent() << "oprot->getTransport()->flush();" << endl <<
-        indent() << "oprot->getTransport()->writeEnd();" << endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
+        indent() << "}" << endl <<
         indent() << "return cob(true);" << endl;
-
       scope_down(f_service_);
       f_service_ << endl;
     } // for each function

Modified: incubator/thrift/trunk/lib/cpp/src/TProcessor.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/TProcessor.h?rev=1005128&r1=1005127&r2=1005128&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/TProcessor.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/TProcessor.h Wed Oct  6 17:09:39 2010
@@ -86,6 +86,21 @@ class TProcessorEventHandler {
 };
 
 /**
+ * A helper class used by the generated code to free each context.
+ */
+class TProcessorContextFreer {
+ public:
+  TProcessorContextFreer(TProcessorEventHandler* handler, void* context, const char* method) :
+    handler_(handler), context_(context), method_(method) {}
+  ~TProcessorContextFreer() { if (handler_ != NULL) handler_->freeContext(context_, method_); }
+  void unregister() { handler_ = NULL; }
+ private:
+  apache::thrift::TProcessorEventHandler* handler_;
+  void* context_;
+  const char* method_;
+};
+
+/**
  * A processor is a generic object that acts upon two streams of data, one
  * an input and the other an output. The definition of this object is loose,
  * though the typical case is for some sort of server that either generates

Modified: incubator/thrift/trunk/lib/cpp/src/async/TAsyncProcessor.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/async/TAsyncProcessor.h?rev=1005128&r1=1005127&r2=1005128&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/async/TAsyncProcessor.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/async/TAsyncProcessor.h Wed Oct  6 17:09:39 2010
@@ -31,6 +31,9 @@ namespace apache { namespace thrift { na
  * Async version of a TProcessor.  It is not expected to complete by the time
  * the call to process returns.  Instead, it calls a cob to signal completion.
  */
+
+class TEventServer; // forward declaration
+
 class TAsyncProcessor {
  public:
   virtual ~TAsyncProcessor() {}
@@ -44,8 +47,27 @@ class TAsyncProcessor {
     return process(_return, io, io);
   }
 
+  boost::shared_ptr<TProcessorEventHandler> getEventHandler() {
+    return eventHandler_;
+  }
+
+  void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) {
+    eventHandler_ = eventHandler;
+  }
+
+  const TEventServer* getAsyncServer() {
+    return asyncServer_;
+  }
  protected:
   TAsyncProcessor() {}
+
+  boost::shared_ptr<TProcessorEventHandler> eventHandler_;
+  const TEventServer* asyncServer_;
+ private:
+  friend class TEventServer;
+  void setAsyncServer(const TEventServer* server) {
+    asyncServer_ = server;
+  }
 };
 
 }}} // apache::thrift::async