You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/05/22 12:46:20 UTC

svn commit: r777463 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/management/ test/java/org/apache/activemq/

Author: gtully
Date: Fri May 22 10:46:20 2009
New Revision: 777463

URL: http://svn.apache.org/viewvc?rev=777463&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2265, revert https://issues.apache.org/activemq/browse/AMQ-2262, inflight count stil needs work

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=777463&r1=777462&r2=777463&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri May 22 10:46:20 2009
@@ -779,8 +779,11 @@
             return;
         }
         if (messageExpired) {
-        	// do nothing since STANDARD_ACK will be sent
-            return;
+            synchronized (deliveredMessages) {
+                deliveredMessages.remove(md);
+            }
+            stats.getExpiredMessageCount().increment();
+            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
         } else {
             stats.onMessage();
             if (session.getTransacted()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=777463&r1=777462&r2=777463&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri May 22 10:46:20 2009
@@ -217,9 +217,13 @@
         } else if (ack.isDeliveredAck()) {
             // Message was delivered but not acknowledged: update pre-fetch
             // counters.
-            dequeueCounter.addAndGet(ack.getMessageCount());
-            if (destination != null) {
-                destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
+            if (ack.isInTransaction()) {
+                if (destination != null) {
+                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
+                }
+            } else {
+                // expired message - expired message in a transacion
+                dequeueCounter.addAndGet(ack.getMessageCount());
             }
             dispatchMatched();
             return;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java?rev=777463&r1=777462&r2=777463&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java Fri May 22 10:46:20 2009
@@ -136,6 +136,16 @@
         }
     }
 
+    @Override
+    public void setEnabled(boolean enabled) {
+        super.setEnabled(enabled);
+        messageCount.setEnabled(enabled);
+        messageRateTime.setEnabled(enabled);
+        pendingMessageCount.setEnabled(enabled);
+        expiredMessageCount.setEnabled(enabled);
+        messageWaitTime.setEnabled(enabled);
+    }
+
     public void dump(IndentPrinter out) {
         out.printIndent();
         out.println(messageCount);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=777463&r1=777462&r2=777463&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Fri May 22 10:46:20 2009
@@ -19,6 +19,7 @@
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -27,6 +28,7 @@
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -35,8 +37,13 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
 
 import junit.framework.Test;
+
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.commons.logging.Log;
@@ -849,5 +856,70 @@
         assertNull(redispatchConsumer.receive(500));
         redispatchSession.close();
     }
+    
+    
+    public void initCombosForTestAckOfExpired() {
+        addCombinationValues("destinationType", 
+                new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+    }
+        
+    public void testAckOfExpired() throws Exception {
+        ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
+        connection = fact.createActiveMQConnection();
+        
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
+        destination = (ActiveMQDestination) (destinationType == ActiveMQDestination.QUEUE_TYPE ?
+                session.createQueue("test") : session.createTopic("test"));
+                    
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.setStatsEnabled(true);
+                
+        Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
+            MessageProducer producer = sendSession.createProducer(destination);
+        producer.setTimeToLive(1000);
+        final int count = 4;
+        for (int i = 0; i < count; i++) {
+            TextMessage message = sendSession.createTextMessage("" + i);
+            producer.send(message);
+        }
+        
+        // let first bunch in queue expire
+        Thread.sleep(1000);
+        
+        producer.setTimeToLive(0);
+        for (int i = 0; i < count; i++) {
+            TextMessage message = sendSession.createTextMessage("no expiry" + i);
+            producer.send(message);
+        }
+        
+        ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
+        
+        for(int i=0; i<count; i++) {
+            TextMessage msg = (TextMessage) amqConsumer.receive();
+            assertNotNull(msg);
+            assertTrue(msg.getText().contains("no expiry"));
+            
+            // force an ack when there are expired messages
+            amqConsumer.acknowledge();         
+        }
+        assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
+    
+        DestinationViewMBean view = createView(destination);
+        
+        assertTrue("Wrong inFlightCount: " + view.getInFlightCount(), (view.getDispatchCount() - view.getDequeueCount()) - view.getInFlightCount() < 5);
+    }
+    
+    protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+         MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
+         String domain = "org.apache.activemq";
+         ObjectName name;
+        if (destination.isQueue()) {
+            name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
+        } else {
+            name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
+        }
+        return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
+    }
 
 }