You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Alan Conway <ac...@redhat.com> on 2009/08/04 22:23:06 UTC

Re: c++ messaging api: a map message example

On 07/31/2009 01:10 PM, Gordon Sim wrote:
> Attached is a patch for a higher level, protocol independent messaging
> api from c++ as last discussed some months ago. I have not had much time
> to spend on this since then and so there is not a huge amount of change
> (mainly moving to a better namespace and adjusting for the revised
> include directory layout).
>Looks good, overall I like the flow of the examples.

We shoudl ensure that Variant::Map and Variant::List provide the full 
std::map/sequence API.

map_sender.cpp:
	Message message;
         message.getContent()["id"] = 987654321;

Why does a message default to having a map content?  How about:

   Map map;
   map["id"] = ...
   message.setContent(map);

A map is a valuable data structure that deserves to exist independently of 
Message, and a Message
shouldn't be predjudiced towards one particular type of content. In particular 
there's no way to
implement getContent()[] if the message contains binary data.

         Variant::List colours;

We should make it simple & efficient to interop with std:: collections here, e.g.:

   std::vector<string> colours;
   map["colours"] = Variant::rangeList(colours);

Where rangeList returns a templated wrapper for a pair of iterators that can be 
inserted
into a map.

map_receiver.cpp:

print functions: I think all our types including Variant and Map should have 
std::ostream op<<, I'd push
the print code into that and use << in the example. Also the type returned by 
getContent() should have
an ostream op so you can say:

         cout << message.getContent();

queue_listener.cpp:

This looks messy:
         listener.subscribed(session.subscribe("message_queue", listener));

Informing the listener can be done as part of the implementation of 
session.subscribe(), it doesn't
need to be left to the user.

     	
We need to sort out our threading model in this new API. The critical
thing is to allow many sessions to be served by the same thread or
thread pool pool. E.g. in TSX case it looks like it helps to have a
session per queue for large numbers of messages, but for small numbers
the excessive threading (with 4000 queues) makes performance much
worse. What you really want is 4000 sessions and a thread pool of
<number of cpus> threads that dispatch them all.

We also want to integrate that with our own client side poller, and of
course keep the current model for backwards compatibility.

How about providing two alternatives:
Session {
   void dispatch(); // Calling thread dispatches session.
   // OR
   void activate(); // Session is dispatched by qpid's own thread pool. Does not 
block.
   void deactivate(); // Stop dispatching in qpid thread pool, blocks till 
current operation completes.
   void wait(); // Wait for activated session to be closed, or the last 
subscription to be cancelled.
}

I think that covers the majority of cases. The other case that has
been mentioned is providing a selectable fd to dispatch to qpid from
someone elses select/poll/epoll loop. I'd say that's an addition for
the 1.1 release of the new API, not necessarily for 1.0.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 08/06/2009 04:31 AM, Gordon Sim wrote:
> Alan Conway wrote:
>> On 08/05/2009 02:45 AM, Gordon Sim wrote:
>>>> This looks messy:
>>>> listener.subscribed(session.subscribe("message_queue", listener));
>>>>
>>>> Informing the listener can be done as part of the implementation of
>>>> session.subscribe(), it doesn't
>>>> need to be left to the user.
>>>
>>> I'll have a think about this.
>>
>> My main objection to the line above is the fact that listener is
>> mentioned twice. What does it mean if you use two different listeners?
>> I think its an error, in which case the API should be modified to make
>> that error impossibl, e.g. by doing the whole thing atomically in
>> sesision.susbcribe. This also avoids the error where the user forgets
>> to call listener.subscribed(). I'm assuming here that part of the
>> semantics of MessageListener is that it will get a call to
>> subscribed() each time it is subscribed.
>
> The Listener::subscribed() call in this example is really part of the
> example code, not the API, as with the existing direct_listener example
> which it emulates.

Good point, maybe just clarify that by breaking the line:

