You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@thrift.apache.org by Erik Bernhardsson <er...@spotify.com> on 2009/07/09 14:41:21 UTC

Asynchronous C++ client/server (THRIFT-1)

Hi everyone,

There's been some discussions about asynchronous C++ clients and
servers (THRIFT-1). Inspired by the Twisted server (THRIFT-148) by
Esteve Fernandez, as well as his C++ ideas (THRIFT-311), we (Erik
Bernhardsson, Mattias de Zalenski) decided to implement our own
version here at Spotify.

Based on TFramedTransport and Boost.ASIO, this version is event-driven
and runs fully asynchronously in a single thread. There is no overhead
in terms of additional threads. This code is not thread-safe yet, in
the sense that multiple threads cannot simultaneously invoke calls on
the same client, though we're definitely interested in discussing how
to implement this.

We have implemented an additional C++ stub generator (invoked by --gen
cpp:async) which adds two extra arguments to all client and server
methods. These are a callback and an errback, which are both Boost
closures. On the client side, these are passed when making a call, and
will be invoked when the call returns. On the server side, the server
is free to invoke these at any point in the future in order to respond
to the client. Calls and responses can be sent in any order. See code
example below.

The stub generator is independent of the ASIO code and can easily be
plugged into other reactor loops/frameworks. It shares most of the
code with the standard cpp generator (and does generate the
synchronous stubs as well). We have attached a diff between them.

This is still very much an early version, so feel welcome to submit
your comments. There are a few design decisions we still aren't 100%
sure of. Additionally, we haven't worked out how to handle low-level
failures such as disconnects. We are also working on a stress test and
some unit tests.

We have compiled everything by adding the stub generator to
compiler/cpp/src/ and the client/server code to lib/cpp/src/async/. In
order for this to work, the corresponding Makefiles have to be
modified and everything has to be compiled with -lboost_system.

Code example for the server follows below. We have modified the add
method so that it sleeps for num1+num2 seconds before returning, so
that there is an easy way of generating responses in a different order
than requests. We only include the most relevant parts below, but have
attached the full source code. Client code follows after the server
code.

class CalculatorAsyncHandler : public CalculatorAsyncIf {
 public:
 CalculatorAsyncHandler() {}

 virtual void ping(boost::function<void (void)> callback,
boost::function<void (Calculator_ping_result)> errback) {
   printf("ping()\n");
   callback();
 }

 virtual void add(const int32_t num1, const int32_t num2,
boost::function<void (int32_t)> callback, boost::function<void
(Calculator_add_result)> errback) {
   printf("add(%d,%d)\n", num1, num2);
   boost::shared_ptr<boost::asio::deadline_timer> timer(new
boost::asio::deadline_timer(io_service,
boost::posix_time::seconds(num1 + num2)));
   timer->async_wait(boost::bind(&CalculatorAsyncHandler::wait_done,
this, num1 + num2, callback, timer));
 }
 virtual void wait_done(const int32_t sum, boost::function<void
(int32_t)> callback, boost::shared_ptr<boost::asio::deadline_timer>) {
   callback(sum);
   // timer will fall out of scope now and will be deleted
 }

 virtual void calculate(const int32_t logid, const Work& w,
boost::function<void (int32_t)> callback, boost::function<void
(Calculator_calculate_result)> errback) {
(...)
   case DIVIDE:
     if (w.num2 == 0) {
       InvalidOperation io;
       io.what = w.op;
       io.why = "Cannot divide by 0";
       errback(calculate_ouch(io));
       return;
     }
     val = w.num1 / w.num2;
     break;
   default:
     errback(calculate_failure(std::string("Invalid Operation")));
     return;
   }
 (...)
}
(... other methods, omitted for brevity)

};

int main(int argc, char **argv) {
 boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
protocol::TBinaryProtocolFactory());
 boost::shared_ptr<CalculatorAsyncHandler> handler(new
CalculatorAsyncHandler());
 boost::shared_ptr<TProcessor> processor(new
CalculatorAsyncProcessor(handler));

 boost::shared_ptr<apache::thrift::async::TAsioServer> server(
                                                              new
apache::thrift::async::TAsioServer(

                              io_service,

                              9090,

                              protocolFactory,

                              protocolFactory,

                              processor));

 server->start(); // Nonblocking
 io_service.run(); // Blocking

 return 0;
}

Code for the client:

void pingback() {
 printf("ping()\n");
}

