You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [29/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Oct 21 01:19:00 2011
@@ -70,6 +70,7 @@ import org.apache.qpid.AMQDisconnectedEx
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
@@ -87,6 +88,8 @@ import org.apache.qpid.client.message.JM
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQShortString;
@@ -94,10 +97,7 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.jms.Session;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -213,6 +213,8 @@ public abstract class AMQSession<C exten
      */
     protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
 
+    protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
+
     /**
      * The period to wait while flow controlled before sending a log message confirming that the session is still
      * waiting on flow control being revoked
@@ -308,7 +310,7 @@ public abstract class AMQSession<C exten
     protected final FlowControllingBlockingQueue _queue;
 
     /** Holds the highest received delivery tag. */
-    protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
     private final AtomicLong _rollbackMark = new AtomicLong(-1);
     
     /** All the not yet acknowledged message tags */
@@ -362,13 +364,7 @@ public abstract class AMQSession<C exten
      * Set when recover is called. This is to handle the case where recover() is called by application code during
      * onMessage() processing to ensure that an auto ack is not sent.
      */
-    private volatile boolean _sessionInRecovery;
-
-    /**
-     * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
-     * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
-     */
-    private volatile boolean _usingDispatcherForCleanup;
+    private boolean _inRecovery;
 
     /** Used to indicates that the connection to which this session belongs, has been stopped. */
     private boolean _connectionStopped;
@@ -571,8 +567,6 @@ public abstract class AMQSession<C exten
         close(-1);
     }
 
-    public abstract AMQException getLastException();
-    
     public void checkNotClosed() throws JMSException
     {
         try
@@ -581,20 +575,16 @@ public abstract class AMQSession<C exten
         }
         catch (IllegalStateException ise)
         {
-            AMQException ex = getLastException();
-            if (ex != null)
-            {
-                IllegalStateException ssnClosed = new IllegalStateException(
-                        "Session has been closed", ex.getErrorCode().toString());
+            // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for
+            AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
 
-                ssnClosed.setLinkedException(ex);
-                ssnClosed.initCause(ex);
-                throw ssnClosed;
-            } 
-            else
+            if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
             {
-                throw ise;
+                ise.setLinkedException(manager.getLastException());
+                ise.initCause(ise.getLinkedException());
             }
+
+            throw ise;
         }
     }
 
@@ -610,36 +600,29 @@ public abstract class AMQSession<C exten
      * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
      *
      * @throws IllegalStateException If the session is closed.
-     * @throws JMSException if there is a problem during acknowledge process.
      */
-    public void acknowledge() throws IllegalStateException, JMSException
+    public void acknowledge() throws IllegalStateException
     {
         if (isClosed())
         {
             throw new IllegalStateException("Session is already closed");
         }
-        else if (hasFailedOverDirty())
+        else if (hasFailedOver())
         {
-            //perform an implicit recover in this scenario
-            recover();
-
-            //notify the consumer
             throw new IllegalStateException("has failed over");
         }
 
-        try
-        {
-            acknowledgeImpl();
-            markClean();
-        }
-        catch (TransportException e)
+        while (true)
         {
-            throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
+            Long tag = _unacknowledgedMessageTags.poll();
+            if (tag == null)
+            {
+                break;
+            }
+            acknowledgeMessage(tag, false);
         }
     }
 