Subscription subscription  = sesssion.subscribe(...);
listener.subscribed(subscription);

to make it (sort of) clearer that listener.subscribed is not really part of 
subscribing, it's something extra the demo does.

The Session::subscribe() call that takes a subclass
> of MessageListener is part of the API itself; the listener there is
> simply a callback through which to notify the application of received
> messages.
>
> Whether or not the MessageListener should contain a callback used to
> notify it of its use for a particular subscription is something I'd need
> to think about a bit more. I want to revisit the details of the use of
> listeners anyway (e.g. should the listener actually be set on a receiver
> instance, as in the python equivalent?).
>

Now that I'm thinking straight(er) I don't think we should extend the 
MessageListener API unless there's a very compelling reason. Its nice and simple 
as it is, long may it remain so.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 08/06/2009 04:31 AM, Gordon Sim wrote:
> Alan Conway wrote:
>> On 08/05/2009 02:45 AM, Gordon Sim wrote:
>>>> This looks messy:
>>>> listener.subscribed(session.subscribe("message_queue", listener));
>>>>
>>>> Informing the listener can be done as part of the implementation of
>>>> session.subscribe(), it doesn't
>>>> need to be left to the user.
>>>
>>> I'll have a think about this.
>>
>> My main objection to the line above is the fact that listener is
>> mentioned twice. What does it mean if you use two different listeners?
>> I think its an error, in which case the API should be modified to make
>> that error impossibl, e.g. by doing the whole thing atomically in
>> sesision.susbcribe. This also avoids the error where the user forgets
>> to call listener.subscribed(). I'm assuming here that part of the
>> semantics of MessageListener is that it will get a call to
>> subscribed() each time it is subscribed.
>
> The Listener::subscribed() call in this example is really part of the
> example code, not the API, as with the existing direct_listener example
> which it emulates.

Good point, maybe just clarify that by breaking the line:

Subscription subscription  = sesssion.subscribe(...);
listener.subscribed(subscription);

to make it (sort of) clearer that listener.subscribed is not really part of 
subscribing, it's something extra the demo does.

The Session::subscribe() call that takes a subclass
> of MessageListener is part of the API itself; the listener there is
> simply a callback through which to notify the application of received
> messages.
>
> Whether or not the MessageListener should contain a callback used to
> notify it of its use for a particular subscription is something I'd need
> to think about a bit more. I want to revisit the details of the use of
> listeners anyway (e.g. should the listener actually be set on a receiver
> instance, as in the python equivalent?).
>

Now that I'm thinking straight(er) I don't think we should extend the 
MessageListener API unless there's a very compelling reason. Its nice and simple 
as it is, long may it remain so.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Gordon Sim <gs...@redhat.com>.
Alan Conway wrote:
> On 08/05/2009 02:45 AM, Gordon Sim wrote:
>>> This looks messy:
>>> listener.subscribed(session.subscribe("message_queue", listener));
>>>
>>> Informing the listener can be done as part of the implementation of
>>> session.subscribe(), it doesn't
>>> need to be left to the user.
>>
>> I'll have a think about this.
> 
> My main objection to the line above is the fact that listener is 
> mentioned twice. What does it mean if you use two different listeners? I 
> think its an error, in which case the API should be modified to make 
> that error impossibl, e.g. by doing the whole thing atomically in 
> sesision.susbcribe. This also avoids the error where the user forgets to 
> call listener.subscribed(). I'm assuming here that part of the semantics 
> of MessageListener is that it will get a call to subscribed() each time 
> it is subscribed.

The Listener::subscribed() call in this example is really part of the 
example code, not the API, as with the existing direct_listener example 
which it emulates. The Session::subscribe() call that takes a subclass 
of MessageListener is part of the API itself; the listener there is 
simply a callback through which to notify the application of received 
messages.

