You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/10/06 19:09:38 UTC

svn commit: r1005127 [1/2] - in /incubator/thrift/trunk: ./ compiler/cpp/src/generate/ contrib/async-test/ lib/cpp/ lib/cpp/src/ lib/cpp/src/async/

Author: dreiss
Date: Wed Oct  6 17:09:37 2010
New Revision: 1005127

URL: http://svn.apache.org/viewvc?rev=1005127&view=rev
Log:
THRIFT-923. cpp: Implement a fully nonblocking server and client

There are three major parts of this:
1/ New callback-style interfaces for for a few key Thrift components:
   TAsyncProcessor for servers and TAsyncChannel for clients.
2/ Concrete implementations of TAsyncChannel and a server for
   TAsyncProcessor based on evhttp.
3/ Async-style code generation for C++

Added:
    incubator/thrift/trunk/contrib/async-test/
    incubator/thrift/trunk/contrib/async-test/Makefile
    incubator/thrift/trunk/contrib/async-test/aggr.thrift
    incubator/thrift/trunk/contrib/async-test/test-leaf.py   (with props)
    incubator/thrift/trunk/contrib/async-test/test-server.cpp
    incubator/thrift/trunk/lib/cpp/src/async/
    incubator/thrift/trunk/lib/cpp/src/async/SimpleCallback.h
    incubator/thrift/trunk/lib/cpp/src/async/TAsyncBufferProcessor.h
    incubator/thrift/trunk/lib/cpp/src/async/TAsyncChannel.cpp
    incubator/thrift/trunk/lib/cpp/src/async/TAsyncChannel.h
    incubator/thrift/trunk/lib/cpp/src/async/TAsyncProcessor.h
    incubator/thrift/trunk/lib/cpp/src/async/TAsyncProtocolProcessor.cpp
    incubator/thrift/trunk/lib/cpp/src/async/TAsyncProtocolProcessor.h
    incubator/thrift/trunk/lib/cpp/src/async/TEvhttpClientChannel.cpp
    incubator/thrift/trunk/lib/cpp/src/async/TEvhttpClientChannel.h
    incubator/thrift/trunk/lib/cpp/src/async/TEvhttpServer.cpp
    incubator/thrift/trunk/lib/cpp/src/async/TEvhttpServer.h
Modified:
    incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc
    incubator/thrift/trunk/configure.ac
    incubator/thrift/trunk/lib/cpp/Makefile.am
    incubator/thrift/trunk/lib/cpp/src/TProcessor.h
    incubator/thrift/trunk/lib/cpp/src/Thrift.h

Modified: incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc?rev=1005127&r1=1005126&r2=1005127&view=diff
==============================================================================
--- incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc (original)
+++ incubator/thrift/trunk/compiler/cpp/src/generate/t_cpp_generator.cc Wed Oct  6 17:09:37 2010
@@ -59,6 +59,9 @@ class t_cpp_generator : public t_oop_gen
     iter = parsed_options.find("include_prefix");
     use_include_prefix_ = (iter != parsed_options.end());
 
+    iter = parsed_options.find("cob_style");
+    gen_cob_style_ = (iter != parsed_options.end());
+
     out_dir_base_ = "gen-cpp";
   }
 
@@ -100,15 +103,16 @@ class t_cpp_generator : public t_oop_gen
    * Service-level generation functions
    */
 
-  void generate_service_interface (t_service* tservice);
-  void generate_service_null      (t_service* tservice);
+  void generate_service_interface (t_service* tservice, string style);
+  void generate_service_null      (t_service* tservice, string style);
   void generate_service_multiface (t_service* tservice);
   void generate_service_helpers   (t_service* tservice);
-  void generate_service_client    (t_service* tservice);
-  void generate_service_processor (t_service* tservice);
+  void generate_service_client    (t_service* tservice, string style);
+  void generate_service_processor (t_service* tservice, string style);
   void generate_service_skeleton  (t_service* tservice);
-  void generate_process_function  (t_service* tservice, t_function* tfunction);
+  void generate_process_function  (t_service* tservice, t_function* tfunction, string style);
   void generate_function_helpers  (t_service* tservice, t_function* tfunction);
+  void generate_service_async_skeleton (t_service* tservice);
 
   /**
    * Serialization constructs
@@ -166,7 +170,12 @@ class t_cpp_generator : public t_oop_gen
                                           t_list*     tlist,
                                           std::string iter);
 
-  /**
+  void generate_function_call            (ostream& out,
+                                          t_function* tfunction,
+                                          string target,
+                                          string iface,
+                                          string arg_prefix);
+  /*
    * Helper rendering functions
    */
 
@@ -176,8 +185,9 @@ class t_cpp_generator : public t_oop_gen
   std::string type_name(t_type* ttype, bool in_typedef=false, bool arg=false);
   std::string base_type_name(t_base_type::t_base tbase);
   std::string declare_field(t_field* tfield, bool init=false, bool pointer=false, bool constant=false, bool reference=false);
-  std::string function_signature(t_function* tfunction, std::string prefix="", bool name_params=true);
-  std::string argument_list(t_struct* tstruct, bool name_params=true);
+  std::string function_signature(t_function* tfunction, std::string style, std::string prefix="", bool name_params=true);
+  std::string cob_function_signature(t_function* tfunction, std::string prefix="", bool name_params=true);
+  std::string argument_list(t_struct* tstruct, bool name_params=true, bool start_comma=false);
   std::string type_to_enum(t_type* ttype);
   std::string local_reflection_name(const char*, t_type* ttype, bool external=false);
 
@@ -223,6 +233,11 @@ class t_cpp_generator : public t_oop_gen
   bool use_include_prefix_;
 
   /**
+   * True iff we should generate "Continuation OBject"-style classes as well.
+   */
+  bool gen_cob_style_;
+
+  /**
    * Strings for namespace, computed once up front then used directly
    */
 
@@ -1172,7 +1187,7 @@ void t_cpp_generator::generate_struct_wr
       type_to_enum((*f_iter)->get_type()) << ", " <<
       (*f_iter)->get_key() << ");" << endl;
     // Write field contents
