You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [29/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Fri Oct 21 14:42:12 2011
@@ -21,9 +21,8 @@
 package org.apache.qpid.client;
 
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Destination;
 import javax.naming.NamingException;
@@ -34,8 +33,6 @@ import javax.naming.StringRefAddr;
 import org.apache.qpid.client.messaging.address.AddressHelper;
 import org.apache.qpid.client.messaging.address.Link;
 import org.apache.qpid.client.messaging.address.Node;
-import org.apache.qpid.client.messaging.address.QpidExchangeOptions;
-import org.apache.qpid.client.messaging.address.QpidQueueOptions;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
@@ -63,7 +60,7 @@ public abstract class AMQDestination imp
 
     private boolean _browseOnly;
     
-    private boolean _isAddressResolved;
+    private AtomicLong _addressResolved = new AtomicLong(0);
 
     private AMQShortString _queueName;
 
@@ -78,15 +75,10 @@ public abstract class AMQDestination imp
 
     private boolean _exchangeExistsChecked;
 
-    private byte[] _byteEncoding;
-    private static final int IS_DURABLE_MASK = 0x1;
-    private static final int IS_EXCLUSIVE_MASK = 0x2;
-    private static final int IS_AUTODELETE_MASK = 0x4;
-
     public static final int QUEUE_TYPE = 1;
     public static final int TOPIC_TYPE = 2;
     public static final int UNKNOWN_TYPE = 3;
-    
+
     // ----- Fields required to support new address syntax -------
     
     public enum DestSyntax {        
@@ -323,7 +315,11 @@ public abstract class AMQDestination imp
     {
         if(_urlAsShortString == null)
         {
-            toURL();
+            if (_url == null)
+            {
+                toURL();
+            }
+            _urlAsShortString = new AMQShortString(_url);
         }
         return _urlAsShortString;
     }
@@ -370,7 +366,6 @@ public abstract class AMQDestination imp
         // calculated URL now out of date
         _url = null;
         _urlAsShortString = null;
-        _byteEncoding = null;
     }
 
     public AMQShortString getRoutingKey()
@@ -508,59 +503,10 @@ public abstract class AMQDestination imp
             sb.deleteCharAt(sb.length() - 1);
             url = sb.toString();
             _url = url;
-            _urlAsShortString = new AMQShortString(url);
         }
         return url;
     }
 
-    public byte[] toByteEncoding()
-    {
-        byte[] encoding = _byteEncoding;
-        if(encoding == null)
-        {
-            int size = _exchangeClass.length() + 1 +
-                       _exchangeName.length() + 1 +
-                       0 +  // in place of the destination name
-                       (_queueName == null ? 0 : _queueName.length()) + 1 +
-                       1;
-            encoding = new byte[size];
-            int pos = 0;
-
-            pos = _exchangeClass.writeToByteArray(encoding, pos);
-            pos = _exchangeName.writeToByteArray(encoding, pos);
-
-            encoding[pos++] = (byte)0;
-
-            if(_queueName == null)
-            {
-                encoding[pos++] = (byte)0;
-            }
-            else
-            {
-                pos = _queueName.writeToByteArray(encoding,pos);
-            }
-            byte options = 0;
-            if(_isDurable)
-            {
-                options |= IS_DURABLE_MASK;
-            }
-            if(_isExclusive)
-            {
-                options |= IS_EXCLUSIVE_MASK;
-            }
-            if(_isAutoDelete)
-            {
-                options |= IS_AUTODELETE_MASK;
-            }
-            encoding[pos] = options;
-
-
-            _byteEncoding = encoding;
-
-        }
-        return encoding;
-    }
-
     public boolean equals(Object o)
     {
         if (this == o)
@@ -614,53 +560,6 @@ public abstract class AMQDestination imp
                 null);          // factory location
     }
 
-
-    public static Destination createDestination(byte[] byteEncodedDestination)
-    {
-        AMQShortString exchangeClass;
-        AMQShortString exchangeName;
-        AMQShortString routingKey;
-        AMQShortString queueName;
-        boolean isDurable;
-        boolean isExclusive;
-        boolean isAutoDelete;
-
-        int pos = 0;
-        exchangeClass = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
-        pos+= exchangeClass.length() + 1;
-        exchangeName =  AMQShortString.readFromByteArray(byteEncodedDestination, pos);
-        pos+= exchangeName.length() + 1;
-        routingKey =  AMQShortString.readFromByteArray(byteEncodedDestination, pos);
-        pos+= (routingKey == null ? 0 : routingKey.length()) + 1;
-        queueName =  AMQShortString.readFromByteArray(byteEncodedDestination, pos);
-        pos+= (queueName == null ? 0 : queueName.length()) + 1;
-        int options = byteEncodedDestination[pos];
-        isDurable = (options & IS_DURABLE_MASK) != 0;
-        isExclusive = (options & IS_EXCLUSIVE_MASK) != 0;
-        isAutoDelete = (options & IS_AUTODELETE_MASK) != 0;
-
-        if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
-        {
-            return new AMQQueue(exchangeName,routingKey,queueName,isExclusive,isAutoDelete,isDurable);
-        }
-        else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
-        {
-            return new AMQTopic(exchangeName,routingKey,isAutoDelete,queueName,isDurable);
-        }
-        else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
-        {
-            return new AMQHeadersExchange(routingKey);
-        }
-        else
-        {
-            return new AMQAnyDestination(exchangeName,exchangeClass,
-                                         routingKey,isExclusive, 
-                                         isAutoDelete,queueName, 
-                                         isDurable, new AMQShortString[0]);
-        }
-
-    }
-
     public static Destination createDestination(BindingURL binding)
     {
         AMQShortString type = binding.getExchangeClass();
@@ -842,12 +741,12 @@ public abstract class AMQDestination imp
     
     public boolean isAddressResolved()
     {
-        return _isAddressResolved;
+        return _addressResolved.get() > 0;
     }
 
-    public void setAddressResolved(boolean addressResolved)
+    public void setAddressResolved(long addressResolved)
     {
-        _isAddressResolved = addressResolved;
+        _addressResolved.set(addressResolved);
     }
     
     private static Address createAddressFromString(String str)
@@ -895,7 +794,7 @@ public abstract class AMQDestination imp
         return _browseOnly;
     }
     
-    public void setBrowseOnly(boolean b)
+    private void setBrowseOnly(boolean b)
     {
         _browseOnly = b;
     }