Whether or not the MessageListener should contain a callback used to 
notify it of its use for a particular subscription is something I'd need 
to think about a bit more. I want to revisit the details of the use of 
listeners anyway (e.g. should the listener actually be set on a receiver 
instance, as in the python equivalent?).

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Gordon Sim <gs...@redhat.com>.
Alan Conway wrote:
> On 08/05/2009 02:45 AM, Gordon Sim wrote:
>>> This looks messy:
>>> listener.subscribed(session.subscribe("message_queue", listener));
>>>
>>> Informing the listener can be done as part of the implementation of
>>> session.subscribe(), it doesn't
>>> need to be left to the user.
>>
>> I'll have a think about this.
> 
> My main objection to the line above is the fact that listener is 
> mentioned twice. What does it mean if you use two different listeners? I 
> think its an error, in which case the API should be modified to make 
> that error impossibl, e.g. by doing the whole thing atomically in 
> sesision.susbcribe. This also avoids the error where the user forgets to 
> call listener.subscribed(). I'm assuming here that part of the semantics 
> of MessageListener is that it will get a call to subscribed() each time 
> it is subscribed.

The Listener::subscribed() call in this example is really part of the 
example code, not the API, as with the existing direct_listener example 
which it emulates. The Session::subscribe() call that takes a subclass 
of MessageListener is part of the API itself; the listener there is 
simply a callback through which to notify the application of received 
messages.

Whether or not the MessageListener should contain a callback used to 
notify it of its use for a particular subscription is something I'd need 
to think about a bit more. I want to revisit the details of the use of 
listeners anyway (e.g. should the listener actually be set on a receiver 
instance, as in the python equivalent?).

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 08/05/2009 02:45 AM, Gordon Sim wrote:
>> This looks messy:
>> listener.subscribed(session.subscribe("message_queue", listener));
>>
>> Informing the listener can be done as part of the implementation of
>> session.subscribe(), it doesn't
>> need to be left to the user.
>
> I'll have a think about this.

My main objection to the line above is the fact that listener is mentioned 
twice. What does it mean if you use two different listeners? I think its an 
error, in which case the API should be modified to make that error impossibl, 
e.g. by doing the whole thing atomically in sesision.susbcribe. This also avoids 
the error where the user forgets to call listener.subscribed(). I'm assuming 
here that part of the semantics of MessageListener is that it will get a call to 
subscribed() each time it is subscribed.

>> We need to sort out our threading model in this new API. The critical
>> thing is to allow many sessions to be served by the same thread or
>> thread pool pool.
>
> I agree that this is an important use case. I'll go in to this in more
> detail shortly.
>
>> We also want to integrate that with our own client side poller, and of
>> course keep the current model for backwards compatibility.
>>
>> How about providing two alternatives:
>> Session {
>> void dispatch(); // Calling thread dispatches session.
>> // OR
>> void activate(); // Session is dispatched by qpid's own thread pool.
>> Does not block.
>> void deactivate(); // Stop dispatching in qpid thread pool, blocks
>> till current operation completes.
>> void wait(); // Wait for activated session to be closed, or the last
>> subscription to be cancelled.
>> }
>>
>> I think that covers the majority of cases. The other case that has
>> been mentioned is providing a selectable fd to dispatch to qpid from
>> someone elses select/poll/epoll loop. I'd say that's an addition for
>> the 1.1 release of the new API, not necessarily for 1.0.
>
> Agree (on both points!). Thanks again for all the comments and suggestions!

The commnet on void wait(); above should say "wait for session to be 
deactivated, which occurs when session is closed, last subscription is cancelled 
or deactivate() is called on the session."

Thinking: if we auto-deactivate on subscription cancel, should we auto-activate 
on subscribe (if the sesssion was previously activated() rather than dispatched())?

Thinking some more: maintaining a thread pool in global variables is going to 
create a lot of start-up and shut down ordering issues. Since this is a new API 
I think it might be a good idea to introduce a qpid::Qpid object which can 
encapsulate the thread pool and any other context that is shared between all 
connections. So the examples would go

