You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/24 18:17:12 UTC

svn commit: r1620147 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/ broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/q...

Author: rgodfrey
Date: Sun Aug 24 16:17:11 2014
New Revision: 1620147

URL: http://svn.apache.org/r1620147
Log:
QPID-6037 : [Java Client] Enhance experimental support for ADDR addressing to the 0-8/9/9-1 client

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
    qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
    qpid/trunk/qpid/java/systests/etc/config-systests.json
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
    qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java Sun Aug 24 16:17:11 2014
@@ -85,7 +85,6 @@ public class BrokerOptions
         Map<String,Object> attributes = new HashMap<String, Object>();
 
         attributes.put("storePath", getConfigurationStoreLocation());
-        attributes.put("storeTye", getConfigurationStoreType());
         attributes.put(ConfiguredObject.CONTEXT, getConfigProperties());
         return attributes;
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Sun Aug 24 16:17:11 2014
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -111,6 +112,12 @@ public abstract class AbstractExchange<T
     public AbstractExchange(Map<String, Object> attributes, VirtualHostImpl vhost)
     {
         super(parentsMap(vhost), attributes);
+        Set<String> providedAttributeNames = new HashSet<>(attributes.keySet());
+        providedAttributeNames.removeAll(getAttributeNames());
+        if(!providedAttributeNames.isEmpty())
+        {
+            throw new IllegalArgumentException("Unknown attributes provided: " + providedAttributeNames);
+        }
         _virtualHost = vhost;
         // check ACL
         try

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Sun Aug 24 16:17:11 2014
@@ -47,8 +47,6 @@ public interface Broker<X extends Broker
     String DEFAULT_VIRTUAL_HOST = "defaultVirtualHost";
     String STATISTICS_REPORTING_PERIOD = "statisticsReportingPeriod";
     String STATISTICS_REPORTING_RESET_ENABLED = "statisticsReportingResetEnabled";
-    String STORE_TYPE = "storeType";
-    String STORE_VERSION = "storeVersion";
     String STORE_PATH = "storePath";
     String MODEL_VERSION = "modelVersion";
     String CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER = "confidentialConfigurationEncryptionProvider";

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Sun Aug 24 16:17:11 2014
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+
+import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
 
@@ -87,6 +89,8 @@ public class QueueArgumentsConverter
         ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, PriorityQueue.PRIORITIES);
 
         ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION);
+        ATTRIBUTE_MAPPINGS.put(Queue.ALTERNATE_EXCHANGE, Queue.ALTERNATE_EXCHANGE);
+
 
         ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION);
         ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY);
@@ -147,6 +151,10 @@ public class QueueArgumentsConverter
                 {
                     value = ((Enum) value).name();
                 }
+                else if(value instanceof ConfiguredObject)
+                {
+                    value = ((ConfiguredObject)value).getName();
+                }
                 wireArguments.put(entry.getKey(), value);
             }
         }

Modified: qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/resources/initial-config.json Sun Aug 24 16:17:11 2014
@@ -20,7 +20,6 @@
  */
 {
   "name": "${broker.name}",
-  "storeVersion": 1,
   "modelVersion": "2.0",
   "defaultVirtualHost" : "default",
   "authenticationproviders" : [ {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java Sun Aug 24 16:17:11 2014
@@ -32,6 +32,7 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.exchange.ExchangeImpl;
@@ -115,15 +116,22 @@ public class ExchangeDeclareHandler impl
                 {
                     String name = exchangeName == null ? null : exchangeName.intern().toString();
                     String type = body.getType() == null ? null : body.getType().intern().toString();
-                    Map<String,Object> attributes = new HashMap<String, Object>();
 
+                    Map<String,Object> attributes = new HashMap<String, Object>();
+                    if(body.getArguments() != null)
+                    {
+                        attributes.putAll(FieldTable.convertToMap(body.getArguments()));
+                    }
                     attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
                     attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
                     attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
                     attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
                     attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
                                    body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
-                    attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+                    if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE))
+                    {
+                        attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+                    }
                     exchange = virtualHost.createExchange(attributes);
 
                 }
@@ -160,6 +168,10 @@ public class ExchangeDeclareHandler impl
                     // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
                     throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
                 }
+                catch (IllegalArgumentException e)
+                {
+                    throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange",e);
+                }
             }
         }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Sun Aug 24 16:17:11 2014
@@ -87,6 +87,8 @@ public class AMQConnection extends Close
     private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
     private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
 