@@ -925,7 +824,7 @@ public abstract class AMQDestination imp
         dest.setTargetNode(_targetNode);
         dest.setSourceNode(_sourceNode);
         dest.setLink(_link);
-        dest.setAddressResolved(_isAddressResolved);
+        dest.setAddressResolved(_addressResolved.get());
         return dest;        
     }
     
@@ -938,4 +837,9 @@ public abstract class AMQDestination imp
     {
         _isDurable = b;
     }
+
+    public boolean isResolvedAfter(long time)
+    {
+        return _addressResolved.get() > time;
+    }
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Oct 21 14:42:12 2011
@@ -70,7 +70,6 @@ import org.apache.qpid.AMQDisconnectedEx
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
@@ -88,8 +87,6 @@ import org.apache.qpid.client.message.JM
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQShortString;
@@ -97,7 +94,10 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -213,8 +213,6 @@ public abstract class AMQSession<C exten
      */
     protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
 
-    protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
-
     /**
      * The period to wait while flow controlled before sending a log message confirming that the session is still
      * waiting on flow control being revoked
@@ -310,7 +308,7 @@ public abstract class AMQSession<C exten
     protected final FlowControllingBlockingQueue _queue;
 
     /** Holds the highest received delivery tag. */
-    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+    protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
     private final AtomicLong _rollbackMark = new AtomicLong(-1);
     
     /** All the not yet acknowledged message tags */
@@ -364,7 +362,13 @@ public abstract class AMQSession<C exten
      * Set when recover is called. This is to handle the case where recover() is called by application code during
      * onMessage() processing to ensure that an auto ack is not sent.
      */
-    private boolean _inRecovery;
+    private volatile boolean _sessionInRecovery;
+
+    /**
+     * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
+     * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
+     */
+    private volatile boolean _usingDispatcherForCleanup;
 
     /** Used to indicates that the connection to which this session belongs, has been stopped. */
     private boolean _connectionStopped;
@@ -567,6 +571,8 @@ public abstract class AMQSession<C exten
         close(-1);
     }
 
+    public abstract AMQException getLastException();
+    
     public void checkNotClosed() throws JMSException
     {
         try
@@ -575,16 +581,20 @@ public abstract class AMQSession<C exten
         }
         catch (IllegalStateException ise)
         {
-            // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for
-            AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
+            AMQException ex = getLastException();
+            if (ex != null)
+            {
+                IllegalStateException ssnClosed = new IllegalStateException(
+                        "Session has been closed", ex.getErrorCode().toString());
 
-            if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
+                ssnClosed.setLinkedException(ex);
+                ssnClosed.initCause(ex);
+                throw ssnClosed;
+            } 
+            else
             {
-                ise.setLinkedException(manager.getLastException());
-                ise.initCause(ise.getLinkedException());
+                throw ise;
             }
-
-            throw ise;
         }
     }
 
@@ -600,29 +610,36 @@ public abstract class AMQSession<C exten
      * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
      *
      * @throws IllegalStateException If the session is closed.
+     * @throws JMSException if there is a problem during acknowledge process.
      */
-    public void acknowledge() throws IllegalStateException
+    public void acknowledge() throws IllegalStateException, JMSException
     {
         if (isClosed())
         {
             throw new IllegalStateException("Session is already closed");
         }
-        else if (hasFailedOver())
+        else if (hasFailedOverDirty())
         {
+            //perform an implicit recover in this scenario
+            recover();
+
+            //notify the consumer
             throw new IllegalStateException("has failed over");
         }
 
-        while (true)
+        try
         {
-            Long tag = _unacknowledgedMessageTags.poll();
-            if (tag == null)
-            {
-                break;
-            }
-            acknowledgeMessage(tag, false);
+            acknowledgeImpl();
+            markClean();
+        }
+        catch (TransportException e)
+        {
+            throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
         }
     }
 
