You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Nascent <om...@yahoo.com> on 2009/06/11 17:26:23 UTC

ActiveMQ-CPP and Advisory messages

Scenario: 
I have a consumer written in C++ that uses ActiveMQ-CPP to communicate with
ActiveMQ. It implements a callback interface where a client can request
on-change updates for data points. The consumer then does a response message
to the original request from the client any time the data point updates. The
response messages go to a temporary queue (reply queue basically) that is
associated with the client.

Problem:
I am trying to figure out a way for the consumer to know that the client has
gone away so it can stop sending updates and free up resources associated
with that client. I cannot always trust that the client will send some kind
of unsubscribe message. I am looking for a way that the consumer can know
authoritatively that the client has either disconnected from ActiveMQ or
that the temporary/reply queue for that client is no longer valid. It seems
like ActiveMQ's Advisory messages are the solution however I don't see how
they can be used through ActiveMQ-CPP since the messages use Java objects as
the body. 

Questions:
Is there anyway to make use of the advisory messages through ActiveMQ-CPP? 

Is there perhaps a better way to handle the scenario listed above that has
less overhead? I have considered using a topic that all data point updates
would go to and then clients could consume with a selector to get only the
updates they care about. However, in my case, there are potentially hundreds
of data points updating every second and clients are usually only interested
in a handful of them. So it seems inefficient to me to have all these data
point updates going to a topic where less than 5% of them may actually be
consumed.

Thanks in advance!
-- 
View this message in context: http://www.nabble.com/ActiveMQ-CPP-and-Advisory-messages-tp23983516p23983516.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: ActiveMQ-CPP and Advisory messages