-    protected abstract void acknowledgeImpl() throws JMSException;
-
     /**
      * Acknowledge one or many messages.
      *
@@ -774,10 +757,6 @@ public abstract class AMQSession<C exten
                         _logger.debug(
                                 "Got FailoverException during channel close, ignored as channel already marked as closed.");
                     }
-                    catch (TransportException e)
-                    {
-                        throw toJMSException("Error closing session:" + e.getMessage(), e);
-                    }
                     finally
                     {
                         _connection.deregisterSession(_channelId);
@@ -848,44 +827,51 @@ public abstract class AMQSession<C exten
      * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
      *                      not mean that the commit is known to have failed, merely that it is not known whether it
      *                      failed or not.
+     * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void commit() throws JMSException
     {
         checkTransacted();
 
-        //Check that we are clean to commit.
-        if (_failedOverDirty)
+        try
         {
-            if (_logger.isDebugEnabled())
+            //Check that we are clean to commit.
+            if (_failedOverDirty)
             {
-                _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
+                rollback();
+
+                throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+                                                         "Forced rollback");
             }
-            rollback();
 
-            throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
-                                                     "The session transaction was rolled back.");
-        }
 
-        try
-        {
-            commitImpl();
+            // Acknowledge all delivered messages
+            while (true)
+            {
+                Long tag = _deliveredMessageTags.poll();
+                if (tag == null)
+                {
+                    break;
+                }
+
+                acknowledgeMessage(tag, false);
+            }
+            // Commits outstanding messages and acknowledgments
+            sendCommit();
             markClean();
         }
         catch (AMQException e)
         {
-            throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e);
+            throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
         }
         catch (FailoverException e)
         {
             throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
         }
-        catch(TransportException e)
-        {
-            throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
-        }
     }
 
-    protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;
+    public abstract void sendCommit() throws AMQException, FailoverException;
+
 
     public void confirmConsumerCancelled(int consumerTag)
     {
@@ -963,7 +949,7 @@ public abstract class AMQSession<C exten
         return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
     }
 
-    protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+    public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
             throws JMSException
     {
         checkValidDestination(destination);
@@ -977,7 +963,15 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
-                                  isBrowseOnlyDestination(destination), false);
+                                  ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+    }
+
+    public C createExclusiveConsumer(Destination destination) throws JMSException
+    {
+        checkValidDestination(destination);
+
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
+                                  ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -985,7 +979,7 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
-                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
+                                  messageSelector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -994,7 +988,16 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
-                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
+                                  messageSelector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+    }
+
+    public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
+            throws JMSException
+    {
+        checkValidDestination(destination);
+
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
+                                  messageSelector, null, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -1002,15 +1005,23 @@ public abstract class AMQSession<C exten
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
+        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
-                                              boolean exclusive, String selector) throws JMSException
+                                          boolean exclusive, String selector) throws JMSException
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+    }
+
+    public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
+                                          String selector, FieldTable rawSelector) throws JMSException
+    {
+        checkValidDestination(destination);
+
+        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -1018,7 +1029,7 @@ public abstract class AMQSession<C exten
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination),
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()),
                                   false);
     }
 
@@ -1032,33 +1043,8 @@ public abstract class AMQSession<C exten
             throws JMSException
     {
         checkNotClosed();
-        Topic origTopic = checkValidTopic(topic, true);
-        
+        AMQTopic origTopic = checkValidTopic(topic, true);
         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
-        if (dest.getDestSyntax() == DestSyntax.ADDR &&
-            !dest.isAddressResolved())
-        {
-            try
-            {
-                handleAddressBasedDestination(dest,false,true);
-                if (dest.getAddressType() !=  AMQDestination.TOPIC_TYPE)
-                {
-                    throw new JMSException("Durable subscribers can only be created for Topics");
-                }
-                dest.getSourceNode().setDurable(true);
-            }
-            catch(AMQException e)
-            {
-                JMSException ex = new JMSException("Error when verifying destination");
-                ex.initCause(e);
-                ex.setLinkedException(e);
-                throw ex;
-            }
-            catch(TransportException e)
-            {
-                throw toJMSException("Error when verifying destination", e);
-            }
-        }
         
         String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
         
@@ -1070,9 +1056,15 @@ public abstract class AMQSession<C exten
             // Not subscribed to this name in the current session
             if (subscriber == null)
             {
-                // After the address is resolved routing key will not be null.
-                AMQShortString topicName = dest.getRoutingKey();
-                
+                AMQShortString topicName;
+                if (topic instanceof AMQTopic)
+                {
+                    topicName = ((AMQTopic) topic).getRoutingKey();
+                } else
+                {
+                    topicName = new AMQShortString(topic.getTopicName());
+                }
+
                 if (_strictAMQP)
                 {
                     if (_strictAMQPFATAL)
@@ -1143,10 +1135,6 @@ public abstract class AMQSession<C exten
     
             return subscriber;
         }
-        catch (TransportException e)
-        {
-            throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
-        }
         finally
         {
             _subscriberDetails.unlock();
@@ -1207,6 +1195,12 @@ public abstract class AMQSession<C exten
         return createProducerImpl(destination, mandatory, immediate);
     }
 
+    public P createProducer(Destination destination, boolean mandatory, boolean immediate,
+                                               boolean waitUntilSent) throws JMSException
+    {
+        return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
+    }
+
     public TopicPublisher createPublisher(Topic topic) throws JMSException
     {
         checkNotClosed();
@@ -1231,6 +1225,7 @@ public abstract class AMQSession<C exten
                 else
                 {
                     AMQQueue queue = new AMQQueue(queueName);
+                    queue.setCreate(AddressOption.ALWAYS);
                     return queue;
                     
                 }
@@ -1312,8 +1307,8 @@ public abstract class AMQSession<C exten
     public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
     {
         checkValidDestination(destination);
-        Queue dest = validateQueue(destination);
-        C consumer = (C) createConsumer(dest);
+        AMQQueue dest = (AMQQueue) destination;
+        C consumer = (C) createConsumer(destination);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1331,8 +1326,8 @@ public abstract class AMQSession<C exten
     public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
     {
         checkValidDestination(destination);
-        Queue dest = validateQueue(destination);
-        C consumer = (C) createConsumer(dest, messageSelector);
+        AMQQueue dest = (AMQQueue) destination;
+        C consumer = (C) createConsumer(destination, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1349,7 +1344,7 @@ public abstract class AMQSession<C exten
     public QueueReceiver createReceiver(Queue queue) throws JMSException
     {
         checkNotClosed();
-        Queue dest = validateQueue(queue);
+        AMQQueue dest = (AMQQueue) queue;
         C consumer = (C) createConsumer(dest);
 
         return new QueueReceiverAdaptor(dest, consumer);
@@ -1368,28 +1363,17 @@ public abstract class AMQSession<C exten
     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
     {
         checkNotClosed();
-        Queue dest = validateQueue(queue);
+        AMQQueue dest = (AMQQueue) queue;
         C consumer = (C) createConsumer(dest, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
-    
-    private Queue validateQueue(Destination dest) throws InvalidDestinationException
-    {
-        if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
-        {
-            return (Queue)dest;
-        }
-        else
-        {
-            throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue");
-        }
-    }
 
     public QueueSender createSender(Queue queue) throws JMSException
     {
         checkNotClosed();
 
+        // return (QueueSender) createProducer(queue);
         return new QueueSenderAdapter(createProducer(queue), queue);
     }
 
@@ -1424,10 +1408,10 @@ public abstract class AMQSession<C exten
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
     {
         checkNotClosed();
-        checkValidTopic(topic);
+        AMQTopic dest = checkValidTopic(topic);
 
-        return new TopicSubscriberAdaptor<C>(topic,
-                createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false));
+        // AMQTopic dest = new AMQTopic(topic.getTopicName());
+        return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
     }
 
     /**
@@ -1444,11 +1428,10 @@ public abstract class AMQSession<C exten
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
     {
         checkNotClosed();
-        checkValidTopic(topic);
+        AMQTopic dest = checkValidTopic(topic);
 
-        return new TopicSubscriberAdaptor<C>(topic,
-                                             createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal,
-                                                                true, messageSelector, null, false, false));
+        // AMQTopic dest = new AMQTopic(topic.getTopicName());
+        return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1550,8 +1533,10 @@ public abstract class AMQSession<C exten
 
     abstract public void sync() throws AMQException;
 
-    public int getAcknowledgeMode()
+    public int getAcknowledgeMode() throws JMSException
     {
+        checkNotClosed();
+
         return _acknowledgeMode;
     }
 
@@ -1611,8 +1596,10 @@ public abstract class AMQSession<C exten
         return _ticket;
     }
 
-    public boolean getTransacted()
+    public boolean getTransacted() throws JMSException
     {
+        checkNotClosed();
+
         return _transacted;
     }
 
@@ -1708,14 +1695,13 @@ public abstract class AMQSession<C exten
         // Ensure that the session is not transacted.
         checkNotTransacted();
 
-
+        // flush any acks we are holding in the buffer.
+        flushAcknowledgments();
+        
+        // this is set only here, and the before the consumer's onMessage is called it is set to false
+        _inRecovery = true;
         try
         {
-            // flush any acks we are holding in the buffer.
-            flushAcknowledgments();
-
-            // this is only set true here, and only set false when the consumers preDeliver method is called
-            _sessionInRecovery = true;
 
             boolean isSuspended = isSuspended();
 
@@ -1723,18 +1709,9 @@ public abstract class AMQSession<C exten
             {
                 suspendChannel(true);
             }
-
-            // Set to true to short circuit delivery of anything currently
-            //in the pre-dispatch queue.
-            _usingDispatcherForCleanup = true;
-
+            
             syncDispatchQueue();
-
-            // Set to false before sending the recover as 0-8/9/9-1 will
-            //send messages back before the recover completes, and we
-            //probably shouldn't clean those! ;-)
-            _usingDispatcherForCleanup = false;
-
+            
             if (_dispatcher != null)
             {
                 _dispatcher.recover();
@@ -1743,7 +1720,10 @@ public abstract class AMQSession<C exten
             sendRecover();
             
             markClean();
-
+            
+            // Set inRecovery to false before you start message flow again again.            
+            _inRecovery = false; 
+            
             if (!isSuspended)
             {
                 suspendChannel(false);
@@ -1757,10 +1737,7 @@ public abstract class AMQSession<C exten
         {
             throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
         }
-        catch(TransportException e)
-        {
-            throw toJMSException("Recover failed: " + e.getMessage(), e);
-        }
+       
     }
 
     protected abstract void sendRecover() throws AMQException, FailoverException;
@@ -1818,7 +1795,9 @@ public abstract class AMQSession<C exten
                     suspendChannel(true);
                 }
 
-                setRollbackMark();
+                // Let the dispatcher know that all the incomming messages
+                // should be rolled back(reject/release)
+                _rollbackMark.set(_highestDeliveryTag.get());
 
                 syncDispatchQueue();
 
@@ -1843,10 +1822,6 @@ public abstract class AMQSession<C exten
             {
                 throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
             }
-            catch (TransportException e)
-            {
-                throw toJMSException("Failure to rollback:" + e.getMessage(), e);
-            }
         }
     }
 
@@ -1893,14 +1868,7 @@ public abstract class AMQSession<C exten
      */
     public void unsubscribe(String name) throws JMSException
     {
-        try
-        {
-            unsubscribe(name, false);
-        }
-        catch (TransportException e)
-        {
-            throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
-        }
+        unsubscribe(name, false);
     }
     
     /**
@@ -1977,12 +1945,6 @@ public abstract class AMQSession<C exten
     {
         checkTemporaryDestination(destination);
 
-        if(!noConsume && isBrowseOnlyDestination(destination))
-        {
-            throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
-                                                  "but a 'browseOnly' Destination has been supplied.");
-        }
-
         final String messageSelector;
 
         if (_strictAMQP && !((selector == null) || selector.equals("")))
@@ -2027,16 +1989,8 @@ public abstract class AMQSession<C exten
                         // argument, as specifying null for the arguments when querying means they should not be checked at all
                         ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
 
-                        C consumer;
-                        try
-                        {
-                            consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
-                                                             noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
-                        }
-                        catch(TransportException e)
-                        {
-                            throw toJMSException("Exception while creating consumer: " + e.getMessage(), e);
-                        }
+                        C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+                                                                              noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
 
                         if (_messageListener != null)
                         {
@@ -2073,10 +2027,7 @@ public abstract class AMQSession<C exten
                             ex.initCause(e);
                             throw ex;
                         }
-                        catch (TransportException e)
-                        {
-                            throw toJMSException("Exception while registering consumer:" + e.getMessage(), e);
-                        }
+
                         return consumer;
                     }
                 }, _connection).execute();
@@ -2141,7 +2092,7 @@ public abstract class AMQSession<C exten
 
     boolean isInRecovery()
     {
-        return _sessionInRecovery;
+        return _inRecovery;
     }
 
     boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
@@ -2263,7 +2214,7 @@ public abstract class AMQSession<C exten
 
     void setInRecovery(boolean inRecovery)
     {
-        _sessionInRecovery = inRecovery;
+        _inRecovery = inRecovery;
     }
 
     boolean isStarted()
@@ -2444,7 +2395,7 @@ public abstract class AMQSession<C exten
     /*
      * I could have combined the last 3 methods, but this way it improves readability
      */