+    protected abstract void acknowledgeImpl() throws JMSException;
+
     /**
      * Acknowledge one or many messages.
      *
@@ -757,6 +774,10 @@ public abstract class AMQSession<C exten
                         _logger.debug(
                                 "Got FailoverException during channel close, ignored as channel already marked as closed.");
                     }
+                    catch (TransportException e)
+                    {
+                        throw toJMSException("Error closing session:" + e.getMessage(), e);
+                    }
                     finally
                     {
                         _connection.deregisterSession(_channelId);
@@ -827,51 +848,44 @@ public abstract class AMQSession<C exten
      * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
      *                      not mean that the commit is known to have failed, merely that it is not known whether it
      *                      failed or not.
-     * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void commit() throws JMSException
     {
         checkTransacted();
 
-        try
+        //Check that we are clean to commit.
+        if (_failedOverDirty)
         {
-            //Check that we are clean to commit.
-            if (_failedOverDirty)
+            if (_logger.isDebugEnabled())
             {
-                rollback();
-
-                throw new TransactionRolledBackException("Connection failover has occured since last send. " +
-                                                         "Forced rollback");
+                _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
             }
+            rollback();
 
+            throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
+                                                     "The session transaction was rolled back.");
+        }
 
-            // Acknowledge all delivered messages
-            while (true)
-            {
-                Long tag = _deliveredMessageTags.poll();
-                if (tag == null)
-                {
-                    break;
-                }
-
-                acknowledgeMessage(tag, false);
-            }
-            // Commits outstanding messages and acknowledgments
-            sendCommit();
+        try
+        {
+            commitImpl();
             markClean();
         }
         catch (AMQException e)
         {
-            throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
+            throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e);
         }
         catch (FailoverException e)
         {
             throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
         }
+        catch(TransportException e)
+        {
+            throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
+        }
     }
 
-    public abstract void sendCommit() throws AMQException, FailoverException;
-
+    protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;
 
     public void confirmConsumerCancelled(int consumerTag)
     {
@@ -949,7 +963,7 @@ public abstract class AMQSession<C exten
         return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
     }
 
-    public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+    protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
             throws JMSException
     {
         checkValidDestination(destination);
@@ -963,15 +977,7 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
-                                  ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
-    }
-
-    public C createExclusiveConsumer(Destination destination) throws JMSException
-    {
-        checkValidDestination(destination);
-
-        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
-                                  ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+                                  isBrowseOnlyDestination(destination), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -979,7 +985,7 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
-                                  messageSelector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -988,16 +994,7 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
-                                  messageSelector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
-    }
-
-    public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
-            throws JMSException
-    {
-        checkValidDestination(destination);
-
-        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
-                                  messageSelector, null, false, false);
+                                  messageSelector, null, isBrowseOnlyDestination(destination), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -1005,23 +1002,15 @@ public abstract class AMQSession<C exten
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
-                                          boolean exclusive, String selector) throws JMSException
+                                              boolean exclusive, String selector) throws JMSException
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
-    }
-
-    public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
-                                          String selector, FieldTable rawSelector) throws JMSException
-    {
-        checkValidDestination(destination);
-
-        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()), false);
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -1029,7 +1018,7 @@ public abstract class AMQSession<C exten
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly()),
+        return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination),
                                   false);
     }
 
@@ -1043,8 +1032,33 @@ public abstract class AMQSession<C exten
             throws JMSException
     {
         checkNotClosed();
-        AMQTopic origTopic = checkValidTopic(topic, true);
+        Topic origTopic = checkValidTopic(topic, true);
+        
         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+        if (dest.getDestSyntax() == DestSyntax.ADDR &&
+            !dest.isAddressResolved())
+        {
+            try
+            {
+                handleAddressBasedDestination(dest,false,true);
+                if (dest.getAddressType() !=  AMQDestination.TOPIC_TYPE)
+                {
+                    throw new JMSException("Durable subscribers can only be created for Topics");
+                }
+                dest.getSourceNode().setDurable(true);
+            }
+            catch(AMQException e)
+            {
+                JMSException ex = new JMSException("Error when verifying destination");
+                ex.initCause(e);
+                ex.setLinkedException(e);
+                throw ex;
+            }
+            catch(TransportException e)
+            {
+                throw toJMSException("Error when verifying destination", e);
+            }
+        }
         
         String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
         
@@ -1056,15 +1070,9 @@ public abstract class AMQSession<C exten
             // Not subscribed to this name in the current session
             if (subscriber == null)
             {
-                AMQShortString topicName;
-                if (topic instanceof AMQTopic)
-                {
-                    topicName = ((AMQTopic) topic).getRoutingKey();
-                } else
-                {
-                    topicName = new AMQShortString(topic.getTopicName());
-                }
-
+                // After the address is resolved routing key will not be null.
+                AMQShortString topicName = dest.getRoutingKey();
+                
                 if (_strictAMQP)
                 {
                     if (_strictAMQPFATAL)
@@ -1135,6 +1143,10 @@ public abstract class AMQSession<C exten
     
             return subscriber;
         }
+        catch (TransportException e)
+        {
+            throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
+        }
         finally
         {
             _subscriberDetails.unlock();
@@ -1195,12 +1207,6 @@ public abstract class AMQSession<C exten
         return createProducerImpl(destination, mandatory, immediate);
     }
 
-    public P createProducer(Destination destination, boolean mandatory, boolean immediate,
-                                               boolean waitUntilSent) throws JMSException
-    {
-        return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
-    }
-
     public TopicPublisher createPublisher(Topic topic) throws JMSException
     {
         checkNotClosed();
@@ -1225,7 +1231,6 @@ public abstract class AMQSession<C exten
                 else
                 {
                     AMQQueue queue = new AMQQueue(queueName);
-                    queue.setCreate(AddressOption.ALWAYS);
                     return queue;
                     
                 }
@@ -1307,8 +1312,8 @@ public abstract class AMQSession<C exten
     public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
     {
         checkValidDestination(destination);
-        AMQQueue dest = (AMQQueue) destination;
-        C consumer = (C) createConsumer(destination);
+        Queue dest = validateQueue(destination);
+        C consumer = (C) createConsumer(dest);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1326,8 +1331,8 @@ public abstract class AMQSession<C exten
     public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
     {
         checkValidDestination(destination);
-        AMQQueue dest = (AMQQueue) destination;
-        C consumer = (C) createConsumer(destination, messageSelector);
+        Queue dest = validateQueue(destination);
+        C consumer = (C) createConsumer(dest, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1344,7 +1349,7 @@ public abstract class AMQSession<C exten
     public QueueReceiver createReceiver(Queue queue) throws JMSException
     {
         checkNotClosed();
-        AMQQueue dest = (AMQQueue) queue;
+        Queue dest = validateQueue(queue);
         C consumer = (C) createConsumer(dest);
 
         return new QueueReceiverAdaptor(dest, consumer);
@@ -1363,17 +1368,28 @@ public abstract class AMQSession<C exten
     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
     {
         checkNotClosed();
-        AMQQueue dest = (AMQQueue) queue;
+        Queue dest = validateQueue(queue);
         C consumer = (C) createConsumer(dest, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
+    
+    private Queue validateQueue(Destination dest) throws InvalidDestinationException
+    {
+        if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
+        {
+            return (Queue)dest;
+        }
+        else
+        {
+            throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue");
+        }
+    }
 
     public QueueSender createSender(Queue queue) throws JMSException
     {
         checkNotClosed();
 
-        // return (QueueSender) createProducer(queue);
         return new QueueSenderAdapter(createProducer(queue), queue);
     }
 
@@ -1408,10 +1424,10 @@ public abstract class AMQSession<C exten
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
     {
         checkNotClosed();
-        AMQTopic dest = checkValidTopic(topic);
+        checkValidTopic(topic);
 
-        // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
+        return new TopicSubscriberAdaptor<C>(topic,
+                createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false));
     }
 
     /**
@@ -1428,10 +1444,11 @@ public abstract class AMQSession<C exten
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
     {
         checkNotClosed();
-        AMQTopic dest = checkValidTopic(topic);
+        checkValidTopic(topic);
 
-        // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
+        return new TopicSubscriberAdaptor<C>(topic,
+                                             createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal,
+                                                                true, messageSelector, null, false, false));
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1533,10 +1550,8 @@ public abstract class AMQSession<C exten
 
     abstract public void sync() throws AMQException;
 
-    public int getAcknowledgeMode() throws JMSException
+    public int getAcknowledgeMode()
     {
-        checkNotClosed();
-
         return _acknowledgeMode;
     }
 
@@ -1596,10 +1611,8 @@ public abstract class AMQSession<C exten
         return _ticket;
     }
 
-    public boolean getTransacted() throws JMSException
+    public boolean getTransacted()
     {
-        checkNotClosed();
-
         return _transacted;
     }
 
@@ -1695,13 +1708,14 @@ public abstract class AMQSession<C exten
         // Ensure that the session is not transacted.
         checkNotTransacted();
 
-        // flush any acks we are holding in the buffer.
-        flushAcknowledgments();
-        
-        // this is set only here, and the before the consumer's onMessage is called it is set to false
-        _inRecovery = true;
+
         try
         {
+            // flush any acks we are holding in the buffer.
+            flushAcknowledgments();
+
+            // this is only set true here, and only set false when the consumers preDeliver method is called
+            _sessionInRecovery = true;
 
             boolean isSuspended = isSuspended();
 
@@ -1709,9 +1723,18 @@ public abstract class AMQSession<C exten
             {
                 suspendChannel(true);
             }
-            
+
+            // Set to true to short circuit delivery of anything currently
+            //in the pre-dispatch queue.
+            _usingDispatcherForCleanup = true;
+
             syncDispatchQueue();
-            
+
+            // Set to false before sending the recover as 0-8/9/9-1 will
+            //send messages back before the recover completes, and we
+            //probably shouldn't clean those! ;-)
+            _usingDispatcherForCleanup = false;
+
             if (_dispatcher != null)
             {
                 _dispatcher.recover();
@@ -1720,10 +1743,7 @@ public abstract class AMQSession<C exten
             sendRecover();
             
             markClean();
-            
-            // Set inRecovery to false before you start message flow again again.            
-            _inRecovery = false; 
-            
+
             if (!isSuspended)
             {
                 suspendChannel(false);
@@ -1737,7 +1757,10 @@ public abstract class AMQSession<C exten
         {
             throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
         }
-       
+        catch(TransportException e)
+        {
+            throw toJMSException("Recover failed: " + e.getMessage(), e);
+        }
     }
 
     protected abstract void sendRecover() throws AMQException, FailoverException;
@@ -1795,9 +1818,7 @@ public abstract class AMQSession<C exten
                     suspendChannel(true);
                 }
 
-                // Let the dispatcher know that all the incomming messages
-                // should be rolled back(reject/release)
-                _rollbackMark.set(_highestDeliveryTag.get());
+                setRollbackMark();
 
                 syncDispatchQueue();
 
@@ -1822,6 +1843,10 @@ public abstract class AMQSession<C exten
             {
                 throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
             }
+            catch (TransportException e)
+            {
+                throw toJMSException("Failure to rollback:" + e.getMessage(), e);
+            }
         }
     }
 
@@ -1868,7 +1893,14 @@ public abstract class AMQSession<C exten
      */
     public void unsubscribe(String name) throws JMSException
     {
-        unsubscribe(name, false);
+        try
+        {
+            unsubscribe(name, false);
+        }
+        catch (TransportException e)
+        {
+            throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
+        }
     }
     
     /**
@@ -1945,6 +1977,12 @@ public abstract class AMQSession<C exten
     {
         checkTemporaryDestination(destination);
 
+        if(!noConsume && isBrowseOnlyDestination(destination))
+        {
+            throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
+                                                  "but a 'browseOnly' Destination has been supplied.");
+        }
+
         final String messageSelector;
 
         if (_strictAMQP && !((selector == null) || selector.equals("")))
@@ -1989,8 +2027,16 @@ public abstract class AMQSession<C exten
                         // argument, as specifying null for the arguments when querying means they should not be checked at all
                         ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
 
-                        C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
-                                                                              noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+                        C consumer;
+                        try
+                        {
+                            consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+                                                             noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+                        }
+                        catch(TransportException e)
+                        {
+                            throw toJMSException("Exception while creating consumer: " + e.getMessage(), e);
+                        }
 
                         if (_messageListener != null)
                         {
@@ -2027,7 +2073,10 @@ public abstract class AMQSession<C exten
                             ex.initCause(e);
                             throw ex;
                         }
-
+                        catch (TransportException e)
+                        {
+                            throw toJMSException("Exception while registering consumer:" + e.getMessage(), e);
+                        }
                         return consumer;
                     }
                 }, _connection).execute();
@@ -2092,7 +2141,7 @@ public abstract class AMQSession<C exten
 
     boolean isInRecovery()
     {
-        return _inRecovery;
+        return _sessionInRecovery;
     }
 
     boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
@@ -2214,7 +2263,7 @@ public abstract class AMQSession<C exten
 
     void setInRecovery(boolean inRecovery)
     {
-        _inRecovery = inRecovery;
+        _sessionInRecovery = inRecovery;
     }
 
     boolean isStarted()
@@ -2395,7 +2444,7 @@ public abstract class AMQSession<C exten
     /*
      * I could have combined the last 3 methods, but this way it improves readability
      */
