You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/09/30 13:59:45 UTC

svn commit: r1177619 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/bro...

Author: gtully
Date: Fri Sep 30 11:59:44 2011
New Revision: 1177619

URL: http://svn.apache.org/viewvc?rev=1177619&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3519 - Allow JMSRedelivered flag to survive a restart. Add transactedIndividualAck flag to connection factory and rewriteOnRedelivery to KahaDBPersistenceAdapter. These combine to persist the redelivery status on a rollback

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Sep 30 11:59:44 2011
@@ -193,6 +193,7 @@ public class ActiveMQConnection implemen
     private long consumerFailoverRedeliveryWaitPeriod;
     private final Scheduler scheduler;
     private boolean messagePrioritySupported=true;
+    private boolean transactedIndividualAck = false;
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -2399,6 +2400,15 @@ public class ActiveMQConnection implemen
         this.checkForDuplicates = checkForDuplicates;
     }
 
+
+    public boolean isTransactedIndividualAck() {
+        return transactedIndividualAck;
+    }
+
+    public void setTransactedIndividualAck(boolean transactedIndividualAck) {
+        this.transactedIndividualAck = transactedIndividualAck;
+    }
+
     /**
      * Removes any TempDestinations that this connection has cached, ignoring
      * any exceptions generated because the destination is in use as they should

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Fri Sep 30 11:59:44 2011
@@ -119,6 +119,7 @@ public class ActiveMQConnectionFactory e
     private boolean checkForDuplicates = true;
     private ClientInternalExceptionListener clientInternalExceptionListener;
     private boolean messagePrioritySupported = true;
+    private boolean transactedIndividualAck = false;
 
     // /////////////////////////////////////////////
     //
@@ -325,6 +326,7 @@ public class ActiveMQConnectionFactory e
         connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
         connection.setCheckForDuplicates(isCheckForDuplicates());
         connection.setMessagePrioritySupported(isMessagePrioritySupported());
+        connection.setTransactedIndividualAck(isTransactedIndividualAck());
         if (transportListener != null) {
             connection.addTransportListener(transportListener);
         }
@@ -707,6 +709,8 @@ public class ActiveMQConnectionFactory e
         props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
         props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
         props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
+        props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
+
     }
 
     public boolean isUseCompression() {
@@ -1019,4 +1023,18 @@ public class ActiveMQConnectionFactory e
     public void setCheckForDuplicates(boolean checkForDuplicates) {
         this.checkForDuplicates = checkForDuplicates;
     }
+
+    public boolean isTransactedIndividualAck() {
+         return transactedIndividualAck;
+     }
+
+     /**
+      * when true, submit individual transacted acks immediately rather than with transaction completion.
+      * This allows the acks to represent delivery status which can be persisted on rollback
+      * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean)  true
+      */
+     public void setTransactedIndividualAck(boolean transactedIndividualAck) {
+         this.transactedIndividualAck = transactedIndividualAck;
+     }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Sep 30 11:59:44 2011
@@ -152,6 +152,7 @@ public class ActiveMQMessageConsumer imp
     private long optimizeAckTimestamp = System.currentTimeMillis();
     private long optimizeAcknowledgeTimeOut = 0;
     private long failoverRedeliveryWaitPeriod = 0;