void pingerr(tutorial::Calculator_ping_result result) {
 printf("Exception caught\n");
}

void addback(int32_t a, int32_t b, int32_t sum) {
 printf("%d+%d=%d\n", a, b, sum);
}

void adderr(tutorial::Calculator_add_result result) {
 printf("Exception caught\n");
}

void connected(boost::shared_ptr<tutorial::CalculatorAsyncClient> client) {
 client->ping(pingback, pingerr);

 client->add(2, 3, boost::bind(&addback, 2, 3, _1), &adderr);  //
will return after 5s
 client->add(1, 2, boost::bind(&addback, 1, 2, _1), &adderr);  //
will return after 3s
 client->add(1, 1, boost::bind(&addback, 1, 1, _1), &adderr);  //
will return after 2s
}

int main(int argc, char* argv[])
{
 try
 {
   boost::asio::io_service io_service;

   boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
protocol::TBinaryProtocolFactory());

   boost::shared_ptr<async::TAsioClient> client (
                                                 new async::TAsioClient(

 io_service,

 protocolFactory,

 protocolFactory));

   client->connect("localhost", 9090, connected); // the type of the
client (tutorial::CalculatorAsyncClient) is inferred from the
signature of connected

   io_service.run();
 }
 catch (std::exception& e)
 {
   std::cout << "Exception: " << e.what() << "\n";
 }

 return 0;
}

Regards,
Erik Bernhardsson,
Mattias de Zalenski
Spotify - http://www.spotify.com/

Re: Asynchronous C++ client/server (THRIFT-1)

Posted by Erik Bernhardsson <er...@spotify.com>.
I'm trying to attach the files yet again, renaming the files to .txt.
Sorry about the confusion.