-    if (pointers) {
+    if (pointers && !(*f_iter)->get_type()->is_xception()) {
       generate_serialize_field(out, *f_iter, "(*(this->", "))");
     } else {
       generate_serialize_field(out, *f_iter, "this->");
@@ -1294,8 +1309,23 @@ void t_cpp_generator::generate_service(t
   f_header_ <<
     "#ifndef " << svcname << "_H" << endl <<
     "#define " << svcname << "_H" << endl <<
-    endl <<
-    "#include <TProcessor.h>" << endl <<
+    endl;
+  if (gen_cob_style_) {
+    f_header_ <<
+      "#include <tr1/functional>" << endl <<
+      // TODO(dreiss): Libify the base client so we don't have to include this.
+      "#include <transport/TTransportUtils.h>" << endl <<
+      "namespace apache { namespace thrift { namespace async {" << endl <<
+      "class TAsyncChannel;" << endl <<
+      "}}}" << endl;
+  }
+  f_header_ <<
+    "#include <TProcessor.h>" << endl;
+  if (gen_cob_style_) {
+    f_header_ <<
+      "#include <async/TAsyncProcessor.h>" << endl;
+  }
+  f_header_ <<
     "#include \"" << get_include_prefix(*get_program()) << program_name_ <<
     "_types.h\"" << endl;
 
@@ -1317,21 +1347,34 @@ void t_cpp_generator::generate_service(t
   f_service_ <<
     autogen_comment();
   f_service_ <<
-    "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" <<
-    endl <<
-    endl <<
+    "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" << endl;
+  if (gen_cob_style_) {
+    f_service_ <<
+      "#include \"async/TAsyncChannel.h\"" << endl;
+  }
+  f_service_ << endl <<
     ns_open_ << endl <<
     endl;
 
   // Generate all the components
-  generate_service_interface(tservice);
-  generate_service_null(tservice);
+  generate_service_interface(tservice, "");
+  generate_service_null(tservice, "");
   generate_service_helpers(tservice);
-  generate_service_client(tservice);
-  generate_service_processor(tservice);
+  generate_service_client(tservice, "");
+  generate_service_processor(tservice, "");
   generate_service_multiface(tservice);
   generate_service_skeleton(tservice);
 
+  // Generate all the cob components
+  if (gen_cob_style_) {
+    generate_service_interface(tservice, "CobCl");
+    generate_service_interface(tservice, "CobSv");
+    generate_service_null(tservice, "CobSv");
+    generate_service_client(tservice, "Cob");
+    generate_service_processor(tservice, "Cob");
+    generate_service_async_skeleton(tservice);
+  }
+
   // Close the namespace
   f_service_ <<
     ns_close_ << endl <<
@@ -1360,6 +1403,7 @@ void t_cpp_generator::generate_service_h
     t_struct* ts = (*f_iter)->get_arglist();
     string name_orig = ts->get_name();
 
+    // TODO(dreiss): Why is this stuff not in generate_function_helpers?
     ts->set_name(tservice->get_name() + "_" + (*f_iter)->get_name() + "_args");
     generate_struct_definition(f_header_, ts, false);
     generate_struct_reader(f_service_, ts);
@@ -1378,23 +1422,29 @@ void t_cpp_generator::generate_service_h
  *
  * @param tservice The service to generate a header definition for
  */
-void t_cpp_generator::generate_service_interface(t_service* tservice) {
+void t_cpp_generator::generate_service_interface(t_service* tservice, string style) {
+
+  if (style == "CobCl") {
+    // Forward declare the client.
+    indent(f_header_) << "class " << service_name_ << "CobClient;" << endl << endl;
+  }
+
   string extends = "";
   if (tservice->get_extends() != NULL) {
-    extends = " : virtual public " + type_name(tservice->get_extends()) + "If";
+    extends = " : virtual public " + type_name(tservice->get_extends()) + style + "If";
   }
   f_header_ <<
-    "class " << service_name_ << "If" << extends << " {" << endl <<
+    "class " << service_name_ << style << "If" << extends << " {" << endl <<
     " public:" << endl;
   indent_up();
   f_header_ <<
-    indent() << "virtual ~" << service_name_ << "If() {}" << endl;
+    indent() << "virtual ~" << service_name_ << style << "If" << "() {}" << endl;
 
   vector<t_function*> functions = tservice->get_functions();
   vector<t_function*>::iterator f_iter;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
     f_header_ <<
-      indent() << "virtual " << function_signature(*f_iter) << " = 0;" << endl;
+      indent() << "virtual " << function_signature(*f_iter, style) << " = 0;" << endl;
   }
   indent_down();
   f_header_ <<
@@ -1406,36 +1456,49 @@ void t_cpp_generator::generate_service_i
  *
  * @param tservice The service to generate a header definition for
  */
-void t_cpp_generator::generate_service_null(t_service* tservice) {
+void t_cpp_generator::generate_service_null(t_service* tservice, string style) {
   string extends = "";
   if (tservice->get_extends() != NULL) {
-    extends = " , virtual public " + type_name(tservice->get_extends()) + "Null";
+    extends = " , virtual public " + type_name(tservice->get_extends()) + style + "Null";
   }
   f_header_ <<
-    "class " << service_name_ << "Null : virtual public " << service_name_ << "If" << extends << " {" << endl <<
+    "class " << service_name_ << style << "Null : virtual public " << service_name_ << style << "If" << extends << " {" << endl <<
     " public:" << endl;
   indent_up();
   f_header_ <<
-    indent() << "virtual ~" << service_name_ << "Null() {}" << endl;
+    indent() << "virtual ~" << service_name_ << style << "Null() {}" << endl;
   vector<t_function*> functions = tservice->get_functions();
   vector<t_function*>::iterator f_iter;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
     f_header_ <<
-      indent() << function_signature(*f_iter, "", false) << " {" << endl;
+      indent() << function_signature(*f_iter, style, "", false) << " {" << endl;
     indent_up();
+
     t_type* returntype = (*f_iter)->get_returntype();
-    if (returntype->is_void()) {
-      f_header_ <<
-        indent() << "return;" << endl;
-    } else if (is_complex_type(returntype)) {
-      f_header_ <<
-        indent() << "return;" << endl;
+    t_field returnfield(returntype, "_return");
+
+    if (style == "") {
+      if (returntype->is_void() || is_complex_type(returntype)) {
+        f_header_ << indent() << "return;" << endl;
+      } else {
+        f_header_ <<
+          indent() << declare_field(&returnfield, true) << endl <<
+          indent() << "return _return;" << endl;
+      }
+    } else if (style == "CobSv") {
+      if (returntype->is_void()) {
+        f_header_ << indent() << "return cob();" << endl;
     } else {
       t_field returnfield(returntype, "_return");
       f_header_ <<
         indent() << declare_field(&returnfield, true) << endl <<
-        indent() << "return _return;" << endl;
+        indent() << "return cob(_return);" << endl;
+    }
+
+    } else {
+      throw "UNKNOWN STYLE";
     }
+
     indent_down();
     f_header_ <<
       indent() << "}" << endl;
@@ -1445,6 +1508,109 @@ void t_cpp_generator::generate_service_n
     "};" << endl << endl;
 }
 
+void t_cpp_generator::generate_function_call(ostream& out, t_function* tfunction, string target, string iface, string arg_prefix) {
+  bool first = true;
+  t_type* ret_type = get_true_type(tfunction->get_returntype());
+  out << indent();
+  if (!tfunction->is_oneway() && !ret_type->is_void()) {
+    if (is_complex_type(ret_type)) {
+      first = false;
+      out << iface << "->" << tfunction->get_name() << "(" << target;
+    } else {
+      out << target << " = " << iface << "->" << tfunction->get_name() << "(";
+    }
+  } else {
+    out << iface << "->" << tfunction->get_name() << "(";
+  }
+  const std::vector<t_field*>& fields = tfunction->get_arglist()->get_members();
+  vector<t_field*>::const_iterator f_iter;
+  for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+    if (first) {
+      first = false;
+    } else {
+      out << ", ";
+    }
+    out << arg_prefix << (*f_iter)->get_name();
+  }
+  out << ");" << endl;
+}
+
+void t_cpp_generator::generate_service_async_skeleton(t_service* tservice) {
+  string svcname = tservice->get_name();
+
+  // Service implementation file includes
+  string f_skeleton_name = get_out_dir()+svcname+"_async_server.skeleton.cpp";
+
+  string ns = namespace_prefix(tservice->get_program()->get_namespace("cpp"));
+
+  ofstream f_skeleton;
+  f_skeleton.open(f_skeleton_name.c_str());
+  f_skeleton <<
+    "// This autogenerated skeleton file illustrates one way to adapt a synchronous" << endl <<
+    "// interface into an asynchronous interface. You should copy it to another" << endl <<
+    "// filename to avoid overwriting it and rewrite as asynchronous any functions" << endl <<
+    "// that would otherwise introduce unwanted latency." << endl <<
+    endl <<
+    "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" << endl <<
+    "#include <protocol/TBinaryProtocol.h>" << endl <<
+    "#include <async/TEventServer.h>" << endl <<
+    endl <<
+    "using namespace ::apache::thrift;" << endl <<
+    "using namespace ::apache::thrift::protocol;" << endl <<
+    "using namespace ::apache::thrift::transport;" << endl <<
+    "using namespace ::apache::thrift::async;" << endl <<
+    endl <<
+    "using boost::shared_ptr;" << endl <<
+    endl;
+
+  if (!ns.empty()) {
+    f_skeleton <<
+      "using namespace " << string(ns, 0, ns.size()-2) << ";" << endl <<
+      endl;
+  }
+
+  f_skeleton <<
+    "class " << svcname << "AsyncHandler : " <<
+    "public " << svcname << "CobSvIf {" << endl <<
+    " public:" << endl;
+  indent_up();
+  f_skeleton <<
+    indent() << svcname << "AsyncHandler() {" << endl <<
+    indent() << "  syncHandler_ = std::auto_ptr<" << svcname <<
+                "Handler>(new " << svcname << "Handler);" << endl <<
+    indent() << "  // Your initialization goes here" << endl <<
+    indent() << "}" << endl;
+  f_skeleton <<
+    indent() << "virtual ~" << service_name_ << "AsyncHandler();" << endl;
+
+  vector<t_function*> functions = tservice->get_functions();
+  vector<t_function*>::iterator f_iter;
+  for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+    f_skeleton <<
+      endl <<
+      indent() << function_signature(*f_iter, "CobSv", "", true) << " {" << endl;
+    indent_up();
+
+    t_type* returntype = (*f_iter)->get_returntype();
+    t_field returnfield(returntype, "_return");
+
+    string target = returntype->is_void() ? "" : "_return";
+    if (!returntype->is_void()) {
+      f_skeleton <<
+        indent() << declare_field(&returnfield, true) << endl;
+    }
+    generate_function_call(f_skeleton, *f_iter, target, "syncHandler_", "");
+    f_skeleton << indent() << "return cob(" << target << ");" << endl;
+
+    scope_down(f_skeleton);
+  }
+  f_skeleton << endl <<
+    " protected:" << endl <<
+    indent() << "std::auto_ptr<" << svcname << "Handler> syncHandler_;" << endl;
+  indent_down();
+  f_skeleton <<
+    "};" << endl << endl;
+}
 
 /**
  * Generates a multiface, which is a single server that just takes a set
@@ -1531,7 +1697,7 @@ void t_cpp_generator::generate_service_m
     call += ")";
 
     f_header_ <<
-      indent() << function_signature(*f_iter) << " {" << endl;
+      indent() << function_signature(*f_iter, "") << " {" << endl;
     indent_up();
     f_header_ <<
       indent() << "uint32_t sz = ifaces_.size();" << endl <<
@@ -1576,75 +1742,115 @@ void t_cpp_generator::generate_service_m
  *
  * @param tservice The service to generate a server for.
  */
-void t_cpp_generator::generate_service_client(t_service* tservice) {
+void t_cpp_generator::generate_service_client(t_service* tservice, string style) {
+  string ifstyle;
+  if (style == "Cob") {
+    ifstyle = "CobCl";
+  }
+
   string extends = "";
   string extends_client = "";
   if (tservice->get_extends() != NULL) {
     extends = type_name(tservice->get_extends());
-    extends_client = ", public " + extends + "Client";
+    extends_client = ", public " + extends + style + "Client";
   }
 
   // Generate the header portion
   f_header_ <<
-    "class " << service_name_ << "Client : " <<
-    "virtual public " << service_name_ << "If" <<
+    "class " << service_name_ << style << "Client : " <<
+    "virtual public " << service_name_ << ifstyle << "If" <<
     extends_client << " {" << endl <<
     " public:" << endl;
 
   indent_up();
-  f_header_ <<
-    indent() << service_name_ << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) :" << endl;
-  if (extends.empty()) {
-    f_header_ <<
-      indent() << "  piprot_(prot)," << endl <<
-      indent() << "  poprot_(prot) {" << endl <<
-      indent() << "  iprot_ = prot.get();" << endl <<
-      indent() << "  oprot_ = prot.get();" << endl <<
-      indent() << "}" << endl;
-  } else {
+  if (style != "Cob") {
     f_header_ <<
-      indent() << "  " << extends << "Client(prot, prot) {}" << endl;
-  }
+      indent() << service_name_ << style << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) :" << endl;
+    if (extends.empty()) {
+      f_header_ <<
+        indent() << "  piprot_(prot)," << endl <<
+        indent() << "  poprot_(prot) {" << endl <<
+        indent() << "  iprot_ = prot.get();" << endl <<
+        indent() << "  oprot_ = prot.get();" << endl <<
+        indent() << "}" << endl;
+    } else {
+      f_header_ <<
+        indent() << "  " << extends << style << "Client(prot, prot) {}" << endl;
+    }
 
-  f_header_ <<
-    indent() << service_name_ << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) :" << endl;
-  if (extends.empty()) {
     f_header_ <<
-      indent() << "  piprot_(iprot)," << endl <<
-      indent() << "  poprot_(oprot) {" << endl <<
-      indent() << "  iprot_ = iprot.get();" << endl <<
-      indent() << "  oprot_ = oprot.get();" << endl <<
+      indent() << service_name_ << style << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) :" << endl;
+    if (extends.empty()) {
+      if (style == "Cob") {
+        f_header_ <<
+          indent() << "  rpc_ctx_(ctx)," << endl;
+      }
+      f_header_ <<
+        indent() << "  piprot_(iprot)," << endl <<
+        indent() << "  poprot_(oprot) {" << endl <<
+        indent() << "  iprot_ = iprot.get();" << endl <<
+        indent() << "  oprot_ = oprot.get();" << endl <<
+        indent() << "}" << endl;
+    } else {
+      f_header_ <<
+        indent() << "  " << extends << style << "Client(iprot, oprot) {}" << endl;
+    }
+    
+    // Generate getters for the protocols.
+    f_header_ <<
+      indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {" << endl <<
+      indent() << "  return piprot_;" << endl <<
       indent() << "}" << endl;
-  } else {
+      
     f_header_ <<
-      indent() << "  " << extends << "Client(iprot, oprot) {}" << endl;
-  }
+      indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {" << endl <<
+      indent() << "  return poprot_;" << endl <<
+      indent() << "}" << endl;
 
+  } else /* if (style == "Cob") */ {
   // Generate getters for the protocols.
-  f_header_ <<
-    indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {" << endl <<
-    indent() << "  return piprot_;" << endl <<
-    indent() << "}" << endl;
+    f_header_ <<
+      indent() << service_name_ << style << "Client("
+      << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, "
+      << "::apache::thrift::protocol::TProtocolFactory* protocolFactory) :" << endl;
+    if (extends.empty()) {
+      f_header_ <<
+        indent() << "  channel_(channel)," << endl <<
+        indent() << "  itrans_(new ::apache::thrift::transport::TMemoryBuffer())," << endl <<
+        indent() << "  otrans_(new ::apache::thrift::transport::TMemoryBuffer())," << endl <<
+        indent() << "  piprot_(protocolFactory->getProtocol(itrans_))," << endl <<
+        indent() << "  poprot_(protocolFactory->getProtocol(otrans_)) {" << endl <<
+        indent() << "  iprot_ = piprot_.get();" << endl <<
+        indent() << "  oprot_ = poprot_.get();" << endl <<
+        indent() << "}" << endl;
+    } else {
+      f_header_ <<
+        indent() << "  " << extends << style << "Client(channel, protocolFactory) {}" << endl;
+    }
+  }
 
-  f_header_ <<
-    indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {" << endl <<
-    indent() << "  return poprot_;" << endl <<
-    indent() << "}" << endl;
+  if (style == "Cob") {
+    f_header_ <<
+      indent() << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {" << endl <<
+      indent() << "  return channel_;" << endl <<
+      indent() << "}" << endl;
+  }
 
   vector<t_function*> functions = tservice->get_functions();
   vector<t_function*>::const_iterator f_iter;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+    indent(f_header_) << function_signature(*f_iter, ifstyle) << ";" << endl;
+    // TODO(dreiss): Use private inheritance to avoid generating thise in cob-style.
     t_function send_function(g_type_void,
-                             string("send_") + (*f_iter)->get_name(),
-                             (*f_iter)->get_arglist());
-    indent(f_header_) << function_signature(*f_iter) << ";" << endl;
-    indent(f_header_) << function_signature(&send_function) << ";" << endl;
+        string("send_") + (*f_iter)->get_name(),
+        (*f_iter)->get_arglist());
+    indent(f_header_) << function_signature(&send_function, "") << ";" << endl;
     if (!(*f_iter)->is_oneway()) {
       t_struct noargs(program_);
       t_function recv_function((*f_iter)->get_returntype(),
-                               string("recv_") + (*f_iter)->get_name(),
-                               &noargs);
-      indent(f_header_) << function_signature(&recv_function) << ";" << endl;
+          string("recv_") + (*f_iter)->get_name(),
+          &noargs);
+      indent(f_header_) << function_signature(&recv_function, "") << ";" << endl;
     }
   }
   indent_down();
@@ -1653,11 +1859,19 @@ void t_cpp_generator::generate_service_c
     f_header_ <<
       " protected:" << endl;
     indent_up();
+
+    if (style == "Cob") {
+      f_header_ <<
+        indent() << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;"  << endl <<
+        indent() << "boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;"  << endl <<
+        indent() << "boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;"  << endl;
+    }
     f_header_ <<
       indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;"  << endl <<
       indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;"  << endl <<
       indent() << "::apache::thrift::protocol::TProtocol* iprot_;"  << endl <<
       indent() << "::apache::thrift::protocol::TProtocol* oprot_;"  << endl;
+
     indent_down();
   }
 
@@ -1665,7 +1879,7 @@ void t_cpp_generator::generate_service_c
     "};" << endl <<
     endl;
 
-  string scope = service_name_ + "Client::";
+  string scope = service_name_ + style + "Client::";
 
   // Generate client method implementations
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
@@ -1673,7 +1887,7 @@ void t_cpp_generator::generate_service_c
 
     // Open function
     indent(f_service_) <<
-      function_signature(*f_iter, scope) << endl;
+      function_signature(*f_iter, ifstyle, scope) << endl;
     scope_up(f_service_);
     indent(f_service_) <<
       "send_" << funname << "(";
@@ -1695,155 +1909,171 @@ void t_cpp_generator::generate_service_c
     }
     f_service_ << ");" << endl;
 
-    if (!(*f_iter)->is_oneway()) {
-      f_service_ << indent();
-      if (!(*f_iter)->get_returntype()->is_void()) {
-        if (is_complex_type((*f_iter)->get_returntype())) {
-          f_service_ << "recv_" << funname << "(_return);" << endl;
+    if (style != "Cob") {
+      if (!(*f_iter)->is_oneway()) {
+        f_service_ << indent();
+        if (!(*f_iter)->get_returntype()->is_void()) {
+          if (is_complex_type((*f_iter)->get_returntype())) {
+            f_service_ << "recv_" << funname << "(_return);" << endl;
+          } else {
+            f_service_ << "return recv_" << funname << "();" << endl;
+          }
         } else {
-          f_service_ << "return recv_" << funname << "();" << endl;
+          f_service_ <<
+            "recv_" << funname << "();" << endl;
         }
+      }
+    } else {
+      if (!(*f_iter)->is_oneway()) {
+        f_service_ <<
+          indent() << _this << "channel_->sendAndRecvMessage(" <<
+          "std::tr1::bind(cob, this), " << _this << "otrans_.get(), " <<
+          _this << "itrans_.get());" << endl;
       } else {
         f_service_ <<
-          "recv_" << funname << "();" << endl;
+        indent() << _this << "channel_->sendMessage(" <<
+          "std::tr1::bind(cob, this), " << _this << "otrans_.get());" << endl;
       }
     }
     scope_down(f_service_);
     f_service_ << endl;
 
-    // Function for sending
-    t_function send_function(g_type_void,
-                             string("send_") + (*f_iter)->get_name(),
-                             (*f_iter)->get_arglist());
-
-    // Open the send function
-    indent(f_service_) <<
-      function_signature(&send_function, scope) << endl;
-    scope_up(f_service_);
-
-    // Function arguments and results
-    string argsname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_pargs";
-    string resultname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_presult";
-
-    // Serialize the request
-    f_service_ <<
-      indent() << "int32_t cseqid = 0;" << endl <<
-      indent() << "oprot_->writeMessageBegin(\"" << (*f_iter)->get_name() << "\", ::apache::thrift::protocol::T_CALL, cseqid);" << endl <<
-      endl <<
-      indent() << argsname << " args;" << endl;
-
-    for (fld_iter = fields.begin(); fld_iter != fields.end(); ++fld_iter) {
-      f_service_ <<
-        indent() << "args." << (*fld_iter)->get_name() << " = &" << (*fld_iter)->get_name() << ";" << endl;
-    }
-
-    f_service_ <<
-      indent() << "args.write(oprot_);" << endl <<
-      endl <<
-      indent() << "oprot_->writeMessageEnd();" << endl <<
-      indent() << "oprot_->getTransport()->flush();" << endl <<
-      indent() << "oprot_->getTransport()->writeEnd();" << endl;
-
-    scope_down(f_service_);
-    f_service_ << endl;
+    //if (style != "Cob") // TODO(dreiss): Libify the client and don't generate this for cob-style
+    if (true) {
+      // Function for sending
+      t_function send_function(g_type_void,
+                               string("send_") + (*f_iter)->get_name(),
+                               (*f_iter)->get_arglist());
 
-    // Generate recv function only if not an oneway function
-    if (!(*f_iter)->is_oneway()) {
-      t_struct noargs(program_);
-      t_function recv_function((*f_iter)->get_returntype(),
-                               string("recv_") + (*f_iter)->get_name(),
-                               &noargs);
-      // Open function
+      // Open the send function
       indent(f_service_) <<
-        function_signature(&recv_function, scope) << endl;
+        function_signature(&send_function, "", scope) << endl;
       scope_up(f_service_);
 
+      // Function arguments and results
+      string argsname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_pargs";
+      string resultname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_presult";
+
+      // Serialize the request
       f_service_ <<
+        indent() << "int32_t cseqid = 0;" << endl <<
+        indent() << "oprot_->writeMessageBegin(\"" << (*f_iter)->get_name() << "\", ::apache::thrift::protocol::T_CALL, cseqid);" << endl <<
         endl <<
-        indent() << "int32_t rseqid = 0;" << endl <<
-        indent() << "std::string fname;" << endl <<
-        indent() << "::apache::thrift::protocol::TMessageType mtype;" << endl <<
-        endl <<
-        indent() << "iprot_->readMessageBegin(fname, mtype, rseqid);" << endl <<
-        indent() << "if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {" << endl <<
-        indent() << "  ::apache::thrift::TApplicationException x;" << endl <<
-        indent() << "  x.read(iprot_);" << endl <<
-        indent() << "  iprot_->readMessageEnd();" << endl <<
-        indent() << "  iprot_->getTransport()->readEnd();" << endl <<
-        indent() << "  throw x;" << endl <<
-        indent() << "}" << endl <<
-        indent() << "if (mtype != ::apache::thrift::protocol::T_REPLY) {" << endl <<
-        indent() << "  iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
-        indent() << "  iprot_->readMessageEnd();" << endl <<
-        indent() << "  iprot_->getTransport()->readEnd();" << endl <<
-        indent() << "  throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);" << endl <<
-        indent() << "}" << endl <<
-        indent() << "if (fname.compare(\"" << (*f_iter)->get_name() << "\") != 0) {" << endl <<
-        indent() << "  iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
-        indent() << "  iprot_->readMessageEnd();" << endl <<
-        indent() << "  iprot_->getTransport()->readEnd();" << endl <<
-        indent() << "  throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::WRONG_METHOD_NAME);" << endl <<
-        indent() << "}" << endl;
+        indent() << argsname << " args;" << endl;
 
-      if (!(*f_iter)->get_returntype()->is_void() &&
-          !is_complex_type((*f_iter)->get_returntype())) {
-        t_field returnfield((*f_iter)->get_returntype(), "_return");
+      for (fld_iter = fields.begin(); fld_iter != fields.end(); ++fld_iter) {
         f_service_ <<
-          indent() << declare_field(&returnfield) << endl;
+          indent() << "args." << (*fld_iter)->get_name() << " = &" << (*fld_iter)->get_name() << ";" << endl;
       }
 
       f_service_ <<
-        indent() << resultname << " result;" << endl;
+        indent() << "args.write(oprot_);" << endl <<
+        endl <<
+        indent() << "oprot_->writeMessageEnd();" << endl <<
+        indent() << "oprot_->getTransport()->flush();" << endl <<
+        indent() << "oprot_->getTransport()->writeEnd();" << endl;
+
+      scope_down(f_service_);
+      f_service_ << endl;
+
+      // Generate recv function only if not an oneway function
+      if (!(*f_iter)->is_oneway()) {
+        t_struct noargs(program_);
+        t_function recv_function((*f_iter)->get_returntype(),
+                                 string("recv_") + (*f_iter)->get_name(),
+                                 &noargs);
+        // Open function
+        indent(f_service_) <<
+          function_signature(&recv_function, "", scope) << endl;
+        scope_up(f_service_);
 
-      if (!(*f_iter)->get_returntype()->is_void()) {
         f_service_ <<
-          indent() << "result.success = &_return;" << endl;
-      }
+          endl <<
+          indent() << "int32_t rseqid = 0;" << endl <<
+          indent() << "std::string fname;" << endl <<
+          indent() << "::apache::thrift::protocol::TMessageType mtype;" << endl <<
+          endl <<
+          indent() << "iprot_->readMessageBegin(fname, mtype, rseqid);" << endl <<
+          indent() << "if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {" << endl <<
+          indent() << "  ::apache::thrift::TApplicationException x;" << endl <<
+          indent() << "  x.read(iprot_);" << endl <<
+          indent() << "  iprot_->readMessageEnd();" << endl <<
+          indent() << "  iprot_->getTransport()->readEnd();" << endl <<
+          indent() << "  throw x;" << endl <<
+          indent() << "}" << endl <<
+          indent() << "if (mtype != ::apache::thrift::protocol::T_REPLY) {" << endl <<
+          indent() << "  iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
+          indent() << "  iprot_->readMessageEnd();" << endl <<
+          indent() << "  iprot_->getTransport()->readEnd();" << endl <<
+          indent() << "  throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);" << endl <<
+          indent() << "}" << endl <<
+          indent() << "if (fname.compare(\"" << (*f_iter)->get_name() << "\") != 0) {" << endl <<
+          indent() << "  iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
+          indent() << "  iprot_->readMessageEnd();" << endl <<
+          indent() << "  iprot_->getTransport()->readEnd();" << endl <<
+          indent() << "  throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::WRONG_METHOD_NAME);" << endl <<
+          indent() << "}" << endl;
 
-      f_service_ <<
-        indent() << "result.read(iprot_);" << endl <<
-        indent() << "iprot_->readMessageEnd();" << endl <<
-        indent() << "iprot_->getTransport()->readEnd();" << endl <<
-        endl;
+        if (!(*f_iter)->get_returntype()->is_void() &&
+            !is_complex_type((*f_iter)->get_returntype())) {
+          t_field returnfield((*f_iter)->get_returntype(), "_return");
+          f_service_ <<
+            indent() << declare_field(&returnfield) << endl;
+        }
+
+        f_service_ <<
+          indent() << resultname << " result;" << endl;
 
-      // Careful, only look for _result if not a void function
-      if (!(*f_iter)->get_returntype()->is_void()) {
-        if (is_complex_type((*f_iter)->get_returntype())) {
+        if (!(*f_iter)->get_returntype()->is_void()) {
           f_service_ <<
-            indent() << "if (result.__isset.success) {" << endl <<
-            indent() << "  // _return pointer has now been filled" << endl <<
-            indent() << "  return;" << endl <<
-            indent() << "}" << endl;
-        } else {
+            indent() << "result.success = &_return;" << endl;
+        }
+
+        f_service_ <<
+          indent() << "result.read(iprot_);" << endl <<
+          indent() << "iprot_->readMessageEnd();" << endl <<
+          indent() << "iprot_->getTransport()->readEnd();" << endl <<
+          endl;
+
+        // Careful, only look for _result if not a void function
+        if (!(*f_iter)->get_returntype()->is_void()) {
+          if (is_complex_type((*f_iter)->get_returntype())) {
+            f_service_ <<
+              indent() << "if (result.__isset.success) {" << endl <<
+              indent() << "  // _return pointer has now been filled" << endl <<
+              indent() << "  return;" << endl <<
+              indent() << "}" << endl;
+          } else {
+            f_service_ <<
+              indent() << "if (result.__isset.success) {" << endl <<
+              indent() << "  return _return;" << endl <<
+              indent() << "}" << endl;
+          }
+        }
+
+        t_struct* xs = (*f_iter)->get_xceptions();
+        const std::vector<t_field*>& xceptions = xs->get_members();
+        vector<t_field*>::const_iterator x_iter;
+        for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
           f_service_ <<
-            indent() << "if (result.__isset.success) {" << endl <<
-            indent() << "  return _return;" << endl <<
+            indent() << "if (result.__isset." << (*x_iter)->get_name() << ") {" << endl <<
+            indent() << "  throw result." << (*x_iter)->get_name() << ";" << endl <<
             indent() << "}" << endl;
         }
-      }
 
-      t_struct* xs = (*f_iter)->get_xceptions();
-      const std::vector<t_field*>& xceptions = xs->get_members();
-      vector<t_field*>::const_iterator x_iter;
-      for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
-        f_service_ <<
-          indent() << "if (result.__isset." << (*x_iter)->get_name() << ") {" << endl <<
-          indent() << "  throw result." << (*x_iter)->get_name() << ";" << endl <<
-          indent() << "}" << endl;
-      }
+        // We only get here if we are a void function
+        if ((*f_iter)->get_returntype()->is_void()) {
+          indent(f_service_) <<
+            "return;" << endl;
+        } else {
+          f_service_ <<
+            indent() << "throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl;
+        }
 
-      // We only get here if we are a void function
-      if ((*f_iter)->get_returntype()->is_void()) {
-        indent(f_service_) <<
-          "return;" << endl;
-      } else {
-        f_service_ <<
-          indent() << "throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl;
+        // Close function
+        scope_down(f_service_);
+        f_service_ << endl;
       }
-
-      // Close function
-      scope_down(f_service_);
-      f_service_ << endl;
     }
   }
 }
@@ -1853,7 +2083,22 @@ void t_cpp_generator::generate_service_c
  *
  * @param tservice The service to generate a server for.
  */
-void t_cpp_generator::generate_service_processor(t_service* tservice) {
+void t_cpp_generator::generate_service_processor(t_service* tservice, string style) {
+  string ifstyle;
+  string pstyle;
+  string finish_cob;
+  string finish_cob_decl;
+  string cob_arg;
+  string ret_type = "bool ";
+  if (style == "Cob") {
+    ifstyle = "CobSv";
+    pstyle = "Async";
+    finish_cob = "std::tr1::function<void(bool ok)> cob, ";
+    finish_cob_decl = "std::tr1::function<void(bool ok)>, ";
+    cob_arg = "cob, ";
+    ret_type = "void ";
+  }
+
   // Generate the dispatch methods
   vector<t_function*> functions = tservice->get_functions();
   vector<t_function*>::iterator f_iter;
@@ -1862,13 +2107,13 @@ void t_cpp_generator::generate_service_p
   string extends_processor = "";
   if (tservice->get_extends() != NULL) {
     extends = type_name(tservice->get_extends());
-    extends_processor = ", public " + extends + "Processor";
+    extends_processor = ", public " + extends + pstyle + "Processor";
   }
 
   // Generate the header portion
   f_header_ <<
-    "class " << service_name_ << "Processor : " <<
-    "virtual public ::apache::thrift::TProcessor" <<
+    "class " << service_name_ << pstyle << "Processor : " <<
+    "virtual public ::apache::thrift::T" << pstyle << "Processor" <<
     extends_processor << " {" << endl;
 
   // Protected data members
@@ -1876,9 +2121,9 @@ void t_cpp_generator::generate_service_p
     " protected:" << endl;
   indent_up();
   f_header_ <<
-    indent() << "boost::shared_ptr<" << service_name_ << "If> iface_;" << endl;
+    indent() << "boost::shared_ptr<" << service_name_ << ifstyle << "If> iface_;" << endl;
   f_header_ <<
-    indent() << "virtual bool process_fn(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl;
+    indent() << "virtual " << ret_type << "process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl;
   indent_down();
 
   // Process function declarations
@@ -1886,10 +2131,20 @@ void t_cpp_generator::generate_service_p
     " private:" << endl;
   indent_up();
   f_header_ <<
-    indent() << "std::map<std::string, void (" << service_name_ << "Processor::*)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)> processMap_;" << endl;
+    indent() << "std::map<std::string, void (" << service_name_ << pstyle << "Processor::*)(" << finish_cob_decl << "int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)> processMap_;" << endl;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
     indent(f_header_) <<
-      "void process_" << (*f_iter)->get_name() << "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl;
+      "void process_" << (*f_iter)->get_name() << "(" << finish_cob << "int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl;
+    if (style == "Cob") {
+      // XXX Factor this out, even if it is a pain.
+      string ret_arg = ((*f_iter)->get_returntype()->is_void()
+                        ? ""
+                        : ", const " + type_name((*f_iter)->get_returntype()) + "& _return");
+      // XXX Don't declare throw if it doesn't exist
+      f_header_ <<
+        "void return_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ");" << endl <<
+        "void throw_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw);" << endl;
+    }
   }
   indent_down();
 
@@ -1903,6 +2158,7 @@ void t_cpp_generator::generate_service_p
     declare_map += (*f_iter)->get_name();
     declare_map += "\"] = &";
     declare_map += service_name_;
+    declare_map += pstyle;
     declare_map += "Processor::process_";
     declare_map += (*f_iter)->get_name();
     declare_map += ";\n";
@@ -1911,28 +2167,28 @@ void t_cpp_generator::generate_service_p
 
   f_header_ <<
     " public:" << endl <<
-    indent() << service_name_ << "Processor(boost::shared_ptr<" << service_name_ << "If> iface) :" << endl;
+    indent() << service_name_ << pstyle << "Processor(boost::shared_ptr<" << service_name_ << ifstyle << "If> iface) :" << endl;
   if (extends.empty()) {
     f_header_ <<
       indent() << "  iface_(iface) {" << endl;
   } else {
     f_header_ <<
-      indent() << "  " << extends << "Processor(iface)," << endl <<
+      indent() << "  " << extends << pstyle << "Processor(iface)," << endl <<
       indent() << "  iface_(iface) {" << endl;
   }
   f_header_ <<
     declare_map <<
     indent() << "}" << endl <<
     endl <<
-    indent() << "virtual bool process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl <<
-    indent() << "virtual ~" << service_name_ << "Processor() {}" << endl;
+    indent() << "virtual " << ret_type << "process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl <<
+    indent() << "virtual ~" << service_name_ << pstyle << "Processor() {}" << endl;
   indent_down();
   f_header_ <<
     "};" << endl << endl;
 
   // Generate the server implementation
   f_service_ <<
-    "bool " << service_name_ << "Processor::process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" << endl;
+    ret_type << service_name_ << pstyle << "Processor::process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" << endl;
   indent_up();
 
   f_service_ <<
@@ -1955,10 +2211,11 @@ void t_cpp_generator::generate_service_p
     indent() << "  oprot->writeMessageEnd();" << endl <<
     indent() << "  oprot->getTransport()->flush();" << endl <<
     indent() << "  oprot->getTransport()->writeEnd();" << endl <<
-    indent() << "  return true;" << endl <<
+    indent() << (style == "Cob" ? "  return cob(true);" : "  return true;") << endl <<
     indent() << "}" << endl <<
     endl <<
-    indent() << "return process_fn(iprot, oprot, fname, seqid);" <<
+    indent() << "return process_fn(" << (style == "Cob" ? "cob, " : "")
+             << "iprot, oprot, fname, seqid);" <<
     endl;
 
   indent_down();
@@ -1967,12 +2224,12 @@ void t_cpp_generator::generate_service_p
     endl;
 
   f_service_ <<
-    "bool " << service_name_ << "Processor::process_fn(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid) {" << endl;
+    ret_type << service_name_ << pstyle << "Processor::process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid) {" << endl;
   indent_up();
 
   // HOT: member function pointer map
   f_service_ <<
-    indent() << "std::map<std::string, void (" << service_name_ << "Processor::*)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)>::iterator pfn;" << endl <<
+    indent() << "std::map<std::string, void (" << service_name_ << pstyle << "Processor::*)(" << finish_cob_decl << "int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)>::iterator pfn;" << endl <<
     indent() << "pfn = processMap_.find(fname);" << endl <<
     indent() << "if (pfn == processMap_.end()) {" << endl;
   if (extends.empty()) {
@@ -1986,15 +2243,26 @@ void t_cpp_generator::generate_service_p
       indent() << "  oprot->writeMessageEnd();" << endl <<
       indent() << "  oprot->getTransport()->flush();" << endl <<
       indent() << "  oprot->getTransport()->writeEnd();" << endl <<
-      indent() << "  return true;" << endl;
+      indent() << (style == "Cob" ? "  return cob(true);" : "  return true;") << endl;
   } else {
     f_service_ <<
-      indent() << "  return " << extends << "Processor::process_fn(iprot, oprot, fname, seqid);" << endl;
+      indent() << "  return "
+               << extends << pstyle << "Processor::process_fn("
+               << (style == "Cob" ? "cob, " : "")
+               << "iprot, oprot, fname, seqid);" << endl;
   }
   f_service_ <<
     indent() << "}" << endl <<
-    indent() << "(this->*(pfn->second))(seqid, iprot, oprot);" << endl <<
-    indent() << "return true;" << endl;
+    indent() << "(this->*(pfn->second))(" << cob_arg << "seqid, iprot, oprot);" << endl;
+
+  // TODO(dreiss): return pfn ret?
+  if (style == "Cob") {
+    f_service_ <<
+      indent() << "return;" << endl;
+  } else {
+    f_service_ <<
+      indent() << "return true;" << endl;
+  }
 
   indent_down();
   f_service_ <<
@@ -2003,7 +2271,7 @@ void t_cpp_generator::generate_service_p
 
   // Generate the process subfunctions
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
-    generate_process_function(tservice, *f_iter);
+    generate_process_function(tservice, *f_iter, style);
   }
 }
 
@@ -2036,8 +2304,11 @@ void t_cpp_generator::generate_function_
   generate_struct_result_writer(f_service_, &result);
 
   result.set_name(tservice->get_name() + "_" + tfunction->get_name() + "_presult");
-  generate_struct_definition(f_header_, &result, false, true, true, false);
+  generate_struct_definition(f_header_, &result, false, true, true, gen_cob_style_);
   generate_struct_reader(f_service_, &result, true);
+  if (gen_cob_style_) {
+    generate_struct_writer(f_service_, &result, true);
+  }
 
 }
 
@@ -2047,165 +2318,303 @@ void t_cpp_generator::generate_function_
  * @param tfunction The function to write a dispatcher for
  */
 void t_cpp_generator::generate_process_function(t_service* tservice,
-                                                t_function* tfunction) {
-  // Open function
-  f_service_ <<
-    "void " << tservice->get_name() << "Processor::" <<
-    "process_" << tfunction->get_name() <<
-    "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl;
-  scope_up(f_service_);
-
-  string argsname = tservice->get_name() + "_" + tfunction->get_name() + "_args";
-  string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result";
-
-  f_service_ <<
-    indent() << "void* ctx = NULL;" << endl <<
-    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-    indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
-    indent() << "}" << endl <<
-    indent() << "// A glorified finally block since ctx is a void*" << endl <<
-    indent() << "class ContextFreer {" << endl <<
-    indent() << "  public:" << endl <<
-    indent() << "    ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
-    indent() << "      handler_(handler), context_(context) {}" << endl <<
-    indent() << "    ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
-    indent() << "  private:" << endl <<
-    indent() << "    ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
-    indent() << "    void* context_;" << endl <<
-    indent() << "};" << endl <<
-    indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
-    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-    indent() << "  eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
-    indent() << "}" << endl << endl <<
-    indent() << argsname << " args;" << endl <<
-    indent() << "args.read(iprot);" << endl <<
-    indent() << "iprot->readMessageEnd();" << endl <<
-    indent() << "iprot->getTransport()->readEnd();" << endl << endl <<
-    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-    indent() << "  eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
-    indent() << "}" << endl <<
-    endl;
+                                                t_function* tfunction,
+                                                string style) {
+  t_struct* arg_struct = tfunction->get_arglist();
+  const std::vector<t_field*>& fields = arg_struct->get_members();
+  vector<t_field*>::const_iterator f_iter;
 
   t_struct* xs = tfunction->get_xceptions();
   const std::vector<t_field*>& xceptions = xs->get_members();
   vector<t_field*>::const_iterator x_iter;
 
-  // Declare result
-  if (!tfunction->is_oneway()) {
+  // I tried to do this as one function.  I really did.  But it was too hard.
+  if (style != "Cob") {
+    // Open function
     f_service_ <<
-      indent() << resultname << " result;" << endl;
-  }
+      "void " << tservice->get_name() << "Processor::" <<
+      "process_" << tfunction->get_name() << "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl;
+    scope_up(f_service_);
 
-  // Try block for functions with exceptions
-  f_service_ <<
-    indent() << "try {" << endl;
-  indent_up();
+    string argsname = tservice->get_name() + "_" + tfunction->get_name() + "_args";
+    string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result";
 
-  // Generate the function call
-  t_struct* arg_struct = tfunction->get_arglist();
-  const std::vector<t_field*>& fields = arg_struct->get_members();
-  vector<t_field*>::const_iterator f_iter;
+    f_service_ <<
+      indent() << "void* ctx = NULL;" << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl <<
+      indent() << "// A glorified finally block since ctx is a void*" << endl <<
+      indent() << "class ContextFreer {" << endl <<
+      indent() << "  public:" << endl <<
+      indent() << "    ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
+      indent() << "      handler_(handler), context_(context) {}" << endl <<
+      indent() << "    ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
+      indent() << "  private:" << endl <<
+      indent() << "    ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
+      indent() << "    void* context_;" << endl <<
+      indent() << "};" << endl <<
+      indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl << endl <<
+      indent() << argsname << " args;" << endl <<
+      indent() << "args.read(iprot);" << endl <<
+      indent() << "iprot->readMessageEnd();" << endl <<
+      indent() << "iprot->getTransport()->readEnd();" << endl << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl <<
+      endl;
 
-  bool first = true;
-  f_service_ << indent();
-  if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
-    if (is_complex_type(tfunction->get_returntype())) {
-      first = false;
-      f_service_ << "iface_->" << tfunction->get_name() << "(result.success";
-    } else {
-      f_service_ << "result.success = iface_->" << tfunction->get_name() << "(";
+    // Declare result
+    if (!tfunction->is_oneway()) {
+      f_service_ <<
+        indent() << resultname << " result;" << endl;
     }
-  } else {
+
+    // Try block for functions with exceptions
     f_service_ <<
-      "iface_->" << tfunction->get_name() << "(";
-  }
-  for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
-    if (first) {
-      first = false;
+      indent() << "try {" << endl;
+    indent_up();
+
+    // Generate the function call
+    bool first = true;
+    f_service_ << indent();
+    if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
+      if (is_complex_type(tfunction->get_returntype())) {
+        first = false;
+        f_service_ << "iface_->" << tfunction->get_name() << "(result.success";
+      } else {
+        f_service_ << "result.success = iface_->" << tfunction->get_name() << "(";
+      }
     } else {
-      f_service_ << ", ";
+      f_service_ <<
+        "iface_->" << tfunction->get_name() << "(";
     }
-    f_service_ << "args." << (*f_iter)->get_name();
-  }
-  f_service_ << ");" << endl;
+    for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+      if (first) {
+        first = false;
+      } else {
+        f_service_ << ", ";
+      }
+      f_service_ << "args." << (*f_iter)->get_name();
+    }
+    f_service_ << ");" << endl;
 
-  // Set isset on success field
-  if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
-    f_service_ <<
-      indent() << "result.__isset.success = true;" << endl;
-  }
+    // Set isset on success field
+    if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
+      f_service_ <<
+        indent() << "result.__isset.success = true;" << endl;
+    }
 
-  indent_down();
-  f_service_ << indent() << "}";
+    indent_down();
+    f_service_ << indent() << "}";
 
-  if (!tfunction->is_oneway()) {
-    for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
-      f_service_ << " catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl;
-      if (!tfunction->is_oneway()) {
-        indent_up();
-        f_service_ <<
-          indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl <<
-          indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
-        indent_down();
-        f_service_ << indent() << "}";
-      } else {
-        f_service_ << "}";
+    if (!tfunction->is_oneway()) {
+      for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+        f_service_ << " catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl;
+        if (!tfunction->is_oneway()) {
+          indent_up();
+          f_service_ <<
+            indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl <<
+            indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
+          indent_down();
+          f_service_ << indent() << "}";
+        } else {
+          f_service_ << "}";
+        }
       }
     }
-  }
 
-  f_service_ << " catch (const std::exception& e) {" << endl;
+    f_service_ << " catch (const std::exception& e) {" << endl;
 
-  indent_up();
-  f_service_ <<
-    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-    indent() << "  eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
-    indent() << "}" << endl;
+    indent_up();
+    f_service_ <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl;
 
   if (!tfunction->is_oneway()) {
     f_service_ <<
       endl <<
-      indent() << "::apache::thrift::TApplicationException x(e.what());" << endl <<
-      indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
-      indent() << "x.write(oprot);" << endl <<
-      indent() << "oprot->writeMessageEnd();" << endl <<
-      indent() << "oprot->getTransport()->flush();" << endl <<
-      indent() << "oprot->getTransport()->writeEnd();" << endl;
+        indent() << "::apache::thrift::TApplicationException x(e.what());" << endl <<
+        indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
+        indent() << "x.write(oprot);" << endl <<
+        indent() << "oprot->writeMessageEnd();" << endl <<
+        indent() << "oprot->getTransport()->flush();" << endl <<
+        indent() << "oprot->getTransport()->writeEnd();" << endl;
   }
   f_service_ << indent() << "return;" << endl;
   indent_down();
   f_service_ << indent() << "}" << endl << endl;
 
-  // Shortcut out here for oneway functions
-  if (tfunction->is_oneway()) {
+    // Shortcut out here for oneway functions
+    if (tfunction->is_oneway()) {
+      f_service_ <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl << endl <<
+        indent() << "return;" << endl;
+      indent_down();
+      f_service_ << "}" << endl <<
+        endl;
+      return;
+    }
+
+    // Serialize the result into a struct
     f_service_ <<
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-      indent() << "  eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "  eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
       indent() << "}" << endl << endl <<
-      indent() << "return;" << endl;
+      indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
+      indent() << "result.write(oprot);" << endl <<
+      indent() << "oprot->writeMessageEnd();" << endl <<
+      indent() << "oprot->getTransport()->flush();" << endl <<
+      indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl;
+
     // Close function
     scope_down(f_service_);
     f_service_ << endl;
-    return;
   }
 
-  // Serialize the result into a struct
-  f_service_ <<
-      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-      indent() << "  eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
-      indent() << "}" << endl << endl <<
-    indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
-    indent() << "result.write(oprot);" << endl <<
-    indent() << "oprot->writeMessageEnd();" << endl <<
-    indent() << "oprot->getTransport()->flush();" << endl <<
-    indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
-    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-    indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
-    indent() << "}" << endl;
+  // Cob style.
+  else {
+    // Processor entry point.
+    f_service_ <<
+      "void " << tservice->get_name() << "AsyncProcessor::" <<
+      "process_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl;
+    scope_up(f_service_);
 
-  // Close function
-  scope_down(f_service_);
-  f_service_ << endl;
+    f_service_ <<
+      indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl <<
+      indent() << "try {" << endl;
+    indent_up();
+    f_service_ <<
+      indent() << "args.read(iprot);" << endl <<
+      indent() << "iprot->readMessageEnd();" << endl <<
+      indent() << "iprot->getTransport()->readEnd();" << endl;
+    scope_down(f_service_);
+
+    // TODO(dreiss): Handle TExceptions?  Expose to server?
+    f_service_ <<
+      indent() << "catch (const std::exception& exn) {" << endl <<
+      indent() << "  return cob(false);" << endl <<
+      indent() << "}" << endl;
+
+    // TODO(dreiss): Figure out a strategy for exceptions in async handlers.
+    f_service_ <<
+      indent() << "iface_->" << tfunction->get_name() << "(";
+    indent_up(); indent_up();
+    if (tfunction->is_oneway()) {
+      // No return.  Just hand off our cob.
+      // TODO(dreiss): Call the cob immediately?
+      f_service_ <<
+        "std::tr1::bind(cob, true)" << endl;
+    } else {
+      f_service_ << endl;
+      string ret_placeholder = ", std::tr1::placeholders::_1";
+      string comma = "";
+      if (tfunction->get_returntype()->is_void()) {
+        ret_placeholder = "";
+      }
+      f_service_ <<
+        indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
+                 << "return_" << tfunction->get_name()
+                 << ", this, cob, seqid, oprot" << ret_placeholder << ")";
+      if (!xceptions.empty()) {
+        f_service_
+                   << ',' << endl <<
+          indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
+                   << "throw_" << tfunction->get_name()
+                   << ", this, cob, seqid, oprot, std::tr1::placeholders::_1)";
+      }
+    }
+
+    // XXX Whitespace cleanup.
+    for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+      f_service_
+                 << ',' << endl <<
+        indent() << "args." << (*f_iter)->get_name();
+    }
+    f_service_ << ");" << endl;
+    indent_down(); indent_down();
+    scope_down(f_service_);
+    f_service_ << endl;
+
+    // Normal return.
+    if (!tfunction->is_oneway()) {
+      string ret_arg = (tfunction->get_returntype()->is_void()
+                        ? ""
+                        : ", const " + type_name(tfunction->get_returntype()) + "& _return");
+      f_service_ <<
+        "void " << tservice->get_name() << "AsyncProcessor::" <<
+        "return_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ')' << endl;
+      scope_up(f_service_);
+      f_service_ <<
+        indent() << tservice->get_name() << "_" << tfunction->get_name() << "_presult result;" << endl;
+      if (!tfunction->get_returntype()->is_void()) {
+        // The const_cast here is unfortunate, but it would be a pain to avoid,
+        // and we only do a write with this struct, which is const-safe.
+        f_service_ <<
+          indent() << "result.success = const_cast<" << type_name(tfunction->get_returntype()) << "*>(&_return);" << endl <<
+          indent() << "result.__isset.success = true;" << endl;
+      }
+      // Serialize the result into a struct
+      f_service_ <<
+        endl <<
+        indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
+        indent() << "result.write(oprot);" << endl <<
+        indent() << "oprot->writeMessageEnd();" << endl <<
+        indent() << "oprot->getTransport()->flush();" << endl <<
+        indent() << "oprot->getTransport()->writeEnd();" << endl <<
+        indent() << "return cob(true);" << endl;
+      scope_down(f_service_);
+      f_service_ << endl;
+    }
+
+    // Exception return.
+    if (!tfunction->is_oneway() && !xceptions.empty()) {
+      f_service_ <<
+        "void " << tservice->get_name() << "AsyncProcessor::" <<
+        "throw_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw)" << endl;
+      scope_up(f_service_);
+      f_service_ <<
+        indent() << tservice->get_name() << "_" << tfunction->get_name() << "_result result;" << endl << endl <<
+        indent() << "try {" << endl;
+      indent_up();
+      f_service_ <<
+        indent() << "_throw->throw_it();" << endl <<
+        indent() << "return cob(false);" << endl;  // Is this possible?  TBD.
+      indent_down();
+      f_service_ <<
+        indent() << '}';
+      for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+        f_service_ << "  catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl;
+        indent_up();
+        f_service_ <<
+          indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl <<
+          indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
+        scope_down(f_service_);
+      }
+
+      // Serialize the result into a struct
+      f_service_ <<
+        endl <<
+        indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
+        indent() << "result.write(oprot);" << endl <<
+        indent() << "oprot->writeMessageEnd();" << endl <<
+        indent() << "oprot->getTransport()->flush();" << endl <<
+        indent() << "oprot->getTransport()->writeEnd();" << endl <<
+        indent() << "return cob(true);" << endl;
+
+      scope_down(f_service_);
+      f_service_ << endl;
+    } // for each function
+  } // cob style
 }
 
 /**
@@ -2261,7 +2670,7 @@ void t_cpp_generator::generate_service_s
   vector<t_function*>::iterator f_iter;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
     f_skeleton <<
-      indent() << function_signature(*f_iter) << " {" << endl <<
+      indent() << function_signature(*f_iter, "") << " {" << endl <<
       indent() << "  // Your implementation goes here" << endl <<
       indent() << "  printf(\"" << (*f_iter)->get_name() << "\\n\");" << endl <<
       indent() << "}" << endl <<
@@ -2924,21 +3333,46 @@ string t_cpp_generator::declare_field(t_
  * @return String of rendered function definition
  */
 string t_cpp_generator::function_signature(t_function* tfunction,
+                                           string style,
                                            string prefix,
                                            bool name_params) {
   t_type* ttype = tfunction->get_returntype();
   t_struct* arglist = tfunction->get_arglist();
+  bool has_xceptions = !tfunction->get_xceptions()->get_members().empty();
+
+  if (style == "") {
+    if (is_complex_type(ttype)) {
+      return
+        "void " + prefix + tfunction->get_name() +
+        "(" + type_name(ttype) + (name_params ? "& _return" : "& /* _return */") +
+        argument_list(arglist, name_params, true) + ")";
+    } else {
+      return
+        type_name(ttype) + " " + prefix + tfunction->get_name() +
+        "(" + argument_list(arglist, name_params) + ")";
+    }
+  } else if (style.substr(0,3) == "Cob") {
+    string cob_type;
+    string exn_cob;
+    if (style == "CobCl") {
+      cob_type = "(" + service_name_ + "CobClient* client)";
+    } else if (style =="CobSv") {
+      cob_type = (ttype->is_void()
+                  ? "()"
+                  : ("(" + type_name(ttype) + " const& _return)"));
+      if (has_xceptions) {
+        exn_cob = ", std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob";
+      }
+    } else {
+      throw "UNKNOWN STYLE";
+    }
 
-  if (is_complex_type(ttype)) {
-    bool empty = arglist->get_members().size() == 0;
     return
       "void " + prefix + tfunction->get_name() +
-      "(" + type_name(ttype) + (name_params ? "& _return" : "& /* _return */") +
-      (empty ? "" : (", " + argument_list(arglist, name_params))) + ")";
+      "(std::tr1::function<void" + cob_type + "> cob" + exn_cob +
+      argument_list(arglist, name_params, true) + ")";
   } else {
-    return
-      type_name(ttype) + " " + prefix + tfunction->get_name() +
-      "(" + argument_list(arglist, name_params) + ")";
+    throw "UNKNOWN STYLE";
   }
 }
 
@@ -2948,12 +3382,12 @@ string t_cpp_generator::function_signatu
  * @param tstruct The struct definition
  * @return Comma sepearated list of all field names in that struct
  */
-string t_cpp_generator::argument_list(t_struct* tstruct, bool name_params) {
+string t_cpp_generator::argument_list(t_struct* tstruct, bool name_params, bool start_comma) {
   string result = "";
 
   const vector<t_field*>& fields = tstruct->get_members();
   vector<t_field*>::const_iterator f_iter;
-  bool first = true;
+  bool first = !start_comma;
   for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
     if (first) {
       first = false;

Modified: incubator/thrift/trunk/configure.ac
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/configure.ac?rev=1005127&r1=1005126&r2=1005127&view=diff
==============================================================================
--- incubator/thrift/trunk/configure.ac (original)
+++ incubator/thrift/trunk/configure.ac Wed Oct  6 17:09:37 2010
@@ -311,6 +311,14 @@ if test "$cross_compiling" = "no" ; then
   AX_SIGNED_RIGHT_SHIFT
 fi
 
+dnl autoscan thinks we need this macro because we have a member function
+dnl called "error".  Invoke the macro but don't run the check so autoscan
+dnl thinks we are in the clear.  It's highly unlikely that we will ever
+dnl actually use the function that this checks for.
+if false ; then
+  AC_FUNC_ERROR_AT_LINE
+fi
+
 AX_THRIFT_GEN(cpp, [C++], yes)
 AM_CONDITIONAL([THRIFT_GEN_cpp], [test "$ax_thrift_gen_cpp" = "yes"])
 AX_THRIFT_GEN(java, [Java], yes)

Added: incubator/thrift/trunk/contrib/async-test/Makefile
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/contrib/async-test/Makefile?rev=1005127&view=auto
==============================================================================
--- incubator/thrift/trunk/contrib/async-test/Makefile (added)
+++ incubator/thrift/trunk/contrib/async-test/Makefile Wed Oct  6 17:09:37 2010
@@ -0,0 +1,33 @@
+THRIFT = thrift
+CXXFLAGS = `pkg-config --cflags thrift thrift-nb` -levent
+LDLIBS = `pkg-config --libs thrift thrift-nb` -levent
+CXXFLAGS += -g -O0
+
+GENNAMES = Aggr aggr_types
+GENHEADERS = $(addsuffix .h, $(GENNAMES))
+GENSRCS = $(addsuffix .cpp, $(GENNAMES))
+GENOBJS = $(addsuffix .o, $(GENNAMES))
+
+PYLIBS = aggr/__init__.py
+
+PROGS =  test-server
+
+all: $(PYLIBS) $(PROGS)
+
+test-server: test-server.o $(GENOBJS)
+
+test-server.o: $(GENSRCS)
+
+aggr/__init__.py: aggr.thrift
+	$(RM) $(dir $@)
+	$(THRIFT) --gen py:newstyle $<
+	mv gen-py/$(dir $@) .
+
+$(GENSRCS): aggr.thrift
+	$(THRIFT) --gen cpp:cob_style $<
+	mv $(addprefix gen-cpp/, $(GENSRCS) $(GENHEADERS)) .
+
+clean:
+	$(RM) -r *.o $(PROGS) $(GENSRCS) $(GENHEADERS) gen-* aggr
+
+.PHONY: clean

Added: incubator/thrift/trunk/contrib/async-test/aggr.thrift
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/contrib/async-test/aggr.thrift?rev=1005127&view=auto
==============================================================================
--- incubator/thrift/trunk/contrib/async-test/aggr.thrift (added)
+++ incubator/thrift/trunk/contrib/async-test/aggr.thrift Wed Oct  6 17:09:37 2010
@@ -0,0 +1,8 @@
+exception Error {
+  1: string desc;
+}
+
+service Aggr {
+  void addValue(1: i32 value);
+  list<i32> getValues() throws (1: Error err);
+}

Added: incubator/thrift/trunk/contrib/async-test/test-leaf.py
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/contrib/async-test/test-leaf.py?rev=1005127&view=auto
==============================================================================
--- incubator/thrift/trunk/contrib/async-test/test-leaf.py (added)
+++ incubator/thrift/trunk/contrib/async-test/test-leaf.py Wed Oct  6 17:09:37 2010
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+import sys
+import time
+from thrift.transport import TTransport
+from thrift.transport import TSocket
+from thrift.protocol import TBinaryProtocol
+from thrift.server import THttpServer
+from aggr import Aggr
+
+class AggrHandler(Aggr.Iface):
+  def __init__(self):
+    self.values = []
+
+  def addValue(self, value):
+    self.values.append(value)
+
+  def getValues(self, ):
+    time.sleep(1)
+    return self.values
+
+processor = Aggr.Processor(AggrHandler())
+pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+THttpServer.THttpServer(processor, ('', int(sys.argv[1])), pfactory).serve()

Propchange: incubator/thrift/trunk/contrib/async-test/test-leaf.py
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/thrift/trunk/contrib/async-test/test-server.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/contrib/async-test/test-server.cpp?rev=1005127&view=auto
==============================================================================
--- incubator/thrift/trunk/contrib/async-test/test-server.cpp (added)
+++ incubator/thrift/trunk/contrib/async-test/test-server.cpp Wed Oct  6 17:09:37 2010
@@ -0,0 +1,97 @@
+#include <tr1/functional>
+#include "protocol/TBinaryProtocol.h"
+#include "async/TAsyncProtocolProcessor.h"
+#include "async/TEvhttpServer.h"
+#include "async/TEvhttpClientChannel.h"
+#include "Aggr.h"
+
+using std::tr1::bind;
+using std::tr1::placeholders::_1;
+
+using apache::thrift::TException;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::async::TEvhttpServer;
+using apache::thrift::async::TAsyncProcessor;
+using apache::thrift::async::TAsyncBufferProcessor;
+using apache::thrift::async::TAsyncProtocolProcessor;
+using apache::thrift::async::TAsyncChannel;
+using apache::thrift::async::TEvhttpClientChannel;
+
+class AggrAsyncHandler : public AggrCobSvIf {
+ protected:
+  struct RequestContext {
+    std::tr1::function<void(std::vector<int32_t> const& _return)> cob;
+    std::vector<int32_t> ret;
+    int pending_calls;
+  };
+
+ public:
+  AggrAsyncHandler()
+    : eb_(NULL)
+    , pfact_(new TBinaryProtocolFactory())
+  {
+    leaf_ports_.push_back(8081);
+    leaf_ports_.push_back(8082);
+  }
+
+  void addValue(std::tr1::function<void()> cob, const int32_t value) {
+    // Silently drop writes to the aggrgator.
+    return cob();
+  }
+
+  void getValues(std::tr1::function<void(
+        std::vector<int32_t> const& _return)> cob,
+      std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
+    RequestContext* ctx = new RequestContext();
+    ctx->cob = cob;
+    ctx->pending_calls = leaf_ports_.size();
+    for (std::vector<int>::iterator it = leaf_ports_.begin();
+        it != leaf_ports_.end(); ++it) {
+      boost::shared_ptr<TAsyncChannel> channel(
+          new TEvhttpClientChannel(
+            "localhost", "/", "127.0.0.1", *it, eb_));
+      AggrCobClient* client = new AggrCobClient(channel, pfact_.get());
+      client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1));
+    }
+  }
+
+  void setEventBase(struct event_base* eb) {
+    eb_ = eb;
+  }
+
+  void clientReturn(RequestContext* ctx, AggrCobClient* client) {
+    ctx->pending_calls -= 1;
+
+    try {
+      std::vector<int32_t> subret;
+      client->recv_getValues(subret);
+      ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end());
+    } catch (TException& exn) {
+      // TODO: Log error
+    }
+
+    delete client;
+
+    if (ctx->pending_calls == 0) {
+      ctx->cob(ctx->ret);
+      delete ctx;
+    }
+  }
+
+ protected:
+  struct event_base* eb_;
+  std::vector<int> leaf_ports_;
+  boost::shared_ptr<TProtocolFactory> pfact_;
+};
+
+
+int main() {
+  boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler());
+  boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler));
+  boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory());
+  boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact));
+  boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080));
+  handler->setEventBase(server->getEventBase());
+  server->serve();
+}