+    private boolean transactedIndividualAck = false;
 
     /**
      * Create a MessageConsumer
@@ -249,6 +250,7 @@ public class ActiveMQMessageConsumer imp
         }
         this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
         this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
+        this.transactedIndividualAck = session.connection.isTransactedIndividualAck();
         if (messageListener != null) {
             setMessageListener(messageListener);
         }
@@ -678,7 +680,7 @@ public class ActiveMQMessageConsumer imp
             synchronized (unconsumedMessages.getMutex()) {
                 if (inProgressClearRequiredFlag) {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt");
+                        LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
                     }
                     // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
                     List<MessageDispatch> list = unconsumedMessages.removeAll();
@@ -833,11 +835,24 @@ public class ActiveMQMessageConsumer imp
                 deliveredMessages.addFirst(md);
             }
             if (session.getTransacted()) {
-                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+                if (transactedIndividualAck) {
+                    immediateIndividualTransactedAck(md);
+                } else {
+                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+                }
             }
         }
     }
-    
+
+    private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException {
+        // acks accumulate on the broker pending transaction completion to indicate
+        // delivery status
+        registerSync();
+        MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
+        ack.setTransactionId(session.getTransactionContext().getTransactionId());
+        session.sendAck(ack);
+    }
+
     private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
         if (unconsumedMessages.isClosed()) {
             return;
@@ -919,29 +934,7 @@ public class ActiveMQMessageConsumer imp
         // Don't acknowledge now, but we may need to let the broker know the
         // consumer got the message to expand the pre-fetch window
         if (session.getTransacted()) {
-            session.doStartTransaction();
-            if (!synchronizationRegistered) {
-                synchronizationRegistered = true;
-                session.getTransactionContext().addSynchronization(new Synchronization() {
-                    @Override
-                    public void beforeEnd() throws Exception {
-                        acknowledge();
-                        synchronizationRegistered = false;
-                    }
-
-                    @Override
-                    public void afterCommit() throws Exception {
-                        commit();
-                        synchronizationRegistered = false;
-                    }
-
-                    @Override
-                    public void afterRollback() throws Exception {
-                        rollback();
-                        synchronizationRegistered = false;
-                    }
-                });
-            }
+            registerSync();
         }
 
         deliveredCounter++;
@@ -976,6 +969,40 @@ public class ActiveMQMessageConsumer imp
         }
     }
 
+    private void registerSync() throws JMSException {
+        session.doStartTransaction();
+        if (!synchronizationRegistered) {
+            synchronizationRegistered = true;
+            session.getTransactionContext().addSynchronization(new Synchronization() {
+                @Override
+                public void beforeEnd() throws Exception {
+                    if (transactedIndividualAck) {
+                        clearDispatchList();
+                        waitForRedeliveries();
+                        synchronized(deliveredMessages) {
+                            rollbackOnFailedRecoveryRedelivery();
+                        }
+                    } else {
+                        acknowledge();
+                    }
+                    synchronizationRegistered = false;
+                }
+
+                @Override
+                public void afterCommit() throws Exception {
+                    commit();
+                    synchronizationRegistered = false;
+                }
+
+                @Override
+                public void afterRollback() throws Exception {
+                    rollback();
+                    synchronizationRegistered = false;
+                }
+            });
+        }
+    }
+
     /**
      * Acknowledge all the messages that have been delivered to the client up to
      * this point.
@@ -1284,7 +1311,11 @@ public class ActiveMQMessageConsumer imp
                                 poisonAck.setFirstMessageId(md.getMessage().getMessageId());
                                 session.sendAck(poisonAck);
                             } else {
-                                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+                                if (transactedIndividualAck) {
+                                    immediateIndividualTransactedAck(md);
+                                } else {
+                                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+                                }
                             }
                         }
                     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Sep 30 11:59:44 2011
@@ -593,17 +593,20 @@ public class BrokerService implements Se
             tempDataStore.stop();
             tempDataStore = null;
         }
-        stopper.stop(persistenceAdapter);
-        persistenceAdapter = null;
-        slave = true;
-        if (isUseJmx()) {
-            stopper.stop(getManagementContext());
-            managementContext = null;
+        try {
+            stopper.stop(persistenceAdapter);
+            persistenceAdapter = null;
+            slave = true;
+            if (isUseJmx()) {
+                stopper.stop(getManagementContext());
+                managementContext = null;
+            }
+            // Clear SelectorParser cache to free memory
+            SelectorParser.clearCache();
+        } finally {
+            stopped.set(true);
+            stoppedLatch.countDown();
         }
-        // Clear SelectorParser cache to free memory
-        SelectorParser.clearCache();
-        stopped.set(true);
-        stoppedLatch.countDown();
         if (masterConnectorURI == null) {
             // master start has not finished yet
             if (slaveStartSignal.getCount() == 1) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Sep 30 11:59:44 2011
@@ -223,32 +223,7 @@ public abstract class PrefetchSubscripti
                             node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                             removeList.add(node);
                         } else {
-                            // setup a Synchronization to remove nodes from the
-                            // dispatched list.
-                            context.getTransaction().addSynchronization(
-                                    new Synchronization() {
-
-                                        @Override
-                                        public void afterCommit()
-                                                throws Exception {
-                                            synchronized(dispatchLock) {
-                                                dequeueCounter++;
-                                                dispatched.remove(node);
-                                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
-                                            }
-                                        }
-
-                                        @Override
-                                        public void afterRollback() throws Exception {
-                                            synchronized(dispatchLock) {
-                                                if (isSlave()) {
-                                                    node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
-                                                } else {
-                                                    // poisionAck will decrement - otherwise still inflight on client
-                                                }
-                                            }
-                                        }
-                                    });
+                            registerRemoveSync(context, node);
                         }
                         index++;
                         acknowledge(context, ack, node);
@@ -281,13 +256,17 @@ public abstract class PrefetchSubscripti
                 for (final MessageReference node : dispatched) {
                     MessageId messageId = node.getMessageId();
                     if (ack.getLastMessageId().equals(messageId)) {
-                        // this should never be within a transaction
-                        dequeueCounter++;
-                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
-                        destination = node.getRegionDestination();
-                        acknowledge(context, ack, node);
-                        dispatched.remove(node);
+                        // Don't remove the nodes until we are committed - immediateAck option
+                        if (!context.isInTransaction()) {
+                            dequeueCounter++;
+                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                            dispatched.remove(node);
+                        } else {
+                            registerRemoveSync(context, node);
+                        }
                         prefetchExtension = Math.max(0, prefetchExtension - 1);
+                        acknowledge(context, ack, node);
+                        destination = node.getRegionDestination();
                         callDispatchMatched = true;
                         break;
                     }
@@ -406,6 +385,35 @@ public abstract class PrefetchSubscripti
         }
     }
 
+    private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
+        // setup a Synchronization to remove nodes from the
+        // dispatched list.
+        context.getTransaction().addSynchronization(
+                new Synchronization() {
+
+                    @Override
+                    public void afterCommit()
+                            throws Exception {
+                        synchronized(dispatchLock) {
+                            dequeueCounter++;
+                            dispatched.remove(node);
+                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                        }
+                    }
+
+                    @Override
+                    public void afterRollback() throws Exception {
+                        synchronized(dispatchLock) {
+                            if (isSlave()) {
+                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                            } else {
+                                // poisionAck will decrement - otherwise still inflight on client
+                            }
+                        }
+                    }
+                });
+    }
+
     /**
      * Checks an ack versus the contents of the dispatched list.
      * 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Fri Sep 30 11:59:44 2011
@@ -500,6 +500,18 @@ public class KahaDBPersistenceAdapter im
         letter.setForceRecoverIndex(forceRecoverIndex);
     }
 
+    /**
+     * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
+     * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean)  true
+     */
+    public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
+        letter.setRewriteOnRedelivery(rewriteOnRedelivery);
+    }
+
+    public boolean isRewriteOnRedelivery() {
+        return letter.isRewriteOnRedelivery();
+    }
+
     public KahaDBStore getStore() {
         return letter;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Sep 30 11:59:44 2011
@@ -63,6 +63,7 @@ import org.apache.activemq.usage.MemoryU
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.util.ByteSequence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.kahadb.journal.Location;
@@ -244,6 +245,57 @@ public class KahaDBStore extends Message
         super.doStop(stopper);
     }
 