-    protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws JMSException
+    protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException
     {
         if (topic == null)
         {
@@ -2414,17 +2463,17 @@ public abstract class AMQSession<C exten
                 ("Cannot create a durable subscription with a temporary topic: " + topic);
         }
 
-        if (!(topic instanceof AMQTopic))
+        if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
         {
             throw new javax.jms.InvalidDestinationException(
                     "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
                     + topic.getClass().getName());
         }
 
-        return (AMQTopic) topic;
+        return topic;
     }
 
-    protected AMQTopic checkValidTopic(Topic topic) throws JMSException
+    protected Topic checkValidTopic(Topic topic) throws JMSException
     {
         return checkValidTopic(topic, false);
     }
@@ -2553,15 +2602,9 @@ public abstract class AMQSession<C exten
     public abstract void sendConsume(C consumer, AMQShortString queueName,
                                      AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
 
-    private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+    private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
             throws JMSException
     {
-        return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
-    }
-
-    private P createProducerImpl(final Destination destination, final boolean mandatory,
-                                                    final boolean immediate, final boolean waitUntilSent) throws JMSException
-    {
         return new FailoverRetrySupport<P, JMSException>(
                 new FailoverProtectedOperation<P, JMSException>()
                 {
@@ -2569,8 +2612,18 @@ public abstract class AMQSession<C exten
                     {
                         checkNotClosed();
                         long producerId = getNextProducerId();
-                        P producer = createMessageProducer(destination, mandatory,
-                                                           immediate, waitUntilSent, producerId);
+
+                        P producer;
+                        try
+                        {
+                            producer = createMessageProducer(destination, mandatory,
+                                    immediate, producerId);
+                        }
+                        catch (TransportException e)
+                        {
+                            throw toJMSException("Exception while creating producer:" + e.getMessage(), e);
+                        }
+
                         registerProducer(producerId, producer);
 
                         return producer;
@@ -2579,7 +2632,7 @@ public abstract class AMQSession<C exten
     }
 
     public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
-                                                               final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException;
+                                                               final boolean immediate, final long producerId) throws JMSException;
 
     private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
     {
@@ -2722,6 +2775,21 @@ public abstract class AMQSession<C exten
         }
     }
 
+    /**
+     * Undeclares the specified temporary queue/topic.
+     *
+     * <p/>Note that this operation automatically retries in the event of fail-over.
+     *
+     * @param amqQueue The name of the temporary destination to delete.
+     *
+     * @throws JMSException If the queue could not be deleted for any reason.
+     * @todo Be aware of possible changes to parameter order as versions change.
+     */
+    protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException
+    {
+        deleteQueue(amqQueue.getAMQQueueName());
+    }
+
     public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
 
     private long getNextProducerId()
@@ -2819,6 +2887,7 @@ public abstract class AMQSession<C exten
             {
                 declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
             }
+            bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
         }
         
         AMQShortString queueName = amqd.getAMQQueueName();
@@ -2826,8 +2895,6 @@ public abstract class AMQSession<C exten
         // store the consumer queue name
         consumer.setQueuename(queueName);
 
-        bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
-
         // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
         if (!_immediatePrefetch)
         {
@@ -2978,6 +3045,10 @@ public abstract class AMQSession<C exten
             {
                 throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e);
             }
+            catch (TransportException e)
+            {
+                throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
+            }
         }
     }
 
