You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/10/06 18:40:44 UTC

svn commit: r1179698 - in /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQDestination.java AMQSession.java AMQSession_0_10.java BasicMessageConsumer.java BasicMessageConsumer_0_10.java message/AMQMessageDelegate_0_10.java

Author: robbie
Date: Thu Oct  6 16:40:43 2011
New Revision: 1179698

URL: http://svn.apache.org/viewvc?rev=1179698&view=rev
Log:
QPID-3527: update handling of auto-ack messages for 0-10 to send acks (asynchronously) on a per-message basis rather than batching for 1 second, update handling for other ack modes to be clearer with respect to 0-8/0-10 behavioural differences. Remove some redundant methods from AMQSession, updating handling of 'no consume'/'isBrowseOnly' such that BasicMessageConsumer is always supplied a single consistent answer for whether it is non-consuming or not.

Applied patch from Oleksandr Rudyy<or...@gmail.com> and myself.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1179698&r1=1179697&r2=1179698&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Oct  6 16:40:43 2011
@@ -793,7 +793,7 @@ public abstract class AMQDestination imp
         return _browseOnly;
     }
     
-    public void setBrowseOnly(boolean b)
+    private void setBrowseOnly(boolean b)
     {
         _browseOnly = b;
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1179698&r1=1179697&r2=1179698&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Oct  6 16:40:43 2011
@@ -952,7 +952,7 @@ public abstract class AMQSession<C exten
         return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
     }
 
-    public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+    protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
             throws JMSException
     {
         checkValidDestination(destination);
@@ -966,15 +966,7 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
-                                  ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
-    }
-
-    public C createExclusiveConsumer(Destination destination) throws JMSException
-    {
-        checkValidDestination(destination);
-
-        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
-                                  ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+                                  isBrowseOnlyDestination(destination), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -982,7 +974,7 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
-                                  messageSelector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -991,16 +983,7 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
-                                  messageSelector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
-    }
-
-    public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
-            throws JMSException
-    {
-        checkValidDestination(destination);
-
-        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
-                                  messageSelector, null, false, false);
+                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -1008,23 +991,15 @@ public abstract class AMQSession<C exten
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
-                                          boolean exclusive, String selector) throws JMSException
+                                              boolean exclusive, String selector) throws JMSException
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
-    }
-
-    public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
-                                          String selector, FieldTable rawSelector) throws JMSException
-    {
-        checkValidDestination(destination);
-
-        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -1032,7 +1007,7 @@ public abstract class AMQSession<C exten
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()),
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination),
                                   false);
     }
 