-    protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException
+    protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws JMSException
     {
         if (topic == null)
         {
@@ -2463,17 +2414,17 @@ public abstract class AMQSession<C exten
                 ("Cannot create a durable subscription with a temporary topic: " + topic);
         }
 
-        if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
+        if (!(topic instanceof AMQTopic))
         {
             throw new javax.jms.InvalidDestinationException(
                     "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
                     + topic.getClass().getName());
         }
 
-        return topic;
+        return (AMQTopic) topic;
     }
 
-    protected Topic checkValidTopic(Topic topic) throws JMSException
+    protected AMQTopic checkValidTopic(Topic topic) throws JMSException
     {
         return checkValidTopic(topic, false);
     }
@@ -2602,9 +2553,15 @@ public abstract class AMQSession<C exten
     public abstract void sendConsume(C consumer, AMQShortString queueName,
                                      AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
 
-    private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
+    private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
             throws JMSException
     {
+        return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
+    }
+
+    private P createProducerImpl(final Destination destination, final boolean mandatory,
+                                                    final boolean immediate, final boolean waitUntilSent) throws JMSException
+    {
         return new FailoverRetrySupport<P, JMSException>(
                 new FailoverProtectedOperation<P, JMSException>()
                 {
@@ -2612,18 +2569,8 @@ public abstract class AMQSession<C exten
                     {
                         checkNotClosed();
                         long producerId = getNextProducerId();
-
-                        P producer;
-                        try
-                        {
-                            producer = createMessageProducer(destination, mandatory,
-                                    immediate, producerId);
-                        }
-                        catch (TransportException e)
-                        {
-                            throw toJMSException("Exception while creating producer:" + e.getMessage(), e);
-                        }
-
+                        P producer = createMessageProducer(destination, mandatory,
+                                                           immediate, waitUntilSent, producerId);
                         registerProducer(producerId, producer);
 
                         return producer;
@@ -2632,7 +2579,7 @@ public abstract class AMQSession<C exten
     }
 
     public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
-                                                               final boolean immediate, final long producerId) throws JMSException;
+                                                               final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException;
 
     private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
     {
@@ -2775,21 +2722,6 @@ public abstract class AMQSession<C exten
         }
     }
 
-    /**
-     * Undeclares the specified temporary queue/topic.
-     *
-     * <p/>Note that this operation automatically retries in the event of fail-over.
-     *
-     * @param amqQueue The name of the temporary destination to delete.
-     *
-     * @throws JMSException If the queue could not be deleted for any reason.
-     * @todo Be aware of possible changes to parameter order as versions change.
-     */
-    protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException
-    {
-        deleteQueue(amqQueue.getAMQQueueName());
-    }
-
     public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
 
     private long getNextProducerId()
@@ -2887,7 +2819,6 @@ public abstract class AMQSession<C exten
             {
                 declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
             }
-            bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
         }
         
         AMQShortString queueName = amqd.getAMQQueueName();
@@ -2895,6 +2826,8 @@ public abstract class AMQSession<C exten
         // store the consumer queue name
         consumer.setQueuename(queueName);
 
+        bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
+
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
         if (!_immediatePrefetch)
         {
@@ -3045,10 +2978,6 @@ public abstract class AMQSession<C exten
             {
                 throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e);
             }
-            catch (TransportException e)
-            {
-                throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
-            }
         }
     }
 