+    void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException {
+        Location location;
+        this.indexLock.writeLock().lock();
+        try {
+              location = findMessageLocation(key, destination);
+        } finally {
+            this.indexLock.writeLock().unlock();
+        }
+
+        if (location != null) {
+            KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
+            Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
+
+            message.incrementRedeliveryCounter();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter());
+            }
+            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
+            addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
+
+            final Location rewriteLocation = journal.write(toByteSequence(addMessage), true);
+
+            this.indexLock.writeLock().lock();
+            try {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(destination, tx);
+                        Long sequence = sd.messageIdIndex.get(tx, key);
+                        MessageKeys keys = sd.orderIndex.get(tx, sequence);
+                        sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation));
+                    }
+                });
+            } finally {
+                this.indexLock.writeLock().unlock();
+            }
+        }
+    }
+
+    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
+        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
+            public Location execute(Transaction tx) throws IOException {
+                StoredDestination sd = getStoredDestination(destination, tx);
+                Long sequence = sd.messageIdIndex.get(tx, key);
+                if (sequence == null) {
+                    return null;
+                }
+                return sd.orderIndex.get(tx, sequence).location;
+            }
+        });
+    }
+
     protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
         StoreQueueTask task = null;
         synchronized (store.asyncTaskMap) {
@@ -390,16 +442,7 @@ public class KahaDBStore extends Message
             Location location;
             indexLock.readLock().lock();
             try {
-                location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
-                    public Location execute(Transaction tx) throws IOException {
-                        StoredDestination sd = getStoredDestination(dest, tx);
-                        Long sequence = sd.messageIdIndex.get(tx, key);
-                        if (sequence == null) {
-                            return null;
-                        }
-                        return sd.orderIndex.get(tx, sequence).location;
-                    }
-                });
+                location = findMessageLocation(key, dest);
             }finally {
                 indexLock.readLock().unlock();
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Fri Sep 30 11:59:44 2011
@@ -407,7 +407,7 @@ public class KahaDBTransactionStore impl
                         return message;
                     }
                     @Override