+    private static final long DEFAULT_CLOSE_TIMEOUT = 2000l;
+
     private final long _connectionNumber;
 
     /**
@@ -160,7 +162,6 @@ public class AMQConnection extends Close
 
     /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
     private final ExecutorService _taskPool = Executors.newCachedThreadPool();
-    private static final long DEFAULT_TIMEOUT = 1000 * 30;
 
     private AMQConnectionDelegate _delegate;
 
@@ -873,7 +874,7 @@ public class AMQConnection extends Close
 
     public void close() throws JMSException
     {
-        close(DEFAULT_TIMEOUT);
+        close(DEFAULT_CLOSE_TIMEOUT);
     }
 
     public void close(long timeout) throws JMSException

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sun Aug 24 16:17:11 2014
@@ -313,6 +313,10 @@ public abstract class AMQSession<C exten
         return _immediatePrefetch;
     }
 
+    abstract void handleNodeDelete(final AMQDestination dest) throws AMQException;
+
+    abstract void handleLinkDelete(final AMQDestination dest) throws AMQException;
+
     public static final class IdToConsumerMap<C extends BasicMessageConsumer>
     {
         private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Sun Aug 24 16:17:11 2014
@@ -1462,6 +1462,7 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
+    @Override
     void handleNodeDelete(AMQDestination dest) throws AMQException
     {
         if (AMQDestination.TOPIC_TYPE == dest.getAddressType())

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Sun Aug 24 16:17:11 2014
@@ -29,6 +29,7 @@ import static org.apache.qpid.configurat
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -64,6 +65,7 @@ import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.Strings;
 
 public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
@@ -175,12 +177,49 @@ public class AMQSession_0_8 extends AMQS
     }
 
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-                              final AMQShortString exchangeName, final AMQDestination dest,
+                              final AMQShortString exchangeName, final AMQDestination destination,
                               final boolean nowait) throws AMQException, FailoverException
     {
-        getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
-                                        (getTicket(),queueName,exchangeName,routingKey,false,arguments).
-                                        generateFrame(getChannelId()), QueueBindOkBody.class);
+        if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL)
+        {
+
+            getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
+                    (getTicket(), queueName, exchangeName, routingKey, false, arguments).
+                    generateFrame(getChannelId()), QueueBindOkBody.class);
+
+        }
+        else
+        {
+            // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
+            List<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>();
+            bindings.addAll(destination.getNode().getBindings());
+
+            String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
+                    destination.getAddressName(): "amq.topic";
+
+            for (AMQDestination.Binding binding: bindings)
+            {
+                // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
+                // The null check below is a way to side step that issue while fixing QPID-4146
+                // Note this issue only affects producers.
+                if (binding.getQueue() == null && queueName == null)
+                {
+                    continue;
+                }
+                String queue = binding.getQueue() == null?
+                        queueName.asString(): binding.getQueue();
+
+                String exchange = binding.getExchange() == null ?
+                        defaultExchange :
+                        binding.getExchange();
+
+                _logger.debug("Binding queue : " + queue +
+                              " exchange: " + exchange +
+                              " using binding key " + binding.getBindingKey() +
+                              " with args " + Strings.printMap(binding.getArgs()));
+                doBind(destination, binding, queue, exchange);
+            }
+        }
     }
 
     public void sendClose(long timeout) throws AMQException, FailoverException
@@ -547,10 +586,8 @@ public class AMQSession_0_8 extends AMQS
         Map<String,Object> bindingArguments = new HashMap<String, Object>();
         bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
 
-        bindQueue(AMQShortString.valueOf(queueName),
-                  AMQShortString.valueOf(dest.getSubject()),
-                  FieldTable.convertToFieldTable(bindingArguments),
-                  AMQShortString.valueOf(dest.getAddressName()),dest,false);
+        final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments);
+        doBind(dest, binding, queueName, dest.getAddressName());
 
     }
 
@@ -589,6 +626,15 @@ public class AMQSession_0_8 extends AMQS
         getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
     }
 
+    public void sendExchangeDelete(final String name) throws AMQException, FailoverException
+    {
+        ExchangeDeleteBody body =
+                getMethodRegistry().createExchangeDeleteBody(getTicket(),AMQShortString.valueOf(name),false, false);
+        AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
+
+        getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+    }
+
     private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException
     {
         AMQShortString queueName = amqd.getAMQQueueName();
@@ -821,18 +867,25 @@ public class AMQSession_0_8 extends AMQS
 
     protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException
     {
-        AMQFrame queueDeclare =
-            getMethodRegistry().createQueueDeclareBody(getTicket(),
-                                                       amqd.getAMQQueueName(),
-                                                       true,
-                                                       amqd.isDurable(),
-                                                       amqd.isExclusive(),
-                                                       amqd.isAutoDelete(),
-                                                       false,
-                                                       null).generateFrame(getChannelId());
-        QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
-        getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
-        return okHandler.getMessageCount();
+        if(isBound(null, amqd.getAMQQueueName(), null))
+        {
+            AMQFrame queueDeclare =
+                    getMethodRegistry().createQueueDeclareBody(getTicket(),
+                                                               amqd.getAMQQueueName(),
+                                                               true,
+                                                               amqd.isDurable(),
+                                                               amqd.isExclusive(),
+                                                               amqd.isAutoDelete(),
+                                                               false,
+                                                               null).generateFrame(getChannelId());
+            QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+            getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+            return okHandler.getMessageCount();
+        }
+        else
+        {
+            return 0l;
+        }
     }
 
     protected boolean tagLE(long tag1, long tag2)
@@ -916,6 +969,11 @@ public class AMQSession_0_8 extends AMQS
         {
             arguments.put(AddressHelper.NO_LOCAL, noLocal);
         }
+        String altExchange = node.getAlternateExchange();
+        if(altExchange != null && !"".equals(altExchange))
+        {
+            arguments.put("alternateExchange", altExchange);
+        }
 
         (new FailoverNoopSupport<Void, AMQException>(
                 new FailoverProtectedOperation<Void, AMQException>()
@@ -942,13 +1000,21 @@ public class AMQSession_0_8 extends AMQS
     void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
     {
         Node node = dest.getNode();
+        String altExchange = dest.getNode().getAlternateExchange();
+        Map<String, Object> arguments = node.getDeclareArgs();
+
+        if(altExchange != null && !"".equals(altExchange))
+        {
+            arguments.put("alternateExchange", altExchange);
+        }
+
         // can't set alt. exchange
         declareExchange(AMQShortString.valueOf(dest.getAddressName()),
                         AMQShortString.valueOf(node.getExchangeType()),
                         false,
                         node.isDurable(),
                         node.isAutoDelete(),
-                        FieldTable.convertToFieldTable(node.getDeclareArgs()), false);
+                        FieldTable.convertToFieldTable(arguments), false);
 
         // If bindings are specified without a queue name and is called by the producer,
         // the broker will send an exception as expected.
@@ -962,9 +1028,79 @@ public class AMQSession_0_8 extends AMQS
                         final String queue,
                         final String exchange) throws AMQException
     {
-        bindQueue(AMQShortString.valueOf(queue),AMQShortString.valueOf(binding.getBindingKey()),
-                  FieldTable.convertToFieldTable(binding.getArgs()),
-                  AMQShortString.valueOf(exchange),dest);
+        final String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+        new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+        {
+            public Object execute() throws AMQException, FailoverException
+            {
+
+
+                MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+                QueueBindBody queueBindBody =
+                        methodRegistry.createQueueBindBody(getTicket(),
+                                                           AMQShortString.valueOf(queue),
+                                                           AMQShortString.valueOf(exchange),
+                                                           AMQShortString.valueOf(bindingKey),
+                                                           false,
+                                                           FieldTable.convertToFieldTable(binding.getArgs()));
+
+                getProtocolHandler().syncWrite(queueBindBody.
+                generateFrame(getChannelId()), QueueBindOkBody.class);
+                return null;
+            }
+        }, getAMQConnection()).execute();
+
+    }
+
+
+    protected void doUnbind(final AMQDestination.Binding binding,
+                            final String queue,
+                            final String exchange) throws AMQException
+    {
+        new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+        {
+            public Object execute() throws AMQException, FailoverException
+            {
+
+                if (isBound(null, AMQShortString.valueOf(queue), null))
+                {
+                    MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+                    AMQMethodBody body;
+                    if (methodRegistry instanceof MethodRegistry_0_9)
+                    {
+                        String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+                        MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry;
+                        body = methodRegistry_0_9.createQueueUnbindBody(getTicket(),
+                                                                        AMQShortString.valueOf(queue),
+                                                                        AMQShortString.valueOf(exchange),
+                                                                        AMQShortString.valueOf(bindingKey),
+                                                                        null);
+                    }
+                    else if (methodRegistry instanceof MethodRegistry_0_91)
+                    {
+                        MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry;
+                        body = methodRegistry_0_91.createQueueUnbindBody(getTicket(),
+                                                                         AMQShortString.valueOf(queue),
+                                                                         AMQShortString.valueOf(exchange),
+                                                                         AMQShortString.valueOf(binding.getBindingKey()),
+                                                                         null);
+
+                    }
+                    else
+                    {
+                        throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8");
+                    }
+                    getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class);
+                    return null;
+                }
+                else
+                {
+                    return null;
+                }
+            }
+        }, getAMQConnection()).execute();
     }
 
     public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
@@ -1057,6 +1193,102 @@ public class AMQSession_0_8 extends AMQS
         return match;
     }
 
+    @Override
+    void handleNodeDelete(final AMQDestination dest) throws AMQException
+    {
+        if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
+        {
+            if (isExchangeExist(dest,false))
+            {
+
+                new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+                {
+                    public Object execute() throws AMQException, FailoverException
+                    {
+                        sendExchangeDelete(dest.getAddressName());
+                        return null;
+                    }
+                }, getAMQConnection()).execute();
+                dest.setAddressResolved(0);
+            }
+        }
+        else
+        {
+            if (isQueueExist(dest,false))
+            {
+
+                new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+                {
+                    public Object execute() throws AMQException, FailoverException
+                    {
+                        sendQueueDelete(AMQShortString.valueOf(dest.getAddressName()));
+                        return null;
+                    }
+                }, getAMQConnection()).execute();
+                dest.setAddressResolved(0);
+            }
+        }
+    }
+
+    @Override
+    void handleLinkDelete(AMQDestination dest) throws AMQException
+    {
+        // We need to destroy link bindings
+        String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+                .getAddressName() : "amq.topic";
+
+        String defaultQueueName = null;
+        if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+        {
+            defaultQueueName = dest.getQueueName();
+        }
+        else
+        {
+            defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+        }
+
+        for (AMQDestination.Binding binding: dest.getLink().getBindings())
+        {
+            String queue = binding.getQueue() == null?
+                    defaultQueueName: binding.getQueue();
+
+            String exchange = binding.getExchange() == null ?
+                    defaultExchangeForBinding :
+                    binding.getExchange();
+
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Unbinding queue : " + queue +
+                              " exchange: " + exchange +
+                              " using binding key " + binding.getBindingKey() +
+                              " with args " + Strings.printMap(binding.getArgs()));
+            }
+            doUnbind(binding, queue, exchange);
+        }
+    }
+
+
+    void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException
+    {
+        // We need to delete the subscription queue.
+        if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
+            dest.getLink().getSubscriptionQueue().isExclusive() &&
+            isQueueExist(dest.getQueueName(), false, false, false, false, null))
+        {
+            (new FailoverNoopSupport<Void, AMQException>(
+                    new FailoverProtectedOperation<Void, AMQException>()
+                    {
+                        public Void execute() throws AMQException, FailoverException
+                        {
+
+                            sendQueueDelete(AMQShortString.valueOf(dest.getQueueName()));
+                            return null;
+                        }
+                    }, getAMQConnection())).execute();
+
+        }
+    }
+
     protected void flushAcknowledgments()
     {
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Sun Aug 24 16:17:11 2014
@@ -20,19 +20,35 @@
  */
 package org.apache.qpid.client;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.filter.JMSSelectorFilter;
 import org.apache.qpid.client.filter.MessageFilter;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.CloseConsumerMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.client.filter.JMSSelectorFilter;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
