You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/05/22 12:46:20 UTC
svn commit: r777463 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/management/ test/java/org/apache/activemq/
Author: gtully
Date: Fri May 22 10:46:20 2009
New Revision: 777463
URL: http://svn.apache.org/viewvc?rev=777463&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2265, revert https://issues.apache.org/activemq/browse/AMQ-2262, inflight count stil needs work
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=777463&r1=777462&r2=777463&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri May 22 10:46:20 2009
@@ -779,8 +779,11 @@
return;
}
if (messageExpired) {
- // do nothing since STANDARD_ACK will be sent
- return;
+ synchronized (deliveredMessages) {
+ deliveredMessages.remove(md);
+ }
+ stats.getExpiredMessageCount().increment();
+ ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else {
stats.onMessage();
if (session.getTransacted()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=777463&r1=777462&r2=777463&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri May 22 10:46:20 2009
@@ -217,9 +217,13 @@
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
- dequeueCounter.addAndGet(ack.getMessageCount());
- if (destination != null) {
- destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
+ if (ack.isInTransaction()) {
+ if (destination != null) {
+ destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
+ }
+ } else {
+ // expired message - expired message in a transacion
+ dequeueCounter.addAndGet(ack.getMessageCount());
}
dispatchMatched();
return;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java?rev=777463&r1=777462&r2=777463&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java Fri May 22 10:46:20 2009
@@ -136,6 +136,16 @@
}
}
+ @Override
+ public void setEnabled(boolean enabled) {
+ super.setEnabled(enabled);
+ messageCount.setEnabled(enabled);
+ messageRateTime.setEnabled(enabled);
+ pendingMessageCount.setEnabled(enabled);
+ expiredMessageCount.setEnabled(enabled);
+ messageWaitTime.setEnabled(enabled);
+ }
+
public void dump(IndentPrinter out) {
out.printIndent();
out.println(messageCount);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=777463&r1=777462&r2=777463&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Fri May 22 10:46:20 2009
@@ -19,6 +19,7 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.HashMap;
import java.util.Map;
+import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -27,6 +28,7 @@
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -35,8 +37,13 @@
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
import junit.framework.Test;
+
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
@@ -849,5 +856,70 @@
assertNull(redispatchConsumer.receive(500));
redispatchSession.close();
}
+
+
+ public void initCombosForTestAckOfExpired() {
+ addCombinationValues("destinationType",
+ new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testAckOfExpired() throws Exception {
+ ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
+ connection = fact.createActiveMQConnection();
+
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = (ActiveMQDestination) (destinationType == ActiveMQDestination.QUEUE_TYPE ?
+ session.createQueue("test") : session.createTopic("test"));
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ connection.setStatsEnabled(true);
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(destination);
+ producer.setTimeToLive(1000);
+ final int count = 4;
+ for (int i = 0; i < count; i++) {
+ TextMessage message = sendSession.createTextMessage("" + i);
+ producer.send(message);
+ }
+
+ // let first bunch in queue expire
+ Thread.sleep(1000);
+
+ producer.setTimeToLive(0);
+ for (int i = 0; i < count; i++) {
+ TextMessage message = sendSession.createTextMessage("no expiry" + i);
+ producer.send(message);
+ }
+
+ ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
+
+ for(int i=0; i<count; i++) {
+ TextMessage msg = (TextMessage) amqConsumer.receive();
+ assertNotNull(msg);
+ assertTrue(msg.getText().contains("no expiry"));
+
+ // force an ack when there are expired messages
+ amqConsumer.acknowledge();
+ }
+ assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
+
+ DestinationViewMBean view = createView(destination);
+
+ assertTrue("Wrong inFlightCount: " + view.getInFlightCount(), (view.getDispatchCount() - view.getDequeueCount()) - view.getInFlightCount() < 5);
+ }
+
+ protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+ MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
+ String domain = "org.apache.activemq";
+ ObjectName name;
+ if (destination.isQueue()) {
+ name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
+ } else {
+ name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
+ }
+ return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
+ }
}