You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Rodrigo S de Castro <ro...@terra.com.br> on 2006/04/15 21:41:23 UTC

Redelivery problem with MessageListener and Session rollback

Hi,

1) I have a MessageListener implementation that rolls back the session if
something goes wrong. I expected the message to be redelivered to this
listener, but that does not happen. It is never redelivered.

This code does NOT work:

    private class MessageListenerTest implements MessageListener {
    	private Session session;
    	public int counter = 0;
    	
    	public MessageListenerTest(ActiveMQMessageConsumer session) {
    		this.session = session;
    	}
    	
		public void onMessage(Message message) {			
			try {
				System.out.println("Message: " + message);
				counter++;
				if (counter <= 2) {
					System.out.println("ROLLBACK");
					session.rollback();
				} else {
					System.out.println("COMMIT");
					message.acknowledge();
					session.commit();
				}					
			} catch(JMSException e) {
				System.err.println("Error when rolling back transaction");
			}
		}    	
    }


2) The only I managed to make it redeliver is to pass a reference to
MessageConsumer to the MessageListener implementation, cast it to
ActiveMQMessageConsumer and call its rollback method.

The code below DOES work:

    private class MessageListenerTest implements MessageListener {
    	private ActiveMQMessageConsumer consumer;
    	public int counter = 0;
    	
    	public MessageListenerTest(ActiveMQMessageConsumer consumer) {
    		this.consumer = consumer;
    	}
    	
		public void onMessage(Message message) {			
			try {
				System.out.println("Message: " + message);
				counter++;
				if (counter <= 2) {
					System.out.println("ROLLBACK");
					session.rollback();
				} else {
					System.out.println("COMMIT");
					message.acknowledge();
					session.commit();
				}					
			} catch(JMSException e) {
				System.err.println("Error when rolling back transaction");
			}
		}    	
    }

It this right? I think that session.rollback() should work as well.

Below a JUnit code that shows this problem:


import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import junit.framework.TestCase;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQMessage;

public class MessageListenerRedeliveryTest extends TestCase {

    private Connection connection;
    
    protected void setUp() throws Exception {
        connection = createConnection();
    }

    /**
     * @see junit.framework.TestCase#tearDown()
     */
    protected void tearDown() throws Exception {
        if (connection != null) {
            connection.close();
            connection = null;
        }
    }

    protected RedeliveryPolicy getRedeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(1000);
        redeliveryPolicy.setBackOffMultiplier((short) 5);
        redeliveryPolicy.setMaximumRedeliveries(10);
        redeliveryPolicy.setUseExponentialBackOff(true);
        return redeliveryPolicy;
    }
    
    protected Connection createConnection() throws Exception{
        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
        factory.setRedeliveryPolicy(getRedeliveryPolicy());
        return factory.createConnection();
    }
    
    private class ConsumerMessageListenerTest implements MessageListener {
    	private ActiveMQMessageConsumer consumer;
    	public int counter = 0;
    	
    	public ConsumerMessageListenerTest(ActiveMQMessageConsumer consumer) {
    		this.consumer = consumer;
    	}
    	
		public void onMessage(Message message) {			
			try {
				System.out.println("Message: " + message);
				counter++;
				if (counter <= 2) {
					System.out.println("ROLLBACK");
					consumer.rollback();
				} else {
					System.out.println("COMMIT");
					message.acknowledge();
					consumer.commit();
				}					
			} catch(JMSException e) {
				System.err.println("Error when rolling back transaction");
			}
		}    	
    }    
    
    private class SessionMessageListenerTest implements MessageListener {
    	private Session session;
    	public int counter = 0;
    	
    	public SessionMessageListenerTest(Session session) {
    		this.session = session;
    	}
    	
		public void onMessage(Message message) {			
			try {
				System.out.println("Message: " + message);
				counter++;
				if (counter <= 2) {
					System.out.println("ROLLBACK");
					session.rollback();
				} else {
					System.out.println("COMMIT");
					message.acknowledge();
					session.commit();
				}					
			} catch(JMSException e) {
				System.err.println("Error when rolling back transaction");
			}
		}    	
    }
    
    public void testQueueRollbackMessageListener() throws JMSException {
        connection.start();

        Session session = connection.createSession(true,
Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue("queue-"+getName());
        MessageProducer producer = createProducer(session, queue);
        Message message = createTextMessage(session);
        producer.send(message);
        session.commit();

        MessageConsumer consumer = session.createConsumer(queue);

        ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
        mc.setRedeliveryPolicy(getRedeliveryPolicy());

        SessionMessageListenerTest listener = new
SessionMessageListenerTest(session);
        consumer.setMessageListener(listener);
        
        // redelivery works with the code below
        /*
        ConsumerMessageListenerTest listener = new
ConsumerMessageListenerTest(session);
        consumer.setMessageListener(listener);
         */
        
        try {
        	Thread.sleep(7000);
        } catch(InterruptedException e) {
        	
        }        
        assertEquals(2, listener.counter);
        
        producer.send(createTextMessage(session));
        session.commit();
        
        try {
        	Thread.sleep(2000);
        } catch(InterruptedException e) {
        	// ignore
        }
        assertEquals(3, listener.counter);
        
        session.close();
    }    

    private TextMessage createTextMessage(Session session) throws
JMSException {
        return session.createTextMessage("Hello");
    }
   
    private MessageProducer createProducer(Session session, Destination
queue) throws JMSException {
         MessageProducer producer = session.createProducer(queue);
         producer.setDeliveryMode(getDeliveryMode());
         return producer;
    }
    
    protected int getDeliveryMode() {
        return DeliveryMode.PERSISTENT;
    }
}