@@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsum
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.transport.TransportException;
 
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
 public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -376,7 +377,23 @@ public abstract class BasicMessageConsum
      */
     public boolean isExclusive()
     {
-        return _exclusive;
+
+        AMQDestination dest = this.getDestination();
+        if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+        {
+            if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+            {
+                return true;
+            }
+            else
+            {
+                return dest.getLink().getSubscription().isExclusive();
+            }
+        }
+        else
+        {
+            return _exclusive;
+        }
     }
 
     public boolean isReceiving()

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Sun Aug 24 16:17:11 2014
@@ -17,12 +17,18 @@
  */
 package org.apache.qpid.client;
 
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
 import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSe
 import org.apache.qpid.transport.SessionException;
 import org.apache.qpid.transport.TransportException;
 
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * This is a 0.10 message consumer.
  */
@@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 e
             clearReceiveQueue();
         }
     }
-    
-    public boolean isExclusive()
-    {
-        AMQDestination dest = this.getDestination();
-        if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
-        {
-            if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
-            {
-                return true;
-            }
-            else
-            {                
-                return dest.getLink().getSubscription().isExclusive();
-            }
-        }
-        else
-        {
-            return super.isExclusive();
-        }
-    }
+
     
     void postSubscription() throws AMQException
     {
@@ -509,10 +490,10 @@ public class BasicMessageConsumer_0_10 e
             if (dest.getDelete() == AddressOption.ALWAYS ||
                 dest.getDelete() == AddressOption.RECEIVER )
             {
-                ((AMQSession_0_10) getSession()).handleNodeDelete(dest);                
+                getSession().handleNodeDelete(dest);
             }
             // Subscription queue is handled as part of linkDelete method.
-            ((AMQSession_0_10) getSession()).handleLinkDelete(dest);
+            getSession().handleLinkDelete(dest);
             if (!isDurableSubscriber())
             {
                 ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
@@ -566,4 +547,4 @@ public class BasicMessageConsumer_0_10 e
         return capacity;
     }
 
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Sun Aug 24 16:17:11 2014
@@ -118,13 +118,33 @@ public class BasicMessageConsumer_0_8 ex
         final AMQFrame cancelFrame = body.generateFrame(getChannelId());
 
         getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
-
+        postSubscription();
+        getSession().sync();
         if (_logger.isDebugEnabled())
         {
             _logger.debug("CancelOk'd for consumer:" + debugIdentity());
         }
     }
 
