You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by Naveen Rawat <na...@in.effectsoft.com> on 2006/08/17 08:29:43 UTC

Queue hogging by a single consumer.

Hi all :) 

I am working with the binary version of ActiveMQ 4.0 broker and trying out 
the openwire cpp apis for asynchronous messaging.
[https://svn.apache.org/repos/asf/incubator/activemq/tags/activemq-4.0/openw 
ire-cpp] 

 

I wonder if I m resurrecting the mummies of issues already burnt. 

 

I am trying out having 3 consumers A, B and C listening on the same queue.
What I am trying to do is - 

	A, B, C sending on Q1, and consuming Z's response on Q2
	Consumer Z listening on Q1, responding back on Q2. 

	like this -
	A/B/C  >>  Q1  >>  Z  >>  Q2  >>  A/B/C
	 

Listening/responding to a single consumer is working well at present. BUT 
broker is spoofing up the responses from Z to the simultaneous consumers 
(either 2 or all three). Response for one consumer (A) is going to any of 
the other consumer (B/C). Same is happening for other consumers. Being 
prefetch size preset to 1000, the consumer that first manages session with 
the broker on a queue is getting all the messages (and if it gets 
terminated, the following one hogs the all and so on.) . 

As I m at presently testing, setting prefetch size to less (say 1), even 
dont solves the purpose as not giving it frequest quick requests (man Vs 
machine). Moreover as the hogging consumer is reading and acknowledging all 
the responses, the prefetch size of even 1 is not surpassed. 

I tried out "with no success" the way of grid processing of messages (using 
MessageGroups) as suggested in
	http://activemq.org/site/how-do-i-preserve-order-of-messages.html
Code relevant of this is as follows - 

	[........A/B/C....................................................
	producer = session->createProducer(Q1) ;
	producer->setPersistent(true) ;
	message = session->createBytesMessage() ;
	message->setJMSXGroupID("cheese") ;
	message->writeString("Hello World") ;
 	producer->send(message);
	.................................................................] 


	[ .........Z ' s OnMessage(message)...............................
	p<string> NGid;
	NGid = message->getJMSXGroupID();
	producer = session->createProducer(Q2) ;
	producer->setPersistent(true) ;
	reqMessage = session->createBytesMessage() ;
	reqMessage->setJMSXGroupID(NGid->c_str() );
	reqMessage->writeString("R E C E I V E D") ; 	//response string
	producer->send(reqMessage);
	.................................................................] 

Is there anymore needed in the code that I m loosing? 


I come to know that there are certain issues yet not resolved pertaining to 
the prefetch buffer initial size. Correct me please.
Will manipulation of prefetch buffer size help my cause? Please suggest a 
way otherwise. 

HELP ME.. 


			THANKS IN ADVANCE 

Regards,
Navin

Re: Queue hogging by a single consumer.

Posted by James Strachan <ja...@gmail.com>.
More details here along with example code in the Lingo project...
http://activemq.org/site/how-should-i-implement-request-response-with-jms.html

here's some pseudocode...

// client side
Destination tempDest = session.createTemporaryQueue();
session.createConsumer(tempDest);

// send a request..
message.setJMSReplyTo(tempDest);
message.setJMSCorrelationID(myCorrelationID);
producer.send(message);


// server side
public void onMessage(Message request) {

Message response = session.createMessage();
response.setJMSCorrelationID(request.getJMSCorrelationID())

producer.send(request.getJMSReplyTo(), response)
}



