You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/01 21:52:19 UTC

svn commit: r439442 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/region/

Author: chirino
Date: Fri Sep  1 12:52:18 2006
New Revision: 439442

URL: http://svn.apache.org/viewvc?rev=439442&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-855 allow prefetch==0 to work with receive(timeout) and receiveNoWait()

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Sep  1 12:52:18 2006
@@ -381,6 +381,8 @@
                     } else {
                         return null;
                     }
+                } else if ( md.getMessage()==null ) {
+                	return null;
                 } else if (md.getMessage().isExpired()) {
                     if (log.isDebugEnabled()) {
                         log.debug("Received expired message: " + md);
@@ -415,9 +417,10 @@
      *         this message consumer is concurrently closed
      */
     public Message receive() throws JMSException {
-        sendPullCommand();
         checkClosed();
         checkMessageListener();
+        
+        sendPullCommand(-1);
         MessageDispatch md = dequeue(-1);
         if (md == null)
             return null;
@@ -454,22 +457,29 @@
      * expires, and the call blocks indefinitely.
      * 
      * @param timeout
-     *            the timeout value (in milliseconds)
+     *            the timeout value (in milliseconds), a time out of zero never expires.
      * @return the next message produced for this message consumer, or null if
      *         the timeout expires or this message consumer is concurrently
      *         closed
      */
     public Message receive(long timeout) throws JMSException {
-        sendPullCommand();
         checkClosed();
         checkMessageListener();
         if (timeout == 0) {
             return this.receive();
 
         }
-
+        
+        sendPullCommand(timeout);
         while (timeout > 0) {
-            MessageDispatch md = dequeue(timeout);
+        	
+            MessageDispatch md;
+            if (info.getPrefetchSize() == 0) {
+            	md = dequeue(-1);  // We let the broker let us know when we timeout.
+            } else {
+            	md = dequeue(timeout);
+            }
+
             if (md == null)
                 return null;
 
@@ -492,7 +502,15 @@
     public Message receiveNoWait() throws JMSException {
         checkClosed();
         checkMessageListener();
-        MessageDispatch md = dequeue(0);
+        sendPullCommand(-1);
+        
+        MessageDispatch md;
+        if (info.getPrefetchSize() == 0) {
+        	md = dequeue(-1);  // We let the broker let us know when we timeout.
+        } else {
+        	md = dequeue(0);
+        }
+        
         if (md == null)
             return null;
 
@@ -598,10 +616,11 @@
      * we are about to receive
      *
      */
-    protected void sendPullCommand() throws JMSException {
+    protected void sendPullCommand(long timeout) throws JMSException {
         if (info.getPrefetchSize() == 0) {
             MessagePull messagePull = new MessagePull();
             messagePull.configure(info);
+            messagePull.setTimeout(timeout);            
             session.asyncSendPacket(messagePull);
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java Fri Sep  1 12:52:18 2006
@@ -25,7 +25,7 @@
 import org.apache.activemq.command.MessageId;
 
 /**
- * Only used by the {@link QueueMessageReference#END_OF_BROWSE_MARKER} 
+ * Only used by the {@link QueueMessageReference#NULL_MESSAGE} 
  */
 final class EndOfBrowseMarkerQueueMessageReference implements
 		QueueMessageReference {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Sep  1 12:52:18 2006
@@ -37,6 +37,7 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
@@ -58,7 +59,7 @@
     long enqueueCounter;
     long dispatchCounter;
     long dequeueCounter;
-    
+        
     public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
                     throws InvalidSelectorException{
         super(broker,context,info);
@@ -68,16 +69,51 @@
     /**
      * Allows a message to be pulled on demand by a client
      */
-    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
-        if (getPrefetchSize() == 0) {
+    synchronized public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
+    	// The slave should not deliver pull messages.  TODO: when the slave becomes a master,
+    	// He should send a NULL message to all the consumers to 'wake them up' in case 
+    	// they were waiting for a message.
+        if (getPrefetchSize() == 0 && !isSlaveBroker()) {
             prefetchExtension++;
-            dispatchMatched();
             
-            // TODO it might be nice one day to actually return the message itself
+            final long dispatchCounterBeforePull = dispatchCounter;
+        	dispatchMatched();
+        	
+        	// If there was nothing dispatched.. we may need to setup a timeout.
+        	if( dispatchCounterBeforePull == dispatchCounter ) {
+        		// imediate timeout used by receiveNoWait()
+        		if( pull.getTimeout() == -1 ) {
+        			// Send a NULL message.
+	            	add(QueueMessageReference.NULL_MESSAGE);
+	            	dispatchMatched();
+        		}
+        		if( pull.getTimeout() > 0 ) {
+	            	Scheduler.executeAfterDelay(new Runnable(){
+							public void run() {
+								pullTimeout(dispatchCounterBeforePull);
+							}
+						}, pull.getTimeout());
+        		}
+        	}
         }
         return null;
     }
     
+    /**
+     * Occurs when a pull times out.  If nothing has been dispatched
+     * since the timeout was setup, then send the NULL message.
+     */
+    synchronized private void pullTimeout(long dispatchCounterBeforePull) {    	
+    	if( dispatchCounterBeforePull == dispatchCounter ) {
+        	try {
+				add(QueueMessageReference.NULL_MESSAGE);
+				dispatchMatched();
+			} catch (Exception e) {
+				context.getConnection().serviceException(e);
+			}
+    	}
+	}
+        
     synchronized public void add(MessageReference node) throws Exception{
         enqueueCounter++;
         if(!isFull()){
@@ -311,9 +347,17 @@
         }
         // Make sure we can dispatch a message.
         if(canDispatch(node)&&!isSlaveBroker()){
-            dispatchCounter++;
+        	
             MessageDispatch md=createMessageDispatch(node,message);
-            dispatched.addLast(node);            
+
+            // NULL messages don't count... they don't get Acked.
+            if( node != QueueMessageReference.NULL_MESSAGE ) {
+        		dispatchCounter++;
+        		dispatched.addLast(node);            
+            } else {
+            	prefetchExtension=Math.max(0,prefetchExtension-1);
+            }
+            
             if(info.isDispatchAsync()){
                 md.setConsumer(new Runnable(){
                     public void run(){
@@ -335,8 +379,10 @@
 
     synchronized protected void onDispatch(final MessageReference node,final Message message){
         if(node.getRegionDestination()!=null){
-            node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
-            context.getConnection().getStatistics().onMessageDequeue(message);
+        	if( node != QueueMessageReference.NULL_MESSAGE ) {
+	            node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
+	            context.getConnection().getStatistics().onMessageDequeue(message);
+        	}
             try{
                 dispatchMatched();
             }catch(IOException e){
@@ -365,12 +411,20 @@
      * @return
      */
     protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
-        MessageDispatch md=new MessageDispatch();
-        md.setConsumerId(info.getConsumerId());
-        md.setDestination(node.getRegionDestination().getActiveMQDestination());
-        md.setMessage(message);
-        md.setRedeliveryCounter(node.getRedeliveryCounter());
-        return md;
+        if( node == QueueMessageReference.NULL_MESSAGE ) {
+            MessageDispatch md = new MessageDispatch();
+            md.setMessage(null);
+            md.setConsumerId( info.getConsumerId() );
+            md.setDestination( null );
+            return md;
+        } else {
+            MessageDispatch md=new MessageDispatch();
+            md.setConsumerId(info.getConsumerId());
+            md.setDestination(node.getRegionDestination().getActiveMQDestination());
+            md.setMessage(message);
+            md.setRedeliveryCounter(node.getRedeliveryCounter());
+            return md;
+        }
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Fri Sep  1 12:52:18 2006
@@ -17,16 +17,14 @@
  */
 package org.apache.activemq.broker.region;
 
-import javax.jms.InvalidSelectorException;
-
 import java.io.IOException;
 
+import javax.jms.InvalidSelectorException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.filter.MessageEvaluationContext;
 
 public class QueueBrowserSubscription extends QueueSubscription {
@@ -53,19 +51,7 @@
 
     public void browseDone() throws Exception {
         browseDone = true;
-        add(QueueMessageReference.END_OF_BROWSE_MARKER);
-    }
-    
-    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
-        if( node == QueueMessageReference.END_OF_BROWSE_MARKER ) {
-            MessageDispatch md = new MessageDispatch();
-            md.setMessage(null);
-            md.setConsumerId( info.getConsumerId() );
-            md.setDestination( null );
-            return md;
-        } else {
-            return super.createMessageDispatch(node, message);
-        }
+        add(QueueMessageReference.NULL_MESSAGE);
     }
     
     public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java Fri Sep  1 12:52:18 2006
@@ -25,7 +25,7 @@
  */
 public interface QueueMessageReference extends MessageReference {
 
-    public static final QueueMessageReference END_OF_BROWSE_MARKER = new EndOfBrowseMarkerQueueMessageReference();
+    public static final QueueMessageReference NULL_MESSAGE = new EndOfBrowseMarkerQueueMessageReference();
 
     public boolean isAcked();