main() {
   Qpid qpid;
   Connection c = qpid.open(...);
   // Qpid dtor cleans up here.
}

That lets us do all our init & shut-down inside main and avoids problems of 
static ctor/dtor ordering. That also gives a place to configure things like the 
client-side thread pool size etc.

   qpid.setWorkerThreads(10); // 10 worker threads in the pool.

We could also provide built-in argc/argv processing:
main (int argc, char**argv) {
   Qpid qpid(argc, argv);
   // Here argc/argv have been modified to remove all --qpid arguments.
}

That gives an out-of-the-box way for customers to write configurable clients. 
For the client side I think we'd prefix all the options with --qpid- to avoid 
clashes with user options. e.g. --qpid-worker-threads etc.

Above ideas shamelessly stolen from CORBA. There is a significant maintenance 
advantage to keeping it all in main() in terms of lost time chasing sporadic 
shutdown bugs.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Alan Conway <ac...@redhat.com>.
On 08/05/2009 02:45 AM, Gordon Sim wrote:
>> This looks messy:
>> listener.subscribed(session.subscribe("message_queue", listener));
>>
>> Informing the listener can be done as part of the implementation of
>> session.subscribe(), it doesn't
>> need to be left to the user.
>
> I'll have a think about this.

My main objection to the line above is the fact that listener is mentioned 
twice. What does it mean if you use two different listeners? I think its an 
error, in which case the API should be modified to make that error impossibl, 
e.g. by doing the whole thing atomically in sesision.susbcribe. This also avoids 
the error where the user forgets to call listener.subscribed(). I'm assuming 
here that part of the semantics of MessageListener is that it will get a call to 
subscribed() each time it is subscribed.

>> We need to sort out our threading model in this new API. The critical
>> thing is to allow many sessions to be served by the same thread or
>> thread pool pool.
>
> I agree that this is an important use case. I'll go in to this in more
> detail shortly.
>
>> We also want to integrate that with our own client side poller, and of
>> course keep the current model for backwards compatibility.
>>
>> How about providing two alternatives:
>> Session {
>> void dispatch(); // Calling thread dispatches session.
>> // OR
>> void activate(); // Session is dispatched by qpid's own thread pool.
>> Does not block.
>> void deactivate(); // Stop dispatching in qpid thread pool, blocks
>> till current operation completes.
>> void wait(); // Wait for activated session to be closed, or the last
>> subscription to be cancelled.
>> }
>>
>> I think that covers the majority of cases. The other case that has
>> been mentioned is providing a selectable fd to dispatch to qpid from
>> someone elses select/poll/epoll loop. I'd say that's an addition for
>> the 1.1 release of the new API, not necessarily for 1.0.
>
> Agree (on both points!). Thanks again for all the comments and suggestions!

The commnet on void wait(); above should say "wait for session to be 
deactivated, which occurs when session is closed, last subscription is cancelled 
or deactivate() is called on the session."

Thinking: if we auto-deactivate on subscription cancel, should we auto-activate 
on subscribe (if the sesssion was previously activated() rather than dispatched())?

Thinking some more: maintaining a thread pool in global variables is going to 
create a lot of start-up and shut down ordering issues. Since this is a new API 
I think it might be a good idea to introduce a qpid::Qpid object which can 
encapsulate the thread pool and any other context that is shared between all 
connections. So the examples would go

main() {
   Qpid qpid;
   Connection c = qpid.open(...);
   // Qpid dtor cleans up here.
}

That lets us do all our init & shut-down inside main and avoids problems of 
static ctor/dtor ordering. That also gives a place to configure things like the 
client-side thread pool size etc.

   qpid.setWorkerThreads(10); // 10 worker threads in the pool.

We could also provide built-in argc/argv processing:
main (int argc, char**argv) {
   Qpid qpid(argc, argv);
   // Here argc/argv have been modified to remove all --qpid arguments.
}