+    void postSubscription() throws AMQException
+    {
+        AMQDestination dest = this.getDestination();
+        if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+        {
+            if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
+                dest.getDelete() == AMQDestination.AddressOption.RECEIVER )
+            {
+                getSession().handleNodeDelete(dest);
+            }
+            // Subscription queue is handled as part of linkDelete method.
+            getSession().handleLinkDelete(dest);
+            if (!isDurableSubscriber())
+            {
+                getSession().deleteSubscriptionQueue(dest);
+            }
+        }
+    }
+
     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
     {
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Sun Aug 24 16:17:11 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.client;
 
 import java.util.UUID;
+
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -32,13 +33,15 @@ import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
+
+import org.slf4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageConverter;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.UUIDGen;
 import org.apache.qpid.util.UUIDs;
-import org.slf4j.Logger;
 
 public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
@@ -286,6 +289,31 @@ public abstract class BasicMessageProduc
     {
         setClosed();
         _session.deregisterProducer(_producerId);
+        AMQDestination dest = getAMQDestination();
+        AMQSession ssn = getSession();
+        if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+        {
+            try
+            {
+                if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
+                    dest.getDelete() == AMQDestination.AddressOption.SENDER )
+                {
+                    ssn.handleNodeDelete(dest);
+                }
+                ssn.handleLinkDelete(dest);
+            }
+            catch(TransportException e)
+            {
+                throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+            }
+            catch (AMQException e)
+            {
+                JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
+                ex.setLinkedException(e);
+                ex.initCause(e);
+                throw ex;
+            }
+        }
     }
 
     public void send(Message message) throws JMSException

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Sun Aug 24 16:17:11 2014
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
 import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -48,7 +47,6 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.GZIPUtils;
 import org.apache.qpid.util.Strings;
 