On Thu, Jul 9, 2009 at 3:43 PM, Erik Bernhardsson<er...@spotify.com> wrote:
> It seems like most of the attachments were somehow lost, so I'll try
> sending them again. There should  be a CppClient.cpp, CppServer.cpp,
> TAsync.h, TAsioAsync.h and TAsioAsync.cpp
>
> On Thu, Jul 9, 2009 at 2:41 PM, Erik Bernhardsson<er...@spotify.com> wrote:
>> Hi everyone,
>>
>> There's been some discussions about asynchronous C++ clients and
>> servers (THRIFT-1). Inspired by the Twisted server (THRIFT-148) by
>> Esteve Fernandez, as well as his C++ ideas (THRIFT-311), we (Erik
>> Bernhardsson, Mattias de Zalenski) decided to implement our own
>> version here at Spotify.
>>
>> Based on TFramedTransport and Boost.ASIO, this version is event-driven
>> and runs fully asynchronously in a single thread. There is no overhead
>> in terms of additional threads. This code is not thread-safe yet, in
>> the sense that multiple threads cannot simultaneously invoke calls on
>> the same client, though we're definitely interested in discussing how
>> to implement this.
>>
>> We have implemented an additional C++ stub generator (invoked by --gen
>> cpp:async) which adds two extra arguments to all client and server
>> methods. These are a callback and an errback, which are both Boost
>> closures. On the client side, these are passed when making a call, and
>> will be invoked when the call returns. On the server side, the server
>> is free to invoke these at any point in the future in order to respond
>> to the client. Calls and responses can be sent in any order. See code
>> example below.
>>
>> The stub generator is independent of the ASIO code and can easily be
>> plugged into other reactor loops/frameworks. It shares most of the
>> code with the standard cpp generator (and does generate the
>> synchronous stubs as well). We have attached a diff between them.
>>
>> This is still very much an early version, so feel welcome to submit
>> your comments. There are a few design decisions we still aren't 100%
>> sure of. Additionally, we haven't worked out how to handle low-level
>> failures such as disconnects. We are also working on a stress test and
>> some unit tests.
>>
>> We have compiled everything by adding the stub generator to
>> compiler/cpp/src/ and the client/server code to lib/cpp/src/async/. In
>> order for this to work, the corresponding Makefiles have to be
>> modified and everything has to be compiled with -lboost_system.
>>
>> Code example for the server follows below. We have modified the add
>> method so that it sleeps for num1+num2 seconds before returning, so
>> that there is an easy way of generating responses in a different order
>> than requests. We only include the most relevant parts below, but have
>> attached the full source code. Client code follows after the server
>> code.
>>
>> class CalculatorAsyncHandler : public CalculatorAsyncIf {
>>  public:
>>  CalculatorAsyncHandler() {}
>>
>>  virtual void ping(boost::function<void (void)> callback,
>> boost::function<void (Calculator_ping_result)> errback) {
>>    printf("ping()\n");
>>    callback();
>>  }
>>
>>  virtual void add(const int32_t num1, const int32_t num2,
>> boost::function<void (int32_t)> callback, boost::function<void
>> (Calculator_add_result)> errback) {
>>    printf("add(%d,%d)\n", num1, num2);
>>    boost::shared_ptr<boost::asio::deadline_timer> timer(new
>> boost::asio::deadline_timer(io_service,
>> boost::posix_time::seconds(num1 + num2)));
>>    timer->async_wait(boost::bind(&CalculatorAsyncHandler::wait_done,
>> this, num1 + num2, callback, timer));
>>  }
>>  virtual void wait_done(const int32_t sum, boost::function<void
>> (int32_t)> callback, boost::shared_ptr<boost::asio::deadline_timer>) {
>>    callback(sum);
>>    // timer will fall out of scope now and will be deleted
>>  }
>>
>>  virtual void calculate(const int32_t logid, const Work& w,
>> boost::function<void (int32_t)> callback, boost::function<void
>> (Calculator_calculate_result)> errback) {
>> (...)
>>    case DIVIDE:
>>      if (w.num2 == 0) {
>>        InvalidOperation io;
>>        io.what = w.op;
>>        io.why = "Cannot divide by 0";
>>        errback(calculate_ouch(io));
>>        return;
>>      }
>>      val = w.num1 / w.num2;
>>      break;
>>    default:
>>      errback(calculate_failure(std::string("Invalid Operation")));
>>      return;
>>    }
>>  (...)
>> }
>> (... other methods, omitted for brevity)
>>
>> };
>>
>> int main(int argc, char **argv) {
>>  boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
>> protocol::TBinaryProtocolFactory());
>>  boost::shared_ptr<CalculatorAsyncHandler> handler(new
>> CalculatorAsyncHandler());
>>  boost::shared_ptr<TProcessor> processor(new
>> CalculatorAsyncProcessor(handler));
>>
>>  boost::shared_ptr<apache::thrift::async::TAsioServer> server(
>>                                                               new
>> apache::thrift::async::TAsioServer(
>>
>>                               io_service,
>>
>>                               9090,
>>
>>                               protocolFactory,
>>
>>                               protocolFactory,
>>
>>                               processor));
>>
>>  server->start(); // Nonblocking
>>  io_service.run(); // Blocking
>>
>>  return 0;
>> }
>>
>> Code for the client:
>>
>> void pingback() {
>>  printf("ping()\n");
>> }
>>
>> void pingerr(tutorial::Calculator_ping_result result) {
>>  printf("Exception caught\n");
>> }
>>
>> void addback(int32_t a, int32_t b, int32_t sum) {
>>  printf("%d+%d=%d\n", a, b, sum);
>> }
>>
>> void adderr(tutorial::Calculator_add_result result) {
>>  printf("Exception caught\n");
>> }
>>
>> void connected(boost::shared_ptr<tutorial::CalculatorAsyncClient> client) {
>>  client->ping(pingback, pingerr);
>>
>>  client->add(2, 3, boost::bind(&addback, 2, 3, _1), &adderr);  //
>> will return after 5s
>>  client->add(1, 2, boost::bind(&addback, 1, 2, _1), &adderr);  //
>> will return after 3s
>>  client->add(1, 1, boost::bind(&addback, 1, 1, _1), &adderr);  //
>> will return after 2s
>> }
>>
>> int main(int argc, char* argv[])
>> {
>>  try
>>  {
>>    boost::asio::io_service io_service;
>>
>>    boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
>> protocol::TBinaryProtocolFactory());
>>
>>    boost::shared_ptr<async::TAsioClient> client (
>>                                                  new async::TAsioClient(
>>
>>  io_service,
>>
>>  protocolFactory,
>>
>>  protocolFactory));
>>
>>    client->connect("localhost", 9090, connected); // the type of the
>> client (tutorial::CalculatorAsyncClient) is inferred from the
>> signature of connected
>>
>>    io_service.run();
>>  }
>>  catch (std::exception& e)
>>  {
>>    std::cout << "Exception: " << e.what() << "\n";
>>  }
>>
>>  return 0;
>> }
>>
>> Regards,
>> Erik Bernhardsson,
>> Mattias de Zalenski
>> Spotify - http://www.spotify.com/
>>
>