Modified: incubator/thrift/trunk/lib/cpp/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/Makefile.am?rev=1005127&r1=1005126&r2=1005127&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/Makefile.am (original)
+++ incubator/thrift/trunk/lib/cpp/Makefile.am Wed Oct  6 17:09:37 2010
@@ -70,9 +70,11 @@ libthrift_la_SOURCES = src/Thrift.cpp \
                        src/server/TSimpleServer.cpp \
                        src/server/TThreadPoolServer.cpp \
                        src/server/TThreadedServer.cpp \
+											 src/async/TAsyncChannel.cpp \
                        src/processor/PeekProcessor.cpp
 
-libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp
+libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp \
+                         src/async/TAsyncProtocolProcessor.cpp
 
 libthriftz_la_SOURCES = src/transport/TZlibTransport.cpp
 
@@ -148,6 +150,17 @@ include_processor_HEADERS = \
                          src/processor/PeekProcessor.h \
                          src/processor/StatsProcessor.h
 
+include_asyncdir = $(include_thriftdir)/async
+include_async_HEADERS = \
+                     src/async/TAsyncChannel.h \
+                     src/async/TAsyncProcessor.h \
+                     src/async/TAsyncBufferProcessor.h \
+                     src/async/TAsyncProtocolProcessor.h \
+                     src/async/TEvhttpClientChannel.h \
+                     src/async/TEvhttpServer.h \
+                     src/async/SimpleCallback.h
+
+
 noinst_PROGRAMS = concurrency_test
 
 concurrency_test_SOURCES = src/concurrency/test/Tests.cpp \

