You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/04/03 12:16:29 UTC

svn commit: r761597 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/state/ main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activemq/t...

Author: gtully
Date: Fri Apr  3 10:16:28 2009
New Revision: 761597

URL: http://svn.apache.org/viewvc?rev=761597&view=rev
Log:
further tests and fixes related to failover, this time transactions and topics. prefetch and maxPageSize are relevant in the transacted. With redeliveries the prefetch needs to be less than half the transaction size and the maxPageSize needs to exceed the transaction span. more tests in AMQ-2149|https://issues.apache.org/activemq/browse/AMQ-2149 and some amendments to the fixes for that issue

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Apr  3 10:16:28 2009
@@ -1838,7 +1838,9 @@
         }
         for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
             ActiveMQSession s = i.next();
-            s.deliverAcks();
+            // deliverAcks at this point is too early as acks can arrive at the broker
+            // before redispatch of messages and hence be out or order
+            s.transportResumed();
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Apr  3 10:16:28 2009
@@ -131,7 +131,7 @@
 
     private MessageAck pendingAck;
     private long lastDeliveredSequenceId;
-    
+
     private IOException failureError;
 
     /**
@@ -439,8 +439,8 @@
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     }
                 } else {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(getConsumerId() + " received message: " + md);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(getConsumerId() + " received message: " + md);
                     }
                     return md;
                 }
@@ -639,18 +639,20 @@
         MessageAck ack = null;
         if (deliveryingAcknowledgements.compareAndSet(false, true)) {
             if (session.isAutoAcknowledge()) {
-            	synchronized(deliveredMessages) {
-            		ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
-            		if (ack != null) {
-            			deliveredMessages.clear();
-            			ackCounter = 0;
+                synchronized(deliveredMessages) {
+                    ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                    if (ack != null) {
+                        deliveredMessages.clear();
+                        ackCounter = 0;
             		}
             	}
             } else if (pendingAck != null && pendingAck.isStandardAck()) {
                 ack = pendingAck;
+                pendingAck = null;
             }
             if (ack != null) {
                 final MessageAck ackToSend = ack;
+                
                 if (executorService == null) {
                     executorService = Executors.newSingleThreadExecutor();
                 }
@@ -840,8 +842,7 @@
     private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
 
         // Don't acknowledge now, but we may need to let the broker know the
-        // consumer got the message
-        // to expand the pre-fetch window
+        // consumer got the message to expand the pre-fetch window
         if (session.getTransacted()) {
             session.doStartTransaction();
             if (!synchronizationRegistered) {
@@ -865,19 +866,30 @@
             }
         }
 
-        // The delivered message list is only needed for the recover method
-        // which is only used with client ack.
         deliveredCounter++;
         
         MessageAck oldPendingAck = pendingAck;
         pendingAck = new MessageAck(md, ackType, deliveredCounter);
+        pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
         if( oldPendingAck==null ) {
             pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
-        } else {
+        } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
             pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
+        } else {
+            // old pending ack being superseded by ack of another type, if is is not a delivered
+            // ack and hence important, send it now so it is not lost.
+            if ( !oldPendingAck.isDeliveredAck()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+                }
+                session.sendAck(oldPendingAck);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+                }
+            }
         }
-        pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
-
+        
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
             session.sendAck(pendingAck);
             pendingAck=null;
@@ -910,14 +922,14 @@
             }
             session.sendAck(ack);
             pendingAck = null;
-    
+            
             // Adjust the counters
-            deliveredCounter -= deliveredMessages.size();
+            deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
             additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
-    
-            if (!session.getTransacted()) {
+            
+            if (!session.getTransacted()) {  
                 deliveredMessages.clear();
-            }
+            } 
         }
     }
     
@@ -1073,9 +1085,12 @@
                     } else {
                         // ignore duplicate
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug(getConsumerId() + " Ignoring Duplicate: " + md.getMessage());
+                            LOG.debug(getConsumerId() + " ignoring duplicate: " + md.getMessage());
                         }
-                        acknowledge(md);
+                        // in a transaction ack delivery of duplicates to ensure prefetch extension kicks in.
+                        // the normal ack will happen in the transaction.
+                        ackLater(md, session.isTransacted() ? 
+                                MessageAck.DELIVERED_ACK_TYPE : MessageAck.STANDARD_ACK_TYPE);
                     }
                 }
             }
@@ -1144,6 +1159,13 @@
         return lastDeliveredSequenceId;
     }
 
+    // on resumption re deliveries will percolate acks in their own good time
+    public void transportResumed() {
+        pendingAck = null; 
+        additionalWindowSize = 0;
+        deliveredCounter = 0;
+    }
+
 	public IOException getFailureError() {
 		return failureError;
 	}
@@ -1151,5 +1173,4 @@
 	public void setFailureError(IOException failureError) {
 		this.failureError = failureError;
 	}
-    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Fri Apr  3 10:16:28 2009
@@ -1669,8 +1669,8 @@
             msg.setConnection(connection);
             msg.onSend();
             msg.setProducerId(msg.getMessageId().getProducerId());
-            if (this.debug) {
-                LOG.debug(getSessionId() + " sending message: " + msg);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(getSessionId() + " sending message: " + msg);
             }
             if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
                 this.connection.asyncSendPacket(msg);
@@ -1963,4 +1963,11 @@
         }
     }
 
+    public void transportResumed() {
+        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
+            ActiveMQMessageConsumer consumer = iter.next();
+            consumer.transportResumed();
+        }
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Fri Apr  3 10:16:28 2009
@@ -211,10 +211,11 @@
             if (localTransactionEventListener != null) {
                 localTransactionEventListener.beginEvent();
             }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Begin:" + transactionId);
+            }
         }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Being:" + transactionId);
-        }
+        
     }
 
     /**
@@ -234,7 +235,9 @@
         beforeEnd();
         if (transactionId != null) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Rollback:" + transactionId);
+                LOG.debug("Rollback: "  + transactionId
+                + " syncCount: " 
+                + (synchronizations != null ? synchronizations.size() : 0));
             }
 
             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
@@ -268,7 +271,9 @@
         // Only send commit if the transaction was started.
         if (transactionId != null) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Commit:" + transactionId);
+                LOG.debug("Commit: "  + transactionId
+                        + " syncCount: " 
+                        + (synchronizations != null ? synchronizations.size() : 0));
             }
 
             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Fri Apr  3 10:16:28 2009
@@ -199,7 +199,7 @@
 
     protected void doAddRecoveredMessage(MessageReference message) throws Exception {
         synchronized(pending) {
-        pending.addRecoveredMessage(message);
+            pending.addRecoveredMessage(message);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Apr  3 10:16:28 2009
@@ -393,10 +393,8 @@
                 }
             }
         }
-        if (callDispatchMatched && destination != null) {
-//            if (destination.isLazyDispatch()) {
-                destination.wakeup();
-//            }
+        if (callDispatchMatched && destination != null) {    
+            destination.wakeup();
             dispatchPending();
         } else {
             if (isSlave()) {
@@ -661,7 +659,7 @@
         if (node.getRegionDestination() != null) {
             if (node != QueueMessageReference.NULL_MESSAGE) {
                 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
-                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
+                node.getRegionDestination().getDestinationStatistics().getInflight().increment();       
             }
         }
         if (LOG.isTraceEnabled()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Apr  3 10:16:28 2009
@@ -1258,6 +1258,13 @@
         dispatchLock.lock();
         try{
             int toPageIn = getMaxPageSize() + Math.max(0, (int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
+         
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(destination.getPhysicalName() + " toPageIn: "  + toPageIn + ", Inflight: "
+                        + destinationStatistics.getInflight().getCount()
+                        + ", pagedInMessages.size " + pagedInMessages.size());
+            }
+            
             toPageIn = Math.max(0, Math.min(toPageIn, getMaxPageSize()));
             if (isLazyDispatch()&& !force) {
                 // Only page in the minimum number of messages which can be dispatched immediately.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Fri Apr  3 10:16:28 2009
@@ -39,6 +39,8 @@
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Tracks the state of a connection so a newly established transport can be
@@ -47,6 +49,7 @@
  * @version $Revision$
  */
 public class ConnectionStateTracker extends CommandVisitorAdapter {
+    private static final Log LOG = LogFactory.getLog(ConnectionStateTracker.class);
 
     private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
 
@@ -135,8 +138,14 @@
     private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
         for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
             TransactionState transactionState = (TransactionState)iter.next();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("tx: " + transactionState.getId());
+            }
             for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
                 Command command = (Command)iterator.next();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("tx replay: " + command);