Posted by Timothy Bish <ta...@gmail.com>.
On Sun, 2009-06-14 at 14:38 -0700, Nascent wrote:
> Tim,
> 
> Thanks for the reply. The tutorial looks great. I have a few more questions
> though.
> 
> Specifically, I am trying to use topic://ActiveMQ.Advisory.TempQueue but
> there doesn't seem to be any msg properties that I can leverage and I have
> no visibility into the body of the messages that come across this topic.
> According to ActiveMQ documentation
> (http://activemq.apache.org/advisory-message.html) the properties and data
> structure are null for messages on this topic. Is this correct? I would
> think a message on this topic would at the very least have the temporary
> queue name and whether it was just created or destroyed. This is the kind of
> information I am looking for. I want to know if a client is gone by knowing
> when its temporary (reply) queue goes away. 
> 
> I have a simple consumer (TestAdvisory) that subscribes to
> "topic://ActiveMQ.Advisory.>". Below is output from this consumer. I start
> the TestAdvisory consumer and then start another test application that
> connects to ActiveMQ and immediately creates a temporary queue to act as its
> reply queue. Here is the output from TestAdvisory when I start the
> application and then stop it:
> 
> Output:
> {TestAdvisory started}
> {test application started}
> 
> Message received on ActiveMQ.Advisory.Connection (msg type = Unknown)
> Property: originBrokerId = ID:<host>-58822-1245008598443-0:0
> Property: originBrokerName = localhost
> Property: originBrokerURL = vm://localhost
> 
> Message received on ActiveMQ.Advisory.TempQueue (msg type = Unknown)
> Property: originBrokerId = ID:<host>-58822-1245008598443-0:0
> Property: originBrokerName = localhost
> Property: originBrokerURL = vm://localhost
> 
> Message received on
> ActiveMQ.Advisory.Consumer.Queue.b74906ba-b930-df85-100a-abec448e4189:0 (msg
> type = Unknown)
> Property: consumerCount = 1
> Property: originBrokerId = ID:<host>-58822-1245008598443-0:0
> Property: originBrokerName = localhost
> Property: originBrokerURL = vm://localhost
> 
> {test application stopped}
> 
> Message received on ActiveMQ.Advisory.TempQueue (msg type = Unknown)
> Property: originBrokerId = ID:<host>-58822-1245008598443-0:0
> Property: originBrokerName = localhost
> Property: originBrokerURL = vm://localhost
> 
> Message received on ActiveMQ.Advisory.Connection (msg type = Unknown)
> Property: originBrokerId = ID:<host>-58822-1245008598443-0:0
> Property: originBrokerName = localhost
> Property: originBrokerURL = vm://localhost
> 
> {TestAdvisory stopped}
> 
> As you can see from the output, I am iterating over the message properties
> and displaying them. I am also getting the destination from the message and
> I try to determine the message type by doing dynamic_cast checks against
> sub-classes of cms::Message. I assume the two messages that come across
> topic://ActiveMQ.Advisory.TempQueue are advising of the temporary queue of
> the application being created and destroyed but I can't verify that with
> code since there are no useful message properties and also since the message
> type is unknown (I am assuming it is an object message).
> 
> So after all that... my question is, how can I get useful information out of
> the messages on ActiveMQ.Advisory.TempQueue? 

>>From the 5.3 Broker code it looks like there should be a DestinationInfo
object in the data structure element of the ActiveMQMessage that is
received.  You can get to this in CMS but its a bit tricky, especially
with the 2.x code.  I will try and add another example that shows how to
access this from the 3.0 code when I have a chance.  The DestinationInfo
object would show a remove or add operation on the destination in
question which is how you would know when they have been created or
destroyed.

Regards
Tim.

> 
> >From everything I have read about CMS, the answer is you can't because CMS
> can't handle Java Object messages. I understand if this is the case and can
> look at alternatives but I just want to make sure I am not missing something
> here.
> 
> 
> 
> 
> 
> If interested, here is the code in TestAdvisory that generated the above
> output (compiled against ActiveMQ-CPP v2.2.2):
> void TestAdvisory::handleMessage(cms::Message* p_message)
> {
>         string msgType = "Unknown";
>         if (dynamic_cast<cms::BytesMessage*>(p_message))
>         {
>                 msgType = "Bytes";
>         }
>         else if (dynamic_cast<cms::MapMessage*>(p_message))
>         {
>                 msgType = "Map";
>         }
>         //else if (dynamic_cast<cms::ObjectMessage*>(p_message))
>         //{
>                 //msgType = "Object";
>         //}
>         else if (dynamic_cast<cms::TextMessage*>(p_message))
>         {
>                 msgType = "Text";
>         }
> 
>         const cms::Destination *dest = p_message->getCMSDestination();
>         cout << endl << "Message received on " + dest->toProviderString() +
> " (msg type = " << msgType << ")" << endl;
> 
>         vector<pair<string, string> > destProps =
> dest->getCMSProperties().toArray();
>         for (vector<pair<string, string> >::iterator itr =
> destProps.begin(); itr != destProps.end(); itr++)
>         {
>                 cout << "Dest Property: " << itr->first << " = " <<
> itr->second << endl;
>         }
> 
>         vector<string> properties = p_message->getPropertyNames();
>         for (vector<string>::iterator itr = properties.begin(); itr !=
> properties.end(); itr++)
>         {
>                 try
>                 {
>                         cout << "Property: " << *itr << " = " <<
> p_message->getStringProperty(*itr) << endl;
>                 }
>                 catch (...)
>                 {
>                         cout << "Property: " << *itr << " = " <<
> p_message->getIntProperty(*itr) << endl;
>                 }
>         }
> }
> 
-- 
Tim Bish
http://fusesource.com
http://timbish.blogspot.com/




Re: ActiveMQ-CPP and Advisory messages

Posted by Timothy Bish <ta...@gmail.com>.
On Sun, 2009-06-14 at 14:38 -0700, Nascent wrote:
> Tim,
> 
> Thanks for the reply. The tutorial looks great. I have a few more questions
> though.
> 
> Specifically, I am trying to use topic://ActiveMQ.Advisory.TempQueue but
> there doesn't seem to be any msg properties that I can leverage and I have
> no visibility into the body of the messages that come across this topic.
> According to ActiveMQ documentation
> (http://activemq.apache.org/advisory-message.html) the properties and data
> structure are null for messages on this topic. Is this correct? I would
> think a message on this topic would at the very least have the temporary
> queue name and whether it was just created or destroyed. This is the kind of
> information I am looking for. I want to know if a client is gone by knowing
> when its temporary (reply) queue goes away. 
> 

I have updated the tutorial to demonstrate how you can monitor the
Broker for temp destinations being created and destroyed.  Here's the
link angain:

http://activemq.apache.org/cms/handling-advisory-messages.html

Note that you will be a bit more challenged to try and do this with the
2.x code and I'd really advise you to move to 3.0 for this.

There are also new examples in the trunk code under examples/advisories
for this as well.

Regards
Tim.

-- 
Tim Bish
http://fusesource.com
http://timbish.blogspot.com/




Re: ActiveMQ-CPP and Advisory messages

Posted by Nascent <om...@yahoo.com>.
Tim,

Thanks for the reply. The tutorial looks great. I have a few more questions
though.

Specifically, I am trying to use topic://ActiveMQ.Advisory.TempQueue but
there doesn't seem to be any msg properties that I can leverage and I have
no visibility into the body of the messages that come across this topic.
According to ActiveMQ documentation
(http://activemq.apache.org/advisory-message.html) the properties and data
structure are null for messages on this topic. Is this correct? I would
think a message on this topic would at the very least have the temporary
queue name and whether it was just created or destroyed. This is the kind of
information I am looking for. I want to know if a client is gone by knowing
when its temporary (reply) queue goes away. 

I have a simple consumer (TestAdvisory) that subscribes to
"topic://ActiveMQ.Advisory.>". Below is output from this consumer. I start
the TestAdvisory consumer and then start another test application that
connects to ActiveMQ and immediately creates a temporary queue to act as its
reply queue. Here is the output from TestAdvisory when I start the
application and then stop it:

Output:
{TestAdvisory started}
{test application started}

Message received on ActiveMQ.Advisory.Connection (msg type = Unknown)
Property: originBrokerId = ID:<host>-58822-1245008598443-0:0
Property: originBrokerName = localhost
Property: originBrokerURL = vm://localhost

Message received on ActiveMQ.Advisory.TempQueue (msg type = Unknown)
Property: originBrokerId = ID:<host>-58822-1245008598443-0:0
Property: originBrokerName = localhost
Property: originBrokerURL = vm://localhost

Message received on
ActiveMQ.Advisory.Consumer.Queue.b74906ba-b930-df85-100a-abec448e4189:0 (msg
type = Unknown)
Property: consumerCount = 1
Property: originBrokerId = ID:<host>-58822-1245008598443-0:0
Property: originBrokerName = localhost
Property: originBrokerURL = vm://localhost

{test application stopped}

Message received on ActiveMQ.Advisory.TempQueue (msg type = Unknown)
Property: originBrokerId = ID:<host>-58822-1245008598443-0:0
Property: originBrokerName = localhost
Property: originBrokerURL = vm://localhost

Message received on ActiveMQ.Advisory.Connection (msg type = Unknown)
Property: originBrokerId = ID:<host>-58822-1245008598443-0:0
Property: originBrokerName = localhost
Property: originBrokerURL = vm://localhost

{TestAdvisory stopped}

As you can see from the output, I am iterating over the message properties
and displaying them. I am also getting the destination from the message and
I try to determine the message type by doing dynamic_cast checks against
sub-classes of cms::Message. I assume the two messages that come across
topic://ActiveMQ.Advisory.TempQueue are advising of the temporary queue of
the application being created and destroyed but I can't verify that with
code since there are no useful message properties and also since the message
type is unknown (I am assuming it is an object message).

So after all that... my question is, how can I get useful information out of
the messages on ActiveMQ.Advisory.TempQueue? 

>From everything I have read about CMS, the answer is you can't because CMS
can't handle Java Object messages. I understand if this is the case and can
look at alternatives but I just want to make sure I am not missing something
here.





If interested, here is the code in TestAdvisory that generated the above
output (compiled against ActiveMQ-CPP v2.2.2):
void TestAdvisory::handleMessage(cms::Message* p_message)
{
        string msgType = "Unknown";
        if (dynamic_cast<cms::BytesMessage*>(p_message))
        {
                msgType = "Bytes";
        }
        else if (dynamic_cast<cms::MapMessage*>(p_message))
        {
                msgType = "Map";
        }
        //else if (dynamic_cast<cms::ObjectMessage*>(p_message))
        //{
                //msgType = "Object";
        //}
        else if (dynamic_cast<cms::TextMessage*>(p_message))
        {
                msgType = "Text";
        }

        const cms::Destination *dest = p_message->getCMSDestination();
        cout << endl << "Message received on " + dest->toProviderString() +
" (msg type = " << msgType << ")" << endl;

        vector<pair<string, string> > destProps =
dest->getCMSProperties().toArray();
        for (vector<pair<string, string> >::iterator itr =
destProps.begin(); itr != destProps.end(); itr++)
        {
                cout << "Dest Property: " << itr->first << " = " <<
itr->second << endl;
        }

        vector<string> properties = p_message->getPropertyNames();
        for (vector<string>::iterator itr = properties.begin(); itr !=
properties.end(); itr++)
        {
                try
                {
                        cout << "Property: " << *itr << " = " <<
p_message->getStringProperty(*itr) << endl;
                }
                catch (...)
                {
                        cout << "Property: " << *itr << " = " <<
p_message->getIntProperty(*itr) << endl;
                }
        }
}

-- 
View this message in context: http://www.nabble.com/ActiveMQ-CPP-and-Advisory-messages-tp23983516p24026003.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: ActiveMQ-CPP and Advisory messages

Posted by Timothy Bish <ta...@gmail.com>.
On Thu, 2009-06-11 at 08:26 -0700, Nascent wrote:
> Scenario: 
> I have a consumer written in C++ that uses ActiveMQ-CPP to communicate with
> ActiveMQ. It implements a callback interface where a client can request
> on-change updates for data points. The consumer then does a response message
> to the original request from the client any time the data point updates. The
> response messages go to a temporary queue (reply queue basically) that is
> associated with the client.
> 
> Problem:
> I am trying to figure out a way for the consumer to know that the client has
> gone away so it can stop sending updates and free up resources associated
> with that client. I cannot always trust that the client will send some kind
> of unsubscribe message. I am looking for a way that the consumer can know
> authoritatively that the client has either disconnected from ActiveMQ or
> that the temporary/reply queue for that client is no longer valid. It seems
> like ActiveMQ's Advisory messages are the solution however I don't see how
> they can be used through ActiveMQ-CPP since the messages use Java objects as
> the body. 
> 
> Questions:
> Is there anyway to make use of the advisory messages through ActiveMQ-CPP? 
> 

Since this seemed like a question that might confront many ActiveMQ-CPP
users I've created a Tutorial on it on the CMS Wiki.  Here's the link:

http://activemq.apache.org/cms/handling-advisory-messages.html

The tutorial is a work in progress, so comments are welcome, if you see
something that needs more explanation let me know.

I also created an example Producer and Consumer that show the basics of
using the advisory messages.  You will need to download the latest SVN
source to see the code, its in the src/exmaples/advisories folder.

Regards
Tim.

> Is there perhaps a better way to handle the scenario listed above that has
> less overhead? I have considered using a topic that all data point updates
> would go to and then clients could consume with a selector to get only the
> updates they care about. However, in my case, there are potentially hundreds
> of data points updating every second and clients are usually only interested
> in a handful of them. So it seems inefficient to me to have all these data
> point updates going to a topic where less than 5% of them may actually be
> consumed.
> 
> Thanks in advance!
-- 
Tim Bish
http://fusesource.com
http://timbish.blogspot.com/