Re: Asynchronous C++ client/server (THRIFT-1)

Posted by Erik Bernhardsson <er...@spotify.com>.
It seems like most of the attachments were somehow lost, so I'll try
sending them again. There should  be a CppClient.cpp, CppServer.cpp,
TAsync.h, TAsioAsync.h and TAsioAsync.cpp

On Thu, Jul 9, 2009 at 2:41 PM, Erik Bernhardsson<er...@spotify.com> wrote:
> Hi everyone,
>
> There's been some discussions about asynchronous C++ clients and
> servers (THRIFT-1). Inspired by the Twisted server (THRIFT-148) by
> Esteve Fernandez, as well as his C++ ideas (THRIFT-311), we (Erik
> Bernhardsson, Mattias de Zalenski) decided to implement our own
> version here at Spotify.
>
> Based on TFramedTransport and Boost.ASIO, this version is event-driven
> and runs fully asynchronously in a single thread. There is no overhead
> in terms of additional threads. This code is not thread-safe yet, in
> the sense that multiple threads cannot simultaneously invoke calls on
> the same client, though we're definitely interested in discussing how
> to implement this.
>
> We have implemented an additional C++ stub generator (invoked by --gen
> cpp:async) which adds two extra arguments to all client and server
> methods. These are a callback and an errback, which are both Boost
> closures. On the client side, these are passed when making a call, and
> will be invoked when the call returns. On the server side, the server
> is free to invoke these at any point in the future in order to respond
> to the client. Calls and responses can be sent in any order. See code
> example below.
>
> The stub generator is independent of the ASIO code and can easily be
> plugged into other reactor loops/frameworks. It shares most of the
> code with the standard cpp generator (and does generate the
> synchronous stubs as well). We have attached a diff between them.
>
> This is still very much an early version, so feel welcome to submit
> your comments. There are a few design decisions we still aren't 100%
> sure of. Additionally, we haven't worked out how to handle low-level
> failures such as disconnects. We are also working on a stress test and
> some unit tests.
>
> We have compiled everything by adding the stub generator to
> compiler/cpp/src/ and the client/server code to lib/cpp/src/async/. In
> order for this to work, the corresponding Makefiles have to be
> modified and everything has to be compiled with -lboost_system.
>
> Code example for the server follows below. We have modified the add
> method so that it sleeps for num1+num2 seconds before returning, so
> that there is an easy way of generating responses in a different order
> than requests. We only include the most relevant parts below, but have
> attached the full source code. Client code follows after the server
> code.
>
> class CalculatorAsyncHandler : public CalculatorAsyncIf {
>  public:
>  CalculatorAsyncHandler() {}
>
>  virtual void ping(boost::function<void (void)> callback,
> boost::function<void (Calculator_ping_result)> errback) {
>    printf("ping()\n");
>    callback();
>  }
>
>  virtual void add(const int32_t num1, const int32_t num2,
> boost::function<void (int32_t)> callback, boost::function<void
> (Calculator_add_result)> errback) {
>    printf("add(%d,%d)\n", num1, num2);
>    boost::shared_ptr<boost::asio::deadline_timer> timer(new
> boost::asio::deadline_timer(io_service,
> boost::posix_time::seconds(num1 + num2)));
>    timer->async_wait(boost::bind(&CalculatorAsyncHandler::wait_done,
> this, num1 + num2, callback, timer));
>  }
>  virtual void wait_done(const int32_t sum, boost::function<void
> (int32_t)> callback, boost::shared_ptr<boost::asio::deadline_timer>) {
>    callback(sum);
>    // timer will fall out of scope now and will be deleted
>  }
>
>  virtual void calculate(const int32_t logid, const Work& w,
> boost::function<void (int32_t)> callback, boost::function<void
> (Calculator_calculate_result)> errback) {
> (...)
>    case DIVIDE:
>      if (w.num2 == 0) {
>        InvalidOperation io;
>        io.what = w.op;
>        io.why = "Cannot divide by 0";
>        errback(calculate_ouch(io));
>        return;
>      }
>      val = w.num1 / w.num2;
>      break;
>    default:
>      errback(calculate_failure(std::string("Invalid Operation")));
>      return;
>    }
>  (...)
> }
> (... other methods, omitted for brevity)
>
> };
>
> int main(int argc, char **argv) {
>  boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
> protocol::TBinaryProtocolFactory());
>  boost::shared_ptr<CalculatorAsyncHandler> handler(new
> CalculatorAsyncHandler());
>  boost::shared_ptr<TProcessor> processor(new
> CalculatorAsyncProcessor(handler));
>
>  boost::shared_ptr<apache::thrift::async::TAsioServer> server(
>                                                               new
> apache::thrift::async::TAsioServer(
>
>                               io_service,
>
>                               9090,
>
>                               protocolFactory,
>
>                               protocolFactory,
>
>                               processor));
>
>  server->start(); // Nonblocking
>  io_service.run(); // Blocking
>
>  return 0;
> }
>
> Code for the client:
>
> void pingback() {
>  printf("ping()\n");
> }
>
> void pingerr(tutorial::Calculator_ping_result result) {
>  printf("Exception caught\n");
> }
>
> void addback(int32_t a, int32_t b, int32_t sum) {
>  printf("%d+%d=%d\n", a, b, sum);
> }
>
> void adderr(tutorial::Calculator_add_result result) {
>  printf("Exception caught\n");
> }
>
> void connected(boost::shared_ptr<tutorial::CalculatorAsyncClient> client) {
>  client->ping(pingback, pingerr);
>
>  client->add(2, 3, boost::bind(&addback, 2, 3, _1), &adderr);  //
> will return after 5s
>  client->add(1, 2, boost::bind(&addback, 1, 2, _1), &adderr);  //
> will return after 3s
>  client->add(1, 1, boost::bind(&addback, 1, 1, _1), &adderr);  //
> will return after 2s
> }
>
> int main(int argc, char* argv[])
> {
>  try
>  {
>    boost::asio::io_service io_service;
>
>    boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
> protocol::TBinaryProtocolFactory());
>
>    boost::shared_ptr<async::TAsioClient> client (
>                                                  new async::TAsioClient(
>
>  io_service,
>
>  protocolFactory,
>
>  protocolFactory));
>
>    client->connect("localhost", 9090, connected); // the type of the
> client (tutorial::CalculatorAsyncClient) is inferred from the
> signature of connected
>
>    io_service.run();
>  }
>  catch (std::exception& e)
>  {
>    std::cout << "Exception: " << e.what() << "\n";
>  }
>
>  return 0;
> }
>
> Regards,
> Erik Bernhardsson,
> Mattias de Zalenski
> Spotify - http://www.spotify.com/
>