@@ -3087,11 +3016,21 @@ public abstract class AMQSession<C exten
      *
      * @return boolean true if failover has occured.
      */
-    public boolean hasFailedOverDirty()
+    public boolean hasFailedOver()
     {
         return _failedOverDirty;
     }
 
+    /**
+     * Check to see if any message have been sent in this transaction and have not been commited.
+     *
+     * @return boolean true if a message has been sent but not commited
+     */
+    public boolean isDirty()
+    {
+        return _dirty;
+    }
+
     public void setTicket(int ticket)
     {
         _ticket = ticket;
@@ -3204,7 +3143,7 @@ public abstract class AMQSession<C exten
                     setConnectionStopped(true);
                 }
 
-                setRollbackMark();
+                _rollbackMark.set(_highestDeliveryTag.get());
 
                 _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
 
@@ -3353,14 +3292,9 @@ public abstract class AMQSession<C exten
                 if (!(message instanceof CloseConsumerMessage)
                     && tagLE(deliveryTag, _rollbackMark.get()))
                 {
-                    if (_logger.isDebugEnabled())
-                    {
-                        _logger.debug("Rejecting message because delivery tag " + deliveryTag
-                                + " <= rollback mark " + _rollbackMark.get());
-                    }
                     rejectMessage(message, true);
                 }
-                else if (_usingDispatcherForCleanup)
+                else if (isInRecovery())
                 {
                     _unacknowledgedMessageTags.add(deliveryTag);            
                 }
@@ -3419,11 +3353,6 @@ public abstract class AMQSession<C exten
                 // Don't reject if we're already closing
                 if (!_closed.get())
                 {
-                    if (_logger.isDebugEnabled())
-                    {
-                        _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
-                                + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
-                    }
                     rejectMessage(message, true);
                 }
             }
@@ -3521,48 +3450,4 @@ public abstract class AMQSession<C exten
     {
         return _closing.get()|| _connection.isClosing();
     }
-    
-    public boolean isDeclareExchanges()
-    {
-    	return DECLARE_EXCHANGES;
-    }
-
-    JMSException toJMSException(String message, TransportException e)
-    {
-        int code = getErrorCode(e);
-        JMSException jmse = new JMSException(message, Integer.toString(code));
-        jmse.setLinkedException(e);
-        jmse.initCause(e);
-        return jmse;
-    }
-
-    private int getErrorCode(TransportException e)
-    {
-        int code = AMQConstant.INTERNAL_ERROR.getCode();
-        if (e instanceof SessionException)
-        {
-            SessionException se = (SessionException) e;
-            if(se.getException() != null && se.getException().getErrorCode() != null)
-            {
-                code = se.getException().getErrorCode().getValue();
-            }
-        }
-        return code;
-    }
-
-    private boolean isBrowseOnlyDestination(Destination destination)
-    {
-        return ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly());
-    }
-
-    private void setRollbackMark()
-    {
-        // Let the dispatcher know that all the incomming messages
-        // should be rolled back(reject/release)
-        _rollbackMark.set(_highestDeliveryTag.get());
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Rollback mark is set to " + _rollbackMark.get());
-        }
-    }
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Oct 21 01:19:00 2011
@@ -47,8 +47,6 @@ import org.apache.qpid.client.message.AM
 import org.apache.qpid.client.message.FieldTableSupport;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
 import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
 import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -58,7 +56,6 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.transport.ExchangeBoundResult;
 import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
 import org.apache.qpid.transport.ExecutionException;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
@@ -72,7 +69,6 @@ import org.apache.qpid.transport.RangeSe
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionException;
 import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.Serial;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,20 +156,13 @@ public class AMQSession_0_10 extends AMQ
      */
     AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
                     boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
-                    int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
+                    int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
 
         super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
               defaultPrefetchLowMark);
         _qpidConnection = qpidConnection;
-        if (name == null)
-        {
-            _qpidSession = _qpidConnection.createSession(1);
-        }
-        else
-        {
-            _qpidSession = _qpidConnection.createSession(name,1);
-        }
+        _qpidSession = _qpidConnection.createSession(1);
         _qpidSession.setSessionListener(this);
         if (_transacted)
         {
@@ -200,12 +189,11 @@ public class AMQSession_0_10 extends AMQ
      * @param qpidConnection      The connection
      */
     AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
-                    boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow,
-                    String name)
+                    boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
     {
 
         this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
-             defaultPrefetchHigh, defaultPrefetchLow,name);
+             defaultPrefetchHigh, defaultPrefetchLow);
     }
 
     private void addUnacked(int id)
@@ -270,7 +258,7 @@ public class AMQSession_0_10 extends AMQ
 
         long prefetch = getAMQConnection().getMaxPrefetch();
 
-        if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
+        if (unackedCount >= prefetch/2 || maxAckDelay <= 0)
         {
             flushAcknowledgments();
         }
@@ -294,34 +282,23 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-    void messageAcknowledge(final RangeSet ranges, final boolean accept)
+    void messageAcknowledge(RangeSet ranges, boolean accept)
     {
         messageAcknowledge(ranges,accept,false);
     }
     
