You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by hackingbear <ha...@gmail.com> on 2009/02/15 01:10:13 UTC

Prefetch=0 how to?

Hi,

I have two server components in my system. A gateway server and multiple
application server.

The gateway server communicates with external system and does the following:
1. receive tokens from the external system and place them in a queue (the
token queue.)
2. receive business requests from application servers and send them to the
external system

My application servers essentially do the following:

1. receive some user messages from somewhere
2. get a token from the token queue (by calling consumer.receive())
3. create and send a business request message from the token (and other
data)

The problem is that the tokens are expensive, costing money literally, and
so there are maybe only a few available at a time.

Now, for example, the gateway sends out two (and only two) tokens A and B;
there are two application servers running, each having a consumer on the
token queue. Server 2 is idle while Server 1 receives two user messages.

- Server 1 receives user message 1, and so try to get a token, get token A
and send out the business request. Everything is fine so far.
- Server 1 receives user message 2, and so try to get a token, get token B,
but get stuck.

What happen? Token B gets pre-fetched to Server 2's consumer even though
that server never actually need it. I have set the prefetch to 1.

Following http://activemq.apache.org/what-is-the-prefetch-limit-for.html, I
then attempt to create the consumer with prefetch=0

  queue = new ActiveMQQueue("TOKEN.QUEUE?consumer.prefetchSize=0");
  consumer = session.createConsumer(queue);

However, the effect is identical to prefetch=1: token B still get
pre-fetched to Server 2.

now, my solution is to write an async consumer which will, upon receiving a
token, check if there is any local request pending, if not, send it back to
the queue. the downside is, of course, the consumer would loop and resending
the tokens forever even if the servers are idle.

Any better solution?

Thanks

-- 
View this message in context: http://www.nabble.com/Prefetch%3D0-how-to--tp22018602p22018602.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Prefetch=0 how to?

Posted by Dejan Bosanac <de...@nighttale.net>.
I just ran the test a couple dozen times (against 5.2) and it always gives
me green with the following messages

receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0
receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0
sender: sent 100-A
sender: sent 100-B
receiver1: got 100-A
sender: got back: 100-A
receiver1: got 100-B
sender: got back: 100-B

Can somebody else give it a try?

Cheers
--
Dejan Bosanac

Open Source Integration - http://fusesource.com/
ActiveMQ in Action - http://www.manning.com/snyder/
Blog - http://www.nighttale.net


On Sat, Feb 21, 2009 at 9:03 PM, hackingbear <ha...@gmail.com> wrote:

>
> The latest codes in the SVN still seems to receive from two receivers.
> Maybe
> I miss something?
>
> Anyway, I have written a test case which depends on nothing but AMQ. I was
> using 5.1 and this bug shows up in 5.1; I tried running it in 5.2 for
> several times, and it always works. [What prevents me from upgrading to 5.2
> is a strange performance hit I have not solved yet.]
>
> (Note: in 5.1, this can be reproduced quite reliably but not 100%; so you
> may to run a few times to reproduce it, also make sure to purge the queue
> using jmx.)
>
> The results:
>
> // AMQ 5.2
> receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0
> receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0
> sender: sent 100-A
> sender: sent 100-B
> receiver1: got 100-A
> sender: got back: 100-A
> receiver1: got 100-B
> sender: got back: 100-B
>
> // AMQ 5.1
> receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0
> receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0
> sender: sent 100-A
> sender: sent 100-B
> receiver1: got 100-A
> sender: got back: 100-A
> sender: got back: NOTHING
>
>
> The codes:
>
> import java.util.Properties;
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.ObjectMessage;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.naming.Context;
> import javax.naming.InitialContext;
>
> import junit.framework.TestCase;
>
> import org.apache.activemq.command.ActiveMQQueue;
>
> public class TestPrefetching extends TestCase {
>    private class Receiver {
>        private final String m_receiverID;
>        private Session m_session;
>        private MessageConsumer m_receiver;
>        private Destination m_queue;
>
>        public Receiver(String receiverID) {
>            try {
>                m_receiverID = receiverID;
>                m_session = s_connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>                m_queue = new ActiveMQQueue(qp);
>                print(m_receiverID, "Start receiver to " + qp);
>                m_receiver = m_session.createConsumer(m_queue);
>            } catch (JMSException e) {
>                throw new RuntimeException(e);
>            }
>        }
>
>        public synchronized String getMsg(long waitFor) {
>            try {
>                ObjectMessage objMsg;
>                objMsg = waitFor == 0 ? (ObjectMessage) m_receiver.receive()
>                        : (ObjectMessage) m_receiver.receive(waitFor);
>                if (objMsg != null) {
>                    String msg = (String) objMsg.getObject();
>                    print(m_receiverID, "got " + msg);
>                    return msg;
>                }
>            } catch (JMSException e) {
>                throw new RuntimeException(e);
>            }
>            return "NOTHING";
>        }
>    }
>
>    static final String qp = "TEST_ONLY?consumer.prefetchSize=0";
>
>    static Connection s_connection;
>    static {
>        try {
>            System.setProperty("org.apache.activemq.UseDedicatedTaskRunner",
> "false");
>            Properties props = new Properties();
>            props.put(Context.PROVIDER_URL, "tcp://localhost:61616");
>            InitialContext ctx = new InitialContext(props);
>            ConnectionFactory cf = (ConnectionFactory)
> ctx.lookup("ConnectionFactory");
>            s_connection = cf.createConnection();
>            s_connection.start();
>        } catch (Exception e) {
>            e.printStackTrace();
>            throw new RuntimeException(e);
>        }
>    }
>
>    private static void print(String who, String msg) {
>        System.out.println(who + ": " + msg);
>        System.out.flush();
>    }
>
>    private Receiver m_receiver1;
>    private Receiver m_receiver2;
>
>    @Override
>    protected void setUp() throws Exception {
>        super.setUp();
>        m_receiver1 = new Receiver("receiver1");
>        m_receiver2 = new Receiver("receiver2");
>    }
>
>    public void testReplies() throws Exception {
>        Session session = s_connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>        Queue queue = new ActiveMQQueue(qp);
>        MessageProducer sender = session.createProducer(queue);
>
>        String payload = "100-A";
>        ObjectMessage msg = session.createObjectMessage();
>        msg.setObject(payload);
>        sender.send(msg);
>        print("sender", "sent " + payload);
>
>        payload = "100-B";
>        msg = session.createObjectMessage();
>        msg.setObject(payload);
>        sender.send(msg);
>        print("sender", "sent " + payload);
>
>        Object waiter = "WAITER";
>        synchronized (waiter) {
>            waiter.wait(2000); // wait
>        }
>        String reply = m_receiver1.getMsg(0);
>        print("sender", "got back: " + reply);
>        synchronized (waiter) {
>            waiter.wait(2000); // wait
>        }
>        reply = m_receiver1.getMsg(5000);
>        print("sender", "got back: " + reply);
>    }
> }
>
>
> --
> View this message in context:
> http://www.nabble.com/Prefetch%3D0-how-to--tp22018602p22139980.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: Prefetch=0 how to?