On 8/18/06, ahamad <ar...@indiatimes.com> wrote:
>
>
> >
> James.Strachan wrote:
> >
> >>On 8/17/06, Naveen Rawat <na...@in.effectsoft.com> wrote:
> >>> Hi James,
> >>>
> >>> Thanks for your response.
> >>>
> >>>
> >>> > Are you trying to implement request-response with A, B, C making
> >>> > requests on Z and getting the response? Or can A, B, C process any
> >>> > message from Z?
> >>>
> >>>
> >>> Exactly the first case.
> >>> A, B, C making requests on Z and getting the response from Z
> >>>
> >>>
> >>>
> >> >> I'm not sure if your issue is that say A doesn't see the responses for
> >> >> its request (if thats the case use either 3 queues, use temporary
> >> > >queues for the responses or use a selector and a correlationID on the
> >> >> request & response) - or is it that you have a small number of
> >> > >responses from Z and they are being hogged by one consumer - in which
> >> >> case setting a small prefetch and a round robin dispatch policy will
> >> >> fix this.
> >>>
> >>>
> >> >Its that,  A doesn't see the responses for its requests made.
> >>>
> >> >I would really appreciate if I can get some help stuff on -
> >>  >       1) Creating, destroying and maintaining data in temporary
> >> queues.
> >>  >       2) Setting selector and correlationID in messages.
> >
> >>Details here
> >
> >>http://incubator.apache.org/activemq/how-should-i-implement-request-response-with-jms.html
> >
> >>for 1) just call session.createTemporaryQueue() and set that queue on
> >>the Message.setJMSReplyTo() property so that services can reply to
> >>your temporary queue. They are deleted when A terminates so there's no
> >>issue with maintaining data.
> >
> >  Message.setJMSReplyTo( destination) API wants to set the destination so
> > please tell me How I can set the destination for that and How Sender gets
> > this responce without set the MessageListener  at the sender site. Please
> > help me ...
> >
> >
> >>for 2) just add a JMSCorrelationID() to the request messages you send
> >>as requests. You can then use a selector such as "JMSCorrelationID =
> >>'abc'" on the consumer for responses so that responses are filtered to
> >>only return A's results etc
> >
> >>The contract of Z whichever option you go with is the to copy the
> >>JMSCorrelationID property from the request to the response message and
> >>send the response message to the request.getJMSReployTo() destination
> >
> >>--
> >
> >>James
> >>-------
> >>http://radio.weblogs.com/0112098/
> >
> >>
>
> Thanks
> Arashad Ahmad
> --
> View this message in context: http://www.nabble.com/Queue-hogging-by-a-single-consumer.-tf2119797.html#a5866866
> Sent from the ActiveMQ - Dev forum at Nabble.com.
>
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Re: Queue hogging by a single consumer.

Posted by ahamad <ar...@indiatimes.com>.

>
James.Strachan wrote:
> 
>>On 8/17/06, Naveen Rawat <na...@in.effectsoft.com> wrote:
>>> Hi James,
>>>
>>> Thanks for your response.
>>>
>>>
>>> > Are you trying to implement request-response with A, B, C making
>>> > requests on Z and getting the response? Or can A, B, C process any
>>> > message from Z?
>>>
>>>
>>> Exactly the first case.
>>> A, B, C making requests on Z and getting the response from Z
>>>
>>>
>>>
>> >> I'm not sure if your issue is that say A doesn't see the responses for
>> >> its request (if thats the case use either 3 queues, use temporary
>> > >queues for the responses or use a selector and a correlationID on the
>> >> request & response) - or is it that you have a small number of
>> > >responses from Z and they are being hogged by one consumer - in which
>> >> case setting a small prefetch and a round robin dispatch policy will
>> >> fix this.
>>>
>>>
>> >Its that,  A doesn't see the responses for its requests made.
>>>
>> >I would really appreciate if I can get some help stuff on -
>>  >       1) Creating, destroying and maintaining data in temporary
>> queues.
>>  >       2) Setting selector and correlationID in messages.
> 
>>Details here
> 
>>http://incubator.apache.org/activemq/how-should-i-implement-request-response-with-jms.html
> 
>>for 1) just call session.createTemporaryQueue() and set that queue on
>>the Message.setJMSReplyTo() property so that services can reply to
>>your temporary queue. They are deleted when A terminates so there's no
>>issue with maintaining data.
> 
>  Message.setJMSReplyTo( destination) API wants to set the destination so
> please tell me How I can set the destination for that and How Sender gets
> this responce without set the MessageListener  at the sender site. Please
> help me ...
> 
> 
>>for 2) just add a JMSCorrelationID() to the request messages you send
>>as requests. You can then use a selector such as "JMSCorrelationID =
>>'abc'" on the consumer for responses so that responses are filtered to
>>only return A's results etc
> 
>>The contract of Z whichever option you go with is the to copy the
>>JMSCorrelationID property from the request to the response message and
>>send the response message to the request.getJMSReployTo() destination
> 
>>-- 
> 
>>James
>>-------
>>http://radio.weblogs.com/0112098/
> 
>>