-    void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
+    void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
     {
-        final Session ssn = getQpidSession();
-        flushProcessed(ranges,accept);
-        if (accept)
+        Session ssn = getQpidSession();
+        for (Range range : ranges)
         {
-            ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
+            ssn.processed(range);
         }
-    }
-
-    /**
-     * Flush any outstanding commands. This causes session complete to be sent.
-     * @param ranges the range of command ids.
-     * @param batch true if batched.
-     */
-    void flushProcessed(final RangeSet ranges, final boolean batch)
-    {
-        final Session ssn = getQpidSession();
-        for (final Range range : ranges)
+        ssn.flushProcessed(accept ? BATCH : NONE);
+        if (accept)
         {
-            ssn.processed(range);
+            ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
         }
-        ssn.flushProcessed(batch ? BATCH : NONE);
     }
 
     /**
@@ -337,7 +314,7 @@ public class AMQSession_0_10 extends AMQ
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
                               final FieldTable arguments, final AMQShortString exchangeName,
                               final AMQDestination destination, final boolean nowait)
-            throws AMQException
+            throws AMQException, FailoverException
     {
         if (destination.getDestSyntax() == DestSyntax.BURL)
         {
@@ -423,6 +400,25 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
+
+    /**
+     * Commit the receipt and the delivery of all messages exchanged by this session resources.
+     */
+    public void sendCommit() throws AMQException, FailoverException
+    {
+        getQpidSession().setAutoSync(true);
+        try
+        {
+            getQpidSession().txCommit();
+        }
+        finally
+        {
+            getQpidSession().setAutoSync(false);
+        }
+        // We need to sync so that we get notify of an error.
+        sync();
+    }
+
     /**
      * Create a queue with a given name.
      *
@@ -455,14 +451,6 @@ public class AMQSession_0_10 extends AMQ
     public void sendRecover() throws AMQException, FailoverException
     {
         // release all unacked messages
-        RangeSet ranges = gatherUnackedRangeSet();
-        getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
-        // We need to sync so that we get notify of an error.
-        sync();
-    }
-
-    private RangeSet gatherUnackedRangeSet()
-    {
         RangeSet ranges = new RangeSet();
         while (true)
         {
@@ -471,11 +459,11 @@ public class AMQSession_0_10 extends AMQ
             {
                 break;
             }
-
-            ranges.add(tag.intValue());
+            ranges.add((int) (long) tag);
         }
-
-        return ranges;
+        getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+        // We need to sync so that we get notify of an error.
+        sync();
     }
 
 
@@ -549,6 +537,7 @@ public class AMQSession_0_10 extends AMQ
     }
     
     public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
+    throws JMSException
     {
         boolean res;
         ExchangeBoundResult bindingQueryResult =
@@ -611,16 +600,10 @@ public class AMQSession_0_10 extends AMQ
                         (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
             }
             
-            boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
-            
-            if (consumer.getDestination().getLink() != null)
-            {
-                acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
-            }
             
             getQpidSession().messageSubscribe
                 (queueName.toString(), String.valueOf(tag),
-                 acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+                 getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
                  preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
                  consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
         }
@@ -676,12 +659,13 @@ public class AMQSession_0_10 extends AMQ
      * Create an 0_10 message producer
      */
     public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
-                                                      final boolean immediate, final long producerId) throws JMSException
+                                                      final boolean immediate, final boolean waitUntilSent,
+                                                      long producerId) throws JMSException
     {
         try
         {
             return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
-                                             getProtocolHandler(), producerId, immediate, mandatory);
+                                             getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
         }
         catch (AMQException e)
         {
@@ -691,10 +675,6 @@ public class AMQSession_0_10 extends AMQ
             
             throw ex;
         }
-        catch(TransportException e)
-        {
-            throw toJMSException("Exception while creating message producer:" + e.getMessage(), e);
-        }
 
     }
 
@@ -787,7 +767,7 @@ public class AMQSession_0_10 extends AMQ
         else
         {
             QueueNode node = (QueueNode)amqd.getSourceNode();
-            getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() ,
+            getQpidSession().queueDeclare(queueName.toString(), "" ,
                     node.getDeclareArgs(),
                     node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
                     node.isDurable() ? Option.DURABLE : Option.NONE,
@@ -924,26 +904,7 @@ public class AMQSession_0_10 extends AMQ
         setCurrentException(exc);
     }
 
-    public void closed(Session ssn)
-    {
-        try
-        {
-            super.closed(null);
-            if (flushTask != null)
-            {
-                flushTask.cancel();
-                flushTask = null;
-            }
-        } catch (Exception e)
-        {
-            _logger.error("Error closing JMS session", e);
-        }
-    }
-
-    public AMQException getLastException()
-    {
-        return getCurrentException();
-    }
+    public void closed(Session ssn) {}
 
     protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
                                           final boolean noLocal, final boolean nowait)
@@ -997,26 +958,27 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-    public void commitImpl() throws AMQException, FailoverException, TransportException
+    @Override public void commit() throws JMSException
     {
-        if( _txSize > 0 )
+        checkTransacted();
+        try
         {
-            messageAcknowledge(_txRangeSet, true);
-            _txRangeSet.clear();
-            _txSize = 0;
+            if( _txSize > 0 )
+            {
+                messageAcknowledge(_txRangeSet, true);
+                _txRangeSet.clear();
+                _txSize = 0;
+            }
+            sendCommit();
         }
-
-        getQpidSession().setAutoSync(true);
-        try
+        catch (AMQException e)
         {
-            getQpidSession().txCommit();
+            throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
         }
-        finally
+        catch (FailoverException e)
         {
-            getQpidSession().setAutoSync(false);
+            throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
         }
-        // We need to sync so that we get notify of an error.
-        sync();
     }
 
     protected final boolean tagLE(long tag1, long tag2)
@@ -1058,9 +1020,11 @@ public class AMQSession_0_10 extends AMQ
                 code = ee.getErrorCode().getValue();
             }
             AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
+
+            _connection.exceptionReceived(amqe);
+
             _currentException = amqe;
         }
-        _connection.exceptionReceived(_currentException);
     }
 
     public AMQMessageDelegateFactory getMessageDelegateFactory()
@@ -1104,37 +1068,22 @@ public class AMQSession_0_10 extends AMQ
         return match;
     }
     