@@ -3016,21 +3087,11 @@ public abstract class AMQSession<C exten
      *
      * @return boolean true if failover has occured.
      */
-    public boolean hasFailedOver()
+    public boolean hasFailedOverDirty()
     {
         return _failedOverDirty;
     }
 
-    /**
-     * Check to see if any message have been sent in this transaction and have not been commited.
-     *
-     * @return boolean true if a message has been sent but not commited
-     */
-    public boolean isDirty()
-    {
-        return _dirty;
-    }
-
     public void setTicket(int ticket)
     {
         _ticket = ticket;
@@ -3143,7 +3204,7 @@ public abstract class AMQSession<C exten
                     setConnectionStopped(true);
                 }
 
-                _rollbackMark.set(_highestDeliveryTag.get());
+                setRollbackMark();
 
                 _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
 
@@ -3292,9 +3353,14 @@ public abstract class AMQSession<C exten
                 if (!(message instanceof CloseConsumerMessage)
                     && tagLE(deliveryTag, _rollbackMark.get()))
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Rejecting message because delivery tag " + deliveryTag
+                                + " <= rollback mark " + _rollbackMark.get());
+                    }
                     rejectMessage(message, true);
                 }
-                else if (isInRecovery())
+                else if (_usingDispatcherForCleanup)
                 {
                     _unacknowledgedMessageTags.add(deliveryTag);            
                 }
@@ -3353,6 +3419,11 @@ public abstract class AMQSession<C exten
                 // Don't reject if we're already closing
                 if (!_closed.get())
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
+                                + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+                    }
                     rejectMessage(message, true);
                 }
             }
@@ -3450,4 +3521,48 @@ public abstract class AMQSession<C exten
     {
         return _closing.get()|| _connection.isClosing();
     }
+    
+    public boolean isDeclareExchanges()
+    {
+    	return DECLARE_EXCHANGES;
+    }
+
+    JMSException toJMSException(String message, TransportException e)
+    {
+        int code = getErrorCode(e);
+        JMSException jmse = new JMSException(message, Integer.toString(code));
+        jmse.setLinkedException(e);
+        jmse.initCause(e);
+        return jmse;
+    }
+
+    private int getErrorCode(TransportException e)
+    {
+        int code = AMQConstant.INTERNAL_ERROR.getCode();
+        if (e instanceof SessionException)
+        {
+            SessionException se = (SessionException) e;
+            if(se.getException() != null && se.getException().getErrorCode() != null)
+            {
+                code = se.getException().getErrorCode().getValue();
+            }
+        }
+        return code;
+    }
+
+    private boolean isBrowseOnlyDestination(Destination destination)
+    {
+        return ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly());
+    }
+
+    private void setRollbackMark()
+    {
+        // Let the dispatcher know that all the incomming messages
+        // should be rolled back(reject/release)
+        _rollbackMark.set(_highestDeliveryTag.get());
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Rollback mark is set to " + _rollbackMark.get());
+        }
+    }
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Oct 21 14:42:12 2011
@@ -47,6 +47,8 @@ import org.apache.qpid.client.message.AM
 import org.apache.qpid.client.message.FieldTableSupport;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.messaging.address.Link;
+import org.apache.qpid.client.messaging.address.Link.Reliability;
 import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
 import org.apache.qpid.client.messaging.address.Node.QueueNode;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -56,6 +58,7 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.transport.ExchangeBoundResult;
 import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
 import org.apache.qpid.transport.ExecutionException;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
@@ -69,6 +72,7 @@ import org.apache.qpid.transport.RangeSe
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionException;
 import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.Serial;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -156,13 +160,20 @@ public class AMQSession_0_10 extends AMQ
      */
     AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
                     boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
-                    int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+                    int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
     {
 
         super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
               defaultPrefetchLowMark);
         _qpidConnection = qpidConnection;
-        _qpidSession = _qpidConnection.createSession(1);
+        if (name == null)
+        {
+            _qpidSession = _qpidConnection.createSession(1);
+        }
+        else
+        {
+            _qpidSession = _qpidConnection.createSession(name,1);
+        }
         _qpidSession.setSessionListener(this);
         if (_transacted)
         {
@@ -189,11 +200,12 @@ public class AMQSession_0_10 extends AMQ
      * @param qpidConnection      The connection
      */
     AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
-                    boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
+                    boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow,
+                    String name)
     {
 
         this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
-             defaultPrefetchHigh, defaultPrefetchLow);
+             defaultPrefetchHigh, defaultPrefetchLow,name);
     }
 
     private void addUnacked(int id)
@@ -258,7 +270,7 @@ public class AMQSession_0_10 extends AMQ
 
         long prefetch = getAMQConnection().getMaxPrefetch();
 
-        if (unackedCount >= prefetch/2 || maxAckDelay <= 0)
+        if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
         {
             flushAcknowledgments();
         }
@@ -282,23 +294,34 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-    void messageAcknowledge(RangeSet ranges, boolean accept)
+    void messageAcknowledge(final RangeSet ranges, final boolean accept)
     {
         messageAcknowledge(ranges,accept,false);
     }
     
