You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Scott the Red <sc...@yahoo.com> on 2007/10/25 08:48:36 UTC

c++ client doesn't receive on all topic consumers

I'm creating a C++ adapter class to represent a message consumer connecting
to ActiveMQ.

Unfortunately, the consumers are not reliably receiving messages.  Some
receive OK, but others have a problem where the first consumer of a
particular message doesn't receive.  However, any additional consumers of
the same type do.

I also see that a process sending a message in one thread won't receive the
message in another thread.  The sender and consumer use different
connections, and the consumer's "noLocal" is false.

The consumers are listening on topics using message selectors.  The topics
and connection have no special configuration. 

The adapter is based on the "standard" example at
http://activemq.apache.org/cms/example.html



The connection code looks like this (mostly it's just a copy of the
example):

void MyConsumer::start()
{

	std::string brokerURI =
	        "tcp://127.0.0.1:61616"
        	"?wireFormat=openwire"
	        "&transport.useAsyncSend=true";


        // Create a ConnectionFactory
       	ActiveMQConnectionFactory* connectionFactory =
	           new ActiveMQConnectionFactory( brokerURI );

        // Create a Connection
       	connection = connectionFactory->createConnection();
        delete connectionFactory;
       	connection->start();

        connection->setExceptionListener(this);

        // Create a Session
        session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

        // Create the destination (Topic or Queue)
        if( useTopic ) {
            destination = session->createTopic( this->destinationName );
        } else {
            destination = session->createQueue( this->destinationName );
        }

	// create a message selector:
        std::stringstream selector;
        selector << "messageType='" << this->messageType << "'";

        // Create a MessageConsumer from the Session to the Topic or Queue
        consumer = session->createConsumer( destination, selector.str() );

        consumer->setMessageListener( this );

        // Indicate we are ready for messages.
        latch.countDown();
}

/*virtual*/ void onMessage( const Message* message ) 
{
   ...
}



Debugging and output show that onMessage() is never called for the
non-receiving consumers.

Java-based processes show that the input messages are being sent correctly.



Anyone have any ideas?


Is there a way to configure the topic for higher reliability?


Thanks!





-- 
View this message in context: http://www.nabble.com/c%2B%2B-client-doesn%27t-receive-on-all-topic-consumers-tf4688946s2354.html#a13401180
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


RE: c++ client doesn't receive on all topic consumers

Posted by "Mittler, Nathan" <na...@sensis.com>.
Hmm .. Sounds fishy.  

Anyway, I'm glad you got things working.  Thanks for the update!

> -----Original Message-----
> From: Scott the Red [mailto:scott_samek@yahoo.com] 
> Sent: Thursday, October 25, 2007 3:22 PM
> To: users@activemq.apache.org
> Subject: Re: c++ client doesn't receive on all topic consumers
> 
> 
> Hmmmmm.....  I seem to have fixed the problem by merely 
> moving the ActiveMQ connection out of my consumer's 
> constructor, and into a separate "start"
> method.  Everything seems to be working properly now.  There 
> don't appear to be any variable lifecycle issues (like a 
> local or argument being referenced after it's goes out of 
> scope).  And onMessage() should never have been called before 
> the constructor exited...  
> 
> At any rate, I think I'll file it under "not looking the gift 
> horse in the mouth", and be happy it works.
> 
> thanks!
> 
> 
> --
> View this message in context: 
> http://www.nabble.com/c%2B%2B-client-doesn%27t-receive-on-all-
> topic-consumers-tf4688946s2354.html#a13413988
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> 
> 

Re: c++ client doesn't receive on all topic consumers