-    public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
+    public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode)
     {
         boolean match = true;
-        try
+        QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
+        match = dest.getAddressName().equals(result.getQueue());
+        
+        if (match && assertNode)
         {
-            QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
-            match = dest.getAddressName().equals(result.getQueue());
-            
-            if (match && assertNode)
-            {
-                match = (result.getDurable() == node.isDurable()) && 
-                         (result.getAutoDelete() == node.isAutoDelete()) &&
-                         (result.getExclusive() == node.isExclusive()) &&
-                         (matchProps(result.getArguments(),node.getDeclareArgs()));
-            }
-            else if (match)
-            {
-                // should I use the queried details to update the local data structure.
-            }
+            match = (result.getDurable() == node.isDurable()) && 
+                     (result.getAutoDelete() == node.isAutoDelete()) &&
+                     (result.getExclusive() == node.isExclusive()) &&
+                     (matchProps(result.getArguments(),node.getDeclareArgs()));
         }
-        catch(SessionException e)
+        else if (match)
         {
-            if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
-            {
-                match = false;
-            }
-            else
-            {
-                throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
-                        "Error querying queue",e);
-            }
+            // should I use the queried details to update the local data structure.
         }
         
         return match;
@@ -1200,22 +1149,6 @@ public class AMQSession_0_10 extends AMQ
             
             int type = resolveAddressType(dest);
             
-            if (type == AMQDestination.QUEUE_TYPE && 
-                    dest.getLink().getReliability() == Reliability.UNSPECIFIED)
-            {
-                dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
-            }
-            else if (type == AMQDestination.TOPIC_TYPE && 
-                    dest.getLink().getReliability() == Reliability.UNSPECIFIED)
-            {
-                dest.getLink().setReliability(Reliability.UNRELIABLE);
-            }
-            else if (type == AMQDestination.TOPIC_TYPE && 
-                    dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE)
-            {
-                throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");                      
-            }
-            
             switch (type)
             {
                 case AMQDestination.QUEUE_TYPE: 
@@ -1229,8 +1162,6 @@ public class AMQSession_0_10 extends AMQ
                     {
                         setLegacyFiledsForQueueType(dest);
                         send0_10QueueDeclare(dest,null,false,noWait);
-                        sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
-                                      null,dest.getExchangeName(),dest, false);
                         break;
                     }                
                 }
@@ -1339,8 +1270,6 @@ public class AMQSession_0_10 extends AMQ
                                     dest.getQueueName(),// should have one by now
                                     dest.getSubject(),
                                     Collections.<String,Object>emptyMap()));
-        sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
-                null,dest.getExchangeName(),dest, false);
     }
     
     public void setLegacyFiledsForQueueType(AMQDestination dest)
@@ -1378,26 +1307,5 @@ public class AMQSession_0_10 extends AMQ
         sb.append(">");
         return sb.toString();
     }
-
-    protected void acknowledgeImpl()
-    {
-        RangeSet range = gatherUnackedRangeSet();
-
-        if(range.size() > 0 )
-        {
-            messageAcknowledge(range, true);
-            getQpidSession().sync();
-        }
-    }
-
-    @Override
-    void resubscribe() throws AMQException
-    {
-        // Also reset the delivery tag tracker, to insure we dont
-        // return the first <total number of msgs received on session>
-        // messages sent by the brokers following the first rollback
-        // after failover
-        _highestDeliveryTag.set(-1);
-        super.resubscribe();
-    }
+    
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Oct 21 01:19:00 2011
@@ -38,7 +38,6 @@ import org.apache.qpid.client.message.Re
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQFrame;
@@ -76,12 +75,12 @@ import org.apache.qpid.framing.amqp_0_91
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
+
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
@@ -91,7 +90,7 @@ public final class AMQSession_0_8 extend
      * @param con                     The connection on which to create the session.
      * @param channelId               The unique identifier for the session.
      * @param transacted              Indicates whether or not the session is transactional.
-     * @param acknowledgeMode         The acknowledgement mode for the session.
+     * @param acknowledgeMode         The acknoledgement mode for the session.
      * @param messageFactoryRegistry  The message factory factory for the session.
      * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
@@ -109,7 +108,7 @@ public final class AMQSession_0_8 extend
      * @param con                     The connection on which to create the session.
      * @param channelId               The unique identifier for the session.
      * @param transacted              Indicates whether or not the session is transactional.
-     * @param acknowledgeMode         The acknowledgement mode for the session.
+     * @param acknowledgeMode         The acknoledgement mode for the session.
      * @param defaultPrefetchHigh     The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLow      The number of prefetched messages at which to resume the session.
      */
@@ -125,20 +124,6 @@ public final class AMQSession_0_8 extend
         return getProtocolHandler().getProtocolVersion();
     }
 
-    protected void acknowledgeImpl()
-    {
-        while (true)
-        {
-            Long tag = _unacknowledgedMessageTags.poll();
-            if (tag == null)
-            {
-                break;
-            }
-
-            acknowledgeMessage(tag, false);
-        }
-    }
-
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
@@ -168,7 +153,7 @@ public final class AMQSession_0_8 extend
         // we also need to check the state manager for 08/09 as the
         // _connection variable may not be updated in time by the error receiving
         // thread.
-        // We can't close the session if we are already in the process of
+        // We can't close the session if we are alreadying in the process of
         // closing/closed the connection.
                 
         if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
@@ -184,20 +169,8 @@ public final class AMQSession_0_8 extend
         }
     }
 
-    public void commitImpl() throws AMQException, FailoverException, TransportException
+    public void sendCommit() throws AMQException, FailoverException
     {
-        // Acknowledge all delivered messages
-        while (true)
-        {
-            Long tag = _deliveredMessageTags.poll();
-            if (tag == null)
-            {
-                break;
-            }
-
-            acknowledgeMessage(tag, false);
-        }
-
         final AMQProtocolHandler handler = getProtocolHandler();
 
         handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
@@ -427,12 +400,12 @@ public final class AMQSession_0_8 extend
 
 
     public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
-            final boolean immediate, long producerId) throws JMSException
+            final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException
     {
        try
        {
            return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
-                                 this, getProtocolHandler(), producerId, immediate, mandatory);
+                                 this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
        }
        catch (AMQException e)
        {
@@ -604,18 +577,6 @@ public final class AMQSession_0_8 extend
         
     }
 
-    @Override
-    protected void deleteTemporaryDestination(final TemporaryDestination amqQueue)
-            throws JMSException
-    {
-        // Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted
-        // by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects).
-        // This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the
-        // client explicitly deletes it.
-
-        /* intentional no-op */
-    }
-
     public boolean isQueueBound(String exchangeName, String queueName,
             String bindingKey, Map<String, Object> args) throws JMSException
     {
@@ -623,34 +584,4 @@ public final class AMQSession_0_8 extend
                             queueName == null ? null : new AMQShortString(queueName),
                             bindingKey == null ? null : new AMQShortString(bindingKey));
     }
