You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Brad Willard (JIRA)" <ji...@apache.org> on 2010/07/14 21:41:52 UTC

[jira] Commented: (AMQ-2745) Deadlock or Performance Bottleneck when reading messages with Correlation

    [ https://issues.apache.org/activemq/browse/AMQ-2745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=60662#action_60662 ] 

Brad Willard commented on AMQ-2745:
-----------------------------------

I have created two classes to show this problem:  test.PutMessages and test.ReadMessages that show the problem.  Steps to reproduce

1) Start a 5.3.0 broker

2) Start two messages reader for two different correlations on the same queue
java -cp <yourclasspath> test.ReadMessages tcp://localhost:61616 TestQueue ForReader1
java -cp <yourclasspath> test.ReadMessages tcp://localhost:61616 TestQueue ForReader2

3) Start two messages producers for the two different correlations
java -cp <yourclasspath> tesPutMessages tcp://localhost:61616 TestQueue ForReader1
java -cp <yourclasspath> test.PutMessages tcp://localhost:61616 TestQueue ForReader2

4) Looking at the output of the readers you started on step two, you will both read the messages for the correlation with the time on the broker about 1ms.

5) Stop the reader ForReader1, you will notice that the program ForReader2 is uneffected.  Messages with corrlations "ForReader1" backup on the queue, and the program ForReader2 continues reading normally.

6) stop all classes, and stop 5.3.0 broker.  Start a 5.3.2  broker.

7) Repeat steps 1-5.  Except you'll notice that once you stop ForReader1, ForReader2 is effected which is shouldn't be.  ForReader2 will basically stop being able to read messages until you start ForReader1 again.  ForReader2 will occasionally get messages, but incredibly slowly and performance is ruined.

package test;

import java.net.*;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 *
 * @author bwillard
 */
public class PutMessages extends Thread {

    final private MessageProducer producer;
    final private String correlationID;
    final private Session session;

    public PutMessages(URI uri, String queueName, String correlationID) throws Exception {

        this.correlationID = correlationID;

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection connection = factory.createConnection();
        connection.start();

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(queueName);

        producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    }

    public void run() {

        ObjectMessage message;
        String text;
        long counter = 0;

        while (true) {

            try {

                this.sleep(5);

                counter++;

                message = session.createObjectMessage();
                message.setJMSCorrelationID(correlationID);
                text = "Message " + counter + " for consumer " + correlationID;
                message.setObject(text);

                producer.send(message);

            } catch (Exception exc) {
                System.err.println("Error sending message");
                exc.printStackTrace(System.err);
            }

        }
    }

    public static void main(String[] args) {

        try {

            URI uri = URI.create(args[0]);
            String queueName = args[1];
            String correlationID = args[2];

            new PutMessages(uri, queueName, correlationID).start();

        } catch (Exception exc) {
            exc.printStackTrace();
        }

    }
}







package test;

import java.net.*;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 *
 * @author bwillard
 */
public class ReadMessages implements MessageListener {

    final private MessageConsumer consumer;
    final private String correlationID;
    final private Session session;

    public ReadMessages(URI uri, String queueName, String correlationID) throws Exception {

        this.correlationID = correlationID;

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection connection = factory.createConnection();
        connection.start();

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(queueName);

        consumer = session.createConsumer(queue, "JMSCorrelationID='" + correlationID + "'");
        consumer.setMessageListener(this);
    }

    public void onMessage(Message msg) {
        long inTime, outTime, brokerTime;

        try {

            if (msg instanceof ObjectMessage) {
                ObjectMessage txt = (ObjectMessage) msg;
                inTime = txt.getLongProperty("JMSActiveMQBrokerInTime");
                outTime = txt.getLongProperty("JMSActiveMQBrokerOutTime");

                brokerTime = outTime - inTime;

                System.out.println("Message waited " + brokerTime + "ms : " + txt.getObject().toString());
            }

        } catch (Exception exc) {
            System.err.println("Error reading message");
            exc.printStackTrace();
        }
    }

    public static void main(String[] args) {

        try {

            URI uri = URI.create(args[0]);
            String queueName = args[1];
            String correlationID = args[2];

            new ReadMessages(uri, queueName, correlationID);

        } catch (Exception exc) {
            exc.printStackTrace();
        }

    }
}






> Deadlock or Performance Bottleneck when reading messages with Correlation
> -------------------------------------------------------------------------
>
>                 Key: AMQ-2745
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2745
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.1
>         Environment: Java 64-bit, Windows 2008 Server
>            Reporter: Brad Willard
>            Priority: Minor
>             Fix For: 5.4.1
>
>
> We have a situation where we are posting messages to a queue with two different correlation ids specifically intended to reach two different clients who subscribe with message selectors for the appropriate correlation.  The clients are reading with message listeners.  When one client stops reading the expected behavior, and the behavior we saw on 5.3.0, is that the messages with the correlation for the stopped client will backup on the queue and will not effect the performance of the second client who is still reading the messages with the other correlation.  With our memory config messages can backup into the hundreds of thousands before noticing any performance impact on the active client.
> However this is not the case in 5.3.1.  With 5.3.1 once enough messages backup for the stopped client, suddenly the active client's performance drops drastically 20 ms reads to 30,000ms reads.  We will see this within a few hundred messages.  I believe there is some kind of deadlock, or buffering bottleneck that was introduced on the client side.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.