You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Rodrigo S de Castro <ro...@terra.com.br> on 2006/04/15 21:41:23 UTC
Redelivery problem with MessageListener and Session rollback
Hi,
1) I have a MessageListener implementation that rolls back the session if
something goes wrong. I expected the message to be redelivered to this
listener, but that does not happen. It is never redelivered.
This code does NOT work:
private class MessageListenerTest implements MessageListener {
private Session session;
public int counter = 0;
public MessageListenerTest(ActiveMQMessageConsumer session) {
this.session = session;
}
public void onMessage(Message message) {
try {
System.out.println("Message: " + message);
counter++;
if (counter <= 2) {
System.out.println("ROLLBACK");
session.rollback();
} else {
System.out.println("COMMIT");
message.acknowledge();
session.commit();
}
} catch(JMSException e) {
System.err.println("Error when rolling back transaction");
}
}
}
2) The only I managed to make it redeliver is to pass a reference to
MessageConsumer to the MessageListener implementation, cast it to
ActiveMQMessageConsumer and call its rollback method.
The code below DOES work:
private class MessageListenerTest implements MessageListener {
private ActiveMQMessageConsumer consumer;
public int counter = 0;
public MessageListenerTest(ActiveMQMessageConsumer consumer) {
this.consumer = consumer;
}
public void onMessage(Message message) {
try {
System.out.println("Message: " + message);
counter++;
if (counter <= 2) {
System.out.println("ROLLBACK");
session.rollback();
} else {
System.out.println("COMMIT");
message.acknowledge();
session.commit();
}
} catch(JMSException e) {
System.err.println("Error when rolling back transaction");
}
}
}
It this right? I think that session.rollback() should work as well.
Below a JUnit code that shows this problem:
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQMessage;
public class MessageListenerRedeliveryTest extends TestCase {
private Connection connection;
protected void setUp() throws Exception {
connection = createConnection();
}
/**
* @see junit.framework.TestCase#tearDown()
*/
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
connection = null;
}
}
protected RedeliveryPolicy getRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(1000);
redeliveryPolicy.setBackOffMultiplier((short) 5);
redeliveryPolicy.setMaximumRedeliveries(10);
redeliveryPolicy.setUseExponentialBackOff(true);
return redeliveryPolicy;
}
protected Connection createConnection() throws Exception{
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
factory.setRedeliveryPolicy(getRedeliveryPolicy());
return factory.createConnection();
}
private class ConsumerMessageListenerTest implements MessageListener {
private ActiveMQMessageConsumer consumer;
public int counter = 0;
public ConsumerMessageListenerTest(ActiveMQMessageConsumer consumer) {
this.consumer = consumer;
}
public void onMessage(Message message) {
try {
System.out.println("Message: " + message);
counter++;
if (counter <= 2) {
System.out.println("ROLLBACK");
consumer.rollback();
} else {
System.out.println("COMMIT");
message.acknowledge();
consumer.commit();
}
} catch(JMSException e) {
System.err.println("Error when rolling back transaction");
}
}
}
private class SessionMessageListenerTest implements MessageListener {
private Session session;
public int counter = 0;
public SessionMessageListenerTest(Session session) {
this.session = session;
}
public void onMessage(Message message) {
try {
System.out.println("Message: " + message);
counter++;
if (counter <= 2) {
System.out.println("ROLLBACK");
session.rollback();
} else {
System.out.println("COMMIT");
message.acknowledge();
session.commit();
}
} catch(JMSException e) {
System.err.println("Error when rolling back transaction");
}
}
}
public void testQueueRollbackMessageListener() throws JMSException {
connection.start();
Session session = connection.createSession(true,
Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-"+getName());
MessageProducer producer = createProducer(session, queue);
Message message = createTextMessage(session);
producer.send(message);
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
mc.setRedeliveryPolicy(getRedeliveryPolicy());
SessionMessageListenerTest listener = new
SessionMessageListenerTest(session);
consumer.setMessageListener(listener);
// redelivery works with the code below
/*
ConsumerMessageListenerTest listener = new
ConsumerMessageListenerTest(session);
consumer.setMessageListener(listener);
*/
try {
Thread.sleep(7000);
} catch(InterruptedException e) {
}
assertEquals(2, listener.counter);
producer.send(createTextMessage(session));
session.commit();
try {
Thread.sleep(2000);
} catch(InterruptedException e) {
// ignore
}
assertEquals(3, listener.counter);
session.close();
}
private TextMessage createTextMessage(Session session) throws
JMSException {
return session.createTextMessage("Hello");
}
private MessageProducer createProducer(Session session, Destination
queue) throws JMSException {
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(getDeliveryMode());
return producer;
}
protected int getDeliveryMode() {
return DeliveryMode.PERSISTENT;
}
}
--
View this message in context: http://www.nabble.com/Redelivery-problem-with-MessageListener-and-Session-rollback-t1455413.html#a3933378
Sent from the ActiveMQ - User forum at Nabble.com.
Re: Redelivery problem with MessageListener and Session rollback
Posted by Rodrigo S de Castro <ro...@terra.com.br>.
Hi Hiram,
Thanks for your answer and sorry for taking so long to answer. Attached
you will find two tests:
1 - Class MessageListenerRedeliveryTestActiveMQ3: tests redelivery in
ActiveMQ 3 when message is rolled back in a MessageListener. It shows
that redelivery policy (including initial timeout as well as back off
mode) are not followed. The only setting that works in the policy is the
maximum number of tries. Tested with 3.2.2
2 - Class MessageListenerRedeliveryTestActiveMQ4: tests redelivery in
ActiveMQ 4 when message is rolled back in a MessageListener (same as
above). Redelivery policy does not work at all in this version when the
message is rolled back in MessageListener. It also tests the number of
retries as well as if the initial timeout and back off mode. Tested with
ActiveMQ 4 RC2.
I would like to contribute these classes to Apache ActiveMQ. I will be
happy to hear comments and make any necessary adjustments in this code.
Cheers,
Rodrigo
Hiram Chirino wrote:
> Hi Rodrigo,
>
> Is that junit test a patch you want to contibute to Apache? If so,
> I'll add it to our test suite asap.
>
> Regards,
> Hiram
>
> On 4/15/06, Rodrigo S de Castro <ro...@terra.com.br> wrote:
>
>> Hi,
>>
>> 1) I have a MessageListener implementation that rolls back the session if
>> something goes wrong. I expected the message to be redelivered to this
>> listener, but that does not happen. It is never redelivered.
>>
>> This code does NOT work:
>>
>> private class MessageListenerTest implements MessageListener {
>> private Session session;
>> public int counter = 0;
>>
>> public MessageListenerTest(ActiveMQMessageConsumer session) {
>> this.session = session;
>> }
>>
>> public void onMessage(Message message) {
>> try {
>> System.out.println("Message: " + message);
>> counter++;
>> if (counter <= 2) {
>> System.out.println("ROLLBACK");
>> session.rollback();
>> } else {
>> System.out.println("COMMIT");
>> message.acknowledge();
>> session.commit();
>> }
>> } catch(JMSException e) {
>> System.err.println("Error when rolling back transaction");
>> }
>> }
>> }
>>
>>
>> 2) The only I managed to make it redeliver is to pass a reference to
>> MessageConsumer to the MessageListener implementation, cast it to
>> ActiveMQMessageConsumer and call its rollback method.
>>
>> The code below DOES work:
>>
>> private class MessageListenerTest implements MessageListener {
>> private ActiveMQMessageConsumer consumer;
>> public int counter = 0;
>>
>> public MessageListenerTest(ActiveMQMessageConsumer consumer) {
>> this.consumer = consumer;
>> }
>>
>> public void onMessage(Message message) {
>> try {
>> System.out.println("Message: " + message);
>> counter++;
>> if (counter <= 2) {
>> System.out.println("ROLLBACK");
>> session.rollback();
>> } else {
>> System.out.println("COMMIT");
>> message.acknowledge();
>> session.commit();
>> }
>> } catch(JMSException e) {
>> System.err.println("Error when rolling back transaction");
>> }
>> }
>> }
>>
>> It this right? I think that session.rollback() should work as well.
>>
>> Below a JUnit code that shows this problem:
>>
[...]
Re: Redelivery problem with MessageListener and Session rollback
Posted by Hiram Chirino <hi...@hiramchirino.com>.
Hi Rodrigo,
Is that junit test a patch you want to contibute to Apache? If so,
I'll add it to our test suite asap.
Regards,
Hiram
On 4/15/06, Rodrigo S de Castro <ro...@terra.com.br> wrote:
>
> Hi,
>
> 1) I have a MessageListener implementation that rolls back the session if
> something goes wrong. I expected the message to be redelivered to this
> listener, but that does not happen. It is never redelivered.
>
> This code does NOT work:
>
> private class MessageListenerTest implements MessageListener {
> private Session session;
> public int counter = 0;
>
> public MessageListenerTest(ActiveMQMessageConsumer session) {
> this.session = session;
> }
>
> public void onMessage(Message message) {
> try {
> System.out.println("Message: " + message);
> counter++;
> if (counter <= 2) {
> System.out.println("ROLLBACK");
> session.rollback();
> } else {
> System.out.println("COMMIT");
> message.acknowledge();
> session.commit();
> }
> } catch(JMSException e) {
> System.err.println("Error when rolling back transaction");
> }
> }
> }
>
>
> 2) The only I managed to make it redeliver is to pass a reference to
> MessageConsumer to the MessageListener implementation, cast it to
> ActiveMQMessageConsumer and call its rollback method.
>
> The code below DOES work:
>
> private class MessageListenerTest implements MessageListener {
> private ActiveMQMessageConsumer consumer;
> public int counter = 0;
>
> public MessageListenerTest(ActiveMQMessageConsumer consumer) {
> this.consumer = consumer;
> }
>
> public void onMessage(Message message) {
> try {
> System.out.println("Message: " + message);
> counter++;
> if (counter <= 2) {
> System.out.println("ROLLBACK");
> session.rollback();
> } else {
> System.out.println("COMMIT");
> message.acknowledge();
> session.commit();
> }
> } catch(JMSException e) {
> System.err.println("Error when rolling back transaction");
> }
> }
> }
>
> It this right? I think that session.rollback() should work as well.
>
> Below a JUnit code that shows this problem:
>
>
> import javax.jms.Connection;
> import javax.jms.DeliveryMode;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import junit.framework.TestCase;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.ActiveMQMessageConsumer;
> import org.apache.activemq.RedeliveryPolicy;
> import org.apache.activemq.command.ActiveMQMessage;
>
> public class MessageListenerRedeliveryTest extends TestCase {
>
> private Connection connection;
>
> protected void setUp() throws Exception {
> connection = createConnection();
> }
>
> /**
> * @see junit.framework.TestCase#tearDown()
> */
> protected void tearDown() throws Exception {
> if (connection != null) {
> connection.close();
> connection = null;
> }
> }
>
> protected RedeliveryPolicy getRedeliveryPolicy() {
> RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
> redeliveryPolicy.setInitialRedeliveryDelay(1000);
> redeliveryPolicy.setBackOffMultiplier((short) 5);
> redeliveryPolicy.setMaximumRedeliveries(10);
> redeliveryPolicy.setUseExponentialBackOff(true);
> return redeliveryPolicy;
> }
>
> protected Connection createConnection() throws Exception{
> ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
> factory.setRedeliveryPolicy(getRedeliveryPolicy());
> return factory.createConnection();
> }
>
> private class ConsumerMessageListenerTest implements MessageListener {
> private ActiveMQMessageConsumer consumer;
> public int counter = 0;
>
> public ConsumerMessageListenerTest(ActiveMQMessageConsumer consumer) {
> this.consumer = consumer;
> }
>
> public void onMessage(Message message) {
> try {
> System.out.println("Message: " + message);
> counter++;
> if (counter <= 2) {
> System.out.println("ROLLBACK");
> consumer.rollback();
> } else {
> System.out.println("COMMIT");
> message.acknowledge();
> consumer.commit();
> }
> } catch(JMSException e) {
> System.err.println("Error when rolling back transaction");
> }
> }
> }
>
> private class SessionMessageListenerTest implements MessageListener {
> private Session session;
> public int counter = 0;
>
> public SessionMessageListenerTest(Session session) {
> this.session = session;
> }
>
> public void onMessage(Message message) {
> try {
> System.out.println("Message: " + message);
> counter++;
> if (counter <= 2) {
> System.out.println("ROLLBACK");
> session.rollback();
> } else {
> System.out.println("COMMIT");
> message.acknowledge();
> session.commit();
> }
> } catch(JMSException e) {
> System.err.println("Error when rolling back transaction");
> }
> }
> }
>
> public void testQueueRollbackMessageListener() throws JMSException {
> connection.start();
>
> Session session = connection.createSession(true,
> Session.CLIENT_ACKNOWLEDGE);
> Queue queue = session.createQueue("queue-"+getName());
> MessageProducer producer = createProducer(session, queue);
> Message message = createTextMessage(session);
> producer.send(message);
> session.commit();
>
> MessageConsumer consumer = session.createConsumer(queue);
>
> ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
> mc.setRedeliveryPolicy(getRedeliveryPolicy());
>
> SessionMessageListenerTest listener = new
> SessionMessageListenerTest(session);
> consumer.setMessageListener(listener);
>
> // redelivery works with the code below
> /*
> ConsumerMessageListenerTest listener = new
> ConsumerMessageListenerTest(session);
> consumer.setMessageListener(listener);
> */
>
> try {
> Thread.sleep(7000);
> } catch(InterruptedException e) {
>
> }
> assertEquals(2, listener.counter);
>
> producer.send(createTextMessage(session));
> session.commit();
>
> try {
> Thread.sleep(2000);
> } catch(InterruptedException e) {
> // ignore
> }
> assertEquals(3, listener.counter);
>
> session.close();
> }
>
> private TextMessage createTextMessage(Session session) throws
> JMSException {
> return session.createTextMessage("Hello");
> }
>
> private MessageProducer createProducer(Session session, Destination
> queue) throws JMSException {
> MessageProducer producer = session.createProducer(queue);
> producer.setDeliveryMode(getDeliveryMode());
> return producer;
> }
>
> protected int getDeliveryMode() {
> return DeliveryMode.PERSISTENT;
> }
> }
>
>
>
>
> --
> View this message in context: http://www.nabble.com/Redelivery-problem-with-MessageListener-and-Session-rollback-t1455413.html#a3933378
> Sent from the ActiveMQ - User forum at Nabble.com.
>
>
--
Regards,
Hiram