You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/28 19:58:06 UTC

svn commit: r1403073 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/TopicSubscription.java test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java

Author: tabish
Date: Sun Oct 28 18:58:06 2012
New Revision: 1403073

URL: http://svn.apache.org/viewvc?rev=1403073&view=rev
Log:
fix and tests for: https://issues.apache.org/jira/browse/AMQ-3746

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1403073&r1=1403072&r2=1403073&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Sun Oct 28 18:58:06 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
 
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.JMSException;
@@ -38,6 +39,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.SystemUsage;
 import org.slf4j.Logger;
@@ -54,6 +56,7 @@ public class TopicSubscription extends A
 
     boolean singleDestination = true;
     Destination destination;
+    private final Scheduler scheduler;
 
     private int maximumPendingMessages = -1;
     private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
@@ -61,6 +64,7 @@ public class TopicSubscription extends A
     private final Object matchedListMutex = new Object();
     private final AtomicLong enqueueCounter = new AtomicLong(0);
     private final AtomicLong dequeueCounter = new AtomicLong(0);
+    private final AtomicBoolean prefetchWindowOpen = new AtomicBoolean(false);
     private int memoryUsageHighWaterMark = 95;
     // allow duplicate suppression in a ring network of brokers
     protected int maxProducersToAudit = 1024;
@@ -78,6 +82,8 @@ public class TopicSubscription extends A
         } else {
             this.matched = new FilePendingMessageCursor(broker,matchedName,false);
         }
+
+        this.scheduler = broker.getScheduler();
     }
 
     public void init() throws Exception {
@@ -95,7 +101,7 @@ public class TopicSubscription extends A
             return;
         }
         enqueueCounter.incrementAndGet();
-        if (!isFull() && matched.isEmpty()  && !isSlave()) {
+        if (!isFull() && matched.isEmpty() && !isSlave()) {
             // if maximumPendingMessages is set we will only discard messages which
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
             dispatch(node);
@@ -291,10 +297,52 @@ public class TopicSubscription extends A
     }
 
     public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
-        // not supported for topics
+
+        // The slave should not deliver pull messages.
+        if (getPrefetchSize() == 0 && !isSlave()) {
+
+            prefetchWindowOpen.set(true);
+            dispatchMatched();
+
+            // If there was nothing dispatched.. we may need to setup a timeout.
+            if (prefetchWindowOpen.get()) {
+
+                // immediate timeout used by receiveNoWait()
+                if (pull.getTimeout() == -1) {
+                    prefetchWindowOpen.set(false);
+                    // Send a NULL message to signal nothing pending.
+                    dispatch(null);
+                }
+
+                if (pull.getTimeout() > 0) {
+                    scheduler.executeAfterDelay(new Runnable() {
+
+                        public void run() {
+                            pullTimeout();
+                        }
+                    }, pull.getTimeout());
+                }
+            }
+        }
         return null;
     }
 
+    /**
+     * Occurs when a pull times out. If nothing has been dispatched since the
+     * timeout was setup, then send the NULL message.
+     */
+    private final void pullTimeout() {
+        synchronized (matchedListMutex) {
+            if (prefetchWindowOpen.compareAndSet(true, false)) {
+                try {
+                    dispatch(null);
+                } catch (Exception e) {
+                    context.getConnection().serviceException(e);
+                }
+            }
+        }
+    }
+
     public int getPendingQueueSize() {
         return matched();
     }
@@ -395,7 +443,7 @@ public class TopicSubscription extends A
     // Implementation methods
     // -------------------------------------------------------------------------
     public boolean isFull() {
-        return getDispatchedQueueSize() >= info.getPrefetchSize();
+        return getDispatchedQueueSize() >= info.getPrefetchSize() && !prefetchWindowOpen.get();
     }
 
     public int getInFlightSize() {
@@ -482,6 +530,7 @@ public class TopicSubscription extends A
                             continue; // just drop it.
                         }
                         dispatch(message);