Modified: incubator/thrift/trunk/lib/cpp/src/TProcessor.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/TProcessor.h?rev=1005127&r1=1005126&r2=1005127&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/TProcessor.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/TProcessor.h Wed Oct  6 17:09:37 2010
@@ -119,4 +119,4 @@ class TProcessor {
 
 }} // apache::thrift
 
-#endif // #ifndef _THRIFT_PROCESSOR_H_
+#endif // #ifndef _THRIFT_TPROCESSOR_H_

Modified: incubator/thrift/trunk/lib/cpp/src/Thrift.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/Thrift.h?rev=1005127&r1=1005126&r2=1005127&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/Thrift.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/Thrift.h Wed Oct  6 17:09:37 2010
@@ -110,6 +110,29 @@ namespace reflection { namespace local {
 struct TypeSpec;
 }}
 
+class TDelayedException {
+ public:
+  template <class E> static TDelayedException* delayException(const E& e);
+  virtual void throw_it() = 0;
+  virtual ~TDelayedException() {};
+};
+
+template <class E> class TExceptionWrapper : public TDelayedException {
+ public:
+  TExceptionWrapper(const E& e) : e_(e) {}
+  virtual void throw_it() {
+    E temp(e_);
+    delete this;
+    throw temp;
+  }
+ private:
+  E e_;
+};
+
+template <class E>
+TDelayedException* TDelayedException::delayException(const E& e) {
+  return new TExceptionWrapper<E>(e);
+}
 
 }} // apache::thrift
 

