You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by tm...@apache.org on 2012/08/10 16:44:37 UTC
svn commit: r1371722 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ActiveMQMessageConsumer.java
test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
Author: tmielke
Date: Fri Aug 10 14:44:36 2012
New Revision: 1371722
URL: http://svn.apache.org/viewvc?rev=1371722&view=rev
Log:
AMQ-3965: Fix for org.apache.activemq.ActiveMQMessageConsumer.java plus new JUnit test
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1371722&r1=1371721&r2=1371722&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Aug 10 14:44:36 2012
@@ -892,7 +892,10 @@ public class ActiveMQMessageConsumer imp
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
- if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
+
+ // AMQ-3956 evaluate both expired and normal msgs as
+ // otherwise consumer may get stalled
+ if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
@@ -900,6 +903,17 @@ public class ActiveMQMessageConsumer imp
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
+ // AMQ-3956 - as further optimization send
+ // ack for expired msgs when there are any.
+ // This resets the deliveredCounter to 0 so that
+ // we won't sent standard acks with every msg just
+ // because the deliveredCounter just below
+ // 0.5 * prefetch as used in ackLater()
+ if (pendingAck != null && deliveredCounter > 0) {
+ session.sendAck(pendingAck);
+ pendingAck = null;
+ deliveredCounter = 0;
+ }
}
} else {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -979,8 +993,9 @@ public class ActiveMQMessageConsumer imp
}
}
}
-
- if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
+ // AMQ-3956 evaluate both expired and normal msgs as
+ // otherwise consumer may get stalled
+ if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
session.sendAck(pendingAck);
pendingAck=null;
deliveredCounter = 0;
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java?rev=1371722&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java Fri Aug 10 14:44:36 2012
@@ -0,0 +1,231 @@
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.Wait;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for AMQ-3965.
+ * A consumer may be stalled in case it uses optimizeAcknowledge and receives
+ * a number of messages that expire before being dispatched to application code.
+ * See AMQ-3965 for more details.
+ */
+public class OptimizeAcknowledgeWithExpiredMsgsTest {
+
+ private final static Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class);
+
+ private static BrokerService broker = null;
+ protected static final String DATA_DIR = "target/activemq-data/";
+ public final String brokerUrl = "tcp://localhost:61614";
+
+
+ /**
+ * Creates a broker instance and starts it.
+ *
+ * @param brokerUri - transport uri of broker
+ * @param brokerName - name for the broker
+ * @return a BrokerService instance with transport uri and broker name set
+ * @throws Exception
+ */
+ protected BrokerService createBroker(URI brokerUri, String brokerName) throws Exception {
+ BrokerService broker = BrokerFactory.createBroker(brokerUri);
+ broker.setBrokerName(brokerName);
+ broker.setBrokerId(brokerName);
+ broker.setDataDirectory(DATA_DIR);
+ broker.setEnableStatistics(true);
+ broker.setUseJmx(false);
+ return broker;
+ }
+
+
+ @Before
+ public void setUp() throws Exception {
+ final String options = "?persistent=false&useJmx=false&deleteAllMessagesOnStartup=true";
+
+ broker = createBroker(new URI("broker:(" + brokerUrl + ")" + options), "localhost");
+ broker.start();
+ broker.waitUntilStarted();
+
+ }
+
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null)
+ broker.stop();
+ }
+
+
+ /**
+ * Tests for AMQ-3965
+ * Creates connection into broker using optimzeAcknowledge and prefetch=100
+ * Creates producer and consumer. Producer sends 45 msgs that will expire
+ * at consumer (but before being dispatched to app code).
+ * Producer then sends 60 msgs without expiry.
+ *
+ * Consumer receives msgs using a MessageListener and increments a counter.
+ * Main thread sleeps for 5 seconds and checks the counter value.
+ * If counter != 60 msgs (the number of msgs that should get dispatched
+ * to consumer) the test fails.
+ */
+ @Test
+ public void testOptimizedAckWithExpiredMsgs() throws Exception
+ {
+
+ ActiveMQConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory(brokerUrl + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100");
+
+ // Create JMS resources
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue("TEST.FOO");
+
+ // ***** Consumer code *****
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ MyMessageListener listener = new MyMessageListener();
+ connection.setExceptionListener((ExceptionListener) listener);
+
+ // ***** Producer Code *****
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
+ TextMessage message;
+
+ consumer.setMessageListener(listener);
+ listener.setDelay(100);
+
+ // Produce msgs that will expire quickly
+ for (int i=0; i<45; i++) {
+ message = session.createTextMessage(text);
+ producer.send(message,1,1,30);
+ LOG.trace("Sent message: "+ message.getJMSMessageID() +
+ " with expiry 30 msec");
+ }
+ // Produce msgs that don't expire
+ for (int i=0; i<60; i++) {
+ message = session.createTextMessage(text);
+ producer.send(message);
+ LOG.trace("Sent message: "+ message.getJMSMessageID() +
+ " with no expiry.");
+ }
+ listener.setDelay(0);
+
+ // set exit condition
+ TestExitCondition cond = new TestExitCondition(listener);
+ Wait.waitFor(cond, 5000);
+
+ Assert.assertTrue("Error: Some non-expired messages were not received.", listener.getCounter() >= 60);
+
+ LOG.info("Received all expected messages with counter at " + listener.getCounter());
+
+ // Cleanup
+ LOG.info("Cleaning up.");
+ producer.close();
+ consumer.close();
+ session.close();
+ connection.close();
+ listener = null;
+ }
+
+
+ private void sleep(int milliSecondTime) {
+ try {
+ Thread.sleep(milliSecondTime);
+ } catch (InterruptedException igonred) {
+ }
+ }
+
+
+ /**
+ * Defines the exit condition for the test.
+ */
+ private class TestExitCondition implements Wait.Condition {
+
+ private MyMessageListener listener;
+
+ public TestExitCondition(MyMessageListener l) {
+ this.listener = l;
+ }
+
+ public boolean isSatisified() throws Exception {
+ return listener.getCounter() == 36;
+ }
+
+ }
+
+
+ /**
+ * Standard JMS MessageListener
+ */
+ private class MyMessageListener implements MessageListener, ExceptionListener {
+
+ private AtomicInteger counter = new AtomicInteger(0);
+ private int delay = 0;
+
+ public void onMessage(final Message message) {
+ try {
+ LOG.trace("Got Message " + message.getJMSMessageID());
+ LOG.debug("counter at " + counter.incrementAndGet());
+ if (delay>0) {
+ sleep(delay);
+ }
+ } catch (final Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public int getCounter() {
+ return counter.get();
+ }
+
+ public int getDelay() {
+ return delay;
+ }
+
+ public void setDelay(int newDelay) {
+ this.delay = newDelay;
+ }
+
+ public synchronized void onException(JMSException ex) {
+ LOG.error("JMS Exception occured. Shutting down client.");
+ }
+ }
+}
+
\ No newline at end of file