You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/18 14:19:28 UTC

svn commit: r900390 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/policy/ test/java/org/apache/activemq/transport/failover/

Author: gtully
Date: Mon Jan 18 13:19:27 2010
New Revision: 900390

URL: http://svn.apache.org/viewvc?rev=900390&view=rev
Log:
merge -c 898797 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2566, inflight out of sync after delivery to dlq. Also tidy failover transaction test from: https://issues.apache.org/activemq/browse/AMQ-2560

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java
      - copied unchanged from r898797, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=900390&r1=900389&r2=900390&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jan 18 13:19:27 2010
@@ -199,7 +199,7 @@
             }
         }
         if (LOG.isTraceEnabled()) {
-            LOG.info("ack:" + ack);
+            LOG.trace("ack:" + ack);
         }
         synchronized(dispatchLock) {
             if (ack.isStandardAck()) {
@@ -241,7 +241,11 @@
 
                                         public void afterRollback() throws Exception {
                                             synchronized(dispatchLock) {
-                                                node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                                                if (isSlave()) {
+                                                    node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                                                } else {
+                                                    // poisionAck will decrement - otherwise still inflight on client
+                                                }
                                             }
                                         }
                                     });
@@ -362,8 +366,7 @@
                     if (inAckRange) {
                         sendToDLQ(context, node);
                         node.getRegionDestination().getDestinationStatistics()
-                                .getInflight().increment();
-
+                                .getInflight().decrement();
                         removeList.add(node);
                         dequeueCounter++;
                         index++;

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java?rev=900390&r1=900389&r2=900390&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java Mon Jan 18 13:19:27 2010
@@ -57,6 +57,7 @@
             assertMessage(msg, i);
             assertNotNull("Should be a DLQ message for loop: " + i, msg);
         }
+        session.commit();
     }
 
     protected void consumeAndRollback(int messageCounter) throws Exception {

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java?rev=900390&r1=900389&r2=900390&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java Mon Jan 18 13:19:27 2010
@@ -31,6 +31,8 @@
 
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -169,13 +171,15 @@
         deliveryMode = DeliveryMode.NON_PERSISTENT;
         durableSubscriber = false;
         doTest();
+        validateConsumerPrefetch(this.getDestinationString(), 0);        
     }
-
+        
     public void testDurableQueueMessage() throws Exception {
         super.topic = false;
         deliveryMode = DeliveryMode.PERSISTENT;
         durableSubscriber = false;
         doTest();
+        validateConsumerPrefetch(this.getDestinationString(), 0);
     }
 
     public Destination getDestination() {
@@ -184,4 +188,16 @@
         }
         return destination;
     }
+    
+    private void validateConsumerPrefetch(String destination, long expectedCount) {
+        RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
+            if (dest.getName().equals(destination)) {
+                DestinationStatistics stats = dest.getDestinationStatistics();
+                LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount());
+                assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches", 
+                        expectedCount, stats.getInflight().getCount());      
+            }
+        }
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=900390&r1=900389&r2=900390&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Mon Jan 18 13:19:27 2010
@@ -467,8 +467,8 @@
                     TimeUnit.SECONDS.sleep(7);
                     
                     // should not get a second message as there are two messages and two consumers
-                    // but with failover and unordered connection reinit it can get the second
-                    // message which will have a problem for the ack
+                    // but with failover and unordered connection restore it can get the second
+                    // message which could create a problem for a pending ack
                     msg = consumer1.receive(5000);
                     LOG.info("consumer1 second attempt got message: " + msg);
                     if (msg != null) {
@@ -503,10 +503,12 @@
         assertNull("should be nothing left for consumer1", msg);
         consumerSession1.commit();
         
-        // consumer2 should get other message
+        // consumer2 should get other message provided consumer1 did not get 2
         msg = consumer2.receive(5000);
         LOG.info("post: from consumer2 received: " + msg);
-        assertNotNull("got message on consumer2", msg);
+        if (receivedMessages.size() == 1) {
+            assertNotNull("got second message on consumer2", msg);
+        }
         consumerSession2.commit();
         
         for (Connection c: connections) {
@@ -532,7 +534,7 @@
         if (msg == null) {
             msg = sweeper.receive(5000);
         }
-        LOG.info("Received: " + msg);
+        LOG.info("Sweep received: " + msg);
         assertNull("no messges left dangling but got: " + msg, msg);
         connection.close();
     }