You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/11/07 17:38:59 UTC

svn commit: r712189 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/ test/java/org/apache/activemq/transport/discovery/

Author: gtully
Date: Fri Nov  7 08:38:51 2008
New Revision: 712189

URL: http://svn.apache.org/viewvc?rev=712189&view=rev
Log:
reapplying patch from AMQ-1957 with small ammendment for transactional behaviour, reverting changes that fixed broken tests as tests now pass

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=712189&r1=712188&r2=712189&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Nov  7 08:38:51 2008
@@ -200,13 +200,13 @@
                     }
                     if (inAckRange) {
                         // Don't remove the nodes until we are committed.
-                        removeList.add(node);
                         if (!context.isInTransaction()) {
                             dequeueCounter++;
                             if (!this.getConsumerInfo().isBrowser()) {
                                 node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                             }
                             node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                            removeList.add(node);
                         } else {
                             // setup a Synchronization to remove nodes from the
                             // dispatched list.
@@ -217,6 +217,7 @@
                                                 throws Exception {
                                             synchronized(dispatchLock) {
                                                 dequeueCounter++;
+                                                dispatched.remove(node);
                                                 node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                                                 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                                                 prefetchExtension--;
@@ -224,9 +225,7 @@
                                         }
 
                                         public void afterRollback() throws Exception {
-                                        	// Need to put it back in the front.
                                             synchronized(dispatchLock) {
-                                        	    dispatched.add(0, node);
                                             	// ActiveMQ workaround for AMQ-1730 - Please Ignore next line
                                                 node.incrementRedeliveryCounter();
                                                 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
@@ -307,11 +306,9 @@
                 }
             } else if (ack.isRedeliveredAck()) {
                 // Message was re-delivered but it was not yet considered to be
-                // a
-                // DLQ message.
+                // a DLQ message.
                 // Acknowledge all dispatched messages up till the message id of
-                // the
-                // acknowledgment.
+                // the ack.
                 boolean inAckRange = false;
                 for (final MessageReference node : dispatched) {
                     MessageId messageId = node.getMessageId();
@@ -396,9 +393,8 @@
                                 + ack + ") was not in the dispatch list: "
                                 + dispatched);
             } else {
-                LOG
-                        .debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
-                                + ack);
+                LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
+                        + ack);
             }
         }
     }
@@ -442,7 +438,7 @@
             throw new JMSException("Unmatched acknowledege: " + ack
                     + "; Could not find Message-ID " + lastAckedMsg
                     + " in dispatched-list (end of ack)");
-        if (ack.getMessageCount() != checkCount && ack.isStandardAck()) {
+        if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
             throw new JMSException("Unmatched acknowledege: " + ack
                     + "; Expected message count (" + ack.getMessageCount()
                     + ") differs from count in dispatched-list (" + checkCount
@@ -571,7 +567,7 @@
                                 break;
                             }
                             
-                            // Synchronize between dispatched list and remove of messageg from pending list
+                            // Synchronize between dispatched list and remove of message from pending list
                             // related to remove subscription action
                             synchronized(dispatchLock) {
                                 pending.remove();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=712189&r1=712188&r2=712189&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Fri Nov  7 08:38:51 2008
@@ -639,10 +639,14 @@
         Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
         MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
 
-        // no commit so will auto rollback and get redispatched to redisptachConsumer
+        // no commit so will auto rollback and get re-dispatched to redisptachConsumer
         session.close();
                 
-        assertNotNull(redispatchConsumer.receive(1000));
+        Message msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        // should have re-delivery of 2, one for re-dispatch, one for rollback which is a little too much!
+        assertEquals(3, msg.getLongProperty("JMSXDeliveryCount"));
         redispatchSession.commit();
         
         assertNull(redispatchConsumer.receive(500));

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java?rev=712189&r1=712188&r2=712189&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java Fri Nov  7 08:38:51 2008
@@ -35,8 +35,8 @@
             Connection connection = factory.createConnection();
             connection.setClientID("test");
             fail("Did not fail to connect as expected.");
-        } catch ( JMSException expected ) {  
-            assertTrue("reason is  java.net.UnknownHostException", expected.getCause() instanceof  java.net.UnknownHostException);
+        } catch ( JMSException expected ) { 
+            assertTrue("reason is java.io.IOException, was: " + expected.getCause(), expected.getCause() instanceof java.io.IOException);
         }
     }
 }