You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Gordon Sim <gs...@redhat.com> on 2009/07/31 19:10:47 UTC

c++ messaging api: a map message example

Attached is a patch for a higher level, protocol independent messaging 
api from c++ as last discussed some months ago. I have not had much time 
to spend on this since then and so there is not a huge amount of change 
(mainly moving to a better namespace and adjusting for the revised 
include directory layout).

There is however now an example of a sending a map message using the 
amqp0-10 encoding (list messages are also supported) and the message 
headers are now correctly sent. Jonathan previously requested support 
for the stream operator and I've had a tentative stab at exploring what 
that might look like as well.

As always keen to get any feedback. I'm hoping to have more time to 
spend on this for the next few weeks. I'm aiming to align the c++ api 
with the similar work on python (at least conceptually) and to flesh out 
the functionality (determining how best to expose asynchronous 
completion of sends and acknowledgements will be a likely next step).

--Gordon.

Re: c++ messaging api: a map message example

Posted by Gordon Sim <gs...@redhat.com>.
Cliff Jansen (Interop Systems Inc) wrote:
> Hi Gordon,

Hi Cliff,

Thanks for the feedback, much appreciated!

>> Jonathan previously requested support for the stream operator and I've
>> had a tentative stab at exploring what that might look like as well.
> 
> In addition, I would like to see the some stream-ish functions that
> are friendly to char* blobs that are not null terminated.  Something
> along the lines of:
> 
>   MessageContent& MessageContent::write (const char* s, int n);
>   MessageContent& MessageContent::read (const char* s, int n);  // + rewind() or seek
> 
> As well as something like
> 
>   int MessageContent::size(); // (or getSize(), getLength()...)
> 
>   Message::Message(const char* s, int n);
> 
> Note that std::string is not copy-on-write in all C++ implementations.
> On Windows, forcing an application to use strings for its raw data (from
> a char* perspective) results in needless copies of the content:

Good point, I'll try and incorporate this into the next patch.

>   string tmpdata(myBlobPtr, myBlobSize);   // memory copy #1
>   Message m(tmpdata); // m.impl->bytes gets copy #2 when constructed
>   // tmpdata never used again, freed when out of scope
> 
> The programmer can take evasive action:
> 
>   Message m;  // m.impl->bytes starts out as a sacrificial empty string
>   m.getBytes().assign(myBlobPtr, myBlobSize);  // just one copy
> 
> but
> 
>   Message m(myBlobPtr, myBlobSize);
> 
> might create the FrameSet directly, and cut out another middleman.
> 
> Since MessageContent already has abstract content (string, MAP, and
> LIST), extentending the set to include char* could allow performance
> optimizations:

Agreed.

>   For incoming messages, the actual content could be copied from the
>   FrameSet directly to an application buffer.  (This can be a win even
>   on Linux when messages are larger than the connection's
>   maxFrameSize.)  The string form can be lazilly created when needed.
> 
>   For outgoing messages, the content may be placed directly in special
>   OS memory in preparation for network transfer.
> 
> 
> Cliff
> 
> 
> -----Original Message-----
> From: Gordon Sim [mailto:gsim@redhat.com]
> Sent: Friday, July 31, 2009 10:11 AM
> To: Qpid Dev; users@qpid.apache.org
> Subject: c++ messaging api: a map message example
> 
> Attached is a patch for a higher level, protocol independent messaging api from c++ as last discussed some months ago. I have not had much time to spend on this since then and so there is not a huge amount of change (mainly moving to a better namespace and adjusting for the revised include directory layout).
> 
> There is however now an example of a sending a map message using the amqp0-10 encoding (list messages are also supported) and the message headers are now correctly sent. Jonathan previously requested support for the stream operator and I've had a tentative stab at exploring what that might look like as well.
> 
> As always keen to get any feedback. I'm hoping to have more time to spend on this for the next few weeks. I'm aiming to align the c++ api with the similar work on python (at least conceptually) and to flesh out the functionality (determining how best to expose asynchronous completion of sends and acknowledgements will be a likely next step).
> 
> --Gordon.
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
> 


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 08/04/2009 04:39 PM, Cliff Jansen (Interop Systems Inc) wrote:
> Hi Gordon,
>
>> Jonathan previously requested support for the stream operator and I've
>> had a tentative stab at exploring what that might look like as well.
>
> In addition, I would like to see the some stream-ish functions that
> are friendly to char* blobs that are not null terminated.  Something
> along the lines of:
>
>    MessageContent&  MessageContent::write (const char* s, int n);