-                    public Future run(ConnectionContext ctx) throws IOException {
+                    public Future<Object> run(ConnectionContext ctx) throws IOException {
                         return destination.asyncAddTopicMessage(ctx, message);
                     }
 
@@ -454,7 +454,7 @@ public class KahaDBTransactionStore impl
 
         if (ack.isInTransaction()) {
             if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
-                destination.removeAsyncMessage(context, ack);
+                destination.removeMessage(context, ack);
             } else {
                 Tx tx = getTx(ack.getTransactionId());
                 tx.add(new RemoveMessageCommand(context) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Sep 30 11:59:44 2011
@@ -94,7 +94,7 @@ import org.apache.kahadb.util.VariableMa
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
+public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
 
     protected BrokerService brokerService;
 
@@ -224,6 +224,7 @@ public class MessageDatabase extends Ser
     private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
     protected boolean forceRecoverIndex = false;
     private final Object checkpointThreadLock = new Object();
+    private boolean rewriteOnRedelivery = false;
 
     public MessageDatabase() {
     }
@@ -400,24 +401,27 @@ public class MessageDatabase extends Ser
 
     public void close() throws IOException, InterruptedException {
         if( opened.compareAndSet(true, false)) {
-            this.indexLock.writeLock().lock();
             try {
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx, true);
-                    }
-                });
-                pageFile.unload();
-                metadata = new Metadata();
+                this.indexLock.writeLock().lock();
+                try {
+                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        public void execute(Transaction tx) throws IOException {
+                            checkpointUpdate(tx, true);
+                        }
+                    });
+                    pageFile.unload();
+                    metadata = new Metadata();
+                } finally {
+                    this.indexLock.writeLock().unlock();
+                }
+                journal.close();
+                synchronized (checkpointThreadLock) {
+                    checkpointThread.join();
+                }
             } finally {
-                this.indexLock.writeLock().unlock();
+                lockFile.unlock();
+                lockFile=null;
             }
-            journal.close();
-            synchronized (checkpointThreadLock) {
-                checkpointThread.join();
-            }
-            lockFile.unlock();
-            lockFile=null;
         }
     }
 
@@ -804,6 +808,14 @@ public class MessageDatabase extends Ser
         return store(data, false, null,null);
     }
 
