You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by ChicagoBob123 <bo...@bobfx.com> on 2010/09/01 16:51:41 UTC

Newbie questions on NMS or CMS ActiveMQ consumers puzzle

The problem I have 2 consumers of one queue and each consumer only seems to
get 1/2 the messages. 

Here is some my consumer code. 
The concept is easy I browse the queue and remove any messages that belonged
to my application
based on an id contained in the message. Then I acknowledge which should
dequeue it (but it did not)

Later I tried the ConsumerReceiveThread() which just does a Receive which
dequeues the items but only gets 1/2 the items sent.. 

In my initialization  I started comsuming the queue with using
AcknowledgementMode.IndividualAcknowledge session flag which I had hoped
would allow me 
to remove only the items I was interested in. 
Why would I get only 1/2 the items? Also would I only get 1/3 the items if I
had 3 consumers? (I guess I should just try that) 


HELP ideas? thanks


//////////// Create the queue
 Uri connecturi = new Uri(ActiveMQURL.Text);
   Apache.NMS.ActiveMQ.ConnectionFactory connect = new
Apache.NMS.ActiveMQ.ConnectionFactory(connecturi);
   connection = connect.CreateConnection();
   connection.Start();
   session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
   destination = SessionUtil.GetDestination(session, QueueName.Text);

   // Create a consumer    
  consumer = session.CreateConsumer(destination);



///////// Consumer thread. 
  public void ConsumerBrowserThread()
   {
    String rec;

    String idtext = "id=\"";
    idtext = idtext + f.ID.Text + "\"";
    StayAlive = true;
    while (StayAlive)
     {
      IQueue q = SessionUtil.GetQueue(f.session, f.QueueName.Text);
      IQueueBrowser qb = f.session.CreateBrowser((IQueue)f.destination);
      System.Collections.IEnumerator me = qb.GetEnumerator();
      while (me.MoveNext())
      {
       IMessage msg = (IMessage)me.Current;
       if(msg == null)
        continue;
       ITextMessage message = (ITextMessage)msg;
       rec = message.Text;
       if (rec.Contains(idtext))
        {
         
         f.MessageRecieved.Text += rec;
         msg.Acknowledge();
        }  
      }
      qb.Close();           
     }
    Thread.Sleep(1000);
   }



public void ConsumerReceiveThread()
  {
   String rec;

   String idtext = "id=\"";
   idtext = idtext + f.ID.Text + "\"";
   StayAlive = true;
   while (StayAlive)
   {
    System.TimeSpan ts = new System.TimeSpan(0,0,1);
    IMessage msg = f.consumer.Receive(ts);
    if(msg != null)
     {
      ITextMessage message = (ITextMessage)msg;
      rec = message.Text;
      if (rec.Contains(idtext))
       {
         f.MessageRecieved.Text += rec;
        msg.Acknowledge();
       }
     }
    Thread.Sleep(1000);
   }
  }
-- 
View this message in context: http://activemq.2283324.n4.nabble.com/Newbie-questions-on-NMS-or-CMS-ActiveMQ-consumers-puzzle-tp2403331p2403331.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Newbie questions on NMS or CMS ActiveMQ consumers puzzle

Posted by James Green <ja...@gmail.com>.
AIUI unless your client is operating in an "auto-ack" mode, the client
must "ack" each message it consumes to remove it from the queue.

This means that if you do not acknowledge the message, it will remain
for the next client. Pretty simple and prevents a client reading a
message then immediately crashing, losing the message forever.

Topics are similar however the message is only acknowledges to prevent
the same identified consumer receiving the same message more than
once.

I'm still joining the party on message queues so someone chip in if
I'm wrong... :-)

James

On 1 September 2010 17:34, ChicagoBob123 <bo...@bobfx.com> wrote:
>
> Thanks for the feedback!
> I read this statement a couple of times from the link you sent and wondered
> if
> someone could elaborate further.
>
> "If a consumer receives a message and does not acknowledge it before closing
> "
> Closing what exactly? Closing the queue? The Session?
>
> The link also mentioned load balancing across consumers which seems exatcly
> like what I am experiencing, but what does this flag
> AcknowledgementMode.IndividualAcknowledge do for you then?
>
> Sorry I am so new and trying to get up to speed ASAP.
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Newbie-questions-on-NMS-or-CMS-ActiveMQ-consumers-puzzle-tp2403331p2403538.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: Newbie questions on NMS or CMS ActiveMQ consumers puzzle

Posted by ChicagoBob123 <bo...@bobfx.com>.
Thanks for the feedback! 
I read this statement a couple of times from the link you sent and wondered
if 
someone could elaborate further. 

"If a consumer receives a message and does not acknowledge it before closing
" 
Closing what exactly? Closing the queue? The Session? 

The link also mentioned load balancing across consumers which seems exatcly
like what I am experiencing, but what does this flag
AcknowledgementMode.IndividualAcknowledge do for you then? 

Sorry I am so new and trying to get up to speed ASAP.
-- 
View this message in context: http://activemq.2283324.n4.nabble.com/Newbie-questions-on-NMS-or-CMS-ActiveMQ-consumers-puzzle-tp2403331p2403538.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Newbie questions on NMS or CMS ActiveMQ consumers puzzle

Posted by Timothy Bish <ta...@gmail.com>.
On Wed, 2010-09-01 at 07:51 -0700, ChicagoBob123 wrote:
> The problem I have 2 consumers of one queue and each consumer only seems to
> get 1/2 the messages. 
> 
> Here is some my consumer code. 
> The concept is easy I browse the queue and remove any messages that belonged
> to my application
> based on an id contained in the message. Then I acknowledge which should
> dequeue it (but it did not)
> 
> Later I tried the ConsumerReceiveThread() which just does a Receive which
> dequeues the items but only gets 1/2 the items sent.. 
> 
> In my initialization  I started comsuming the queue with using
> AcknowledgementMode.IndividualAcknowledge session flag which I had hoped
> would allow me 
> to remove only the items I was interested in. 
> Why would I get only 1/2 the items? Also would I only get 1/3 the items if I
> had 3 consumers? (I guess I should just try that) 
> 
> 
> HELP ideas? thanks

You might want to have a look at:
http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html

Regards

-- 
Tim Bish

Open Source Integration: http://fusesource.com
ActiveMQ in Action: http://www.manning.com/snyder/

Follow me on Twitter: http://twitter.com/tabish121
My Blog: http://timbish.blogspot.com/