You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by etiennet123 <et...@lambdasales.com> on 2016/07/20 18:25:10 UTC

Problem Consuming from queue with more than one consumer

Hi 

I am trying to create a app to consume messages in a round robin fashion.
However only one will work at a time.

Preferably I would like to create two seperate apps to consume from the same
queue.

Here is the code I am trying to use:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;

namespace Consume2Sessions
{
    public partial class Consume2Sessions : Form
    {
        Int32 ConsumeCount1 = 0;
        Int32 ConsumeCount2 = 0;

        Thread ConsumerThread1;
        Thread ConsumerThread2;

        Boolean StopThread1 = false;
        Boolean StopThread2 = false;

        Uri connecturi = null;
        IConnectionFactory factory = null;
        IConnection connection;

        public Consume2Sessions()
        {
            InitializeComponent();

            connecturi = new Uri("activemq:tcp://localhost:61616");
            //            Uri connecturi = new
Uri("activemq:tcp://localhost:61617");


            factory = new NMSConnectionFactory(connecturi);

            connection = factory.CreateConnection();

             connection.Start();
            
        }

        private void btnStartConsumer1_Click(object sender, EventArgs e)
        {
            ConsumerThread1 = new Thread(ConsumeThread1);
            ConsumerThread1.Start();
        }

        private void btnStopConsumer1_Click(object sender, EventArgs e)
        {
            StopThread1 = true;
        }

        private void btnStartConsumer2_Click(object sender, EventArgs e)
        {
            ConsumerThread2 = new Thread(ConsumeThread2);
            ConsumerThread2.Start();
        }

        private void btnStopConsumer2_Click(object sender, EventArgs e)
        {
            StopThread2 = true;
        }

        void ConsumeThread1()
        {


            //using (IConnection connection = factory.CreateConnection())
            //{
                

                using (ISession session = connection.CreateSession())
                {

                    IDestination destination =
SessionUtil.GetDestination(session,
"queue://BIG.MONKEY?consumer.prefetchSize=1");

                    using (IMessageConsumer consumer =
session.CreateConsumer(destination))
                    {
                        
                        // Start the connection so that messages will be
processed.
                   //     connection.Start();

                        while (!StopThread1)
                        {
                            try
                            {

                                ITextMessage message = consumer.Receive() as
ITextMessage;
                                // ProcessedCount++; 
                                ConsumeCount1++;
                                // Thread.Sleep(1000);
                            }
                            catch (Exception ss)
                            { }
                        }

                    }


                }
            //}
            StopThread1 = false;
        }

        void ConsumeThread2()
        {

           // using (IConnection connection = factory.CreateConnection())
            using (ISession session = connection.CreateSession())
            {

                IDestination destination =
SessionUtil.GetDestination(session,
"queue://BIG.MONKEY?consumer.prefetchSize=1");

                using (IMessageConsumer consumer =
session.CreateConsumer(destination))
                {
                    // Start the connection so that messages will be
processed.
                    connection.Start();

                    while (!StopThread2)
                    {
                        try
                        {

                            ITextMessage message = consumer.Receive() as
ITextMessage;
                            // ProcessedCount++; 
                            ConsumeCount2++;
                            // Thread.Sleep(1000);
                        }
                        catch (Exception dd)
                        {


                        }
                    }

                }
            }





            StopThread2 = false;
        }

        private void timer1_Tick(object sender, EventArgs e)
        {
            lblStatus1.Text = ConsumeCount1.ToString();
            lblStatus2.Text = ConsumeCount2.ToString();
        }
        
    }
}




--
View this message in context: http://activemq.2283324.n4.nabble.com/Problem-Consuming-from-queue-with-more-than-one-consumer-tp4714196.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Problem Consuming from queue with more than one consumer

Posted by Tim Bain <tb...@alumni.duke.edu>.
OK, so you're looking for competing consumers, which queues give you by
default.  The caveat to that statement is that if you have a large prefetch
buffer (e.g. the default value of 1000) and one consumer connects before
the other one and there are messages waiting on the queue, up to the
prefetch size will be immediately dispatched to that first consumer, which
might leave none for the next consumer (which means they don't actually
compete in that scenario).

I see that you've tried to set the prefetch buffer size to 1, which is the
right approach to dealing with the problem, but have you confirmed that
it's actually having the desired effect?  You should be able to look at the
consumers via the web console or JConsole and see what prefetch size is
being used; if it's not the value you're trying to set it to, you know
you've got something not working right there.

Tim

On Wed, Jul 20, 2016 at 10:38 PM, etiennet123 <et...@lambdasales.com>
wrote:

> Hi Tim
>
> Thanks for the response.
>
> Perhaps round robin is not the best way to describe it.
>
> What I am trying to do is this. I have a producer that sends XML messages
> to
> the queue.
> This happens quite fast. I then have to consume theses messages, process
> these messages
> and send the result to a database. This takes a while like perhaps half a
> second.
> I want to have multiple consumers taking messages from the queue and
> processing them.
>
> In other words when it is finished processing the message it must fetch the
> next one.
>
> What I am experiencing at the moment is that both consumers start up fine.
> If I look on the
> ActiveMQ queues page it shows two consumers as well.
>
> The problem is that which ever one I start first consumes and the second
> one
> blocks until
> I stop this first consumer.
>
> Etienne
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Problem-Consuming-from-queue-with-more-than-one-consumer-tp4714196p4714212.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: Problem Consuming from queue with more than one consumer