@@ -90,8 +88,8 @@ public class BasicMessageProducer_0_10 e
             try
             {
                 getSession().resolveAddress(destination,false,false);
-                ((AMQSession_0_10)getSession()).handleLinkCreation(destination);
-                ((AMQSession_0_10)getSession()).sync();
+                getSession().handleLinkCreation(destination);
+                getSession().sync();
             }
             catch(Exception e)
             {
@@ -278,31 +276,6 @@ public class BasicMessageProducer_0_10 e
     public void close() throws JMSException
     {
         super.close();
-        AMQDestination dest = getAMQDestination();
-        AMQSession_0_10 ssn = (AMQSession_0_10) getSession();
-        if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
-        {
-            try
-            {
-                if (dest.getDelete() == AddressOption.ALWAYS ||
-                    dest.getDelete() == AddressOption.SENDER )
-                {
-                    ssn.handleNodeDelete(dest);
-                }
-                ssn.handleLinkDelete(dest);
-            }
-            catch(TransportException e)
-            {
-                throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
-            }
-            catch (AMQException e)
-            {
-                JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
-                ex.setLinkedException(e);
-                ex.initCause(e);
-                throw ex;
-            }
-        }
     }
 
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Sun Aug 24 16:17:11 2014
@@ -34,15 +34,18 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
 import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.QpidMessageProperties;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.CompositeAMQDataBlock;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.util.GZIPUtils;
 
@@ -63,6 +66,9 @@ public class BasicMessageProducer_0_8 ex
         if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
         {
             getSession().resolveAddress(destination, false, false);
+
+            getSession().handleLinkCreation(destination);
+            getSession().sync();
         }
         else
         {
@@ -92,18 +98,43 @@ public class BasicMessageProducer_0_8 ex
                      UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
                      boolean immediate) throws JMSException
     {
+
+
+
+        AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
+        BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
+
+        AMQShortString routingKey = destination.getRoutingKey();
+
+        FieldTable headers = delegate.getContentHeaderProperties().getHeaders();
+
+        if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
+            (destination.getSubject() != null
+             || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null)))
+        {
+
+            if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null)
+            {
+                // use default subject in address string
+                headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject());
+            }
+
+            if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
+            {
+               routingKey = AMQShortString.valueOf(headers.getString(QpidMessageProperties.QPID_SUBJECT));
+            }
+        }
+
         BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
-                                                                                        destination.getExchangeName(),
-                                                                                        destination.getRoutingKey(),
-                                                                                        mandatory,
-                                                                                        immediate);
+                                                                                    destination.getExchangeName(),
+                                                                                    routingKey,
+                                                                                    mandatory,
+                                                                                    immediate);
 
         AMQFrame publishFrame = body.generateFrame(getChannelId());
 
         message.prepareForSending();
         ByteBuffer payload = message.getData();
-        AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
-        BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
 
         contentHeaderProperties.setUserId(getUserID());
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java Sun Aug 24 16:17:11 2014
@@ -159,7 +159,7 @@ public abstract class BlockingWaiter<T>
             {
                 _waiting.set(true);
 
-                while (!_ready)
+                while (!_ready && _error == null)
                 {
                     try
                     {

Modified: qpid/trunk/qpid/java/systests/etc/config-systests.json
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/etc/config-systests.json?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/etc/config-systests.json (original)
+++ qpid/trunk/qpid/java/systests/etc/config-systests.json Sun Aug 24 16:17:11 2014
@@ -21,7 +21,6 @@
 {
   "name": "Broker",
   "defaultVirtualHost" : "test",
-  "storeVersion": 1,
   "modelVersion": "2.0",
   "authenticationproviders" : [ {
     "name" : "plain",

Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Sun Aug 24 16:17:11 2014
@@ -52,7 +52,6 @@ import org.apache.qpid.client.AMQAnyDest
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQSession_0_10;
 import org.apache.qpid.client.message.QpidMessageProperties;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 import org.apache.qpid.messaging.Address;
@@ -76,7 +75,14 @@ public class AddressBasedDestinationTest
     @Override
     public void tearDown() throws Exception
     {
-        _connection.close();
+        try
+        {
+            _connection.close();
+        }
+        catch(JMSException e)
+        {
+            // ignore
+        }
         super.tearDown();
     }
 
@@ -90,14 +96,15 @@ public class AddressBasedDestinationTest
         // create never --------------------------------------------
         String addr1 = "ADDR:testQueue1";
         AMQDestination  dest = new AMQAnyDestination(addr1);
+        final String queueErrorMessage = "The name 'testQueue1' supplied in the address " +
+                   "doesn't resolve to an exchange or a queue";
         try
         {
             cons = jmsSession.createConsumer(dest);
         }
         catch(JMSException e)
         {
-            assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            assertTrue(e.getMessage().contains(queueErrorMessage));
         }
 
         try
@@ -106,12 +113,12 @@ public class AddressBasedDestinationTest
         }
         catch(JMSException e)
         {
-            assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            assertTrue(e.getCause().getMessage().contains(queueErrorMessage)
+                       || e.getCause().getCause().getMessage().contains(queueErrorMessage));
         }
 
         assertFalse("Queue should not be created",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest,false));
+                (AMQSession)jmsSession).isQueueExist(dest,false));
 
 
         // create always -------------------------------------------
@@ -120,9 +127,9 @@ public class AddressBasedDestinationTest
         cons = jmsSession.createConsumer(dest);
 
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+                (AMQSession)jmsSession).isQueueExist(dest, true));
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("",
+                (AMQSession)jmsSession).isQueueBound("",
                     dest.getAddressName(),dest.getAddressName(), null));
 
         // create receiver -----------------------------------------