-    void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+    void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
     {
-        Session ssn = getQpidSession();
-        for (Range range : ranges)
+        final Session ssn = getQpidSession();
+        flushProcessed(ranges,accept);
+        if (accept)
         {
-            ssn.processed(range);
+            ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
         }
-        ssn.flushProcessed(accept ? BATCH : NONE);
-        if (accept)
+    }
+
+    /**
+     * Flush any outstanding commands. This causes session complete to be sent.
+     * @param ranges the range of command ids.
+     * @param batch true if batched.
+     */
+    void flushProcessed(final RangeSet ranges, final boolean batch)
+    {
+        final Session ssn = getQpidSession();
+        for (final Range range : ranges)
         {
-            ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
+            ssn.processed(range);
         }
+        ssn.flushProcessed(batch ? BATCH : NONE);
     }
 
     /**
@@ -314,7 +337,7 @@ public class AMQSession_0_10 extends AMQ
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
                               final FieldTable arguments, final AMQShortString exchangeName,
                               final AMQDestination destination, final boolean nowait)
-            throws AMQException, FailoverException
+            throws AMQException
     {
         if (destination.getDestSyntax() == DestSyntax.BURL)
         {
@@ -400,25 +423,6 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-
-    /**
-     * Commit the receipt and the delivery of all messages exchanged by this session resources.
-     */
-    public void sendCommit() throws AMQException, FailoverException
-    {
-        getQpidSession().setAutoSync(true);
-        try
-        {
-            getQpidSession().txCommit();
-        }
-        finally
-        {
-            getQpidSession().setAutoSync(false);
-        }
-        // We need to sync so that we get notify of an error.
-        sync();
-    }
-
     /**
      * Create a queue with a given name.
      *
@@ -451,6 +455,14 @@ public class AMQSession_0_10 extends AMQ
     public void sendRecover() throws AMQException, FailoverException
     {
         // release all unacked messages
+        RangeSet ranges = gatherUnackedRangeSet();
+        getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+        // We need to sync so that we get notify of an error.
+        sync();
+    }
+
+    private RangeSet gatherUnackedRangeSet()
+    {
         RangeSet ranges = new RangeSet();
         while (true)
         {
@@ -459,11 +471,11 @@ public class AMQSession_0_10 extends AMQ
             {
                 break;
             }
-            ranges.add((int) (long) tag);
+
+            ranges.add(tag.intValue());
         }
-        getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
-        // We need to sync so that we get notify of an error.
-        sync();
+
+        return ranges;
     }
 
 
@@ -537,7 +549,6 @@ public class AMQSession_0_10 extends AMQ
     }
     
     public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
-    throws JMSException
     {
         boolean res;
         ExchangeBoundResult bindingQueryResult =
@@ -600,10 +611,16 @@ public class AMQSession_0_10 extends AMQ
                         (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
             }
             
+            boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
+            
+            if (consumer.getDestination().getLink() != null)
+            {
+                acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
+            }
             
             getQpidSession().messageSubscribe
                 (queueName.toString(), String.valueOf(tag),
-                 getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+                 acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
                  preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
                  consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
         }
@@ -659,13 +676,12 @@ public class AMQSession_0_10 extends AMQ
      * Create an 0_10 message producer
      */
     public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
-                                                      final boolean immediate, final boolean waitUntilSent,
-                                                      long producerId) throws JMSException
+                                                      final boolean immediate, final long producerId) throws JMSException
     {
         try
         {
             return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
-                                             getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+                                             getProtocolHandler(), producerId, immediate, mandatory);
         }
         catch (AMQException e)
         {
@@ -675,6 +691,10 @@ public class AMQSession_0_10 extends AMQ
             
             throw ex;
         }
+        catch(TransportException e)
+        {
+            throw toJMSException("Exception while creating message producer:" + e.getMessage(), e);
+        }
 
     }
 
@@ -767,7 +787,7 @@ public class AMQSession_0_10 extends AMQ
         else
         {
             QueueNode node = (QueueNode)amqd.getSourceNode();
-            getQpidSession().queueDeclare(queueName.toString(), "" ,
+            getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() ,
                     node.getDeclareArgs(),
                     node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
                     node.isDurable() ? Option.DURABLE : Option.NONE,
@@ -904,7 +924,26 @@ public class AMQSession_0_10 extends AMQ
         setCurrentException(exc);
     }
 
-    public void closed(Session ssn) {}
+    public void closed(Session ssn)
+    {
+        try
+        {
+            super.closed(null);
+            if (flushTask != null)
+            {
+                flushTask.cancel();
+                flushTask = null;
+            }
+        } catch (Exception e)
+        {
+            _logger.error("Error closing JMS session", e);
+        }
+    }
+
+    public AMQException getLastException()
+    {
+        return getCurrentException();
+    }
 
     protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
                                           final boolean noLocal, final boolean nowait)
@@ -958,27 +997,26 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-    @Override public void commit() throws JMSException
+    public void commitImpl() throws AMQException, FailoverException, TransportException
     {
-        checkTransacted();
-        try
+        if( _txSize > 0 )
         {
-            if( _txSize > 0 )
-            {
-                messageAcknowledge(_txRangeSet, true);
-                _txRangeSet.clear();
-                _txSize = 0;
-            }
-            sendCommit();
+            messageAcknowledge(_txRangeSet, true);
+            _txRangeSet.clear();
+            _txSize = 0;
         }
-        catch (AMQException e)
+
+        getQpidSession().setAutoSync(true);
+        try
         {
-            throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+            getQpidSession().txCommit();
         }
-        catch (FailoverException e)
+        finally
         {
-            throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+            getQpidSession().setAutoSync(false);
         }
+        // We need to sync so that we get notify of an error.
+        sync();
     }
 
     protected final boolean tagLE(long tag1, long tag2)
@@ -1020,11 +1058,9 @@ public class AMQSession_0_10 extends AMQ
                 code = ee.getErrorCode().getValue();
             }
             AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
-
-            _connection.exceptionReceived(amqe);
-
             _currentException = amqe;
         }
+        _connection.exceptionReceived(_currentException);
     }
 
     public AMQMessageDelegateFactory getMessageDelegateFactory()
@@ -1068,22 +1104,37 @@ public class AMQSession_0_10 extends AMQ
         return match;
     }
     