--
View this message in context: http://www.nabble.com/Redelivery-problem-with-MessageListener-and-Session-rollback-t1455413.html#a3933378
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Redelivery problem with MessageListener and Session rollback

Posted by Rodrigo S de Castro <ro...@terra.com.br>.
Hi Hiram,

Thanks for your answer and sorry for taking so long to answer. Attached 
you will find two tests:

1 - Class MessageListenerRedeliveryTestActiveMQ3: tests redelivery in 
ActiveMQ 3 when message is rolled back in a MessageListener. It shows 
that redelivery policy (including initial timeout as well as back off 
mode) are not followed. The only setting that works in the policy is the 
maximum number of tries. Tested with 3.2.2

2 - Class MessageListenerRedeliveryTestActiveMQ4: tests redelivery in 
ActiveMQ 4 when message is rolled back in a MessageListener (same as 
above). Redelivery policy does not work at all in this version when the 
message is rolled back in MessageListener. It also tests the number of 
retries as well as if the initial timeout and back off mode. Tested with 
ActiveMQ 4 RC2.

I would like to contribute these classes to Apache ActiveMQ. I will be 
happy to hear comments and make any necessary adjustments in this code.

Cheers,

Rodrigo


Hiram Chirino wrote:
> Hi Rodrigo,
>
> Is that junit test a patch you want to contibute to Apache?  If so,
> I'll add it to our test suite asap.
>
> Regards,
> Hiram
>
> On 4/15/06, Rodrigo S de Castro <ro...@terra.com.br> wrote:
>   
>> Hi,
>>
>> 1) I have a MessageListener implementation that rolls back the session if
>> something goes wrong. I expected the message to be redelivered to this
>> listener, but that does not happen. It is never redelivered.
>>
>> This code does NOT work:
>>
>>     private class MessageListenerTest implements MessageListener {
>>         private Session session;
>>         public int counter = 0;
>>
>>         public MessageListenerTest(ActiveMQMessageConsumer session) {
>>                 this.session = session;
>>         }
>>
>>                 public void onMessage(Message message) {
>>                         try {
>>                                 System.out.println("Message: " + message);
>>                                 counter++;
>>                                 if (counter <= 2) {
>>                                         System.out.println("ROLLBACK");
>>                                         session.rollback();
>>                                 } else {
>>                                         System.out.println("COMMIT");
>>                                         message.acknowledge();
>>                                         session.commit();
>>                                 }
>>                         } catch(JMSException e) {
>>                                 System.err.println("Error when rolling back transaction");
>>                         }
>>                 }
>>     }
>>
>>
>> 2) The only I managed to make it redeliver is to pass a reference to
>> MessageConsumer to the MessageListener implementation, cast it to
>> ActiveMQMessageConsumer and call its rollback method.
>>
>> The code below DOES work:
>>
>>     private class MessageListenerTest implements MessageListener {
>>         private ActiveMQMessageConsumer consumer;
>>         public int counter = 0;
>>
>>         public MessageListenerTest(ActiveMQMessageConsumer consumer) {
>>                 this.consumer = consumer;
>>         }
>>
>>                 public void onMessage(Message message) {
>>                         try {
>>                                 System.out.println("Message: " + message);
>>                                 counter++;
>>                                 if (counter <= 2) {
>>                                         System.out.println("ROLLBACK");
>>                                         session.rollback();
>>                                 } else {
>>                                         System.out.println("COMMIT");
>>                                         message.acknowledge();
>>                                         session.commit();
>>                                 }
>>                         } catch(JMSException e) {
>>                                 System.err.println("Error when rolling back transaction");
>>                         }
>>                 }
>>     }
>>
>> It this right? I think that session.rollback() should work as well.
>>
>> Below a JUnit code that shows this problem:
>>     
[...]



