You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/01 21:52:19 UTC
svn commit: r439442 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
./ broker/region/
Author: chirino
Date: Fri Sep 1 12:52:18 2006
New Revision: 439442
URL: http://svn.apache.org/viewvc?rev=439442&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-855 allow prefetch==0 to work with receive(timeout) and receiveNoWait()
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri Sep 1 12:52:18 2006
@@ -381,6 +381,8 @@
} else {
return null;
}
+ } else if ( md.getMessage()==null ) {
+ return null;
} else if (md.getMessage().isExpired()) {
if (log.isDebugEnabled()) {
log.debug("Received expired message: " + md);
@@ -415,9 +417,10 @@
* this message consumer is concurrently closed
*/
public Message receive() throws JMSException {
- sendPullCommand();
checkClosed();
checkMessageListener();
+
+ sendPullCommand(-1);
MessageDispatch md = dequeue(-1);
if (md == null)
return null;
@@ -454,22 +457,29 @@
* expires, and the call blocks indefinitely.
*
* @param timeout
- * the timeout value (in milliseconds)
+ * the timeout value (in milliseconds), a time out of zero never expires.
* @return the next message produced for this message consumer, or null if
* the timeout expires or this message consumer is concurrently
* closed
*/
public Message receive(long timeout) throws JMSException {
- sendPullCommand();
checkClosed();
checkMessageListener();
if (timeout == 0) {
return this.receive();
}
-
+
+ sendPullCommand(timeout);
while (timeout > 0) {
- MessageDispatch md = dequeue(timeout);
+
+ MessageDispatch md;
+ if (info.getPrefetchSize() == 0) {
+ md = dequeue(-1); // We let the broker let us know when we timeout.
+ } else {
+ md = dequeue(timeout);
+ }
+
if (md == null)
return null;
@@ -492,7 +502,15 @@
public Message receiveNoWait() throws JMSException {
checkClosed();
checkMessageListener();
- MessageDispatch md = dequeue(0);
+ sendPullCommand(-1);
+
+ MessageDispatch md;
+ if (info.getPrefetchSize() == 0) {
+ md = dequeue(-1); // We let the broker let us know when we timeout.
+ } else {
+ md = dequeue(0);
+ }
+
if (md == null)
return null;
@@ -598,10 +616,11 @@
* we are about to receive
*
*/
- protected void sendPullCommand() throws JMSException {
+ protected void sendPullCommand(long timeout) throws JMSException {
if (info.getPrefetchSize() == 0) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
+ messagePull.setTimeout(timeout);
session.asyncSendPacket(messagePull);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java Fri Sep 1 12:52:18 2006
@@ -25,7 +25,7 @@
import org.apache.activemq.command.MessageId;
/**
- * Only used by the {@link QueueMessageReference#END_OF_BROWSE_MARKER}
+ * Only used by the {@link QueueMessageReference#NULL_MESSAGE}
*/
final class EndOfBrowseMarkerQueueMessageReference implements
QueueMessageReference {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Sep 1 12:52:18 2006
@@ -37,6 +37,7 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
+import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
@@ -58,7 +59,7 @@
long enqueueCounter;
long dispatchCounter;
long dequeueCounter;
-
+
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
throws InvalidSelectorException{
super(broker,context,info);
@@ -68,16 +69,51 @@
/**
* Allows a message to be pulled on demand by a client
*/
- public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
- if (getPrefetchSize() == 0) {
+ synchronized public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
+ // The slave should not deliver pull messages. TODO: when the slave becomes a master,
+ // He should send a NULL message to all the consumers to 'wake them up' in case
+ // they were waiting for a message.
+ if (getPrefetchSize() == 0 && !isSlaveBroker()) {
prefetchExtension++;
- dispatchMatched();
- // TODO it might be nice one day to actually return the message itself
+ final long dispatchCounterBeforePull = dispatchCounter;
+ dispatchMatched();
+
+ // If there was nothing dispatched.. we may need to setup a timeout.
+ if( dispatchCounterBeforePull == dispatchCounter ) {
+ // imediate timeout used by receiveNoWait()
+ if( pull.getTimeout() == -1 ) {
+ // Send a NULL message.
+ add(QueueMessageReference.NULL_MESSAGE);
+ dispatchMatched();
+ }
+ if( pull.getTimeout() > 0 ) {
+ Scheduler.executeAfterDelay(new Runnable(){
+ public void run() {
+ pullTimeout(dispatchCounterBeforePull);
+ }
+ }, pull.getTimeout());
+ }
+ }
}
return null;
}
+ /**
+ * Occurs when a pull times out. If nothing has been dispatched
+ * since the timeout was setup, then send the NULL message.
+ */
+ synchronized private void pullTimeout(long dispatchCounterBeforePull) {
+ if( dispatchCounterBeforePull == dispatchCounter ) {
+ try {
+ add(QueueMessageReference.NULL_MESSAGE);
+ dispatchMatched();
+ } catch (Exception e) {
+ context.getConnection().serviceException(e);
+ }
+ }
+ }
+
synchronized public void add(MessageReference node) throws Exception{
enqueueCounter++;
if(!isFull()){
@@ -311,9 +347,17 @@
}
// Make sure we can dispatch a message.
if(canDispatch(node)&&!isSlaveBroker()){
- dispatchCounter++;
+
MessageDispatch md=createMessageDispatch(node,message);
- dispatched.addLast(node);
+
+ // NULL messages don't count... they don't get Acked.
+ if( node != QueueMessageReference.NULL_MESSAGE ) {
+ dispatchCounter++;
+ dispatched.addLast(node);
+ } else {
+ prefetchExtension=Math.max(0,prefetchExtension-1);
+ }
+
if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){
public void run(){
@@ -335,8 +379,10 @@
synchronized protected void onDispatch(final MessageReference node,final Message message){
if(node.getRegionDestination()!=null){
- node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
- context.getConnection().getStatistics().onMessageDequeue(message);
+ if( node != QueueMessageReference.NULL_MESSAGE ) {
+ node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
+ context.getConnection().getStatistics().onMessageDequeue(message);
+ }
try{
dispatchMatched();
}catch(IOException e){
@@ -365,12 +411,20 @@
* @return
*/
protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
- MessageDispatch md=new MessageDispatch();
- md.setConsumerId(info.getConsumerId());
- md.setDestination(node.getRegionDestination().getActiveMQDestination());
- md.setMessage(message);
- md.setRedeliveryCounter(node.getRedeliveryCounter());
- return md;
+ if( node == QueueMessageReference.NULL_MESSAGE ) {
+ MessageDispatch md = new MessageDispatch();
+ md.setMessage(null);
+ md.setConsumerId( info.getConsumerId() );
+ md.setDestination( null );
+ return md;
+ } else {
+ MessageDispatch md=new MessageDispatch();
+ md.setConsumerId(info.getConsumerId());
+ md.setDestination(node.getRegionDestination().getActiveMQDestination());
+ md.setMessage(message);
+ md.setRedeliveryCounter(node.getRedeliveryCounter());
+ return md;
+ }
}
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Fri Sep 1 12:52:18 2006
@@ -17,16 +17,14 @@
*/
package org.apache.activemq.broker.region;
-import javax.jms.InvalidSelectorException;
-
import java.io.IOException;
+import javax.jms.InvalidSelectorException;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.filter.MessageEvaluationContext;
public class QueueBrowserSubscription extends QueueSubscription {
@@ -53,19 +51,7 @@
public void browseDone() throws Exception {
browseDone = true;
- add(QueueMessageReference.END_OF_BROWSE_MARKER);
- }
-
- protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
- if( node == QueueMessageReference.END_OF_BROWSE_MARKER ) {
- MessageDispatch md = new MessageDispatch();
- md.setMessage(null);
- md.setConsumerId( info.getConsumerId() );
- md.setDestination( null );
- return md;
- } else {
- return super.createMessageDispatch(node, message);
- }
+ add(QueueMessageReference.NULL_MESSAGE);
}
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java Fri Sep 1 12:52:18 2006
@@ -25,7 +25,7 @@
*/
public interface QueueMessageReference extends MessageReference {
- public static final QueueMessageReference END_OF_BROWSE_MARKER = new EndOfBrowseMarkerQueueMessageReference();
+ public static final QueueMessageReference NULL_MESSAGE = new EndOfBrowseMarkerQueueMessageReference();
public boolean isAcked();