Posted by hackingbear <ha...@gmail.com>.
The latest codes in the SVN still seems to receive from two receivers. Maybe
I miss something?

Anyway, I have written a test case which depends on nothing but AMQ. I was
using 5.1 and this bug shows up in 5.1; I tried running it in 5.2 for
several times, and it always works. [What prevents me from upgrading to 5.2
is a strange performance hit I have not solved yet.]

(Note: in 5.1, this can be reproduced quite reliably but not 100%; so you
may to run a few times to reproduce it, also make sure to purge the queue
using jmx.)

The results:

// AMQ 5.2
receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0
receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0
sender: sent 100-A
sender: sent 100-B
receiver1: got 100-A
sender: got back: 100-A
receiver1: got 100-B
sender: got back: 100-B

// AMQ 5.1
receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0
receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0
sender: sent 100-A
sender: sent 100-B
receiver1: got 100-A
sender: got back: 100-A
sender: got back: NOTHING


The codes:

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

import junit.framework.TestCase;

import org.apache.activemq.command.ActiveMQQueue;

public class TestPrefetching extends TestCase {
    private class Receiver {
        private final String m_receiverID;
        private Session m_session;
        private MessageConsumer m_receiver;
        private Destination m_queue;

        public Receiver(String receiverID) {
            try {
                m_receiverID = receiverID;
                m_session = s_connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                m_queue = new ActiveMQQueue(qp);
                print(m_receiverID, "Start receiver to " + qp);
                m_receiver = m_session.createConsumer(m_queue);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public synchronized String getMsg(long waitFor) {
            try {
                ObjectMessage objMsg;
                objMsg = waitFor == 0 ? (ObjectMessage) m_receiver.receive()
                        : (ObjectMessage) m_receiver.receive(waitFor);
                if (objMsg != null) {
                    String msg = (String) objMsg.getObject();
                    print(m_receiverID, "got " + msg);
                    return msg;
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
            return "NOTHING";
        }
    }

    static final String qp = "TEST_ONLY?consumer.prefetchSize=0";

    static Connection s_connection;
    static {
        try {
            System.setProperty("org.apache.activemq.UseDedicatedTaskRunner",
"false");
            Properties props = new Properties();
            props.put(Context.PROVIDER_URL, "tcp://localhost:61616");
            InitialContext ctx = new InitialContext(props);
            ConnectionFactory cf = (ConnectionFactory)
ctx.lookup("ConnectionFactory");
            s_connection = cf.createConnection();
            s_connection.start();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private static void print(String who, String msg) {
        System.out.println(who + ": " + msg);
        System.out.flush();
    }

    private Receiver m_receiver1;
    private Receiver m_receiver2;

    @Override
    protected void setUp() throws Exception {
        super.setUp();
        m_receiver1 = new Receiver("receiver1");
        m_receiver2 = new Receiver("receiver2");
    }

    public void testReplies() throws Exception {
        Session session = s_connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        Queue queue = new ActiveMQQueue(qp);
        MessageProducer sender = session.createProducer(queue);

        String payload = "100-A";
        ObjectMessage msg = session.createObjectMessage();
        msg.setObject(payload);
        sender.send(msg);
        print("sender", "sent " + payload);

        payload = "100-B";
        msg = session.createObjectMessage();
        msg.setObject(payload);
        sender.send(msg);
        print("sender", "sent " + payload);

        Object waiter = "WAITER";
        synchronized (waiter) {
            waiter.wait(2000); // wait
        }
        String reply = m_receiver1.getMsg(0);
        print("sender", "got back: " + reply);
        synchronized (waiter) {
            waiter.wait(2000); // wait
        }
        reply = m_receiver1.getMsg(5000);
        print("sender", "got back: " + reply);
    }
}


-- 
View this message in context: http://www.nabble.com/Prefetch%3D0-how-to--tp22018602p22139980.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Prefetch=0 how to?

Posted by hackingbear <ha...@gmail.com>.
Thanks, 

I haven't get arround to try this but my test differs from two aspects:
1) I use dedicate session for each consumer
2) I receive from only one consumer twice. There is no problem receiving
from two consumers. But when receiving from one consumer twice, the second
time blocks forever. So the code is more like:

    public void testTwoConsumers() throws Exception {
        Session sessionA = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer1 = sessionA.createConsumer(queue);
        Session sessionB = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer2 = sessionB.createConsumer(queue);

        Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

        MessageProducer producer = session.createProducer(queue);
        producer.send(session.createTextMessage("Msg1"));
        producer.send(session.createTextMessage("Msg2"));

        // now lets receive it
        TextMessage answer = (TextMessage)consumer1.receive();
        assertEquals("Should have received a message!", answer.getText(),
"Msg1");
        answer = (TextMessage)consumer1.receive(); // blocked here
        assertEquals("Should have received a message!", answer.getText(),
"Msg2");

//        answer = (TextMessage)consumer2.receiveNoWait();
//        assertNull("Should have not received a message!", answer);
    }


Dejan Bosanac wrote:
> 
> Hi,
> 
> just added a test that tries to reproduce your problem (testTwoConsumers)
> 
> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java?view=markup
> 
> and everything seems to be working fine. Can you try modifying it to
> reproroduce the problem? BTW. What version are you using?
> 
> 
> Cheers
> --
> Dejan Bosanac
> 
> Open Source Integration - http://fusesource.com/
> ActiveMQ in Action - http://www.manning.com/snyder/
> Blog - http://www.nighttale.net
> 
> 
> On Sun, Feb 15, 2009 at 1:12 AM, hackingbear <ha...@gmail.com>
> wrote:
> 
>>
>> sorry, a small correction:
>>
>>
>> hackingbear wrote:
>> >
>> > - Server 1 receives user message 2, and so try to get a token, get
>> token
>> > B, but get stuck.
>> >
>>
>> I meant:
>>
>> - Server 1 receives user message 2, and so try to get a token, but get
>> stuck. token B never arrives
>>
>> --
>> View this message in context:
>> http://www.nabble.com/Prefetch%3D0-how-to--tp22018602p22018620.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 
> -----
> Dejan Bosanac
> 
> Open Source Integration - http://fusesource.com/
> ActiveMQ in Action - http://www.manning.com/snyder/
> Blog - http://www.nighttale.net
> 

-- 
View this message in context: http://www.nabble.com/Prefetch%3D0-how-to--tp22018602p22072916.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Prefetch=0 how to?

Posted by Dejan Bosanac <de...@nighttale.net>.
Hi,

just added a test that tries to reproduce your problem (testTwoConsumers)

http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java?view=markup

and everything seems to be working fine. Can you try modifying it to
reproroduce the problem? BTW. What version are you using?


Cheers
--
Dejan Bosanac

Open Source Integration - http://fusesource.com/
ActiveMQ in Action - http://www.manning.com/snyder/
Blog - http://www.nighttale.net


On Sun, Feb 15, 2009 at 1:12 AM, hackingbear <ha...@gmail.com> wrote:

>
> sorry, a small correction:
>
>
> hackingbear wrote:
> >
> > - Server 1 receives user message 2, and so try to get a token, get token
> > B, but get stuck.
> >
>
> I meant:
>
> - Server 1 receives user message 2, and so try to get a token, but get
> stuck. token B never arrives
>
> --
> View this message in context:
> http://www.nabble.com/Prefetch%3D0-how-to--tp22018602p22018620.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>

Re: Prefetch=0 how to?

Posted by hackingbear <ha...@gmail.com>.
sorry, a small correction:


hackingbear wrote:
> 
> - Server 1 receives user message 2, and so try to get a token, get token
> B, but get stuck.
> 

I meant:

- Server 1 receives user message 2, and so try to get a token, but get
stuck. token B never arrives

-- 
View this message in context: http://www.nabble.com/Prefetch%3D0-how-to--tp22018602p22018620.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.