You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Javier Posado <Ja...@teracode.com> on 2010/06/17 16:17:57 UTC

possible bug with durable topic transaction redelivery

hi,

i've created a test to check if a message can be consumed after it was 
rollbacked and after reconnection using durable topic transaction and 
jdbc persistence. i would appreciate if someone could verify it.


package org.apache.activemq;

import java.util.ArrayList;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import junit.framework.TestCase;

import org.apache.activemq.broker.BrokerService;
import 
org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;

public class JmsDurableTopicTransactionRedeliverTest extends TestCase {

private static final Log LOG = 
LogFactory.getLog(JmsDurableTopicTransactionRedeliverTest.class);

     private String serverUri = "vm://localhost?create=false";
     protected BrokerService brokerService;
     protected ConnectionFactory connectionFactory;
     protected Connection connection;
     protected Session session;
     protected MessageConsumer consumer;
     protected MessageProducer producer;
     protected Destination destination;
     protected boolean topic = true;


     /**
      * Sends a batch of messages and validates the rollbacked message 
exists after
      * reconnection.
      *
      * @throws Exception
      */
     public void testReceiveRollback() throws Exception {
         Message[] outbound = new Message[] 
{session.createTextMessage("First Message"), 
session.createTextMessage("Second Message")};

         // lets consume any outstanding messages from prev test runs
         beginTx();
         while (consumer.receive(1000) != null) {
         }
         commitTx();

         // sent both messages
         beginTx();
         producer.send(outbound[0]);
         producer.send(outbound[1]);
         commitTx();

         LOG.info("Sent 0: " + outbound[0]);
         LOG.info("Sent 1: " + outbound[1]);

         ArrayList<Message> messages = new ArrayList<Message>();
         beginTx();
         Message message = consumer.receive(1000);
         messages.add(message);
         assertEquals(outbound[0], message);
         commitTx();


         beginTx();
         message = consumer.receive(1000);
         assertNotNull(message);
         assertEquals(outbound[1], message);
         rollbackTx();
         LOG.info("Rollback 1: " + message);


         reconnect();

         // Consume again.. the prev message
         beginTx();
         message = consumer.receive(9000);
         assertNotNull("Should have re-received the message again!", 
message);
         messages.add(message);
         commitTx();

         Message inbound[] = new Message[messages.size()];
         messages.toArray(inbound);
         assertTextMessagesEqual("Rollback did not work", outbound, 
inbound);
     }

     protected BrokerService createBroker() throws Exception {
         BrokerService broker = new BrokerService();
         JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
         EmbeddedDataSource dataSource = new EmbeddedDataSource();
         dataSource.setDatabaseName("derbyDb");
         dataSource.setCreateDatabase("create");
         jdbc.setDataSource(dataSource);
         jdbc.deleteAllMessages();
         broker.setPersistenceAdapter(jdbc);
         return broker;
     }

     protected void setUp() throws Exception {
         super.setUp();
         brokerService = createBroker();
         PolicyMap policyMap = new PolicyMap();
         policyMap.setDefaultEntry(getDefaultPolicy());
         brokerService.setDestinationPolicy(policyMap);
         brokerService.start();

         connectionFactory = createConnectionFactory();
         reconnect();
     }

     protected PolicyEntry getDefaultPolicy() {
         PolicyEntry policy = new PolicyEntry();
         policy.setDispatchPolicy(new RoundRobinDispatchPolicy());
         policy.setSubscriptionRecoveryPolicy(new 
FixedCountSubscriptionRecoveryPolicy());
         return policy;
     }

     protected void tearDown() throws Exception {
         brokerService.stop();
         brokerService = null;
         super.tearDown();

     }

     protected void commitTx() throws Exception {
         session.commit();
     }

     protected void rollbackTx() throws Exception {
         session.rollback();
     }

     public ConnectionFactory createConnectionFactory() throws Exception {
         return new ActiveMQConnectionFactory(serverUri);
     }

     public Connection createConnection(ConnectionFactory cf) throws 
JMSException {
         Connection connection = cf.createConnection();
         if (getClass().getName() != null) {
             connection.setClientID(getClass().getName());
         }
         return connection;
     }


     /**
      * Recreates the connection.
      *
      * @throws JMSException
      */
     protected void reconnect() throws Exception {

         if (connection != null) {
             // Close the prev connection.
             connection.close();
         }
         session = null;
         connection = createConnection(connectionFactory);
         reconnectSession();
         connection.start();
     }

     /**
      * Recreates the connection.
      *
      * @throws JMSException
      */
     protected void reconnectSession() throws JMSException {
         if (session != null) {
             session.close();
         }

         session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
         destination = session.createTopic("TOPIC." + 
getClass().getName() + "." + getName());
         producer = session.createProducer(destination);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
         consumer = session.createDurableSubscriber((Topic)destination, 
"testsub");
     }




     public MessageConsumer createConsumer(Session session, Destination 
destination) throws JMSException {

         return session.createDurableSubscriber((Topic)destination, 
"testsub");
     }

     protected void assertTextMessagesEqual(String messsage, Message[] 
firstSet, Message[] secondSet) throws JMSException {
         assertEquals("Message count does not match: " + messsage, 
firstSet.length, secondSet.length);

         for (int i = 0; i < secondSet.length; i++) {
             TextMessage m1 = (TextMessage)firstSet[i];
             TextMessage m2 = (TextMessage)secondSet[i];
             assertTextMessageEqual("Message " + (i + 1) + " did not 
match : ", m1, m2);
         }
     }

     protected void assertTextMessageEqual(String message, TextMessage 
m1, TextMessage m2) throws JMSException {
         assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 
+ "}", m1 == null ^ m2 == null);

         if (m1 == null) {
             return;
         }

         assertEquals(message, m1.getText(), m2.getText());
     }

     protected void beginTx() throws Exception {
         //no-op for local tx
     }
}