Re: Redelivery problem with MessageListener and Session rollback

Posted by Hiram Chirino <hi...@hiramchirino.com>.
Hi Rodrigo,

Is that junit test a patch you want to contibute to Apache?  If so,
I'll add it to our test suite asap.

Regards,
Hiram

On 4/15/06, Rodrigo S de Castro <ro...@terra.com.br> wrote:
>
> Hi,
>
> 1) I have a MessageListener implementation that rolls back the session if
> something goes wrong. I expected the message to be redelivered to this
> listener, but that does not happen. It is never redelivered.
>
> This code does NOT work:
>
>     private class MessageListenerTest implements MessageListener {
>         private Session session;
>         public int counter = 0;
>
>         public MessageListenerTest(ActiveMQMessageConsumer session) {
>                 this.session = session;
>         }
>
>                 public void onMessage(Message message) {
>                         try {
>                                 System.out.println("Message: " + message);
>                                 counter++;
>                                 if (counter <= 2) {
>                                         System.out.println("ROLLBACK");
>                                         session.rollback();
>                                 } else {
>                                         System.out.println("COMMIT");
>                                         message.acknowledge();
>                                         session.commit();
>                                 }
>                         } catch(JMSException e) {
>                                 System.err.println("Error when rolling back transaction");
>                         }
>                 }
>     }
>
>
> 2) The only I managed to make it redeliver is to pass a reference to
> MessageConsumer to the MessageListener implementation, cast it to
> ActiveMQMessageConsumer and call its rollback method.
>
> The code below DOES work:
>
>     private class MessageListenerTest implements MessageListener {
>         private ActiveMQMessageConsumer consumer;
>         public int counter = 0;
>
>         public MessageListenerTest(ActiveMQMessageConsumer consumer) {
>                 this.consumer = consumer;
>         }
>
>                 public void onMessage(Message message) {
>                         try {
>                                 System.out.println("Message: " + message);
>                                 counter++;
>                                 if (counter <= 2) {
>                                         System.out.println("ROLLBACK");
>                                         session.rollback();
>                                 } else {
>                                         System.out.println("COMMIT");
>                                         message.acknowledge();
>                                         session.commit();
>                                 }
>                         } catch(JMSException e) {
>                                 System.err.println("Error when rolling back transaction");
>                         }
>                 }
>     }
>
> It this right? I think that session.rollback() should work as well.
>
> Below a JUnit code that shows this problem:
>
>
> import javax.jms.Connection;
> import javax.jms.DeliveryMode;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import junit.framework.TestCase;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.ActiveMQMessageConsumer;
> import org.apache.activemq.RedeliveryPolicy;
> import org.apache.activemq.command.ActiveMQMessage;
>
> public class MessageListenerRedeliveryTest extends TestCase {
>
>     private Connection connection;
>
>     protected void setUp() throws Exception {
>         connection = createConnection();
>     }
>
>     /**
>      * @see junit.framework.TestCase#tearDown()
>      */
>     protected void tearDown() throws Exception {
>         if (connection != null) {
>             connection.close();
>             connection = null;
>         }
>     }
>
>     protected RedeliveryPolicy getRedeliveryPolicy() {
>         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
>         redeliveryPolicy.setInitialRedeliveryDelay(1000);
>         redeliveryPolicy.setBackOffMultiplier((short) 5);
>         redeliveryPolicy.setMaximumRedeliveries(10);
>         redeliveryPolicy.setUseExponentialBackOff(true);
>         return redeliveryPolicy;
>     }
>
>     protected Connection createConnection() throws Exception{
>         ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
>         factory.setRedeliveryPolicy(getRedeliveryPolicy());
>         return factory.createConnection();
>     }
>
>     private class ConsumerMessageListenerTest implements MessageListener {
>         private ActiveMQMessageConsumer consumer;
>         public int counter = 0;
>
>         public ConsumerMessageListenerTest(ActiveMQMessageConsumer consumer) {
>                 this.consumer = consumer;
>         }
>
>                 public void onMessage(Message message) {
>                         try {
>                                 System.out.println("Message: " + message);
>                                 counter++;
>                                 if (counter <= 2) {
>                                         System.out.println("ROLLBACK");
>                                         consumer.rollback();
>                                 } else {
>                                         System.out.println("COMMIT");
>                                         message.acknowledge();
>                                         consumer.commit();
>                                 }
>                         } catch(JMSException e) {
>                                 System.err.println("Error when rolling back transaction");
>                         }
>                 }
>     }
>
>     private class SessionMessageListenerTest implements MessageListener {
>         private Session session;
>         public int counter = 0;
>
>         public SessionMessageListenerTest(Session session) {
>                 this.session = session;
>         }
>
>                 public void onMessage(Message message) {
>                         try {
>                                 System.out.println("Message: " + message);
>                                 counter++;
>                                 if (counter <= 2) {
>                                         System.out.println("ROLLBACK");
>                                         session.rollback();
>                                 } else {
>                                         System.out.println("COMMIT");
>                                         message.acknowledge();
>                                         session.commit();
>                                 }
>                         } catch(JMSException e) {
>                                 System.err.println("Error when rolling back transaction");
>                         }
>                 }
>     }
>
>     public void testQueueRollbackMessageListener() throws JMSException {
>         connection.start();
>
>         Session session = connection.createSession(true,
> Session.CLIENT_ACKNOWLEDGE);
>         Queue queue = session.createQueue("queue-"+getName());
>         MessageProducer producer = createProducer(session, queue);
>         Message message = createTextMessage(session);
>         producer.send(message);
>         session.commit();
>
>         MessageConsumer consumer = session.createConsumer(queue);
>
>         ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
>         mc.setRedeliveryPolicy(getRedeliveryPolicy());
>
>         SessionMessageListenerTest listener = new
> SessionMessageListenerTest(session);
>         consumer.setMessageListener(listener);
>
>         // redelivery works with the code below
>         /*
>         ConsumerMessageListenerTest listener = new
> ConsumerMessageListenerTest(session);
>         consumer.setMessageListener(listener);
>          */
>
>         try {
>                 Thread.sleep(7000);
>         } catch(InterruptedException e) {
>
>         }
>         assertEquals(2, listener.counter);
>
>         producer.send(createTextMessage(session));
>         session.commit();
>
>         try {
>                 Thread.sleep(2000);
>         } catch(InterruptedException e) {
>                 // ignore
>         }
>         assertEquals(3, listener.counter);
>
>         session.close();
>     }
>
>     private TextMessage createTextMessage(Session session) throws
> JMSException {
>         return session.createTextMessage("Hello");
>     }
>
>     private MessageProducer createProducer(Session session, Destination
> queue) throws JMSException {
>          MessageProducer producer = session.createProducer(queue);
>          producer.setDeliveryMode(getDeliveryMode());
>          return producer;
>     }
>
>     protected int getDeliveryMode() {
>         return DeliveryMode.PERSISTENT;
>     }
> }
>
>
>
>
> --
> View this message in context: http://www.nabble.com/Redelivery-problem-with-MessageListener-and-Session-rollback-t1455413.html#a3933378
> Sent from the ActiveMQ - User forum at Nabble.com.
>
>


--
Regards,
Hiram