That gives an out-of-the-box way for customers to write configurable clients. 
For the client side I think we'd prefix all the options with --qpid- to avoid 
clashes with user options. e.g. --qpid-worker-threads etc.

Above ideas shamelessly stolen from CORBA. There is a significant maintenance 
advantage to keeping it all in main() in terms of lost time chasing sporadic 
shutdown bugs.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Gordon Sim <gs...@redhat.com>.
Thanks for all the great suggestions, Alan! Comments inline...

Alan Conway wrote:
> On 07/31/2009 01:10 PM, Gordon Sim wrote:
>> Attached is a patch for a higher level, protocol independent messaging
>> api from c++ as last discussed some months ago. I have not had much time
>> to spend on this since then and so there is not a huge amount of change
>> (mainly moving to a better namespace and adjusting for the revised
>> include directory layout).
>> Looks good, overall I like the flow of the examples.
> 
> We shoudl ensure that Variant::Map and Variant::List provide the full 
> std::map/sequence API.

At present Variant::Map and Variant::List are just typedefs for 
std::map<std::string, Variant> and std::list<Variant> respectively.

> map_sender.cpp:
>     Message message;
>         message.getContent()["id"] = 987654321;
> 
> Why does a message default to having a map content?

It doesn't, but if there is no defined content and you ask for a map, 
then you get a map.

    message.getContent().asList();
    message.getContent()["id"] = 987654321;  //ERROR!

Here the second line would fail with an exception as the content cannot 
be converted to a map (which is required by the [] operator).

>  How about:
> 
>   Map map;
>   map["id"] = ...
>   message.setContent(map);

You can do that already (where Map is a Variant::Map).

> A map is a valuable data structure that deserves to exist independently 
> of Message, and a Message
> shouldn't be predjudiced towards one particular type of content. 

Agree on both points.

> In 
> particular there's no way to
> implement getContent()[] if the message contains binary data.

Agreed. At present the [] operator will throw an exception in that case. 
It is really just a shortcut for message.asMap()[key], and is perhaps 
unnecessary.

> 
>         Variant::List colours;
> 
> We should make it simple & efficient to interop with std:: collections 
> here, e.g.:

Yes I agree; good suggestion!

> 
>   std::vector<string> colours;
>   map["colours"] = Variant::rangeList(colours);
> 
> Where rangeList returns a templated wrapper for a pair of iterators that 
> can be inserted
> into a map.
> 
> map_receiver.cpp:
> 
> print functions: I think all our types including Variant and Map should 
> have std::ostream op<<, I'd push
> the print code into that and use << in the example.

Yes, that would be good.

> Also the type 
> returned by getContent() should have
> an ostream op so you can say:
> 
>         cout << message.getContent();

Agree again.

> queue_listener.cpp:
> 
> This looks messy:
>         listener.subscribed(session.subscribe("message_queue", listener));
> 
> Informing the listener can be done as part of the implementation of 
> session.subscribe(), it doesn't
> need to be left to the user.

I'll have a think about this.

> We need to sort out our threading model in this new API. The critical
> thing is to allow many sessions to be served by the same thread or
> thread pool pool. 

I agree that this is an important use case. I'll go in to this in more 
detail shortly.

> We also want to integrate that with our own client side poller, and of
> course keep the current model for backwards compatibility.
> 
> How about providing two alternatives:
> Session {
>   void dispatch(); // Calling thread dispatches session.
>   // OR
>   void activate(); // Session is dispatched by qpid's own thread pool. 
> Does not block.
>   void deactivate(); // Stop dispatching in qpid thread pool, blocks 
> till current operation completes.
>   void wait(); // Wait for activated session to be closed, or the last 
> subscription to be cancelled.
> }
> 
> I think that covers the majority of cases. The other case that has
> been mentioned is providing a selectable fd to dispatch to qpid from
> someone elses select/poll/epoll loop. I'd say that's an addition for
> the 1.1 release of the new API, not necessarily for 1.0.