@@ -134,33 +141,36 @@ public class AddressBasedDestinationTest
         }
         catch(JMSException e)
         {
-            assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            String expectedMessage = "The name 'testQueue2' supplied in the address " +
+                       "doesn't resolve to an exchange or a queue";
+            assertTrue(e.getCause().getMessage().contains(expectedMessage)
+                       || e.getCause().getCause().getMessage().contains(expectedMessage));
         }
 
         assertFalse("Queue should not be created",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+                (AMQSession)jmsSession).isQueueExist(dest, false));
 
 
         cons = jmsSession.createConsumer(dest);
 
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+                (AMQSession)jmsSession).isQueueExist(dest, true));
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("",
+                (AMQSession)jmsSession).isQueueBound("",
                     dest.getAddressName(),dest.getAddressName(), null));
 
         // create never --------------------------------------------
         addr1 = "ADDR:testQueue3; { create: never }";
         dest = new AMQAnyDestination(addr1);
+        String testQueue3ErrorMessage = "The name 'testQueue3' supplied in the address " +
+                   "doesn't resolve to an exchange or a queue";
         try
         {
             cons = jmsSession.createConsumer(dest);
         }
         catch(JMSException e)
         {
-            assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            assertTrue(e.getMessage().contains(testQueue3ErrorMessage));
         }
 
         try
@@ -169,12 +179,12 @@ public class AddressBasedDestinationTest
         }
         catch(JMSException e)
         {
-            assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            assertTrue(e.getCause().getMessage().contains(testQueue3ErrorMessage)
+                       || e.getCause().getCause().getMessage().contains(testQueue3ErrorMessage));
         }
 
         assertFalse("Queue should not be created",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+                (AMQSession)jmsSession).isQueueExist(dest, false));
 
         // create sender ------------------------------------------
         addr1 = "ADDR:testQueue3; { create: sender }";
@@ -186,17 +196,16 @@ public class AddressBasedDestinationTest
         }
         catch(JMSException e)
         {
-            assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
-                    "doesn't resolve to an exchange or a queue"));
+            assertTrue(e.getMessage().contains(testQueue3ErrorMessage));
         }
         assertFalse("Queue should not be created",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+                (AMQSession)jmsSession).isQueueExist(dest, false));
 
         prod = jmsSession.createProducer(dest);
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+                (AMQSession)jmsSession).isQueueExist(dest, true));
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("",
+                (AMQSession)jmsSession).isQueueBound("",
                     dest.getAddressName(),dest.getAddressName(), null));
 
     }
@@ -234,22 +243,22 @@ public class AddressBasedDestinationTest
         // Even if the consumer is closed the queue and the bindings should be intact.
 
         assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+                (AMQSession)jmsSession).isQueueExist(dest, true));
 
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("",
+                (AMQSession)jmsSession).isQueueBound("",
                     dest.getAddressName(),dest.getAddressName(), null));
 
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+                (AMQSession)jmsSession).isQueueBound("amq.direct",
                     dest.getAddressName(),"test", null));
 
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout",
+                (AMQSession)jmsSession).isQueueBound("amq.fanout",
                     dest.getAddressName(),null, null));
 
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+                (AMQSession)jmsSession).isQueueBound("amq.topic",
                     dest.getAddressName(),"a.#", null));
 
         Map<String,Object> args = new HashMap<String,Object>();
@@ -257,7 +266,7 @@ public class AddressBasedDestinationTest
         args.put("dep","sales");
         args.put("loc","CA");
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+                (AMQSession)jmsSession).isQueueBound("amq.match",
                     dest.getAddressName(),null, args));
 
         MessageProducer prod = jmsSession.createProducer(dest);
@@ -339,6 +348,11 @@ public class AddressBasedDestinationTest
             {
                 return;
             }