+    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
+        int size = data.serializedSizeFramed();
+        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+        os.writeByte(data.type().getNumber());
+        data.writeFramed(os);
+        return os.toByteSequence();
+    }
+
     /**
      * All updated are are funneled through this method. The updates are converted
      * to a JournalMessage which is logged to the journal and then the data from
@@ -815,13 +827,9 @@ public class MessageDatabase extends Ser
             before.run();
         }
         try {
-            int size = data.serializedSizeFramed();
-            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
-            os.writeByte(data.type().getNumber());
-            data.writeFramed(os);
-
+            ByteSequence sequence = toByteSequence(data);
             long start = System.currentTimeMillis();
-            Location location = journal.write(os.toByteSequence(), sync);
+            Location location = journal.write(sequence, sync);
             long start2 = System.currentTimeMillis();
             process(data, location, after);
             long end = System.currentTimeMillis();
@@ -1079,16 +1087,35 @@ public class MessageDatabase extends Ser
         }
     }
 
-    protected void process(KahaRollbackCommand command, Location location) {
+    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
         TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
+        List<Operation> updates = null;
         synchronized (inflightTransactions) {
-            List<Operation> tx = inflightTransactions.remove(key);
-            if (tx == null) {
-                preparedTransactions.remove(key);
+            updates = inflightTransactions.remove(key);
+            if (updates == null) {
+                updates = preparedTransactions.remove(key);
             }
         }
+        if (isRewriteOnRedelivery()) {
+            persistRedeliveryCount(updates);
+        }
     }
 
+    private void persistRedeliveryCount(List<Operation> updates)  throws IOException {
+        if (updates != null) {
+            for (Operation operation : updates) {
+                operation.getCommand().visit(new Visitor() {
+                    @Override
+                    public void visit(KahaRemoveMessageCommand command) throws IOException {
+                        incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination());
+                    }
+                });
+            }
+        }
+    }
+
+   abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException;
+
     // /////////////////////////////////////////////////////////////////
     // These methods do the actual index updates.
     // /////////////////////////////////////////////////////////////////
@@ -1981,10 +2008,12 @@ public class MessageDatabase extends Ser
         return TransactionIdConversion.convert(transactionInfo);
     }
 
-    abstract class Operation {
+    abstract class Operation <T extends JournalCommand<T>> {
+        final T command;
         final Location location;
 
-        public Operation(Location location) {
+        public Operation(T command, Location location) {
+            this.command = command;
             this.location = location;
         }
 
@@ -1992,15 +2021,17 @@ public class MessageDatabase extends Ser
             return location;
         }
 
+        public T getCommand() {
+            return command;
+        }
+
         abstract public void execute(Transaction tx) throws IOException;
     }
 
-    class AddOpperation extends Operation {
-        final KahaAddMessageCommand command;
+    class AddOpperation extends Operation<KahaAddMessageCommand> {
 
         public AddOpperation(KahaAddMessageCommand command, Location location) {
-            super(location);
-            this.command = command;
+            super(command, location);
         }
 
         @Override
@@ -2008,27 +2039,18 @@ public class MessageDatabase extends Ser
             upadateIndex(tx, command, location);
         }
 
-        public KahaAddMessageCommand getCommand() {
-            return command;
-        }
     }
 
-    class RemoveOpperation extends Operation {
-        final KahaRemoveMessageCommand command;
+    class RemoveOpperation extends Operation<KahaRemoveMessageCommand> {
 
         public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
-            super(location);
-            this.command = command;
+            super(command, location);
         }
 
         @Override
         public void execute(Transaction tx) throws IOException {
             updateIndex(tx, command, location);
         }
-
-        public KahaRemoveMessageCommand getCommand() {
-            return command;
-        }
     }
 
     // /////////////////////////////////////////////////////////////////
@@ -2247,6 +2269,14 @@ public class MessageDatabase extends Ser
         this.databaseLockedWaitDelay = databaseLockedWaitDelay;
     }
 
+    public boolean isRewriteOnRedelivery() {
+        return rewriteOnRedelivery;
+    }
+
+    public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
+        this.rewriteOnRedelivery = rewriteOnRedelivery;
+    }
+
     // /////////////////////////////////////////////////////////////////
     // Internal conversion methods.
     // /////////////////////////////////////////////////////////////////

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java?rev=1177619&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java Fri Sep 30 11:59:44 2011
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.transport.failover.FailoverTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RedeliveryRestartTest extends BrokerRestartTestSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class);
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        kahaDBPersistenceAdapter.setRewriteOnRedelivery(true);
+        broker.addConnector("tcp://0.0.0.0:0");
+    }
+
+    public void testValidateRedeliveryFlagAfterRestart() throws Exception {
+
+        ConnectionFactory connectionFactory =
+                new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() + ")?jms.immediateAck=true");
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(10, queueName, connection);
+
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Destination destination = session.createQueue(queueName);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        TextMessage msg = null;
+        for (int i=0; i<5;i++) {
+            msg = (TextMessage) consumer.receive(20000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+        }
+        session.rollback();
+        consumer.close();
+
+        restartBroker();
+
+        // make failover aware of the restarted auto assigned port
+        ((FailoverTransport) connection.getTransport().narrow(FailoverTransport.class)).add(true, broker.getTransportConnectors().get(0).getPublishableConnectString());
+
+        consumer = session.createConsumer(destination);
+        for (int i=0; i<5;i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("redelivery count survives restart", 2, msg.getLongProperty("JMSXDeliveryCount"));
+            assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+        }
+        session.commit();
+
+        // consume the rest that were not redeliveries
+        for (int i=0; i<5;i++) {
+            msg = (TextMessage) consumer.receive(20000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+        }
+        session.commit();
+
+        connection.close();
+    }
+
+    public void testValidateRedeliveryFlagAfterRecovery() throws Exception {
+        ConnectionFactory connectionFactory =
+                new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + "?jms.immediateAck=true");
+        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(1, queueName, connection);
+
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Destination destination = session.createQueue(queueName);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        TextMessage msg = (TextMessage) consumer.receive(20000);
+        LOG.info("got: " + msg);
+        assertNotNull("got the message", msg);
+        assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
+        assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+
+        // have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover
+        kahaDBPersistenceAdapter.getStore().getJournal().close();
+        broker.waitUntilStopped();
+
+        broker = createRestartedBroker();
+        broker.start();
+
+        connectionFactory =
+                new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + "?jms.immediateAck=true");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        consumer = session.createConsumer(destination);
+        msg = (TextMessage) consumer.receive(10000);
+        assertNotNull("got the message again", msg);
+        assertEquals("redelivery count survives restart", 2, msg.getLongProperty("JMSXDeliveryCount"));
+        assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+
+        session.commit();
+        connection.close();
+    }
+
+    private void populateDestination(final int nbMessages,
+                                     final String destinationName, javax.jms.Connection connection)
+            throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(destinationName);
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 1; i <= nbMessages; i++) {
+            producer.send(session.createTextMessage("<hello id='" + i + "'/>"));
+        }
+        producer.close();
+        session.close();
+    }
+
+
+    public static Test suite() {
+        return suite(RedeliveryRestartTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java Fri Sep 30 11:59:44 2011
@@ -63,7 +63,6 @@ public class AMQ2736Test {
             store.close();
         } catch (Exception expectedLotsAsJournalBorked) {
         }
-        store.getLockFile().unlock();
 
         broker.stop();
         broker.waitUntilStopped();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java?rev=1177619&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java Fri Sep 30 11:59:44 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.IOException;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
+
+    public static Test suite() {
+        return suite(FailoverRedeliveryTransactionTest.class);
+    }
+
+    @Override
+    public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
+        super.configureConnectionFactory(factory);
+        factory.setImmediateAck(true);
+    }
+
+    @Override
+    public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
+        BrokerService brokerService = super.createBroker(deleteAllMessagesOnStartup, bindAddress);
+        configurePersistenceAdapter(brokerService);
+        return brokerService;
+    }
+
+    private void configurePersistenceAdapter(BrokerService brokerService) throws IOException {
+         KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter();
+         kahaDBPersistenceAdapter.setRewriteOnRedelivery(true);
+    }
+
+    @Override
+    public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
+        PersistenceAdapter persistenceAdapter = super.setDefaultPersistenceAdapter(broker);
+        configurePersistenceAdapter(broker);
+        return persistenceAdapter;
+    }
+
+    // no point rerunning these
+    @Override
+    public void testFailoverProducerCloseBeforeTransaction() throws Exception {
+    }
+
+    @Override
+    public void initCombosForTestFailoverCommitReplyLost() {
+    }
+
+    @Override
+    public void testFailoverCommitReplyLost() throws Exception {
+    }
+
+    @Override
+    public void initCombosForTestFailoverSendReplyLost() {
+    }
+
+    @Override
+    public void testFailoverSendReplyLost() throws Exception {
+    }
+
+    @Override
+    public void initCombosForTestFailoverConnectionSendReplyLost() {
+    }
+
+    @Override
+    public void testFailoverConnectionSendReplyLost() throws Exception {
+    }
+
+    @Override
+    public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
+    }
+
+    @Override
+    public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Fri Sep 30 11:59:44 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.failover;
 
 import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.TestSupport;
@@ -114,9 +115,14 @@ public class FailoverTransactionTest ext
         return broker;
     }
 
+    public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
+        // nothing to do
+    }
+
     public void testFailoverProducerCloseBeforeTransaction() throws Exception {
         startCleanBroker();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
@@ -170,6 +176,7 @@ public class FailoverTransactionTest ext
         broker.start();
 
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
         final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
@@ -222,6 +229,7 @@ public class FailoverTransactionTest ext
 
         // after restart, ensure no dangling messages
         cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
         connection = cf.createConnection();
         connection.start();
         Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -274,6 +282,7 @@ public class FailoverTransactionTest ext
         broker.start();
 
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
         final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -329,6 +338,7 @@ public class FailoverTransactionTest ext
 
         // after restart, ensure no dangling messages
         cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
         connection = cf.createConnection();
         connection.start();
         Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -400,6 +410,7 @@ public class FailoverTransactionTest ext
         proxy.open();
 
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
         final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -450,6 +461,7 @@ public class FailoverTransactionTest ext
 
         // after restart, ensure no dangling messages
         cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
         connection = cf.createConnection();
         connection.start();
         Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -466,6 +478,7 @@ public class FailoverTransactionTest ext
     public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
         startCleanBroker();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
@@ -489,6 +502,7 @@ public class FailoverTransactionTest ext
     public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
         startCleanBroker();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
@@ -521,6 +535,7 @@ public class FailoverTransactionTest ext
     public void testFailoverWithConnectionConsumer() throws Exception {
         startCleanBroker();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
 
@@ -573,6 +588,7 @@ public class FailoverTransactionTest ext
         // as failure depends on hash order of state tracker recovery, do a few times
         for (int i = 0; i < 3; i++) {
             try {
+                LOG.info("Iteration: " + i);
                 doTestFailoverConsumerAckLost(i);
             } finally {
                 stopBroker();
@@ -612,6 +628,7 @@ public class FailoverTransactionTest ext
 
         Vector<Connection> connections = new Vector<Connection>();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
         connections.add(connection);
@@ -728,6 +745,7 @@ public class FailoverTransactionTest ext
 
         // after restart, ensure no dangling messages
         cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
         connection = cf.createConnection();
         connection.start();
         Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -745,6 +763,7 @@ public class FailoverTransactionTest ext
         broker = createBroker(true);
         broker.start();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
         final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -782,6 +801,7 @@ public class FailoverTransactionTest ext
         broker = createBroker(true);
         broker.start();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
         final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -830,6 +850,7 @@ public class FailoverTransactionTest ext
         broker = createBroker(true);
         broker.start();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
+        configureConnectionFactory(cf);
         Connection connection = cf.createConnection();
         connection.start();
         final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);