You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/28 19:58:06 UTC
svn commit: r1403073 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/TopicSubscription.java
test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
Author: tabish
Date: Sun Oct 28 18:58:06 2012
New Revision: 1403073
URL: http://svn.apache.org/viewvc?rev=1403073&view=rev
Log:
fix and tests for: https://issues.apache.org/jira/browse/AMQ-3746
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1403073&r1=1403072&r2=1403073&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Sun Oct 28 18:58:06 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
import java.io.IOException;
import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
@@ -38,6 +39,7 @@ import org.apache.activemq.command.Messa
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
+import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
@@ -54,6 +56,7 @@ public class TopicSubscription extends A
boolean singleDestination = true;
Destination destination;
+ private final Scheduler scheduler;
private int maximumPendingMessages = -1;
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
@@ -61,6 +64,7 @@ public class TopicSubscription extends A
private final Object matchedListMutex = new Object();
private final AtomicLong enqueueCounter = new AtomicLong(0);
private final AtomicLong dequeueCounter = new AtomicLong(0);
+ private final AtomicBoolean prefetchWindowOpen = new AtomicBoolean(false);
private int memoryUsageHighWaterMark = 95;
// allow duplicate suppression in a ring network of brokers
protected int maxProducersToAudit = 1024;
@@ -78,6 +82,8 @@ public class TopicSubscription extends A
} else {
this.matched = new FilePendingMessageCursor(broker,matchedName,false);
}
+
+ this.scheduler = broker.getScheduler();
}
public void init() throws Exception {
@@ -95,7 +101,7 @@ public class TopicSubscription extends A
return;
}
enqueueCounter.incrementAndGet();
- if (!isFull() && matched.isEmpty() && !isSlave()) {
+ if (!isFull() && matched.isEmpty() && !isSlave()) {
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
dispatch(node);
@@ -291,10 +297,52 @@ public class TopicSubscription extends A
}
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
- // not supported for topics
+
+ // The slave should not deliver pull messages.
+ if (getPrefetchSize() == 0 && !isSlave()) {
+
+ prefetchWindowOpen.set(true);
+ dispatchMatched();
+
+ // If there was nothing dispatched.. we may need to setup a timeout.
+ if (prefetchWindowOpen.get()) {
+
+ // immediate timeout used by receiveNoWait()
+ if (pull.getTimeout() == -1) {
+ prefetchWindowOpen.set(false);
+ // Send a NULL message to signal nothing pending.
+ dispatch(null);
+ }
+
+ if (pull.getTimeout() > 0) {
+ scheduler.executeAfterDelay(new Runnable() {
+
+ public void run() {
+ pullTimeout();
+ }
+ }, pull.getTimeout());
+ }
+ }
+ }
return null;
}
+ /**
+ * Occurs when a pull times out. If nothing has been dispatched since the
+ * timeout was setup, then send the NULL message.
+ */
+ private final void pullTimeout() {
+ synchronized (matchedListMutex) {
+ if (prefetchWindowOpen.compareAndSet(true, false)) {
+ try {
+ dispatch(null);
+ } catch (Exception e) {
+ context.getConnection().serviceException(e);
+ }
+ }
+ }
+ }
+
public int getPendingQueueSize() {
return matched();
}
@@ -395,7 +443,7 @@ public class TopicSubscription extends A
// Implementation methods
// -------------------------------------------------------------------------
public boolean isFull() {
- return getDispatchedQueueSize() >= info.getPrefetchSize();
+ return getDispatchedQueueSize() >= info.getPrefetchSize() && !prefetchWindowOpen.get();
}
public int getInFlightSize() {
@@ -482,6 +530,7 @@ public class TopicSubscription extends A
continue; // just drop it.
}
dispatch(message);
+ prefetchWindowOpen.set(false);
}
} finally {
matched.release();
@@ -492,38 +541,46 @@ public class TopicSubscription extends A
private void dispatch(final MessageReference node) throws IOException {
Message message = (Message)node;
- node.incrementReferenceCount();
+ if (node != null) {
+ node.incrementReferenceCount();
+ }
// Make sure we can dispatch a message.
MessageDispatch md = new MessageDispatch();
md.setMessage(message);
md.setConsumerId(info.getConsumerId());
- md.setDestination(node.getRegionDestination().getActiveMQDestination());
- dispatchedCounter.incrementAndGet();
- // Keep track if this subscription is receiving messages from a single destination.
- if (singleDestination) {
- if (destination == null) {
- destination = node.getRegionDestination();
- } else {
- if (destination != node.getRegionDestination()) {
- singleDestination = false;
+ if (node != null) {
+ md.setDestination(node.getRegionDestination().getActiveMQDestination());
+ dispatchedCounter.incrementAndGet();
+ // Keep track if this subscription is receiving messages from a single destination.
+ if (singleDestination) {
+ if (destination == null) {
+ destination = node.getRegionDestination();
+ } else {
+ if (destination != node.getRegionDestination()) {
+ singleDestination = false;
+ }
}
}
}
if (info.isDispatchAsync()) {
- md.setTransmitCallback(new Runnable() {
-
- public void run() {
- node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
- node.getRegionDestination().getDestinationStatistics().getInflight().increment();
- node.decrementReferenceCount();
- }
- });
+ if (node != null) {
+ md.setTransmitCallback(new Runnable() {
+ @Override
+ public void run() {
+ node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
+ node.getRegionDestination().getDestinationStatistics().getInflight().increment();
+ node.decrementReferenceCount();
+ }
+ });
+ }
context.getConnection().dispatchAsync(md);
} else {
context.getConnection().dispatchSync(md);
- node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
- node.getRegionDestination().getDestinationStatistics().getInflight().increment();
- node.decrementReferenceCount();
+ if (node != null) {
+ node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
+ node.getRegionDestination().getDestinationStatistics().getInflight().increment();
+ node.decrementReferenceCount();
+ }
}
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java?rev=1403073&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java Sun Oct 28 18:58:06 2012
@@ -0,0 +1,100 @@
+package org.apache.activemq.usecases;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TopicSubscriptionZeroPrefetchTest {
+
+ private static final String TOPIC_NAME = "slow.consumer";
+ private Connection connection;
+ private Session session;
+ private ActiveMQTopic destination;
+ private MessageProducer producer;
+ private MessageConsumer consumer;
+ private BrokerService brokerService;
+
+ @Before
+ public void setUp() throws Exception {
+
+ brokerService = createBroker();
+
+ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+
+ activeMQConnectionFactory.setWatchTopicAdvisories(true);
+ connection = activeMQConnectionFactory.createConnection();
+ connection.setClientID("ClientID-1");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = new ActiveMQTopic(TOPIC_NAME);
+ producer = session.createProducer(destination);
+
+ connection.start();
+ }
+
+ /*
+ * test non durable topic subscription with prefetch set to zero
+ */
+ @Test(timeout=60000)
+ public void testTopicConsumerPrefetchZero() throws Exception {
+
+ ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
+ consumer = session.createConsumer(consumerDestination);
+
+ // publish messages
+ Message txtMessage = session.createTextMessage("M");
+ producer.send(txtMessage);
+
+ Message consumedMessage = consumer.receiveNoWait();
+
+ Assert.assertNotNull("should have received a message the published message", consumedMessage);
+ }
+
+ /*
+ * test durable topic subscription with prefetch zero
+ */
+ @Test(timeout=60000)
+ public void testDurableTopicConsumerPrefetchZero() throws Exception {
+
+ ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.prefetchSize=0");
+ consumer = session.createDurableSubscriber(consumerDestination, "mysub1");
+
+ // publish messages
+ Message txtMessage = session.createTextMessage("M");
+ producer.send(txtMessage);
+
+ Message consumedMessage = consumer.receive(100);
+
+ Assert.assertNotNull("should have received a message the published message", consumedMessage);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ consumer.close();
+ producer.close();
+ session.close();
+ connection.close();
+ brokerService.stop();
+ }
+
+ // helper method to create a broker with slow consumer advisory turned on
+ private BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName("localhost");
+ broker.setUseJmx(false);
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.addConnector("vm://localhost");
+ broker.start();
+ broker.waitUntilStarted();
+ return broker;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
------------------------------------------------------------------------------
svn:eol-style = native