-    public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode)
+    public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
     {
         boolean match = true;
-        QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
-        match = dest.getAddressName().equals(result.getQueue());
-        
-        if (match && assertNode)
+        try
         {
-            match = (result.getDurable() == node.isDurable()) && 
-                     (result.getAutoDelete() == node.isAutoDelete()) &&
-                     (result.getExclusive() == node.isExclusive()) &&
-                     (matchProps(result.getArguments(),node.getDeclareArgs()));
+            QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
+            match = dest.getAddressName().equals(result.getQueue());
+            
+            if (match && assertNode)
+            {
+                match = (result.getDurable() == node.isDurable()) && 
+                         (result.getAutoDelete() == node.isAutoDelete()) &&
+                         (result.getExclusive() == node.isExclusive()) &&
+                         (matchProps(result.getArguments(),node.getDeclareArgs()));
+            }
+            else if (match)
+            {
+                // should I use the queried details to update the local data structure.
+            }
         }
-        else if (match)
+        catch(SessionException e)
         {
-            // should I use the queried details to update the local data structure.
+            if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+            {
+                match = false;
+            }
+            else
+            {
+                throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
+                        "Error querying queue",e);
+            }
         }
         
         return match;
@@ -1128,8 +1179,8 @@ public class AMQSession_0_10 extends AMQ
                                               boolean isConsumer,
                                               boolean noWait) throws AMQException
     {
-        if (dest.isAddressResolved())
-        {           
+        if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+        {
             if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) 
             {
                 createSubscriptionQueue(dest);
@@ -1149,6 +1200,22 @@ public class AMQSession_0_10 extends AMQ
             
             int type = resolveAddressType(dest);
             
+            if (type == AMQDestination.QUEUE_TYPE && 
+                    dest.getLink().getReliability() == Reliability.UNSPECIFIED)
+            {
+                dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
+            }
+            else if (type == AMQDestination.TOPIC_TYPE && 
+                    dest.getLink().getReliability() == Reliability.UNSPECIFIED)
+            {
+                dest.getLink().setReliability(Reliability.UNRELIABLE);
+            }
+            else if (type == AMQDestination.TOPIC_TYPE && 
+                    dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE)
+            {
+                throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");                      
+            }
+            
             switch (type)
             {
                 case AMQDestination.QUEUE_TYPE: 
@@ -1162,6 +1229,8 @@ public class AMQSession_0_10 extends AMQ
                     {
                         setLegacyFiledsForQueueType(dest);
                         send0_10QueueDeclare(dest,null,false,noWait);
+                        sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
+                                      null,dest.getExchangeName(),dest, false);
                         break;
                     }                
                 }
@@ -1200,7 +1269,7 @@ public class AMQSession_0_10 extends AMQ
                             "The name '" + dest.getAddressName() +
                             "' supplied in the address doesn't resolve to an exchange or a queue");
             }
-            dest.setAddressResolved(true);
+            dest.setAddressResolved(System.currentTimeMillis());
         }
     }
     
@@ -1270,6 +1339,8 @@ public class AMQSession_0_10 extends AMQ
                                     dest.getQueueName(),// should have one by now
                                     dest.getSubject(),
                                     Collections.<String,Object>emptyMap()));
+        sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
+                null,dest.getExchangeName(),dest, false);
     }
     
     public void setLegacyFiledsForQueueType(AMQDestination dest)
@@ -1307,5 +1378,26 @@ public class AMQSession_0_10 extends AMQ
         sb.append(">");
         return sb.toString();
     }
-    
+
+    protected void acknowledgeImpl()
+    {
+        RangeSet range = gatherUnackedRangeSet();
+
+        if(range.size() > 0 )
+        {
+            messageAcknowledge(range, true);
+            getQpidSession().sync();
+        }
+    }
+
+    @Override
+    void resubscribe() throws AMQException
+    {
+        // Also reset the delivery tag tracker, to insure we dont
+        // return the first <total number of msgs received on session>
+        // messages sent by the brokers following the first rollback
+        // after failover
+        _highestDeliveryTag.set(-1);
+        super.resubscribe();
+    }
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Oct 21 14:42:12 2011
@@ -38,6 +38,7 @@ import org.apache.qpid.client.message.Re
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQFrame;
@@ -75,12 +76,12 @@ import org.apache.qpid.framing.amqp_0_91
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
-
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
@@ -90,7 +91,7 @@ public final class AMQSession_0_8 extend
      * @param con                     The connection on which to create the session.
      * @param channelId               The unique identifier for the session.
      * @param transacted              Indicates whether or not the session is transactional.
-     * @param acknowledgeMode         The acknoledgement mode for the session.
+     * @param acknowledgeMode         The acknowledgement mode for the session.
      * @param messageFactoryRegistry  The message factory factory for the session.
      * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
@@ -108,7 +109,7 @@ public final class AMQSession_0_8 extend
      * @param con                     The connection on which to create the session.
      * @param channelId               The unique identifier for the session.
      * @param transacted              Indicates whether or not the session is transactional.
-     * @param acknowledgeMode         The acknoledgement mode for the session.
+     * @param acknowledgeMode         The acknowledgement mode for the session.
      * @param defaultPrefetchHigh     The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLow      The number of prefetched messages at which to resume the session.
      */
@@ -124,6 +125,20 @@ public final class AMQSession_0_8 extend
         return getProtocolHandler().getProtocolVersion();
     }
 
+    protected void acknowledgeImpl()
+    {
+        while (true)
+        {
+            Long tag = _unacknowledgedMessageTags.poll();
+            if (tag == null)
+            {
+                break;
+            }
+
+            acknowledgeMessage(tag, false);
+        }
+    }
+
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
@@ -153,7 +168,7 @@ public final class AMQSession_0_8 extend
         // we also need to check the state manager for 08/09 as the
         // _connection variable may not be updated in time by the error receiving
         // thread.
-        // We can't close the session if we are alreadying in the process of
+        // We can't close the session if we are already in the process of
         // closing/closed the connection.
                 
         if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
@@ -169,8 +184,20 @@ public final class AMQSession_0_8 extend
         }
     }
 
-    public void sendCommit() throws AMQException, FailoverException
+    public void commitImpl() throws AMQException, FailoverException, TransportException
     {
+        // Acknowledge all delivered messages
+        while (true)
+        {
+            Long tag = _deliveredMessageTags.poll();
+            if (tag == null)
+            {
+                break;
+            }
+
+            acknowledgeMessage(tag, false);
+        }
+
         final AMQProtocolHandler handler = getProtocolHandler();
 
         handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
@@ -400,12 +427,12 @@ public final class AMQSession_0_8 extend
 
 
     public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
-            final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException
+            final boolean immediate, long producerId) throws JMSException
     {
        try
        {
            return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
-                                 this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+                                 this, getProtocolHandler(), producerId, immediate, mandatory);
        }
        catch (AMQException e)
        {
@@ -577,6 +604,18 @@ public final class AMQSession_0_8 extend
         
     }
 