Posted by Scott the Red <sc...@yahoo.com>.
Hmmmmm.....  I seem to have fixed the problem by merely moving the ActiveMQ
connection out of my consumer's constructor, and into a separate "start"
method.  Everything seems to be working properly now.  There don't appear to
be any variable lifecycle issues (like a local or argument being referenced
after it's goes out of scope).  And onMessage() should never have been
called before the constructor exited...  

At any rate, I think I'll file it under "not looking the gift horse in the
mouth", and be happy it works.

thanks!


-- 
View this message in context: http://www.nabble.com/c%2B%2B-client-doesn%27t-receive-on-all-topic-consumers-tf4688946s2354.html#a13413988
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: c++ client doesn't receive on all topic consumers

Posted by Scott the Red <sc...@yahoo.com>.

nmittler wrote:
> 
> Which version of ActiveMQ-CPP are you using and against which ActiveMQ
> broker version?  Also, platform might be helpful.
> 

ActiveMQ 4.1.1
ActiveMQ CPP 2.1 
Fedora Linux 



nmittler wrote:
> 
> Could it be that this first consumer was started at or after the time the
> message was sent?  Consumers won't get messages they missed unless they
> are
> retroactive consumers
> (http://activemq.apache.org/retroactive-consumer.html)
> 

No, I start the process with the consumers, wait for it to tell me it's
ready, and then send the messages from an external program.  In the case of
the single process talking to itself, I sleep before starting the producer.



nmittler wrote:
> 
> That shouldn't be a problem.  Our example does this:
> http://activemq.apache.org/cms/example.html.
> 
> To help isolate the problem, you might try not using selectors to see if
> you
> get all the messages you expect.
> 

I tried removing the selector.  No luck.  



nmittler wrote:
> 
> Are your java consumers using selectors as well, or do they simply receive
> everything?  Also, what is the lifetime of the java consumers?  Do you
> typically start them before you start the C++ consumers?
> 

The java consumers seem to work both with and without the selectors.  They
receive the messages that the C++ client is missing.  I've generally started
the java consumers before the C++.


nmittler wrote:
> 
> I'm not convinced this is a reliability issue.  We have flooded our
> consumers pretty hard in our testing.  How many messages are you sending
> per
> second?  How many producers and consumers.  Is there just a single topic
> or
> do you have several?
> 

It could easily be an issue with how I'm managing the consumers.

At this phase, I'm sending just a very few messages.  They all go to a
single topic, and I use a messageType property to differentiate the contents
of the message.


As far as I can see, the main ways my code deviates from the example are
that:

1) it uses a selector     (but removing the selector doesn't help)

2) scope - my adapter setups up the activemq consumer, and then stores it in
a registry, where they sit and wait to be unregistered.

3) the example runs its consumer in a thread.  But I think that's just for
the example's purposes so that the producers and consumer can both work in
the same process.  I believe activemq-cpp is creating all the threads it
needs when I create the consumer.


Thanks much!

-- 
View this message in context: http://www.nabble.com/c%2B%2B-client-doesn%27t-receive-on-all-topic-consumers-tf4688946s2354.html#a13411044
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: c++ client doesn't receive on all topic consumers

Posted by Nathan Mittler <na...@gmail.com>.
Which version of ActiveMQ-CPP are you using and against which ActiveMQ
broker version?  Also, platform might be helpful.


> I'm creating a C++ adapter class to represent a message consumer
> connecting
> to ActiveMQ.
>
> Unfortunately, the consumers are not reliably receiving messages.  Some
> receive OK, but others have a problem where the first consumer of a
> particular message doesn't receive.  However, any additional consumers of
> the same type do.


Could it be that this first consumer was started at or after the time the
message was sent?  Consumers won't get messages they missed unless they are
retroactive consumers (http://activemq.apache.org/retroactive-consumer.html)

I also see that a process sending a message in one thread won't receive the
> message in another thread.  The sender and consumer use different
> connections, and the consumer's "noLocal" is false.


That shouldn't be a problem.  Our example does this:
http://activemq.apache.org/cms/example.html.

The consumers are listening on topics using message selectors.  The topics
> and connection have no special configuration.


To help isolate the problem, you might try not using selectors to see if you
get all the messages you expect.

The adapter is based on the "standard" example at
> http://activemq.apache.org/cms/example.html
>
>
>
> The connection code looks like this (mostly it's just a copy of the
> example):
>
> void MyConsumer::start()
> {
>
>         std::string brokerURI =
>                 "tcp://127.0.0.1:61616"
>                 "?wireFormat=openwire"
>                 "&transport.useAsyncSend=true";
>
>
>         // Create a ConnectionFactory
>         ActiveMQConnectionFactory* connectionFactory =
>                    new ActiveMQConnectionFactory( brokerURI );
>
>         // Create a Connection
>         connection = connectionFactory->createConnection();
>         delete connectionFactory;
>         connection->start();
>
>         connection->setExceptionListener(this);
>
>         // Create a Session
>         session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
>
>         // Create the destination (Topic or Queue)
>         if( useTopic ) {
>             destination = session->createTopic( this->destinationName );
>         } else {
>             destination = session->createQueue( this->destinationName );
>         }
>
>         // create a message selector:
>         std::stringstream selector;
>         selector << "messageType='" << this->messageType << "'";
>
>         // Create a MessageConsumer from the Session to the Topic or Queue
>         consumer = session->createConsumer( destination, selector.str() );
>
>         consumer->setMessageListener( this );
>
>         // Indicate we are ready for messages.
>         latch.countDown();
> }
>
> /*virtual*/ void onMessage( const Message* message )
> {
>    ...
> }
>
>
>
> Debugging and output show that onMessage() is never called for the
> non-receiving consumers.
>
> Java-based processes show that the input messages are being sent
> correctly.


Are your java consumers using selectors as well, or do they simply receive
everything?  Also, what is the lifetime of the java consumers?  Do you
typically start them before you start the C++ consumers?

Anyone have any ideas?
>
>
> Is there a way to configure the topic for higher reliability?


I'm not convinced this is a reliability issue.  We have flooded our
consumers pretty hard in our testing.  How many messages are you sending per
second?  How many producers and consumers.  Is there just a single topic or
do you have several?

Thanks!
>

Regards,
Nate