Re: Asynchronous C++ client/server (THRIFT-1)

Posted by Esteve Fernandez <es...@sindominio.net>.
Hi Erik

> There's been some discussions about asynchronous C++ clients and
> servers (THRIFT-1). Inspired by the Twisted server (THRIFT-148) by
> Esteve Fernandez, as well as his C++ ideas (THRIFT-311), we (Erik
> Bernhardsson, Mattias de Zalenski) decided to implement our own
> version here at Spotify.

Glad to know that you found the support for Twisted useful, at least for
inspiring you to build an asynchronous client and server :-)

Some comments on your implementation:

(1) it changes services' interface (adding a new argument to the generated code)
(2) it forces the developer to call the callback function (the one
passed as an argument), instead of being automatically called when the
operation finishes
(3) it follows the Thrift interface too closely (TAsyncOutputTransport,
TAsioClient, TAsioServer, etc.). IMHO, it feels more natural to treat
Thrift as any other protocol and plug it into ASIO, than the other way
around. For example, there's no implementation of the TServer
interface in Twisted, because that would feel too unnatural to
developers familiar with the latter.

1 & 2 can be solved using futures. For example, instead of returning the
actual value from a client-side call or forcing to pass a callback, you return
a future (templatized to the return type) to which you would attach a callback
function:

void gotResults(int value) {
    std::cout << "Got results: " << value << std::endl;
}

future<int> f = calculatorClient.add(1, 2);
f.add_callback(boost::bind(&gotResults, _1));

when the client gets the value for Calculator#add, the gotResults function
will be called. This way you don't need to change the service interface, but
you can add callbacks to an asynchronous operation.

As for 3, I had to implement my own server in THRIFT-311 because I wanted to
run io_service.run() in a separate thread and use a threadpool, but that
should be up to the developer. For example, in the asynchronous TCP daytime
server from the ASIO distribution
(http://think-async.com/Asio/asio-1.3.1/doc/asio/tutorial/tutdaytime3/src.html),
you only have to implement an acceptor that takes an io_service.

Cheers.