Added: incubator/thrift/trunk/lib/cpp/src/async/SimpleCallback.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/async/SimpleCallback.h?rev=1005127&view=auto
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/async/SimpleCallback.h (added)
+++ incubator/thrift/trunk/lib/cpp/src/async/SimpleCallback.h Wed Oct  6 17:09:37 2010
@@ -0,0 +1,98 @@
+#ifndef _THRIFT_ASYNC_SIMPLECALLBACK_H_
+#define _THRIFT_ASYNC_SIMPLECALLBACK_H_ 1
+
+#include <Thrift.h>
+namespace apache { namespace thrift {
+
+/**
+ * A template class for forming simple method callbacks with either an empty
+ * argument list or one argument of known type.
+ *
+ * For more efficiency where tr1::function is overkill.
+ */
+
+template<typename C,              ///< class whose method we wish to wrap
+         typename A = void,       ///< type of argument
+         typename R = void>       ///< type of return value
+class SimpleCallback {
+  typedef R (C::*cfptr_t)(A);     ///< pointer-to-member-function type
+  cfptr_t fptr_;                  ///< the embedded function pointer
+  C* obj_;                        ///< object whose function we're wrapping
+ public:
+  /**
+   * Constructor for empty callback object.
+   */
+  SimpleCallback() :
+    fptr_(NULL), obj_(NULL) {}
+  /**
+   * Construct callback wrapper for member function.
+   *
+   * @param fptr pointer-to-member-function
+   * @param "this" for object associated with callback
+   */
+  SimpleCallback(cfptr_t fptr, const C* obj) :
+    fptr_(fptr), obj_(const_cast<C*>(obj))
+  {}
+
+  /**
+   * Make a call to the member function we've wrapped.
+   *
+   * @param i argument for the wrapped member function
+   * @return value from that function
+   */
+  R operator()(A i) const {
+    (obj_->*fptr_)(i);
+  }
+
+  operator bool() const {
+    return obj_ != NULL && fptr_ != NULL;
+  }
+
+  ~SimpleCallback() {}
+};
+
+/**
+ * Specialization of SimpleCallback for empty argument list.
+ */
+template<typename C,              ///< class whose method we wish to wrap
+         typename R>              ///< type of return value
+class SimpleCallback<C, void, R> {
+  typedef R (C::*cfptr_t)();      ///< pointer-to-member-function type
+  cfptr_t fptr_;                  ///< the embedded function pointer
+  C* obj_;                        ///< object whose function we're wrapping
+ public:
+  /**
+   * Constructor for empty callback object.
+   */
+  SimpleCallback() :
+    fptr_(NULL), obj_(NULL) {}
+
+  /**
+   * Construct callback wrapper for member function.
+   *
+   * @param fptr pointer-to-member-function
+   * @param obj "this" for object associated with callback
+   */
+  SimpleCallback(cfptr_t fptr, const C* obj) :
+    fptr_(fptr), obj_(const_cast<C*>(obj))
+  {}
+
+  /**
+   * Make a call to the member function we've wrapped.
+   *
+   * @return value from that function
+   */
+  R operator()() const {
+    (obj_->*fptr_)();
+  }
+
+  operator bool() const {
+    return obj_ != NULL && fptr_ != NULL;
+  }
+
+  ~SimpleCallback() {}
+};
+
+}} // apache::thrift
+
+#endif /* !_THRIFT_ASYNC_SIMPLECALLBACK_H_ */

Added: incubator/thrift/trunk/lib/cpp/src/async/TAsyncBufferProcessor.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/async/TAsyncBufferProcessor.h?rev=1005127&view=auto
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/async/TAsyncBufferProcessor.h (added)
+++ incubator/thrift/trunk/lib/cpp/src/async/TAsyncBufferProcessor.h Wed Oct  6 17:09:37 2010
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_
+#define _THRIFT_TASYNC_BUFFER_PROCESSOR_H_ 1
+
+#include <tr1/functional>
+#include <boost/shared_ptr.hpp>
+
+#include "transport/TBufferTransports.h"
+
+namespace apache { namespace thrift { namespace async {
+
+class TAsyncBufferProcessor {
+ public:
+  // Process data in "in", putting the result in "out".
+  // Call _return(true) when done, or _return(false) to
+  // forcefully close the connection (if applicable).
+  // "in" and "out" should be TMemoryBuffer or similar,
+  // not a wrapper around a socket.
+  virtual void process(
+      std::tr1::function<void(bool healthy)> _return,
+      boost::shared_ptr<apache::thrift::transport::TBufferBase> ibuf,
+      boost::shared_ptr<apache::thrift::transport::TBufferBase> obuf) = 0;
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_