You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/05/21 21:19:57 UTC
svn commit: r777225 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ActiveMQMessageConsumer.java
test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Author: dejanb
Date: Thu May 21 19:19:57 2009
New Revision: 777225
URL: http://svn.apache.org/viewvc?rev=777225&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2262 - inflight message count
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=777225&r1=777224&r2=777225&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu May 21 19:19:57 2009
@@ -779,7 +779,8 @@
return;
}
if (messageExpired) {
- ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ // do nothing since STANDARD_ACK will be sent
+ return;
} else {
stats.onMessage();
if (session.getTransacted()) {
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=777225&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Thu May 21 19:19:57 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+
+
+public class ExpiredMessagesTest extends CombinationTestSupport {
+
+ BrokerService broker;
+ Connection connection;
+ Session session;
+ MessageProducer producer;
+ MessageConsumer consumer;
+ public ActiveMQDestination destination;
+
+ public static Test suite() {
+ return suite(ExpiredMessagesTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ protected void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setBrokerName("localhost");
+ broker.setDataDirectory("data/");
+ broker.setUseJmx(true);
+ broker.deleteAllMessages();
+ broker.addConnector("tcp://localhost:61616");
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ public void initCombosForTestExpiredMessages() {
+ addCombinationValues("destination", new Object[] {new ActiveMQQueue("test"), new ActiveMQTopic("test")});
+ }
+
+ public void testExpiredMessages() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ connection = factory.createConnection();
+ session = connection.createSession(false, session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(destination);
+ producer.setTimeToLive(100);
+ consumer = session.createConsumer(destination);
+ connection.start();
+
+ Thread consumerThread = new Thread("Consumer Thread") {
+ public void run() {
+ long start = System.currentTimeMillis();
+ try {
+ long end = System.currentTimeMillis();
+ while (end - start < 3000) {
+ consumer.receive(1000);
+ Thread.sleep(100);
+ end = System.currentTimeMillis();
+ }
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+
+ consumerThread.start();
+
+
+ Thread producingThread = new Thread("Producing Thread") {
+ public void run() {
+ try {
+ int i = 0;
+ while (i++ < 30000) {
+ producer.send(session.createTextMessage("test"));
+ }
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+
+ producingThread.start();
+
+ consumerThread.join();
+ producingThread.join();
+
+ DestinationViewMBean view = createView(destination);
+
+ assertTrue("Wrong inFlightCount: " + view.getInFlightCount(), (view.getDispatchCount() - view.getDequeueCount()) - view.getInFlightCount() < 5);
+ }
+
+ protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+ MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
+ String domain = "org.apache.activemq";
+ ObjectName name;
+ if (destination.isQueue()) {
+ name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
+ } else {
+ name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
+ }
+ return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
+ }
+
+ protected void tearDown() throws Exception {
+ connection.stop();
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+
+
+
+}