Agree (on both points!). Thanks again for all the comments and suggestions!

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: c++ messaging api: a map message example

Posted by Gordon Sim <gs...@redhat.com>.
Thanks for all the great suggestions, Alan! Comments inline...

Alan Conway wrote:
> On 07/31/2009 01:10 PM, Gordon Sim wrote:
>> Attached is a patch for a higher level, protocol independent messaging
>> api from c++ as last discussed some months ago. I have not had much time
>> to spend on this since then and so there is not a huge amount of change
>> (mainly moving to a better namespace and adjusting for the revised
>> include directory layout).
>> Looks good, overall I like the flow of the examples.
> 
> We shoudl ensure that Variant::Map and Variant::List provide the full 
> std::map/sequence API.

At present Variant::Map and Variant::List are just typedefs for 
std::map<std::string, Variant> and std::list<Variant> respectively.

> map_sender.cpp:
>     Message message;
>         message.getContent()["id"] = 987654321;
> 
> Why does a message default to having a map content?

It doesn't, but if there is no defined content and you ask for a map, 
then you get a map.

    message.getContent().asList();
    message.getContent()["id"] = 987654321;  //ERROR!

Here the second line would fail with an exception as the content cannot 
be converted to a map (which is required by the [] operator).

>  How about:
> 
>   Map map;
>   map["id"] = ...
>   message.setContent(map);

You can do that already (where Map is a Variant::Map).

> A map is a valuable data structure that deserves to exist independently 
> of Message, and a Message
> shouldn't be predjudiced towards one particular type of content. 

Agree on both points.

> In 
> particular there's no way to
> implement getContent()[] if the message contains binary data.

Agreed. At present the [] operator will throw an exception in that case. 
It is really just a shortcut for message.asMap()[key], and is perhaps 
unnecessary.

> 
>         Variant::List colours;
> 
> We should make it simple & efficient to interop with std:: collections 
> here, e.g.:

Yes I agree; good suggestion!

> 
>   std::vector<string> colours;
>   map["colours"] = Variant::rangeList(colours);
> 
> Where rangeList returns a templated wrapper for a pair of iterators that 
> can be inserted
> into a map.
> 
> map_receiver.cpp:
> 
> print functions: I think all our types including Variant and Map should 
> have std::ostream op<<, I'd push
> the print code into that and use << in the example.

Yes, that would be good.

> Also the type 
> returned by getContent() should have
> an ostream op so you can say:
> 
>         cout << message.getContent();

Agree again.

> queue_listener.cpp:
> 
> This looks messy:
>         listener.subscribed(session.subscribe("message_queue", listener));
> 
> Informing the listener can be done as part of the implementation of 
> session.subscribe(), it doesn't
> need to be left to the user.

I'll have a think about this.

> We need to sort out our threading model in this new API. The critical
> thing is to allow many sessions to be served by the same thread or
> thread pool pool. 

I agree that this is an important use case. I'll go in to this in more 
detail shortly.

> We also want to integrate that with our own client side poller, and of
> course keep the current model for backwards compatibility.
> 
> How about providing two alternatives:
> Session {
>   void dispatch(); // Calling thread dispatches session.
>   // OR
>   void activate(); // Session is dispatched by qpid's own thread pool. 
> Does not block.
>   void deactivate(); // Stop dispatching in qpid thread pool, blocks 
> till current operation completes.
>   void wait(); // Wait for activated session to be closed, or the last 
> subscription to be cancelled.
> }
> 
> I think that covers the majority of cases. The other case that has
> been mentioned is providing a selectable fd to dispatch to qpid from
> someone elses select/poll/epoll loop. I'd say that's an addition for
> the 1.1 release of the new API, not necessarily for 1.0.

Agree (on both points!). Thanks again for all the comments and suggestions!

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org