I think append() would be more self-explanatory (if that's what you meant)

>    MessageContent&  MessageContent::read (const char* s, int n);  // + rewind() or seek

That raises an interesting question: do we force message to keep its content in 
a single contiguous buffer? If not we need a way for the user to iterate over 
the data in non-contiguous buffers. An iterator of sorts that returns successive 
pair<char*, size_t> would do it.

>
> As well as something like
>
>    int MessageContent::size(); // (or getSize(), getLength()...)
>
>    Message::Message(const char* s, int n);
>
> Note that std::string is not copy-on-write in all C++ implementations.
> On Windows, forcing an application to use strings for its raw data (from
> a char* perspective) results in needless copies of the content:
>
>    string tmpdata(myBlobPtr, myBlobSize);   // memory copy #1
>    Message m(tmpdata); // m.impl->bytes gets copy #2 when constructed
>    // tmpdata never used again, freed when out of scope
>
> The programmer can take evasive action:
>
>    Message m;  // m.impl->bytes starts out as a sacrificial empty string
>    m.getBytes().assign(myBlobPtr, myBlobSize);  // just one copy
>
> but
>
>    Message m(myBlobPtr, myBlobSize);
>
> might create the FrameSet directly, and cut out another middleman.
>
> Since MessageContent already has abstract content (string, MAP, and
> LIST), extentending the set to include char* could allow performance
> optimizations:
>
>    For incoming messages, the actual content could be copied from the
>    FrameSet directly to an application buffer.  (This can be a win even
>    on Linux when messages are larger than the connection's
>    maxFrameSize.)  The string form can be lazilly created when needed.
>
>    For outgoing messages, the content may be placed directly in special
>    OS memory in preparation for network transfer.
>
>
> Cliff
>
>
> -----Original Message-----
> From: Gordon Sim [mailto:gsim@redhat.com]
> Sent: Friday, July 31, 2009 10:11 AM
> To: Qpid Dev; users@qpid.apache.org
> Subject: c++ messaging api: a map message example
>
> Attached is a patch for a higher level, protocol independent messaging api from c++ as last discussed some months ago. I have not had much time to spend on this since then and so there is not a huge amount of change (mainly moving to a better namespace and adjusting for the revised include directory layout).
>
> There is however now an example of a sending a map message using the amqp0-10 encoding (list messages are also supported) and the message headers are now correctly sent. Jonathan previously requested support for the stream operator and I've had a tentative stab at exploring what that might look like as well.
>
> As always keen to get any feedback. I'm hoping to have more time to spend on this for the next few weeks. I'm aiming to align the c++ api with the similar work on python (at least conceptually) and to flesh out the functionality (determining how best to expose asynchronous completion of sends and acknowledgements will be a likely next step).
>
> --Gordon.
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


RE: c++ messaging api: a map message example

Posted by "Cliff Jansen (Interop Systems Inc)" <v-...@microsoft.com>.
Hi Gordon,

> Jonathan previously requested support for the stream operator and I've
> had a tentative stab at exploring what that might look like as well.

In addition, I would like to see the some stream-ish functions that
are friendly to char* blobs that are not null terminated.  Something
along the lines of:

  MessageContent& MessageContent::write (const char* s, int n);
  MessageContent& MessageContent::read (const char* s, int n);  // + rewind() or seek

As well as something like

  int MessageContent::size(); // (or getSize(), getLength()...)

  Message::Message(const char* s, int n);

Note that std::string is not copy-on-write in all C++ implementations.
On Windows, forcing an application to use strings for its raw data (from
a char* perspective) results in needless copies of the content:

  string tmpdata(myBlobPtr, myBlobSize);   // memory copy #1
  Message m(tmpdata); // m.impl->bytes gets copy #2 when constructed
  // tmpdata never used again, freed when out of scope

The programmer can take evasive action:

  Message m;  // m.impl->bytes starts out as a sacrificial empty string
  m.getBytes().assign(myBlobPtr, myBlobSize);  // just one copy

but

  Message m(myBlobPtr, myBlobSize);

might create the FrameSet directly, and cut out another middleman.

Since MessageContent already has abstract content (string, MAP, and
LIST), extentending the set to include char* could allow performance
optimizations:

  For incoming messages, the actual content could be copied from the
  FrameSet directly to an application buffer.  (This can be a win even
  on Linux when messages are larger than the connection's
  maxFrameSize.)  The string form can be lazilly created when needed.

  For outgoing messages, the content may be placed directly in special
  OS memory in preparation for network transfer.


Cliff


-----Original Message-----
From: Gordon Sim [mailto:gsim@redhat.com]
Sent: Friday, July 31, 2009 10:11 AM
To: Qpid Dev; users@qpid.apache.org
Subject: c++ messaging api: a map message example

Attached is a patch for a higher level, protocol independent messaging api from c++ as last discussed some months ago. I have not had much time to spend on this since then and so there is not a huge amount of change (mainly moving to a better namespace and adjusting for the revised include directory layout).

There is however now an example of a sending a map message using the amqp0-10 encoding (list messages are also supported) and the message headers are now correctly sent. Jonathan previously requested support for the stream operator and I've had a tentative stab at exploring what that might look like as well.

As always keen to get any feedback. I'm hoping to have more time to spend on this for the next few weeks. I'm aiming to align the c++ api with the similar work on python (at least conceptually) and to flesh out the functionality (determining how best to expose asynchronous completion of sends and acknowledgements will be a likely next step).

--Gordon.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 08/06/2009 04:31 AM, Gordon Sim wrote:
> Alan Conway wrote:
>> On 08/05/2009 02:45 AM, Gordon Sim wrote:
>>>> This looks messy:
>>>> listener.subscribed(session.subscribe("message_queue", listener));
>>>>
>>>> Informing the listener can be done as part of the implementation of
>>>> session.subscribe(), it doesn't
>>>> need to be left to the user.
>>>
>>> I'll have a think about this.
>>
>> My main objection to the line above is the fact that listener is
>> mentioned twice. What does it mean if you use two different listeners?
>> I think its an error, in which case the API should be modified to make
>> that error impossibl, e.g. by doing the whole thing atomically in
>> sesision.susbcribe. This also avoids the error where the user forgets
>> to call listener.subscribed(). I'm assuming here that part of the
>> semantics of MessageListener is that it will get a call to
>> subscribed() each time it is subscribed.
>
> The Listener::subscribed() call in this example is really part of the
> example code, not the API, as with the existing direct_listener example
> which it emulates.

Good point, maybe just clarify that by breaking the line:

Subscription subscription  = sesssion.subscribe(...);
listener.subscribed(subscription);

to make it (sort of) clearer that listener.subscribed is not really part of 
subscribing, it's something extra the demo does.

The Session::subscribe() call that takes a subclass
> of MessageListener is part of the API itself; the listener there is
> simply a callback through which to notify the application of received
> messages.
>
> Whether or not the MessageListener should contain a callback used to
> notify it of its use for a particular subscription is something I'd need
> to think about a bit more. I want to revisit the details of the use of
> listeners anyway (e.g. should the listener actually be set on a receiver
> instance, as in the python equivalent?).
>

Now that I'm thinking straight(er) I don't think we should extend the 
MessageListener API unless there's a very compelling reason. Its nice and simple 
as it is, long may it remain so.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 08/06/2009 04:31 AM, Gordon Sim wrote:
> Alan Conway wrote:
>> On 08/05/2009 02:45 AM, Gordon Sim wrote:
>>>> This looks messy:
>>>> listener.subscribed(session.subscribe("message_queue", listener));
>>>>
>>>> Informing the listener can be done as part of the implementation of
>>>> session.subscribe(), it doesn't
>>>> need to be left to the user.
>>>
>>> I'll have a think about this.
>>
>> My main objection to the line above is the fact that listener is
>> mentioned twice. What does it mean if you use two different listeners?
>> I think its an error, in which case the API should be modified to make
>> that error impossibl, e.g. by doing the whole thing atomically in
>> sesision.susbcribe. This also avoids the error where the user forgets
>> to call listener.subscribed(). I'm assuming here that part of the
>> semantics of MessageListener is that it will get a call to
>> subscribed() each time it is subscribed.
>
> The Listener::subscribed() call in this example is really part of the
> example code, not the API, as with the existing direct_listener example
> which it emulates.

Good point, maybe just clarify that by breaking the line:

Subscription subscription  = sesssion.subscribe(...);
listener.subscribed(subscription);

to make it (sort of) clearer that listener.subscribed is not really part of 
subscribing, it's something extra the demo does.

The Session::subscribe() call that takes a subclass
> of MessageListener is part of the API itself; the listener there is
> simply a callback through which to notify the application of received
> messages.
>
> Whether or not the MessageListener should contain a callback used to
> notify it of its use for a particular subscription is something I'd need
> to think about a bit more. I want to revisit the details of the use of
> listeners anyway (e.g. should the listener actually be set on a receiver
> instance, as in the python equivalent?).
>

Now that I'm thinking straight(er) I don't think we should extend the 
MessageListener API unless there's a very compelling reason. Its nice and simple 
as it is, long may it remain so.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Gordon Sim <gs...@redhat.com>.
Alan Conway wrote:
> On 08/05/2009 02:45 AM, Gordon Sim wrote:
>>> This looks messy:
>>> listener.subscribed(session.subscribe("message_queue", listener));
>>>
>>> Informing the listener can be done as part of the implementation of
>>> session.subscribe(), it doesn't
>>> need to be left to the user.
>>
>> I'll have a think about this.
> 
> My main objection to the line above is the fact that listener is 
> mentioned twice. What does it mean if you use two different listeners? I 
> think its an error, in which case the API should be modified to make 
> that error impossibl, e.g. by doing the whole thing atomically in 
> sesision.susbcribe. This also avoids the error where the user forgets to 
> call listener.subscribed(). I'm assuming here that part of the semantics 
> of MessageListener is that it will get a call to subscribed() each time 
> it is subscribed.

The Listener::subscribed() call in this example is really part of the 
example code, not the API, as with the existing direct_listener example 
which it emulates. The Session::subscribe() call that takes a subclass 
of MessageListener is part of the API itself; the listener there is 
simply a callback through which to notify the application of received 
messages.

Whether or not the MessageListener should contain a callback used to 
notify it of its use for a particular subscription is something I'd need 
to think about a bit more. I want to revisit the details of the use of 
listeners anyway (e.g. should the listener actually be set on a receiver 
instance, as in the python equivalent?).

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Gordon Sim <gs...@redhat.com>.
Alan Conway wrote:
> On 08/05/2009 02:45 AM, Gordon Sim wrote:
>>> This looks messy:
>>> listener.subscribed(session.subscribe("message_queue", listener));
>>>
>>> Informing the listener can be done as part of the implementation of
>>> session.subscribe(), it doesn't
>>> need to be left to the user.
>>
>> I'll have a think about this.
> 
> My main objection to the line above is the fact that listener is 
> mentioned twice. What does it mean if you use two different listeners? I 
> think its an error, in which case the API should be modified to make 
> that error impossibl, e.g. by doing the whole thing atomically in 
> sesision.susbcribe. This also avoids the error where the user forgets to 
> call listener.subscribed(). I'm assuming here that part of the semantics 
> of MessageListener is that it will get a call to subscribed() each time 
> it is subscribed.

The Listener::subscribed() call in this example is really part of the 
example code, not the API, as with the existing direct_listener example 
which it emulates. The Session::subscribe() call that takes a subclass 
of MessageListener is part of the API itself; the listener there is 
simply a callback through which to notify the application of received 
messages.

Whether or not the MessageListener should contain a callback used to 
notify it of its use for a particular subscription is something I'd need 
to think about a bit more. I want to revisit the details of the use of 
listeners anyway (e.g. should the listener actually be set on a receiver 
instance, as in the python equivalent?).

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 08/05/2009 02:45 AM, Gordon Sim wrote:
>> This looks messy:
>> listener.subscribed(session.subscribe("message_queue", listener));
>>
>> Informing the listener can be done as part of the implementation of
>> session.subscribe(), it doesn't
>> need to be left to the user.
>
> I'll have a think about this.

My main objection to the line above is the fact that listener is mentioned 
twice. What does it mean if you use two different listeners? I think its an 
error, in which case the API should be modified to make that error impossibl, 
e.g. by doing the whole thing atomically in sesision.susbcribe. This also avoids 
the error where the user forgets to call listener.subscribed(). I'm assuming 
here that part of the semantics of MessageListener is that it will get a call to 
subscribed() each time it is subscribed.

>> We need to sort out our threading model in this new API. The critical
>> thing is to allow many sessions to be served by the same thread or
>> thread pool pool.
>
> I agree that this is an important use case. I'll go in to this in more
> detail shortly.
>
>> We also want to integrate that with our own client side poller, and of
>> course keep the current model for backwards compatibility.
>>
>> How about providing two alternatives:
>> Session {
>> void dispatch(); // Calling thread dispatches session.
>> // OR
>> void activate(); // Session is dispatched by qpid's own thread pool.
>> Does not block.
>> void deactivate(); // Stop dispatching in qpid thread pool, blocks
>> till current operation completes.
>> void wait(); // Wait for activated session to be closed, or the last
>> subscription to be cancelled.
>> }
>>
>> I think that covers the majority of cases. The other case that has
>> been mentioned is providing a selectable fd to dispatch to qpid from
>> someone elses select/poll/epoll loop. I'd say that's an addition for
>> the 1.1 release of the new API, not necessarily for 1.0.
>
> Agree (on both points!). Thanks again for all the comments and suggestions!

The commnet on void wait(); above should say "wait for session to be 
deactivated, which occurs when session is closed, last subscription is cancelled 
or deactivate() is called on the session."

Thinking: if we auto-deactivate on subscription cancel, should we auto-activate 
on subscribe (if the sesssion was previously activated() rather than dispatched())?

Thinking some more: maintaining a thread pool in global variables is going to 
create a lot of start-up and shut down ordering issues. Since this is a new API 
I think it might be a good idea to introduce a qpid::Qpid object which can 
encapsulate the thread pool and any other context that is shared between all 
connections. So the examples would go

main() {
   Qpid qpid;
   Connection c = qpid.open(...);
   // Qpid dtor cleans up here.
}

That lets us do all our init & shut-down inside main and avoids problems of 
static ctor/dtor ordering. That also gives a place to configure things like the 
client-side thread pool size etc.

   qpid.setWorkerThreads(10); // 10 worker threads in the pool.

We could also provide built-in argc/argv processing:
main (int argc, char**argv) {
   Qpid qpid(argc, argv);
   // Here argc/argv have been modified to remove all --qpid arguments.
}

That gives an out-of-the-box way for customers to write configurable clients. 
For the client side I think we'd prefix all the options with --qpid- to avoid 
clashes with user options. e.g. --qpid-worker-threads etc.

Above ideas shamelessly stolen from CORBA. There is a significant maintenance 
advantage to keeping it all in main() in terms of lost time chasing sporadic 
shutdown bugs.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 08/05/2009 02:45 AM, Gordon Sim wrote:
>> This looks messy:
>> listener.subscribed(session.subscribe("message_queue", listener));
>>
>> Informing the listener can be done as part of the implementation of
>> session.subscribe(), it doesn't
>> need to be left to the user.
>
> I'll have a think about this.

My main objection to the line above is the fact that listener is mentioned 
twice. What does it mean if you use two different listeners? I think its an 
error, in which case the API should be modified to make that error impossibl, 
e.g. by doing the whole thing atomically in sesision.susbcribe. This also avoids 
the error where the user forgets to call listener.subscribed(). I'm assuming 
here that part of the semantics of MessageListener is that it will get a call to 
subscribed() each time it is subscribed.

>> We need to sort out our threading model in this new API. The critical
>> thing is to allow many sessions to be served by the same thread or
>> thread pool pool.
>
> I agree that this is an important use case. I'll go in to this in more
> detail shortly.
>
>> We also want to integrate that with our own client side poller, and of
>> course keep the current model for backwards compatibility.
>>
>> How about providing two alternatives:
>> Session {
>> void dispatch(); // Calling thread dispatches session.
>> // OR
>> void activate(); // Session is dispatched by qpid's own thread pool.
>> Does not block.
>> void deactivate(); // Stop dispatching in qpid thread pool, blocks
>> till current operation completes.
>> void wait(); // Wait for activated session to be closed, or the last
>> subscription to be cancelled.
>> }
>>
>> I think that covers the majority of cases. The other case that has
>> been mentioned is providing a selectable fd to dispatch to qpid from
>> someone elses select/poll/epoll loop. I'd say that's an addition for
>> the 1.1 release of the new API, not necessarily for 1.0.
>
> Agree (on both points!). Thanks again for all the comments and suggestions!

The commnet on void wait(); above should say "wait for session to be 
deactivated, which occurs when session is closed, last subscription is cancelled 
or deactivate() is called on the session."

Thinking: if we auto-deactivate on subscription cancel, should we auto-activate 
on subscribe (if the sesssion was previously activated() rather than dispatched())?

Thinking some more: maintaining a thread pool in global variables is going to 
create a lot of start-up and shut down ordering issues. Since this is a new API 
I think it might be a good idea to introduce a qpid::Qpid object which can 
encapsulate the thread pool and any other context that is shared between all 
connections. So the examples would go

main() {
   Qpid qpid;
   Connection c = qpid.open(...);
   // Qpid dtor cleans up here.
}

That lets us do all our init & shut-down inside main and avoids problems of 
static ctor/dtor ordering. That also gives a place to configure things like the 
client-side thread pool size etc.

   qpid.setWorkerThreads(10); // 10 worker threads in the pool.

We could also provide built-in argc/argv processing:
main (int argc, char**argv) {
   Qpid qpid(argc, argv);
   // Here argc/argv have been modified to remove all --qpid arguments.
}

That gives an out-of-the-box way for customers to write configurable clients. 
For the client side I think we'd prefix all the options with --qpid- to avoid 
clashes with user options. e.g. --qpid-worker-threads etc.

Above ideas shamelessly stolen from CORBA. There is a significant maintenance 
advantage to keeping it all in main() in terms of lost time chasing sporadic 
shutdown bugs.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Gordon Sim <gs...@redhat.com>.
Thanks for all the great suggestions, Alan! Comments inline...

Alan Conway wrote:
> On 07/31/2009 01:10 PM, Gordon Sim wrote:
>> Attached is a patch for a higher level, protocol independent messaging
>> api from c++ as last discussed some months ago. I have not had much time
>> to spend on this since then and so there is not a huge amount of change
>> (mainly moving to a better namespace and adjusting for the revised
>> include directory layout).
>> Looks good, overall I like the flow of the examples.
> 
> We shoudl ensure that Variant::Map and Variant::List provide the full 
> std::map/sequence API.

At present Variant::Map and Variant::List are just typedefs for 
std::map<std::string, Variant> and std::list<Variant> respectively.

> map_sender.cpp:
>     Message message;
>         message.getContent()["id"] = 987654321;
> 
> Why does a message default to having a map content?

It doesn't, but if there is no defined content and you ask for a map, 
then you get a map.

    message.getContent().asList();
    message.getContent()["id"] = 987654321;  //ERROR!

Here the second line would fail with an exception as the content cannot 
be converted to a map (which is required by the [] operator).

>  How about:
> 
>   Map map;
>   map["id"] = ...
>   message.setContent(map);

You can do that already (where Map is a Variant::Map).

> A map is a valuable data structure that deserves to exist independently 
> of Message, and a Message
> shouldn't be predjudiced towards one particular type of content. 

Agree on both points.

> In 
> particular there's no way to
> implement getContent()[] if the message contains binary data.

Agreed. At present the [] operator will throw an exception in that case. 
It is really just a shortcut for message.asMap()[key], and is perhaps 
unnecessary.

> 
>         Variant::List colours;
> 
> We should make it simple & efficient to interop with std:: collections 
> here, e.g.:

Yes I agree; good suggestion!

> 
>   std::vector<string> colours;
>   map["colours"] = Variant::rangeList(colours);
> 
> Where rangeList returns a templated wrapper for a pair of iterators that 
> can be inserted
> into a map.
> 
> map_receiver.cpp:
> 
> print functions: I think all our types including Variant and Map should 
> have std::ostream op<<, I'd push
> the print code into that and use << in the example.

Yes, that would be good.

> Also the type 
> returned by getContent() should have
> an ostream op so you can say:
> 
>         cout << message.getContent();

Agree again.

> queue_listener.cpp:
> 
> This looks messy:
>         listener.subscribed(session.subscribe("message_queue", listener));
> 
> Informing the listener can be done as part of the implementation of 
> session.subscribe(), it doesn't
> need to be left to the user.

I'll have a think about this.

> We need to sort out our threading model in this new API. The critical
> thing is to allow many sessions to be served by the same thread or
> thread pool pool. 

I agree that this is an important use case. I'll go in to this in more 
detail shortly.

> We also want to integrate that with our own client side poller, and of
> course keep the current model for backwards compatibility.
> 
> How about providing two alternatives:
> Session {
>   void dispatch(); // Calling thread dispatches session.
>   // OR
>   void activate(); // Session is dispatched by qpid's own thread pool. 
> Does not block.
>   void deactivate(); // Stop dispatching in qpid thread pool, blocks 
> till current operation completes.
>   void wait(); // Wait for activated session to be closed, or the last 
> subscription to be cancelled.
> }
> 
> I think that covers the majority of cases. The other case that has
> been mentioned is providing a selectable fd to dispatch to qpid from
> someone elses select/poll/epoll loop. I'd say that's an addition for
> the 1.1 release of the new API, not necessarily for 1.0.

Agree (on both points!). Thanks again for all the comments and suggestions!

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Gordon Sim <gs...@redhat.com>.
Thanks for all the great suggestions, Alan! Comments inline...

Alan Conway wrote:
> On 07/31/2009 01:10 PM, Gordon Sim wrote:
>> Attached is a patch for a higher level, protocol independent messaging
>> api from c++ as last discussed some months ago. I have not had much time
>> to spend on this since then and so there is not a huge amount of change
>> (mainly moving to a better namespace and adjusting for the revised
>> include directory layout).
>> Looks good, overall I like the flow of the examples.
> 
> We shoudl ensure that Variant::Map and Variant::List provide the full 
> std::map/sequence API.

At present Variant::Map and Variant::List are just typedefs for 
std::map<std::string, Variant> and std::list<Variant> respectively.

> map_sender.cpp:
>     Message message;
>         message.getContent()["id"] = 987654321;
> 
> Why does a message default to having a map content?

It doesn't, but if there is no defined content and you ask for a map, 
then you get a map.

    message.getContent().asList();
    message.getContent()["id"] = 987654321;  //ERROR!

Here the second line would fail with an exception as the content cannot 
be converted to a map (which is required by the [] operator).

>  How about:
> 
>   Map map;
>   map["id"] = ...
>   message.setContent(map);

You can do that already (where Map is a Variant::Map).

> A map is a valuable data structure that deserves to exist independently 
> of Message, and a Message
> shouldn't be predjudiced towards one particular type of content. 

Agree on both points.

> In 
> particular there's no way to
> implement getContent()[] if the message contains binary data.

Agreed. At present the [] operator will throw an exception in that case. 
It is really just a shortcut for message.asMap()[key], and is perhaps 
unnecessary.

> 
>         Variant::List colours;
> 
> We should make it simple & efficient to interop with std:: collections 
> here, e.g.:

Yes I agree; good suggestion!

> 
>   std::vector<string> colours;
>   map["colours"] = Variant::rangeList(colours);
> 
> Where rangeList returns a templated wrapper for a pair of iterators that 
> can be inserted
> into a map.
> 
> map_receiver.cpp:
> 
> print functions: I think all our types including Variant and Map should 
> have std::ostream op<<, I'd push
> the print code into that and use << in the example.

Yes, that would be good.

> Also the type 
> returned by getContent() should have
> an ostream op so you can say:
> 
>         cout << message.getContent();

Agree again.

> queue_listener.cpp:
> 
> This looks messy:
>         listener.subscribed(session.subscribe("message_queue", listener));
> 
> Informing the listener can be done as part of the implementation of 
> session.subscribe(), it doesn't
> need to be left to the user.

I'll have a think about this.

> We need to sort out our threading model in this new API. The critical
> thing is to allow many sessions to be served by the same thread or
> thread pool pool. 

I agree that this is an important use case. I'll go in to this in more 
detail shortly.

> We also want to integrate that with our own client side poller, and of
> course keep the current model for backwards compatibility.
> 
> How about providing two alternatives:
> Session {
>   void dispatch(); // Calling thread dispatches session.
>   // OR
>   void activate(); // Session is dispatched by qpid's own thread pool. 
> Does not block.
>   void deactivate(); // Stop dispatching in qpid thread pool, blocks 
> till current operation completes.
>   void wait(); // Wait for activated session to be closed, or the last 
> subscription to be cancelled.
> }
> 
> I think that covers the majority of cases. The other case that has
> been mentioned is providing a selectable fd to dispatch to qpid from
> someone elses select/poll/epoll loop. I'd say that's an addition for
> the 1.1 release of the new API, not necessarily for 1.0.

Agree (on both points!). Thanks again for all the comments and suggestions!

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 07/31/2009 01:10 PM, Gordon Sim wrote:
> Attached is a patch for a higher level, protocol independent messaging
> api from c++ as last discussed some months ago. I have not had much time
> to spend on this since then and so there is not a huge amount of change
> (mainly moving to a better namespace and adjusting for the revised
> include directory layout).
>Looks good, overall I like the flow of the examples.

We shoudl ensure that Variant::Map and Variant::List provide the full 
std::map/sequence API.

map_sender.cpp:
	Message message;
         message.getContent()["id"] = 987654321;

Why does a message default to having a map content?  How about:

   Map map;
   map["id"] = ...
   message.setContent(map);

A map is a valuable data structure that deserves to exist independently of 
Message, and a Message
shouldn't be predjudiced towards one particular type of content. In particular 
there's no way to
implement getContent()[] if the message contains binary data.

         Variant::List colours;

We should make it simple & efficient to interop with std:: collections here, e.g.:

   std::vector<string> colours;
   map["colours"] = Variant::rangeList(colours);

Where rangeList returns a templated wrapper for a pair of iterators that can be 
inserted
into a map.

map_receiver.cpp:

print functions: I think all our types including Variant and Map should have 
std::ostream op<<, I'd push
the print code into that and use << in the example. Also the type returned by 
getContent() should have
an ostream op so you can say:

         cout << message.getContent();

queue_listener.cpp:

This looks messy:
         listener.subscribed(session.subscribe("message_queue", listener));

Informing the listener can be done as part of the implementation of 
session.subscribe(), it doesn't
need to be left to the user.

     	
We need to sort out our threading model in this new API. The critical
thing is to allow many sessions to be served by the same thread or
thread pool pool. E.g. in TSX case it looks like it helps to have a
session per queue for large numbers of messages, but for small numbers
the excessive threading (with 4000 queues) makes performance much
worse. What you really want is 4000 sessions and a thread pool of
<number of cpus> threads that dispatch them all.

We also want to integrate that with our own client side poller, and of
course keep the current model for backwards compatibility.

How about providing two alternatives:
Session {
   void dispatch(); // Calling thread dispatches session.
   // OR
   void activate(); // Session is dispatched by qpid's own thread pool. Does not 
block.
   void deactivate(); // Stop dispatching in qpid thread pool, blocks till 
current operation completes.
   void wait(); // Wait for activated session to be closed, or the last 
subscription to be cancelled.
}

I think that covers the majority of cases. The other case that has
been mentioned is providing a selectable fd to dispatch to qpid from
someone elses select/poll/epoll loop. I'd say that's an addition for
the 1.1 release of the new API, not necessarily for 1.0.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 07/31/2009 01:10 PM, Gordon Sim wrote:
> Attached is a patch for a higher level, protocol independent messaging
> api from c++ as last discussed some months ago. I have not had much time
> to spend on this since then and so there is not a huge amount of change
> (mainly moving to a better namespace and adjusting for the revised
> include directory layout).
>Looks good, overall I like the flow of the examples.

We shoudl ensure that Variant::Map and Variant::List provide the full 
std::map/sequence API.

map_sender.cpp:
	Message message;
         message.getContent()["id"] = 987654321;

Why does a message default to having a map content?  How about:

   Map map;
   map["id"] = ...
   message.setContent(map);

A map is a valuable data structure that deserves to exist independently of 
Message, and a Message
shouldn't be predjudiced towards one particular type of content. In particular 
there's no way to
implement getContent()[] if the message contains binary data.

         Variant::List colours;

We should make it simple & efficient to interop with std:: collections here, e.g.:

   std::vector<string> colours;
   map["colours"] = Variant::rangeList(colours);

Where rangeList returns a templated wrapper for a pair of iterators that can be 
inserted
into a map.

map_receiver.cpp:

print functions: I think all our types including Variant and Map should have 
std::ostream op<<, I'd push
the print code into that and use << in the example. Also the type returned by 
getContent() should have
an ostream op so you can say:

         cout << message.getContent();

queue_listener.cpp:

This looks messy:
         listener.subscribed(session.subscribe("message_queue", listener));

Informing the listener can be done as part of the implementation of 
session.subscribe(), it doesn't
need to be left to the user.

     	
We need to sort out our threading model in this new API. The critical
thing is to allow many sessions to be served by the same thread or
thread pool pool. E.g. in TSX case it looks like it helps to have a
session per queue for large numbers of messages, but for small numbers
the excessive threading (with 4000 queues) makes performance much
worse. What you really want is 4000 sessions and a thread pool of
<number of cpus> threads that dispatch them all.

We also want to integrate that with our own client side poller, and of
course keep the current model for backwards compatibility.

How about providing two alternatives:
Session {
   void dispatch(); // Calling thread dispatches session.
   // OR
   void activate(); // Session is dispatched by qpid's own thread pool. Does not 
block.
   void deactivate(); // Stop dispatching in qpid thread pool, blocks till 
current operation completes.
   void wait(); // Wait for activated session to be closed, or the last 
subscription to be cancelled.
}

I think that covers the majority of cases. The other case that has
been mentioned is providing a selectable fd to dispatch to qpid from
someone elses select/poll/epoll loop. I'd say that's an addition for
the 1.1 release of the new API, not necessarily for 1.0.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org