+            else if((useNonsenseExchangeType || useNonsenseArguments) && !isBroker010()
+                    && String.valueOf(AMQConstant.COMMAND_INVALID.getCode()).equals(e.getErrorCode()))
+            {
+                return;
+            }
             else
             {
                 fail("Unexpected exception whilst creating consumer: " + e);
@@ -346,11 +360,11 @@ public class AddressBasedDestinationTest
         }
 
         assertTrue("Exchange not created as expected",(
-                (AMQSession_0_10)jmsSession).isExchangeExist(dest,true));
+                (AMQSession)jmsSession).isExchangeExist(dest,true));
 
         // The existence of the queue is implicitly tested here
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
+                (AMQSession)jmsSession).isQueueBound("my-exchange",
                     dest.getQueueName(),"hello", null));
 
         // The client should be able to query and verify the existence of my-exchange (QPID-2774)
@@ -387,23 +401,23 @@ public class AddressBasedDestinationTest
     public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception
     {
     	assertTrue("Queue not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+                (AMQSession)jmsSession).isQueueExist(dest, true));
 
-        assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("",
-                    dest.getAddressName(),dest.getAddressName(), null));
+        assertTrue("Queue not bound as expected", (
+                (AMQSession) jmsSession).isQueueBound("",
+                                                      dest.getAddressName(), dest.getAddressName(), null));
 
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+                (AMQSession)jmsSession).isQueueBound("amq.direct",
                     dest.getAddressName(),"test", null));
 
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+                (AMQSession)jmsSession).isQueueBound("amq.topic",
                     dest.getAddressName(),"a.#", null));
 
         Address a = Address.parse(headersBinding);
         assertTrue("Queue not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+                (AMQSession)jmsSession).isQueueBound("amq.match",
                     dest.getAddressName(),null, a.getOptions()));
     }
 
@@ -526,17 +540,17 @@ public class AddressBasedDestinationTest
         MessageConsumer cons3 = jmsSession.createConsumer(dest3);
 
         assertTrue("Destination1 was not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest1, true));
+                (AMQSession)jmsSession).isQueueExist(dest1, true));
 
         assertTrue("Destination1 was not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("",
+                (AMQSession)jmsSession).isQueueBound("",
                     dest1.getAddressName(),dest1.getAddressName(), null));
 
         assertTrue("Destination2 was not created as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest2,true));
+                (AMQSession)jmsSession).isQueueExist(dest2,true));
 
         assertTrue("Destination2 was not bound as expected",(
-                (AMQSession_0_10)jmsSession).isQueueBound("",
+                (AMQSession)jmsSession).isQueueBound("",
                     dest2.getAddressName(),dest2.getAddressName(), null));
 
         MessageProducer producer = jmsSession.createProducer(dest3);
@@ -587,7 +601,7 @@ public class AddressBasedDestinationTest
         MessageProducer prod = ssn.createProducer(queue);
         MessageConsumer cons = ssn.createConsumer(queue);
         assertTrue("my-queue was not created as expected",(
-                (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+                (AMQSession)ssn).isQueueBound("amq.direct",
                     "my-queue","my-queue", null));
 
         prod.send(ssn.createTextMessage("test"));
@@ -606,7 +620,7 @@ public class AddressBasedDestinationTest
         {
         	String s = "The name 'my-queue2' supplied in the address " +
         			"doesn't resolve to an exchange or a queue";
-        	assertEquals(s,e.getCause().getCause().getMessage());
+        	assertTrue(s.equals(e.getCause().getMessage()) || s.equals(e.getCause().getCause().getMessage()));
         }
 
         // explicit create case
@@ -614,7 +628,7 @@ public class AddressBasedDestinationTest
         prod = ssn.createProducer(queue);
         cons = ssn.createConsumer(queue);
         assertTrue("my-queue2 was not created as expected",(
-                (AMQSession_0_10)ssn).isQueueBound("",
+                (AMQSession)ssn).isQueueBound("",
                     "my-queue2","my-queue2", null));
 
         prod.send(ssn.createTextMessage("test"));
@@ -631,7 +645,7 @@ public class AddressBasedDestinationTest
         cons = ssn.createConsumer(queue);
         prod = ssn.createProducer(queue);
         assertTrue("MY.RESP.QUEUE was not created as expected",(
-                (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+                (AMQSession)ssn).isQueueBound("amq.direct",
                     "MY.RESP.QUEUE","x512", null));
         cons.close();
     }
@@ -701,15 +715,15 @@ public class AddressBasedDestinationTest
         prod = ssn.createProducer(topic);
 
         assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("vehicles",
+                (AMQSession)ssn).isQueueBound("vehicles",
                     "my-topic","bus", null));
 
         assertTrue("The queue was not bound to vehicle exchange using car as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("vehicles",
+                (AMQSession)ssn).isQueueBound("vehicles",
                     "my-topic","car", null));
 
         assertTrue("The queue was not bound to vehicle exchange using van as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("vehicles",
+                (AMQSession)ssn).isQueueBound("vehicles",
                     "my-topic","van", null));
 
         Message msg = ssn.createTextMessage("test");
@@ -822,15 +836,18 @@ public class AddressBasedDestinationTest
         catch(Exception e)
         {
         }
-        _connection.close();
 