+                        prefetchWindowOpen.set(false);
                     }
                 } finally {
                     matched.release();
@@ -492,38 +541,46 @@ public class TopicSubscription extends A
 
     private void dispatch(final MessageReference node) throws IOException {
         Message message = (Message)node;
-        node.incrementReferenceCount();
+        if (node != null) {
+            node.incrementReferenceCount();
+        }
         // Make sure we can dispatch a message.
         MessageDispatch md = new MessageDispatch();
         md.setMessage(message);
         md.setConsumerId(info.getConsumerId());
-        md.setDestination(node.getRegionDestination().getActiveMQDestination());
-        dispatchedCounter.incrementAndGet();
-        // Keep track if this subscription is receiving messages from a single destination.
-        if (singleDestination) {
-            if (destination == null) {
-                destination = node.getRegionDestination();
-            } else {
-                if (destination != node.getRegionDestination()) {
-                    singleDestination = false;
+        if (node != null) {
+            md.setDestination(node.getRegionDestination().getActiveMQDestination());
+            dispatchedCounter.incrementAndGet();
+            // Keep track if this subscription is receiving messages from a single destination.
+            if (singleDestination) {
+                if (destination == null) {
+                    destination = node.getRegionDestination();
+                } else {
+                    if (destination != node.getRegionDestination()) {
+                        singleDestination = false;
+                    }
                 }
             }
         }
         if (info.isDispatchAsync()) {
-            md.setTransmitCallback(new Runnable() {
-
-                public void run() {
-                    node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
-                    node.getRegionDestination().getDestinationStatistics().getInflight().increment();
-                    node.decrementReferenceCount();
-                }
-            });
+            if (node != null) {
+                md.setTransmitCallback(new Runnable() {
+                    @Override
+                    public void run() {
+                        node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
+                        node.getRegionDestination().getDestinationStatistics().getInflight().increment();
+                        node.decrementReferenceCount();
+                    }
+                });
+            }
             context.getConnection().dispatchAsync(md);
         } else {
             context.getConnection().dispatchSync(md);
-            node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
-            node.getRegionDestination().getDestinationStatistics().getInflight().increment();
-            node.decrementReferenceCount();
+            if (node != null) {
+                node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
+                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
+                node.decrementReferenceCount();
+            }
         }
     }
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java?rev=1403073&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java Sun Oct 28 18:58:06 2012
@@ -0,0 +1,100 @@
+package org.apache.activemq.usecases;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TopicSubscriptionZeroPrefetchTest {
+
+    private static final String TOPIC_NAME = "slow.consumer";
+    private Connection connection;
+    private Session session;
+    private ActiveMQTopic destination;
+    private MessageProducer producer;
+    private MessageConsumer consumer;
+    private BrokerService brokerService;
+
+    @Before
+    public void setUp() throws Exception {
+
+        brokerService = createBroker();
+
+        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+
+        activeMQConnectionFactory.setWatchTopicAdvisories(true);
+        connection = activeMQConnectionFactory.createConnection();
+        connection.setClientID("ClientID-1");
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = new ActiveMQTopic(TOPIC_NAME);
+        producer = session.createProducer(destination);
+
+        connection.start();
+    }
+
+    /*
+     * test non durable topic subscription with prefetch set to zero
+     */
+    @Test(timeout=60000)
+    public void testTopicConsumerPrefetchZero() throws Exception {
+
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
+        consumer = session.createConsumer(consumerDestination);
+
+        // publish messages
+        Message txtMessage = session.createTextMessage("M");
+        producer.send(txtMessage);
+
+        Message consumedMessage = consumer.receiveNoWait();
+
+        Assert.assertNotNull("should have received a message the published message", consumedMessage);
+    }
+
+    /*
+     * test durable topic subscription with prefetch zero
+     */
+    @Test(timeout=60000)
+    public void testDurableTopicConsumerPrefetchZero() throws Exception {
+
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.prefetchSize=0");
+        consumer = session.createDurableSubscriber(consumerDestination, "mysub1");
+
+        // publish messages
+        Message txtMessage = session.createTextMessage("M");
+        producer.send(txtMessage);
+
+        Message consumedMessage = consumer.receive(100);
+
+        Assert.assertNotNull("should have received a message the published message", consumedMessage);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        consumer.close();
+        producer.close();
+        session.close();
+        connection.close();
+        brokerService.stop();
+    }
+
+    // helper method to create a broker with slow consumer advisory turned on
+    private BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("localhost");
+        broker.setUseJmx(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector("vm://localhost");
+        broker.start();
+        broker.waitUntilStarted();
+        return broker;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
------------------------------------------------------------------------------
    svn:eol-style = native