-  
-
-    public AMQException getLastException()
-    {
-        // if the Connection has closed then we should throw any exception that
-        // has occurred that we were not waiting for
-        AMQStateManager manager = _connection.getProtocolHandler()
-                .getStateManager();
-        
-        Exception e = manager.getLastException();
-        if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
-                && e != null)
-        {
-            if (e instanceof AMQException)
-            {
-                return (AMQException) e;
-            } 
-            else
-            {
-                AMQException amqe = new AMQException(AMQConstant
-                        .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), 
-                        e.getMessage(), e.getCause());
-                return amqe;
-            }
-        } 
-        else
-        {
-            return null;
-        }
-    }
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Fri Oct 21 01:19:00 2011
@@ -20,13 +20,14 @@
  */
 package org.apache.qpid.client;
 
-import java.util.UUID;
-
 import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
 
 import org.apache.qpid.framing.AMQShortString;
 
+import java.util.Random;
+import java.util.UUID;
+
 /** AMQ implementation of a TemporaryQueue. */
 final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
 {
@@ -49,15 +50,11 @@ final class AMQTemporaryQueue extends AM
         {
             throw new JMSException("Temporary Queue has consumers so cannot be deleted");
         }
+        _deleted = true;
 
-        try
-        {
-            _session.deleteTemporaryDestination(this);
-        }
-        finally
-        {
-            _deleted = true;
-        }
+        // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
+        // by the server when there are no more subscriptions to that queue.  This is probably not
+        // quite right for JMSCompliance.
     }
 
     public AMQSession getSession()

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Fri Oct 21 01:19:00 2011
@@ -53,14 +53,10 @@ class AMQTemporaryTopic extends AMQTopic
             throw new JMSException("Temporary Topic has consumers so cannot be deleted");
         }
 
-        try
-        {
-            _session.deleteTemporaryDestination(this);
-        }
-        finally
-        {
-            _deleted = true;
-        }
+        _deleted = true;
+        // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
+        // by the server when there are no more subscriptions to that queue.  This is probably not
+        // quite right for JMSCompliance.
     }
 
     public AMQSession getSession()

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Fri Oct 21 01:19:00 2011
@@ -22,7 +22,6 @@ package org.apache.qpid.client;
 
 import java.net.URISyntaxException;
 
-import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Topic;
 
@@ -96,47 +95,39 @@ public class AMQTopic extends AMQDestina
         super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
     }
 
-    public static AMQTopic createDurableTopic(Topic topic, String subscriptionName, AMQConnection connection)
+    public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
             throws JMSException
     {
-        if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic)
+        if (topic.getDestSyntax() == DestSyntax.ADDR)
         {
-            AMQDestination qpidTopic = (AMQDestination)topic;
-            if (qpidTopic.getDestSyntax() == DestSyntax.ADDR)
+            try
             {
-                try
-                {
-                    AMQTopic t = new AMQTopic(qpidTopic.getAddress());
-                    AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
-                    // link is never null if dest was created using an address string.
-                    t.getLink().setName(queueName.asString());               
-                    t.getSourceNode().setAutoDelete(false);
-                    t.getSourceNode().setDurable(true);
-                    
-                    // The legacy fields are also populated just in case.
-                    t.setQueueName(queueName);
-                    t.setAutoDelete(false);
-                    t.setDurable(true);
-                    return t;
-                }
-                catch(Exception e)
-                {
-                    JMSException ex = new JMSException("Error creating durable topic");
-                    ex.initCause(e);
-                    ex.setLinkedException(e);
-                    throw ex;
-                }
+                AMQTopic t = new AMQTopic(topic.getAddress());
+                AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
+                // link is never null if dest was created using an address string.
+                t.getLink().setName(queueName.asString());               
+                t.getSourceNode().setAutoDelete(false);
+                t.getSourceNode().setDurable(true);
+                
+                // The legacy fields are also populated just in case.
+                t.setQueueName(queueName);
+                t.setAutoDelete(false);
+                t.setDurable(true);
+                return t;
             }
-            else
+            catch(Exception e)
             {
-                return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false,
-                                getDurableTopicQueueName(subscriptionName, connection),
-                                true);
+                JMSException ex = new JMSException("Error creating durable topic");
+                ex.initCause(e);
+                ex.setLinkedException(e);
+                throw ex;
             }
         }
         else
         {
-            throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Topic");
+            return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
+                            getDurableTopicQueueName(subscriptionName, connection),
+                            true);
         }
     }
 
@@ -147,17 +138,13 @@ public class AMQTopic extends AMQDestina
 
     public String getTopicName() throws JMSException
     {
-        if (getRoutingKey() != null)
+        if (super.getRoutingKey() == null && super.getSubject() != null)
         {
-            return getRoutingKey().asString();
-        }
-        else if (getSubject() != null)
-        {
-            return getSubject();
+            return super.getSubject();
         }
         else
         {
-            return null;
+            return super.getRoutingKey().toString();
         }
     }
     
@@ -176,18 +163,12 @@ public class AMQTopic extends AMQDestina
 
     public AMQShortString getRoutingKey()
     {
-        if (super.getRoutingKey() != null)            
-        {
-            return super.getRoutingKey();            
-        }
-        else if (getSubject() != null)
+        if (super.getRoutingKey() == null && super.getSubject() != null)
         {
-            return new AMQShortString(getSubject());
+            return new AMQShortString(super.getSubject());
         }
         else
         {
-            setRoutingKey(new AMQShortString(""));
-            setSubject("");
             return super.getRoutingKey();
         }
     }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Oct 21 01:19:00 2011
@@ -27,7 +27,6 @@ import org.apache.qpid.client.protocol.A
 import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
-import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +36,10 @@ import javax.jms.MessageListener;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.SortedSet;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -115,10 +117,29 @@ public abstract class BasicMessageConsum
     protected final int _acknowledgeMode;
 
     /**
+     * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+     */
+    private int _outstanding;
+
+    /**
+     * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
+     * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
+     */
+    private boolean _dups_ok_acknowledge_send;
+
+    /**
      * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
      */
     private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
 