Thanks 
Arashad Ahmad
-- 
View this message in context: http://www.nabble.com/Queue-hogging-by-a-single-consumer.-tf2119797.html#a5866866
Sent from the ActiveMQ - Dev forum at Nabble.com.


Re: Queue hogging by a single consumer.

Posted by James Strachan <ja...@gmail.com>.
Are you trying to implement request-response with A, B, C making
requests on Z and getting the response? Or can A, B, C process any
message from Z?

I'm not sure if your issue is that say A doesn't see the responses for
its request (if thats the case use either 3 queues, use temporary
queues for the responses or use a selector and a correlationID on the
request & response) - or is it that you have a small number of
responses from Z and they are being hogged by one consumer - in which
case setting a small prefetch and a round robin dispatch policy will
fix this.

On 8/17/06, Naveen Rawat <na...@in.effectsoft.com> wrote:
>
> Hi all :)
>
> I am working with the binary version of ActiveMQ 4.0 broker and trying out
> the openwire cpp apis for asynchronous messaging.
> [https://svn.apache.org/repos/asf/incubator/activemq/tags/activemq-4.0/openw
> ire-cpp]
>
>
>
> I wonder if I m resurrecting the mummies of issues already burnt.
>
>
>
> I am trying out having 3 consumers A, B and C listening on the same queue.
> What I am trying to do is -
>
>         A, B, C sending on Q1, and consuming Z's response on Q2
>         Consumer Z listening on Q1, responding back on Q2.
>
>         like this -
>         A/B/C  >>  Q1  >>  Z  >>  Q2  >>  A/B/C
>
>
> Listening/responding to a single consumer is working well at present. BUT
> broker is spoofing up the responses from Z to the simultaneous consumers
> (either 2 or all three). Response for one consumer (A) is going to any of
> the other consumer (B/C). Same is happening for other consumers. Being
> prefetch size preset to 1000, the consumer that first manages session with
> the broker on a queue is getting all the messages (and if it gets
> terminated, the following one hogs the all and so on.) .
>
> As I m at presently testing, setting prefetch size to less (say 1), even
> dont solves the purpose as not giving it frequest quick requests (man Vs
> machine). Moreover as the hogging consumer is reading and acknowledging all
> the responses, the prefetch size of even 1 is not surpassed.
>
> I tried out "with no success" the way of grid processing of messages (using
> MessageGroups) as suggested in
>         http://activemq.org/site/how-do-i-preserve-order-of-messages.html
> Code relevant of this is as follows -
>
>         [........A/B/C....................................................
>         producer = session->createProducer(Q1) ;
>         producer->setPersistent(true) ;
>         message = session->createBytesMessage() ;
>         message->setJMSXGroupID("cheese") ;
>         message->writeString("Hello World") ;
>         producer->send(message);
>         .................................................................]
>
>
>         [ .........Z ' s OnMessage(message)...............................
>         p<string> NGid;
>         NGid = message->getJMSXGroupID();
>         producer = session->createProducer(Q2) ;
>         producer->setPersistent(true) ;
>         reqMessage = session->createBytesMessage() ;
>         reqMessage->setJMSXGroupID(NGid->c_str() );
>         reqMessage->writeString("R E C E I V E D") ;    //response string
>         producer->send(reqMessage);
>         .................................................................]
>
> Is there anymore needed in the code that I m loosing?
>
>
> I come to know that there are certain issues yet not resolved pertaining to
> the prefetch buffer initial size. Correct me please.
> Will manipulation of prefetch buffer size help my cause? Please suggest a
> way otherwise.
>
> HELP ME..
>
>
>                         THANKS IN ADVANCE
>
> Regards,
> Navin
>


-- 

James
-------
http://radio.weblogs.com/0112098/