+                }
                 transport.oneway(command);
             }
         }
@@ -359,23 +368,6 @@
         return null;
     }
 
-    public Response processMessageAck(MessageAck ack) {
-        if (trackTransactions && ack != null && ack.getTransactionId() != null) {
-            ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId();
-            if (connectionId != null) {
-                ConnectionState cs = connectionStates.get(connectionId);
-                if (cs != null) {
-                    TransactionState transactionState = cs.getTransactionState(ack.getTransactionId());
-                    if (transactionState != null) {
-                        transactionState.addCommand(ack);
-                    }
-                }
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-        return null;
-    }
-
     public Response processBeginTransaction(TransactionInfo info) {
         if (trackTransactions && info != null && info.getTransactionId() != null) {
             ConnectionId connectionId = info.getConnectionId();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Fri Apr  3 10:16:28 2009
@@ -19,8 +19,10 @@
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.broker.ConnectionContext;
@@ -101,8 +103,8 @@
                     ref.setMessageId(messageId);
                     container.add(ref);
                 }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(destination.getPhysicalName() + " add reference: " + messageId);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(destination.getPhysicalName() + " add reference: " + messageId);
                 }
             }
         } finally {
@@ -173,11 +175,11 @@
                             ackContainer.update(entry,tsa);
                         }
                     }
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(destination.getPhysicalName() + " remove: " + messageId);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(destination.getPhysicalName() + " remove: " + messageId);
                     }
                 }else{
-                    if (ackContainer.isEmpty() || isUnreferencedBySubscribers(subscriberMessages, messageId)) {
+                    if (ackContainer.isEmpty() || subscriberMessages.size() == 1 || isUnreferencedBySubscribers(key, subscriberMessages, messageId)) {
                         // no message reference held        
                         removeMessage = true;
                         if (LOG.isDebugEnabled()) {
@@ -198,10 +200,11 @@
     //
     // see: https://issues.apache.org/activemq/browse/AMQ-2123
     private boolean isUnreferencedBySubscribers(
-            Map<String, TopicSubContainer> subscriberContainers, MessageId messageId) {
+            String key, Map<String, TopicSubContainer> subscriberContainers, MessageId messageId) {
         boolean isUnreferenced = true;
-        for (TopicSubContainer container: subscriberContainers.values()) {
-            if (!container.isEmpty()) {
+        for (Entry<String, TopicSubContainer> entry : subscriberContainers.entrySet()) {
+            if (!key.equals(entry.getKey()) && !entry.getValue().isEmpty()) {
+                TopicSubContainer container = entry.getValue();
                 for (Iterator i = container.iterator(); i.hasNext();) {
                     ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
                     if (messageId.equals(ref.getMessageId())) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java Fri Apr  3 10:16:28 2009
@@ -45,6 +45,11 @@
     }
 
     public void commit(boolean onePhase) throws XAException, IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("commit: "  + xid
+                    + " syncCount: " + size());
+        }
+        
         // Get ready for commit.
         try {
             prePrepare();
@@ -79,6 +84,10 @@
 
     public void rollback() throws XAException, IOException {
 
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("rollback: "  + xid
+                    + " syncCount: " + size());
+        }
         setState(Transaction.FINISHED_STATE);
         context.getTransactions().remove(xid);
         transactionStore.rollback(getTransactionId());

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java Fri Apr  3 10:16:28 2009
@@ -29,7 +29,6 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.ActiveMQDestination;
 
 
 public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java Fri Apr  3 10:16:28 2009
@@ -125,8 +125,10 @@
                                 sender.send(response);
                             }
                         } catch (JMSException e) {
-                            errorLatch.countDown();
-                            fail("Unexpected exception:" + e);
+                            if (working) {
+                                errorLatch.countDown();
+                                fail("Unexpected exception:" + e);
+                            }
                         }
                     }
                 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=761597&r1=761596&r2=761597&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Fri Apr  3 10:16:28 2009
@@ -39,6 +39,8 @@
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.util.LoggingBrokerPlugin;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
@@ -55,20 +57,26 @@
 
     private static final Log LOG = LogFactory.getLog(AMQ2149Test.class);
 
-    private static final long BROKER_STOP_PERIOD = 20 * 1000;
-
     private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
-    private static final String BROKER_URL = "failover:("+ BROKER_CONNECTOR
+    private static final String DEFAULT_BROKER_URL = "failover:("+ BROKER_CONNECTOR
         +")?maxReconnectDelay=1000&useExponentialBackOff=false";
         
     private final String SEQ_NUM_PROPERTY = "seqNum";
 
     final int MESSAGE_LENGTH_BYTES = 75 * 1024;
-    final int MAX_TO_SEND  = 1500;
     final long SLEEP_BETWEEN_SEND_MS = 3;
     final int NUM_SENDERS_AND_RECEIVERS = 10;
     final Object brokerLock = new Object();
-         
+    
+    private static final long DEFAULT_BROKER_STOP_PERIOD = 20 * 1000;
+    private static final long DEFAULT_NUM_TO_SEND = 1500;
+    
+    long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
+    long numtoSend = DEFAULT_NUM_TO_SEND;
+    String brokerURL = DEFAULT_BROKER_URL;
+    
+    int numBrokerRestarts = 0;
+    final static int MAX_BROKER_RESTARTS = 4;
     BrokerService broker;
     Vector<Throwable> exceptions = new Vector<Throwable>();
 
@@ -100,12 +108,17 @@
     
     public void setUp() throws Exception {
         dataDirFile = new File("target/"+ getName());
+        numtoSend = DEFAULT_NUM_TO_SEND;
+        brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
+        brokerURL = DEFAULT_BROKER_URL;
     }
     
     public void tearDown() throws Exception {
         synchronized(brokerLock) {
-            broker.stop();
-            broker.waitUntilStopped();
+            if (broker!= null) {
+                broker.stop();
+                broker.waitUntilStopped();
+            }
         }
         exceptions.clear();
     }
@@ -130,15 +143,18 @@
         private final MessageConsumer messageConsumer;
 
         private volatile long nextExpectedSeqNum = 0;
-        
+                
+        private final boolean transactional;
+
         private String lastId = null;
 
-        public Receiver(javax.jms.Destination dest) throws JMSException {
+        public Receiver(javax.jms.Destination dest, boolean transactional) throws JMSException {
             this.dest = dest;
-            connection = new ActiveMQConnectionFactory(BROKER_URL)
+            this.transactional = transactional;
+            connection = new ActiveMQConnectionFactory(brokerURL)
                     .createConnection();
             connection.setClientID(dest.toString());
-            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
             if (ActiveMQDestination.transform(dest).isTopic()) {
                 messageConsumer = session.createDurableSubscriber((Topic) dest, dest.toString());
             } else {
@@ -161,6 +177,11 @@
                 final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
                 if ((seqNum % 500) == 0) {
                     LOG.info(dest + " received " + seqNum);
+                    
+                    if (transactional) {
+                        LOG.info("committing..");
+                        session.commit();
+                    }
                 }
                 if (seqNum != nextExpectedSeqNum) {
                     LOG.warn(dest + " received " + seqNum
@@ -196,7 +217,7 @@
 
         public Sender(javax.jms.Destination dest) throws JMSException {
             this.dest = dest;
-            connection = new ActiveMQConnectionFactory(BROKER_URL)
+            connection = new ActiveMQConnectionFactory(brokerURL)
                     .createConnection();
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             messageProducer = session.createProducer(dest);
@@ -206,7 +227,7 @@
 
         public void run() {
             final String longString = buildLongString();
-            while (nextSequenceNumber < MAX_TO_SEND) {
+            while (nextSequenceNumber < numtoSend) {
                 try {
                     final Message message = session
                             .createTextMessage(longString);
@@ -214,6 +235,11 @@
                             nextSequenceNumber);
                     ++nextSequenceNumber;
                     messageProducer.send(message);
+                    
+                    if ((nextSequenceNumber % 500) == 0) {
+                        LOG.info(dest + " sent " + nextSequenceNumber);
+                    }
+                        
                 } catch (Exception e) {
                     LOG.error(dest + " send error", e);
                     exceptions.add(e);
@@ -296,8 +322,7 @@
         
         verifyStats(true);
     }
-    
-    
+        
     public void testTopicOrderWithRestart() throws Exception {
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
@@ -317,6 +342,54 @@
         verifyStats(true);
     }
 
+    public void testQueueTransactionalOrderWithRestart() throws Exception {
+        doTestTransactionalOrderWithRestart(ActiveMQDestination.QUEUE_TYPE);
+    }
+    
+    public void testTopicTransactionalOrderWithRestart() throws Exception {
+        doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE);
+    }
+    
+    public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
+        
+        // with transactions there may be lots of re deliveries, in the case
+        // or a commit every 500 messages there could be up to 500 re deliveries
+        // In order to ensure these are acked and don't block new message receipt,
+        // the prefetch should be less than double the commit window.
+        // In addition there needs to be sufficient memory to available to dispatch
+        // transaction size + redeliveries - so 2*transaction size
+        brokerURL = DEFAULT_BROKER_URL + "&jms.prefetchPolicy.all=240";
+        numtoSend = 15000;
+        brokerStopPeriod = 30 * 1000;
+            
+        final PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMaxPageSize(500);
+        policyMap.setDefaultEntry(policy);
+    
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                broker.deleteAllMessages();
+                broker.setDestinationPolicy(policyMap);
+            }
+        });
+        
+        final Timer timer = new Timer();
+        schedualRestartTask(timer, new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                broker.setDestinationPolicy(policyMap);
+            }
+        });
+        
+        try {
+            verifyOrderedMessageReceipt(destinationType, 1, true);
+        } finally {
+            timer.cancel();
+        }
+        
+        verifyStats(true);
+    }
+
 
     // no need to run this unless there are issues with the other restart tests
   
@@ -356,8 +429,11 @@
         for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
             DestinationStatistics stats = dest.getDestinationStatistics();
             if (brokerRestarts) {
-                assertTrue("qneue/dequeue match for: " + dest.getName(),
-                        stats.getEnqueues().getCount() <= stats.getDequeues().getCount());
+                // all bets are off w.r.t stats as there may be duplicate sends and duplicate
+                // dispatches, all of which will be suppressed - either by the reference store
+                // not allowing duplicate references or consumers acking duplicates
+                LOG.info("with restart: not asserting qneue/dequeue stat match for: " + dest.getName()
+                        + " " + stats.getEnqueues().getCount() + " <= " +stats.getDequeues().getCount());
             } else {
                 assertEquals("qneue/dequeue match for: " + dest.getName(),
                         stats.getEnqueues().getCount(), stats.getDequeues().getCount());   
@@ -386,29 +462,37 @@
                         exceptions.add(e);
                     }
                 }
-                // do it again
-                try {
-                    timer.schedule(new RestartTask(), BROKER_STOP_PERIOD);
-                } catch (IllegalStateException ignore_alreadyCancelled) {   
+                if (++numBrokerRestarts < MAX_BROKER_RESTARTS) {
+                    // do it again
+                    try {
+                        timer.schedule(new RestartTask(), brokerStopPeriod);
+                    } catch (IllegalStateException ignore_alreadyCancelled) {   
+                    }
+                } else {
+                    LOG.info("no longer stopping broker on reaching Max restarts: " + MAX_BROKER_RESTARTS);
                 }
             } 
         }
-        timer.schedule(new RestartTask(), BROKER_STOP_PERIOD);
+        timer.schedule(new RestartTask(), brokerStopPeriod);
+    }
+    
+    private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
+        verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false);
     }
     
     private void verifyOrderedMessageReceipt() throws Exception {
-        verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE);
+        verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false);
     }
     
-    private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
-        
+    private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPairs, boolean transactional) throws Exception {
+                
         Vector<Thread> threads = new Vector<Thread>();
         Vector<Receiver> receivers = new Vector<Receiver>();
         
-        for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) {
+        for (int i = 0; i < concurrentPairs; ++i) {
             final javax.jms.Destination destination =
                     ActiveMQDestination.createDestination("test.dest." + i, destinationType);
-            receivers.add(new Receiver(destination));
+            receivers.add(new Receiver(destination, transactional));
             Thread thread = new Thread(new Sender(destination));
             thread.start();
             threads.add(thread);
@@ -426,7 +510,7 @@
         
         while(!receivers.isEmpty() && System.currentTimeMillis() < expiry) {
             Receiver receiver = receivers.firstElement();
-            if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND || !exceptions.isEmpty()) {
+            if (receiver.getNextExpectedSeqNo() >= numtoSend || !exceptions.isEmpty()) {
                 receiver.close();
                 receivers.remove(receiver);
             }