You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by sreeramanathan <sr...@gmail.com> on 2012/11/04 12:48:41 UTC

Message consumed advisory does not send any message on successful consumption of message

I'm actually looking for an advisory or any other alternative support from
ActiveMQ, so that i could be notified when the MessageListener associated
with a Consumer finishes processing the message.

The MessageConsumed advisory does not send any message to the Listener that
has been setup (The advisory has been switched on in the PolicyEntry).

Please find the code snippet below:

public class SampleListener implements MessageListener {
    
    private Session session;

    public SampleListener(Session session) {
        this.session = session; 
    }

    public void onMessage(Message message) {
        try {
             // do something
             session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class SampleConsumer {

    private boolean stopConnection = false;

    public static void main(String[] args) {
        new SampleConsumer().start();
    }

    public void start() {
            ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
                Destination destination = session.createTopic("test");
                MessageConsumer messageConsumer =
session.createConsumer(destination);
                messageConsumer.setMessageListener(new
SampleListener(session));

                try {
                    synchronized (this) {
                        while (!stopConnection) {
                            wait();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    session.close();
                    connection.close();
                }

            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    public void stop() {
        synchronized (this) {
            stopConnection = true;
            notify();
        }
    }
}


public class SampleProducer implements MessageListener {
  
    private boolean messageDelivered;

    @Test
    public void shouldTestSomething() throws JMSException,
InterruptedException {
        producerConnection = new
ActiveMQConnectionFactory("tcp://localhost:61616").createConnection();
        producerConnection.start();
        Session session = producerConnection.createSession(true,
SESSION_TRANSACTED);

        Destination destination = session.createTopic("test");
        MessageConsumer advisoryConsumer =
session.createConsumer(AdvisorySupport.getMessageConsumedAdvisoryTopic(destination));
        advisoryConsumer.setMessageListener(this);
        
        Message message = session.createTextMessage("Hi");
        Destination destination = session.createTopic("test");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        producer.send(message);
        session.commit();

        synchronized (this) {
            while (!messageDelivered) {
                wait();
            }
        }
        
        session.close();

        // some assertions
    }

    public void onMessage(Message message) {
        // do something

        synchronized (this) {
            messageDelivered = true;
            notify();
        }
    }
}



--
View this message in context: http://activemq.2283324.n4.nabble.com/Message-consumed-advisory-does-not-send-any-message-on-successful-consumption-of-message-tp4658741.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.