+    /** The last tag that was "multiple" acknowledged on this session (if transacted) */
+    private long _lastAcked;
+
+    /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
+    private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
+
+    private final Object _commitLock = new Object();
+
     /**
      * The thread that was used to call receive(). This is important for being able to interrupt that thread if a
      * receive() is in progress.
@@ -268,6 +289,17 @@ public abstract class BasicMessageConsum
         }
     }
 
+    protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+    {
+        if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        {
+            _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
+        }
+        
+        _session.setInRecovery(false);
+        preDeliver(jmsMsg);
+    }
+
     /**
      * @param immediate if true then return immediately if the connection is failing over
      *
@@ -290,14 +322,14 @@ public abstract class BasicMessageConsum
             }
         }
 
-        if (isMessageListenerSet())
+        if (!_receiving.compareAndSet(false, true))
         {
-            throw new javax.jms.IllegalStateException("A listener has already been set.");
+            throw new javax.jms.IllegalStateException("Another thread is already receiving.");
         }
 
-        if (!_receiving.compareAndSet(false, true))
+        if (isMessageListenerSet())
         {
-            throw new javax.jms.IllegalStateException("Another thread is already receiving.");
+            throw new javax.jms.IllegalStateException("A listener has already been set.");
         }
 
         _receivingThread = Thread.currentThread();
@@ -376,7 +408,7 @@ public abstract class BasicMessageConsum
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
-                preDeliver(m);
+                preApplicationProcessing(m);
                 postDeliver(m);
             }
             return m;
@@ -387,10 +419,6 @@ public abstract class BasicMessageConsum
 
             return null;
         }
-        catch(TransportException e)
-        {
-            throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
-        }
         finally
         {
             releaseReceiving();
@@ -449,7 +477,7 @@ public abstract class BasicMessageConsum
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
-                preDeliver(m);
+                preApplicationProcessing(m);
                 postDeliver(m);
             }
 
@@ -461,10 +489,6 @@ public abstract class BasicMessageConsum
 
             return null;
         }
-        catch(TransportException e)
-        {
-            throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
-        }
         finally
         {
             releaseReceiving();
@@ -547,7 +571,6 @@ public abstract class BasicMessageConsum
                         if (!_session.isClosed() || _session.isClosing())
                         {
                             sendCancel();
-                            cleanupQueue();
                         }
                     }
                     catch (AMQException e)
@@ -558,10 +581,6 @@ public abstract class BasicMessageConsum
                     {
                         throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
                     }
-                    catch (TransportException e)
-                    {
-                        throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
-                    }
                 }
             }
             else
@@ -589,8 +608,6 @@ public abstract class BasicMessageConsum
     }
 
     abstract void sendCancel() throws AMQException, FailoverException;
-    
-    abstract void cleanupQueue() throws AMQException, FailoverException;
 
     /**
      * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
@@ -701,7 +718,7 @@ public abstract class BasicMessageConsum
         {
             if (isMessageListenerSet())
             {
-                preDeliver(jmsMessage);
+                preApplicationProcessing(jmsMessage);
                 getMessageListener().onMessage(jmsMessage);
                 postDeliver(jmsMessage);
             }
@@ -725,42 +742,49 @@ public abstract class BasicMessageConsum
         }
     }
 
-    protected void preDeliver(AbstractJMSMessage msg)
+    void preDeliver(AbstractJMSMessage msg)
     {
-        _session.setInRecovery(false);
-
         switch (_acknowledgeMode)
         {
+
             case Session.PRE_ACKNOWLEDGE:
                 _session.acknowledgeMessage(msg.getDeliveryTag(), false);
                 break;
-            case Session.AUTO_ACKNOWLEDGE:
-                //fall through
-            case Session.DUPS_OK_ACKNOWLEDGE:
-                _session.addUnacknowledgedMessage(msg.getDeliveryTag());
-                break;
+
             case Session.CLIENT_ACKNOWLEDGE:
                 // we set the session so that when the user calls acknowledge() it can call the method on session
                 // to send out the appropriate frame
                 msg.setAMQSession(_session);
-                _session.addUnacknowledgedMessage(msg.getDeliveryTag());
-                _session.markDirty();
                 break;
             case Session.SESSION_TRANSACTED:
-                _session.addDeliveredMessage(msg.getDeliveryTag());
-                _session.markDirty();
-                break;
-            case Session.NO_ACKNOWLEDGE:
-                //do nothing.
-                //path used for NO-ACK consumers, and browsers (see constructor).
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                else
+                {
+                    _session.addDeliveredMessage(msg.getDeliveryTag());
+                    _session.markDirty();
+                }
+
                 break;
         }
+
     }
 
-    void postDeliver(AbstractJMSMessage msg)
+    void postDeliver(AbstractJMSMessage msg) throws JMSException
     {
         switch (_acknowledgeMode)
         {
+
+            case Session.CLIENT_ACKNOWLEDGE:
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                _session.markDirty();
+                break;
+
             case Session.DUPS_OK_ACKNOWLEDGE:
             case Session.AUTO_ACKNOWLEDGE:
                 // we do not auto ack a message if the application code called recover()
@@ -798,6 +822,63 @@ public abstract class BasicMessageConsum
         return null;
     }
 
+    /**
+     * Acknowledge up to last message delivered (if any). Used when commiting.
+     */
+    void acknowledgeDelivered()
+    {
+        synchronized(_commitLock)
+        {
+            ArrayList<Long> tagsToAck = new ArrayList<Long>();
+
+            while (!_receivedDeliveryTags.isEmpty())
+            {
+                tagsToAck.add(_receivedDeliveryTags.poll());
+            }
+
+            Collections.sort(tagsToAck);
+
+            long prevAcked = _lastAcked;
+            long oldAckPoint = -1;
+
+            while(oldAckPoint != prevAcked)
+            {
+                oldAckPoint = prevAcked;
+
+                Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+                while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
+                {
+                    tagsToAckIterator.remove();
+                    prevAcked++;
+                }
+
+                Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
+                while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
+                {
+                    previousAckIterator.remove();
+                    prevAcked++;
+                }
+
+            }
+            if(prevAcked != _lastAcked)
+            {
+                _session.acknowledgeMessage(prevAcked, true);
+                _lastAcked = prevAcked;
+            }
+
+            Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+            while(tagsToAckIterator.hasNext())
+            {
+                Long tag = tagsToAckIterator.next();
+                _session.acknowledgeMessage(tag, false);
+                _previouslyAcked.add(tag);
+            }
+        }
+    }
+
+
     void notifyError(Throwable cause)
     {
         // synchronized (_closed)
@@ -876,7 +957,7 @@ public abstract class BasicMessageConsum
 
     public boolean isNoConsume()
     {
-        return _noConsume;
+        return _noConsume || _destination.isBrowseOnly() ;
     }
 
     public void rollback()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org