@@ -1438,9 +1413,10 @@ public abstract class AMQSession<C exten
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
     {
         checkNotClosed();
-        Topic dest = checkValidTopic(topic);
+        checkValidTopic(topic);
 
-        return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
+        return new TopicSubscriberAdaptor<C>(topic,
+                createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false));
     }
 
     /**
@@ -1457,10 +1433,11 @@ public abstract class AMQSession<C exten
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
     {
         checkNotClosed();
-        Topic dest = checkValidTopic(topic);
+        checkValidTopic(topic);
 
-        // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
+        return new TopicSubscriberAdaptor<C>(topic,
+                                             createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal,
+                                                                true, messageSelector, null, false, false));
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1985,6 +1962,12 @@ public abstract class AMQSession<C exten
     {
         checkTemporaryDestination(destination);
 
+        if(!noConsume && isBrowseOnlyDestination(destination))
+        {
+            throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
+                                                  "but a 'browseOnly' Destination has been supplied.");
+        }
+
         final String messageSelector;
 
         if (_strictAMQP && !((selector == null) || selector.equals("")))
@@ -3526,4 +3509,9 @@ public abstract class AMQSession<C exten
         }
         return code;
     }
+
+    private boolean isBrowseOnlyDestination(Destination destination)
+    {
+        return ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly());
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1179698&r1=1179697&r2=1179698&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Oct  6 16:40:43 2011
@@ -270,7 +270,7 @@ public class AMQSession_0_10 extends AMQ
 
         long prefetch = getAMQConnection().getMaxPrefetch();
 
-        if (unackedCount >= prefetch/2 || maxAckDelay <= 0)
+        if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
         {
             flushAcknowledgments();
         }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1179698&r1=1179697&r2=1179698&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Oct  6 16:40:43 2011
@@ -734,34 +734,27 @@ public abstract class BasicMessageConsum
             case Session.PRE_ACKNOWLEDGE:
                 _session.acknowledgeMessage(msg.getDeliveryTag(), false);
                 break;
+            case Session.AUTO_ACKNOWLEDGE:
+                //fall through
+            case Session.DUPS_OK_ACKNOWLEDGE:
+                _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+                break;
             case Session.CLIENT_ACKNOWLEDGE:
-                if (isNoConsume())
-                {
-                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-                }
-                else
-                {
-                    // we set the session so that when the user calls acknowledge() it can call the method on session
-                    // to send out the appropriate frame
-                    msg.setAMQSession(_session);
-                    _session.addUnacknowledgedMessage(msg.getDeliveryTag());
-                    _session.markDirty();
-                }
+                // we set the session so that when the user calls acknowledge() it can call the method on session
+                // to send out the appropriate frame
+                msg.setAMQSession(_session);
+                _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+                _session.markDirty();
                 break;
             case Session.SESSION_TRANSACTED:
-                if (isNoConsume())
-                {
-                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-                }
-                else
-                {
-                    _session.addDeliveredMessage(msg.getDeliveryTag());
-                    _session.markDirty();
-                }
-
+                _session.addDeliveredMessage(msg.getDeliveryTag());
+                _session.markDirty();
+                break;
+            case Session.NO_ACKNOWLEDGE:
+                //do nothing.
+                //path used for NO-ACK consumers, and browsers (see constructor).
                 break;
         }
-
     }
 
     void postDeliver(AbstractJMSMessage msg)
@@ -883,7 +876,7 @@ public abstract class BasicMessageConsum
 
     public boolean isNoConsume()
     {
-        return _noConsume || _destination.isBrowseOnly() ;
+        return _noConsume;
     }
 
     public void rollback()

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1179698&r1=1179697&r2=1179698&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Oct  6 16:40:43 2011
@@ -66,19 +66,13 @@ public class BasicMessageConsumer_0_10 e
     private boolean _preAcquire = true;
 
     /**
-     * Indicate whether this consumer is started.
-     */
-    private boolean _isStarted = false;
-
-    /**
      * Specify whether this consumer is performing a sync receive
      */
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
     private String _consumerTagString;
     
     private long capacity = 0;
-        
-    //--- constructor
+
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
                                         AMQSession session, AMQProtocolHandler protocolHandler,
@@ -104,7 +98,6 @@ public class BasicMessageConsumer_0_10 e
                 _preAcquire = false;
             }
         }
-        _isStarted = connection.started();
         
         // Destination setting overrides connection defaults
         if (destination.getDestSyntax() == DestSyntax.ADDR && 
@@ -172,8 +165,6 @@ public class BasicMessageConsumer_0_10 e
         }
     }
 
-    //----- overwritten methods
-
     /**
      * This method is invoked when this consumer is stopped.
      * It tells the broker to stop delivering messages to this consumer.
@@ -208,8 +199,13 @@ public class BasicMessageConsumer_0_10 e
     {
         super.preDeliver(jmsMsg);
 
-        if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
+        if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
         {
+            //For 0-10 we need to ensure that all messages are indicated processed in some way to
+            //ensure their AMQP command-id is marked completed, and so we must send a completion
+            //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
+            //Add message to the unacked message list to ensure we dont lose record of it before
+            //sending a completion of some sort.
             _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
         }
     }
@@ -221,7 +217,6 @@ public class BasicMessageConsumer_0_10 e
         return _messageFactory.createMessage(msg.getMessageTransfer());
     }
 
-    // private methods
     /**
      * Check whether a message can be delivered to this consumer.
      *
@@ -459,10 +454,8 @@ public class BasicMessageConsumer_0_10 e
         }
         
         if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE  &&
-             !_session.isInRecovery() &&   
-             _session.getAMQConnection().getSyncAck())
+             !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
         {
-            ((AMQSession_0_10) getSession()).flushAcknowledgments();
             ((AMQSession_0_10) getSession()).getQpidSession().sync();
         }
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1179698&r1=1179697&r2=1179698&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Thu Oct  6 16:40:43 2011
@@ -198,7 +198,6 @@ public class AMQMessageDelegate_0_10 ext
         }
     }
 
-
     public long getJMSTimestamp() throws JMSException
     {
         return _deliveryProps.getTimestamp();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org