+    }
+
+    public void testJMSTopicIsTreatedAsQueueIn0_10() throws Exception
+    {
         _connection = getConnection() ;
         _connection.start();
-        ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        dest = ssn.createTopic("ADDR:my_queue; {create: always}");
-        consumer1 = ssn.createConsumer(dest);
-        consumer2 = ssn.createConsumer(dest);
-        prod = ssn.createProducer(dest);
+        final Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Destination dest = ssn.createTopic("ADDR:my_queue; {create: always}");
+        final MessageConsumer consumer1 = ssn.createConsumer(dest);
+        final MessageConsumer consumer2 = ssn.createConsumer(dest);
+        final MessageProducer prod = ssn.createProducer(dest);
 
         prod.send(ssn.createTextMessage("A"));
         Message m1 = consumer1.receive(1000);
@@ -864,15 +881,15 @@ public class AddressBasedDestinationTest
         MessageConsumer  cons = ssn.createConsumer(topic);
 
         assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("MRKT",
+                (AMQSession)ssn).isQueueBound("MRKT",
                     "my-topic","NYSE.#", null));
 
         assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("MRKT",
+                (AMQSession)ssn).isQueueBound("MRKT",
                     "my-topic","NASDAQ.#", null));
 
         assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",(
-                (AMQSession_0_10)ssn).isQueueBound("MRKT",
+                (AMQSession)ssn).isQueueBound("MRKT",
                     "my-topic","CNTL.#", null));
 
         MessageProducer prod = ssn.createProducer(topic);
@@ -886,7 +903,7 @@ public class AddressBasedDestinationTest
     public void testXSubscribeOverrides() throws Exception
     {
         Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+        String str = "ADDR:my_queue; {create:always, node: { type: queue }, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
         Destination dest = ssn.createTopic(str);
         MessageConsumer consumer1 = ssn.createConsumer(dest);
         try
@@ -937,7 +954,7 @@ public class AddressBasedDestinationTest
         props.setProperty("destination.address1", "ADDR:amq.topic/test");
         props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr);
         props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr);
-        String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+        String addrStr = "ADDR:my_queue; {create:always,node : {type: queue}, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
         props.setProperty("destination.address5", addrStr);
 
         Context ctx = new InitialContext(props);
@@ -1055,7 +1072,7 @@ public class AddressBasedDestinationTest
         }
 
         assertFalse("Queue not deleted as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+                (AMQSession)jmsSession).isQueueExist(dest, false));
 
 
         String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
@@ -1071,7 +1088,7 @@ public class AddressBasedDestinationTest
         }
 
         assertFalse("Queue not deleted as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+                (AMQSession)jmsSession).isQueueExist(dest, false));
 
 
         String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
@@ -1088,7 +1105,7 @@ public class AddressBasedDestinationTest
         }
 
         assertFalse("Queue not deleted as expected",(
-                (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+                (AMQSession)jmsSession).isQueueExist(dest, false));
     }
 
     /**
@@ -1206,11 +1223,11 @@ public class AddressBasedDestinationTest
 		m.setJMSReplyTo(replyToDest);
 		prod.send(m);
 
-		Message msg = cons.receive();
+		Message msg = cons.receive(5000l);
 		MessageProducer prodR = session.createProducer(msg.getJMSReplyTo());
 		prodR.send(session.createTextMessage("x"));
 
-		Message m1 = replyToCons.receive();
+		Message m1 = replyToCons.receive(5000l);
 		assertNotNull("The reply to consumer should have received the messsage",m1);
     }
 
@@ -1422,7 +1439,7 @@ public class AddressBasedDestinationTest
 
         AMQDestination dest = (AMQDestination)jmsSession.createQueue(addr);
         MessageConsumer cons = jmsSession.createConsumer(dest);
-        AMQSession_0_10 ssn = (AMQSession_0_10)jmsSession;
+        AMQSession ssn = (AMQSession)jmsSession;
 
         assertTrue("Queue not created as expected",ssn.isQueueExist(dest, true));
         assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null));
@@ -1454,11 +1471,11 @@ public class AddressBasedDestinationTest
 
         String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}";
         AMQDestination verifyDest = (AMQDestination)ssn.createQueue(verifyAddr);
-        ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true);
+        ((AMQSession)ssn).isQueueExist(verifyDest, true);
 
         // Verify that the producer does not delete the subscription queue.
         MessageProducer prod = ssn.createProducer(dest);
         prod.close();
-        ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true);
+        ((AMQSession)ssn).isQueueExist(verifyDest, true);
     }
 }

Modified: qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes?rev=1620147&r1=1620146&r2=1620147&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes Sun Aug 24 16:17:11 2014
@@ -25,9 +25,17 @@
 org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
 org.apache.qpid.server.message.MessageProtocolConversionTest#*
 
+//QPID-3422: test fails because ring queue is not implemented on java broker
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode
+//QPID-3392: the Java broker does not yet implement exchange creation arguments
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs
+//QPID-6037: the 0-9-1 client takes the view that if you don't specify the node type but you ask for a JMS Topic
+//           you want a topic behaviour.  The 0-10 client thinks you must want a queue.
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSTopicIsTreatedAsQueueIn0_10
+
 // The new addressing based syntax is not supported for AMQP 0-8/0-9 versions
 org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
-org.apache.qpid.test.client.destination.AddressBasedDestinationTest#*
 org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
 org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
 org.apache.qpid.test.unit.message.JMSPropertiesTest#testApplicationProperties



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org