Posted by etiennet123 <et...@lambdasales.com>.
Hi Tim

Thanks for the response.

Perhaps round robin is not the best way to describe it.

What I am trying to do is this. I have a producer that sends XML messages to
the queue.
This happens quite fast. I then have to consume theses messages, process
these messages
and send the result to a database. This takes a while like perhaps half a
second.
I want to have multiple consumers taking messages from the queue and
processing them.

In other words when it is finished processing the message it must fetch the
next one.

What I am experiencing at the moment is that both consumers start up fine.
If I look on the 
ActiveMQ queues page it shows two consumers as well.

The problem is that which ever one I start first consumes and the second one
blocks until
I stop this first consumer.

Etienne





--
View this message in context: http://activemq.2283324.n4.nabble.com/Problem-Consuming-from-queue-with-more-than-one-consumer-tp4714196p4714212.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Problem Consuming from queue with more than one consumer

Posted by Tim Bain <tb...@alumni.duke.edu>.
What does "consume messages in a round robin fashion" mean to you?

What happens when only one works at a time?  What are the symptoms of the
failure when there are two consumers?

Tim

On Jul 20, 2016 3:28 PM, "etiennet123" <et...@lambdasales.com> wrote:

> Hi
>
> I am trying to create a app to consume messages in a round robin fashion.
> However only one will work at a time.
>
> Preferably I would like to create two seperate apps to consume from the
> same
> queue.
>
> Here is the code I am trying to use:
>
> using System;
> using System.Collections.Generic;
> using System.ComponentModel;
> using System.Data;
> using System.Drawing;
> using System.Linq;
> using System.Text;
> using System.Windows.Forms;
> using System.Threading;
> using Apache.NMS;
> using Apache.NMS.Util;
>
> namespace Consume2Sessions
> {
>     public partial class Consume2Sessions : Form
>     {
>         Int32 ConsumeCount1 = 0;
>         Int32 ConsumeCount2 = 0;
>
>         Thread ConsumerThread1;
>         Thread ConsumerThread2;
>
>         Boolean StopThread1 = false;
>         Boolean StopThread2 = false;
>
>         Uri connecturi = null;
>         IConnectionFactory factory = null;
>         IConnection connection;
>
>         public Consume2Sessions()
>         {
>             InitializeComponent();
>
>             connecturi = new Uri("activemq:tcp://localhost:61616");
>             //            Uri connecturi = new
> Uri("activemq:tcp://localhost:61617");
>
>
>             factory = new NMSConnectionFactory(connecturi);
>
>             connection = factory.CreateConnection();
>
>              connection.Start();
>
>         }
>
>         private void btnStartConsumer1_Click(object sender, EventArgs e)
>         {
>             ConsumerThread1 = new Thread(ConsumeThread1);
>             ConsumerThread1.Start();
>         }
>
>         private void btnStopConsumer1_Click(object sender, EventArgs e)
>         {
>             StopThread1 = true;
>         }
>
>         private void btnStartConsumer2_Click(object sender, EventArgs e)
>         {
>             ConsumerThread2 = new Thread(ConsumeThread2);
>             ConsumerThread2.Start();
>         }
>
>         private void btnStopConsumer2_Click(object sender, EventArgs e)
>         {
>             StopThread2 = true;
>         }
>
>         void ConsumeThread1()
>         {
>
>
>             //using (IConnection connection = factory.CreateConnection())
>             //{
>
>
>                 using (ISession session = connection.CreateSession())
>                 {
>
>                     IDestination destination =
> SessionUtil.GetDestination(session,
> "queue://BIG.MONKEY?consumer.prefetchSize=1");
>
>                     using (IMessageConsumer consumer =
> session.CreateConsumer(destination))
>                     {
>
>                         // Start the connection so that messages will be
> processed.
>                    //     connection.Start();
>
>                         while (!StopThread1)
>                         {
>                             try
>                             {
>
>                                 ITextMessage message = consumer.Receive()
> as
> ITextMessage;
>                                 // ProcessedCount++;
>                                 ConsumeCount1++;
>                                 // Thread.Sleep(1000);
>                             }
>                             catch (Exception ss)
>                             { }
>                         }
>
>                     }
>
>
>                 }
>             //}
>             StopThread1 = false;
>         }
>
>         void ConsumeThread2()
>         {
>
>            // using (IConnection connection = factory.CreateConnection())
>             using (ISession session = connection.CreateSession())
>             {
>
>                 IDestination destination =
> SessionUtil.GetDestination(session,
> "queue://BIG.MONKEY?consumer.prefetchSize=1");
>
>                 using (IMessageConsumer consumer =
> session.CreateConsumer(destination))
>                 {
>                     // Start the connection so that messages will be
> processed.
>                     connection.Start();
>
>                     while (!StopThread2)
>                     {
>                         try
>                         {
>
>                             ITextMessage message = consumer.Receive() as
> ITextMessage;
>                             // ProcessedCount++;
>                             ConsumeCount2++;
>                             // Thread.Sleep(1000);
>                         }
>                         catch (Exception dd)
>                         {
>
>
>                         }
>                     }
>
>                 }
>             }
>
>
>
>
>
>             StopThread2 = false;
>         }
>
>         private void timer1_Tick(object sender, EventArgs e)
>         {
>             lblStatus1.Text = ConsumeCount1.ToString();
>             lblStatus2.Text = ConsumeCount2.ToString();
>         }
>
>     }
> }
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Problem-Consuming-from-queue-with-more-than-one-consumer-tp4714196.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>