+    @Override
+    protected void deleteTemporaryDestination(final TemporaryDestination amqQueue)
+            throws JMSException
+    {
+        // Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted
+        // by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects).
+        // This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the
+        // client explicitly deletes it.
+
+        /* intentional no-op */
+    }
+
     public boolean isQueueBound(String exchangeName, String queueName,
             String bindingKey, Map<String, Object> args) throws JMSException
     {
@@ -584,4 +623,34 @@ public final class AMQSession_0_8 extend
                             queueName == null ? null : new AMQShortString(queueName),
                             bindingKey == null ? null : new AMQShortString(bindingKey));
     }
+  
+
+    public AMQException getLastException()
+    {
+        // if the Connection has closed then we should throw any exception that
+        // has occurred that we were not waiting for
+        AMQStateManager manager = _connection.getProtocolHandler()
+                .getStateManager();
+        
+        Exception e = manager.getLastException();
+        if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
+                && e != null)
+        {
+            if (e instanceof AMQException)
+            {
+                return (AMQException) e;
+            } 
+            else
+            {
+                AMQException amqe = new AMQException(AMQConstant
+                        .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), 
+                        e.getMessage(), e.getCause());
+                return amqe;
+            }
+        } 
+        else
+        {
+            return null;
+        }
+    }
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Fri Oct 21 14:42:12 2011
@@ -20,14 +20,13 @@
  */
 package org.apache.qpid.client;
 
+import java.util.UUID;
+
 import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
 
 import org.apache.qpid.framing.AMQShortString;
 
-import java.util.Random;
-import java.util.UUID;
-
 /** AMQ implementation of a TemporaryQueue. */
 final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
 {
@@ -50,11 +49,15 @@ final class AMQTemporaryQueue extends AM
         {
             throw new JMSException("Temporary Queue has consumers so cannot be deleted");
         }
-        _deleted = true;
 
-        // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
-        // by the server when there are no more subscriptions to that queue.  This is probably not
-        // quite right for JMSCompliance.
+        try
+        {
+            _session.deleteTemporaryDestination(this);
+        }
+        finally
+        {
+            _deleted = true;
+        }
     }
 
     public AMQSession getSession()

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Fri Oct 21 14:42:12 2011
@@ -53,10 +53,14 @@ class AMQTemporaryTopic extends AMQTopic
             throw new JMSException("Temporary Topic has consumers so cannot be deleted");
         }
 
-        _deleted = true;
-        // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
-        // by the server when there are no more subscriptions to that queue.  This is probably not
-        // quite right for JMSCompliance.
+        try
+        {
+            _session.deleteTemporaryDestination(this);
+        }
+        finally
+        {
+            _deleted = true;
+        }
     }
 
     public AMQSession getSession()

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Fri Oct 21 14:42:12 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
 
 import java.net.URISyntaxException;
 
+import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Topic;
 
@@ -95,39 +96,47 @@ public class AMQTopic extends AMQDestina
         super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
     }
 
-    public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+    public static AMQTopic createDurableTopic(Topic topic, String subscriptionName, AMQConnection connection)
             throws JMSException
     {
-        if (topic.getDestSyntax() == DestSyntax.ADDR)
+        if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic)
         {
-            try
+            AMQDestination qpidTopic = (AMQDestination)topic;
+            if (qpidTopic.getDestSyntax() == DestSyntax.ADDR)
             {
-                AMQTopic t = new AMQTopic(topic.getAddress());
-                AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
-                // link is never null if dest was created using an address string.
-                t.getLink().setName(queueName.asString());               
-                t.getSourceNode().setAutoDelete(false);
-                t.getSourceNode().setDurable(true);
-                
-                // The legacy fields are also populated just in case.
-                t.setQueueName(queueName);
-                t.setAutoDelete(false);
-                t.setDurable(true);
-                return t;
+                try
+                {
+                    AMQTopic t = new AMQTopic(qpidTopic.getAddress());
+                    AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
+                    // link is never null if dest was created using an address string.
+                    t.getLink().setName(queueName.asString());               
+                    t.getSourceNode().setAutoDelete(false);
+                    t.getSourceNode().setDurable(true);
+                    
+                    // The legacy fields are also populated just in case.
+                    t.setQueueName(queueName);
+                    t.setAutoDelete(false);
+                    t.setDurable(true);
+                    return t;
+                }
+                catch(Exception e)
+                {
+                    JMSException ex = new JMSException("Error creating durable topic");
+                    ex.initCause(e);
+                    ex.setLinkedException(e);
+                    throw ex;
+                }
             }
-            catch(Exception e)
+            else
             {
-                JMSException ex = new JMSException("Error creating durable topic");
-                ex.initCause(e);
-                ex.setLinkedException(e);
-                throw ex;
+                return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false,
+                                getDurableTopicQueueName(subscriptionName, connection),
+                                true);
             }
         }
         else
         {
-            return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
-                            getDurableTopicQueueName(subscriptionName, connection),
-                            true);
+            throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Topic");
         }
     }
 
@@ -138,13 +147,17 @@ public class AMQTopic extends AMQDestina
 
     public String getTopicName() throws JMSException
     {
-        if (super.getRoutingKey() == null && super.getSubject() != null)
+        if (getRoutingKey() != null)
         {
-            return super.getSubject();
+            return getRoutingKey().asString();
+        }
+        else if (getSubject() != null)
+        {
+            return getSubject();
         }
         else
         {
-            return super.getRoutingKey().toString();
+            return null;
         }
     }
     
@@ -163,12 +176,18 @@ public class AMQTopic extends AMQDestina
 
     public AMQShortString getRoutingKey()
     {
-        if (super.getRoutingKey() == null && super.getSubject() != null)
+        if (super.getRoutingKey() != null)            
+        {
+            return super.getRoutingKey();            
+        }
+        else if (getSubject() != null)
         {
-            return new AMQShortString(super.getSubject());
+            return new AMQShortString(getSubject());
         }
         else
         {
+            setRoutingKey(new AMQShortString(""));
+            setSubject("");
             return super.getRoutingKey();
         }
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org