You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/20 16:42:59 UTC

svn commit: r1770576 [7/8] - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/ broker-codegen/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/binding/ broker-core...

Added: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementOutputConverter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementOutputConverter.java?rev=1770576&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementOutputConverter.java (added)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementOutputConverter.java Sun Nov 20 16:42:57 2016
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectCustomSerialization;
+
+class ManagementOutputConverter
+{
+    private static final List<String> ID_AND_TYPE = Arrays.asList(ConfiguredObject.ID, ConfiguredObject.TYPE);
+    private final ManagementNode _managementNode;
+
+    ManagementOutputConverter(final ManagementNode managementNode)
+    {
+        _managementNode = managementNode;
+    }
+
+    private Map<Object, Object> convertMapToOutput(final Map<?, ?> attributes)
+    {
+        Map<Object,Object> result = new LinkedHashMap<>();
+        for(Map.Entry<?,?> entry : attributes.entrySet())
+        {
+            result.put(convertObjectToOutput(entry.getKey()), convertObjectToOutput(entry.getValue()));
+        }
+        return result;
+    }
+
+    private Collection<?> convertCollectionToOutput(final Collection<?> value)
+    {
+
+        List<Object> result = new ArrayList<>();
+        for(Object entry : value)
+        {
+            result.add(convertObjectToOutput(entry));
+        }
+        return result;
+    }
+
+    private Object convertObjectToOutput(final Object value)
+    {
+        if(value == null)
+        {
+            return null;
+        }
+        else if(value instanceof String
+                || value instanceof Integer
+                || value instanceof Long
+                || value instanceof Byte
+                || value instanceof Character
+                || value instanceof Float
+                || value instanceof Double
+                || value instanceof byte[])
+        {
+            return value;
+        }
+        else if(value instanceof Map)
+        {
+            return convertMapToOutput((Map<?,?>)value);
+        }
+        else if(value instanceof Collection)
+        {
+            return convertCollectionToOutput((Collection<?>)value);
+        }
+        else if(value instanceof ConfiguredObject)
+        {
+            return ((ConfiguredObject)value).getName();
+        }
+        else
+        {
+            for(ConfiguredObjectCustomSerialization.Converter converter : ConfiguredObjectCustomSerialization.getConverters())
+            {
+                if(converter.getConversionClass().isAssignableFrom(value.getClass()))
+                {
+                    return convertObjectToOutput(converter.convert(value));
+                }
+            }
+
+            return value.toString();
+        }
+    }
+
+    protected Map<?, ?> convertToOutput(final ConfiguredObject<?> object,
+                                        final boolean actuals)
+    {
+        Map<String, Object> attributes = new LinkedHashMap<>();
+        attributes.put(ManagementNode.IDENTITY_ATTRIBUTE, object.getId());
+        attributes.put(ManagementNode.OBJECT_PATH, _managementNode.generatePath(object));
+        attributes.put(ManagementNode.TYPE_ATTRIBUTE, _managementNode.getAmqpName(object.getTypeClass()));
+        attributes.put(ManagementNode.QPID_TYPE, object.getType());
+
+        if(object != _managementNode.getManagedObject() && !_managementNode.isSyntheticChildClass(object.getCategoryClass()))
+        {
+            for (Class<? extends ConfiguredObject> parentType : object.getModel()
+                                                                      .getParentTypes(object.getCategoryClass()))
+            {
+                if (parentType != _managementNode.getManagedObject().getCategoryClass())
+                {
+                    attributes.put(parentType.getSimpleName().toLowerCase(), object.getParent(parentType));
+                }
+            }
+        }
+
+        for(String name : object.getAttributeNames())
+        {
+            if(!ID_AND_TYPE.contains(name))
+            {
+                Object value = actuals
+                        ? object.getActualAttributes().get(name)
+                        : object.getAttribute(name);
+                if (value != null)
+                {
+                    attributes.put(name, value);
+                }
+            }
+        }
+
+        return convertMapToOutput(attributes);
+    }
+}

Propchange: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementOutputConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java Sun Nov 20 16:42:57 2016
@@ -56,11 +56,11 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFinder;
 import org.apache.qpid.server.model.ConfiguredObjectOperation;
 import org.apache.qpid.server.model.Content;
 import org.apache.qpid.server.model.IllegalStateTransitionException;
 import org.apache.qpid.server.model.IntegrityViolationException;
-import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.OperationTimeoutException;
 import org.apache.qpid.server.model.preferences.UserPreferences;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -110,6 +110,7 @@ public class RestServlet extends Abstrac
     private final boolean _hierarchyInitializationRequired;
     private volatile RequestInfoParser _requestInfoParser;
     private RestUserPreferenceHandler _userPreferenceHandler;
+    private ConfiguredObjectFinder _objectFinder;
 
     @SuppressWarnings("unused")
     public RestServlet()
@@ -133,6 +134,7 @@ public class RestServlet extends Abstrac
         {
             doInitialization();
         }
+        _objectFinder = new ConfiguredObjectFinder(getBroker());
         _requestInfoParser = new RequestInfoParser(_hierarchy);
         Handler.register();
         Long preferenceOperationTimeout = getManagementConfiguration().getContextValue(Long.class, PREFERENCE_OPERTAION_TIMEOUT_CONTEXT_NAME);
@@ -181,116 +183,17 @@ public class RestServlet extends Abstrac
         }
     }
 
+
     private Collection<ConfiguredObject<?>> getTargetObjects(RequestInfo requestInfo,
                                                              List<Predicate<ConfiguredObject<?>>> filterPredicateList)
     {
         List<String> names = requestInfo.getModelParts();
+        ConfiguredObject<?> root = getBroker();
+        Class<? extends ConfiguredObject>[] hierarchy = _hierarchy;
 
-        Collection<ConfiguredObject<?>> parents = new ArrayList<>();
-        parents.add(getBroker());
-        Collection<ConfiguredObject<?>> children = new ArrayList<>();
-
-        Map<Class<? extends ConfiguredObject>, String> filters =
-                new HashMap<>();
-
-        final Model model = getBroker().getModel();
-        boolean wildcard = false;
-        Class<? extends ConfiguredObject> parentType = Broker.class;
-        for (int i = 0; i < _hierarchy.length; i++)
-        {
-            if (model.getChildTypes(parentType).contains(_hierarchy[i]))
-            {
-                parentType = _hierarchy[i];
-                for (ConfiguredObject<?> parent : parents)
-                {
-                    if (names.size() > i
-                        && names.get(i) != null
-                        && !names.get(i).equals("*")
-                        && names.get(i).trim().length() != 0)
-                    {
-                        for (ConfiguredObject<?> child : parent.getChildren(_hierarchy[i]))
-                        {
-                            if (child.getName().equals(names.get(i)))
-                            {
-                                children.add(child);
-                            }
-                        }
-                        if (children.isEmpty())
-                        {
-                            return null;
-                        }
-                    }
-                    else
-                    {
-                        wildcard = true;
-                        children.addAll((Collection<? extends ConfiguredObject<?>>) parent.getChildren(_hierarchy[i]));
-                    }
-                }
-            }
-            else
-            {
-                children = parents;
-                if (names.size() > i
-                    && names.get(i) != null
-                    && !names.get(i).equals("*")
-                    && names.get(i).trim().length() != 0)
-                {
-                    filters.put(_hierarchy[i], names.get(i));
-                }
-                else
-                {
-                    wildcard = true;
-                }
-            }
-
-            parents = children;
-            children = new ArrayList<>();
-        }
-
-        if (!filters.isEmpty() && !parents.isEmpty())
-        {
-            Collection<ConfiguredObject<?>> potentials = parents;
-            parents = new ArrayList<>();
-
-            for (ConfiguredObject o : potentials)
-            {
-
-                boolean match = true;
-
-                for (Map.Entry<Class<? extends ConfiguredObject>, String> entry : filters.entrySet())
-                {
-                    Collection<? extends ConfiguredObject> ancestors =
-                            getAncestors(getConfiguredClass(), entry.getKey(), o);
-                    match = false;
-                    for (ConfiguredObject ancestor : ancestors)
-                    {
-                        if (ancestor.getName().equals(entry.getValue()))
-                        {
-                            match = true;
-                            break;
-                        }
-                    }
-                    if (!match)
-                    {
-                        break;
-                    }
-                }
-                if (match)
-                {
-                    parents.add(o);
-                }
-            }
-        }
+        Collection<ConfiguredObject<?>> parents = _objectFinder.findObjectsFromPath(names, hierarchy, true);
 
-        if (parents.isEmpty() && !wildcard)
-        {
-            return null;
-        }
-        else if (filterPredicateList.isEmpty())
-        {
-            return parents;
-        }
-        else
+        if (!(parents == null || filterPredicateList.isEmpty()))
         {
             Iterator<ConfiguredObject<?>> iter = parents.iterator();
             while (iter.hasNext())
@@ -306,8 +209,8 @@ public class RestServlet extends Abstrac
                 }
             }
 
-            return parents;
         }
+        return parents;
     }
 
     private List<Predicate<ConfiguredObject<?>>> buildFilterPredicates(final HttpServletRequest request)
@@ -334,36 +237,6 @@ public class RestServlet extends Abstrac
         return Collections.unmodifiableList(predicates);
     }
 
-    private Collection<? extends ConfiguredObject> getAncestors(Class<? extends ConfiguredObject> childType,
-                                                                Class<? extends ConfiguredObject> ancestorType,
-                                                                ConfiguredObject child)
-    {
-        Collection<ConfiguredObject> ancestors = new HashSet<>();
-        Collection<Class<? extends ConfiguredObject>> parentTypes = child.getModel().getParentTypes(childType);
-
-        for(Class<? extends ConfiguredObject> parentClazz : parentTypes)
-        {
-            if(parentClazz == ancestorType)
-            {
-                ConfiguredObject parent = child.getParent(parentClazz);
-                if(parent != null)
-                {
-                    ancestors.add(parent);
-                }
-            }
-            else
-            {
-                ConfiguredObject parent = child.getParent(parentClazz);
-                if(parent != null)
-                {
-                    ancestors.addAll(getAncestors(parentClazz, ancestorType, parent));
-                }
-            }
-        }
-
-        return ancestors;
-    }
-
     @Override
     protected void doGetWithSubjectAndActor(HttpServletRequest request, HttpServletResponse response)
             throws ServletException, IOException
@@ -585,7 +458,9 @@ public class RestServlet extends Abstrac
                 Class<? extends ConfiguredObject> objClass = getConfiguredClass();
                 if (_hierarchy.length > 1)
                 {
-                    List<ConfiguredObject> parents = findAllObjectParents(names);
+
+                    List<ConfiguredObject> parents =
+                            _objectFinder.findObjectParentsFromPath(names, _hierarchy, getConfiguredClass());
                     theParent = parents.remove(0);
                     otherParents = parents.toArray(new ConfiguredObject[parents.size()]);
                 }
@@ -853,7 +728,9 @@ public class RestServlet extends Abstrac
             ConfiguredObject[] otherParents = null;
             if (_hierarchy.length > 1)
             {
-                List<ConfiguredObject> parents = findAllObjectParents(names);
+
+                List<ConfiguredObject> parents =
+                        _objectFinder.findObjectParentsFromPath(names, _hierarchy, getConfiguredClass());
                 theParent = parents.remove(0);
                 otherParents = parents.toArray(new ConfiguredObject[parents.size()]);
             }
@@ -936,68 +813,6 @@ public class RestServlet extends Abstrac
         return providedObject;
     }
 
-    private List<ConfiguredObject> findAllObjectParents(List<String> names)
-    {
-        Collection<ConfiguredObject>[] objects = new Collection[_hierarchy.length];
-        for (int i = 0; i < _hierarchy.length - 1; i++)
-        {
-            objects[i] = new HashSet<>();
-            if (i == 0)
-            {
-                for (ConfiguredObject object : getBroker().getChildren(_hierarchy[0]))
-                {
-                    if (object.getName().equals(names.get(0)))
-                    {
-                        objects[0].add(object);
-                        break;
-                    }
-                }
-            }
-            else
-            {
-                for (int j = i - 1; j >= 0; j--)
-                {
-                    if (getBroker().getModel().getChildTypes(_hierarchy[j]).contains(_hierarchy[i]))
-                    {
-                        for (ConfiguredObject<?> parent : objects[j])
-                        {
-                            for (ConfiguredObject<?> object : parent.getChildren(_hierarchy[i]))
-                            {
-                                if (object.getName().equals(names.get(i)))
-                                {
-                                    objects[i].add(object);
-                                }
-                            }
-                        }
-                        break;
-                    }
-                }
-            }
-
-        }
-        List<ConfiguredObject> parents = new ArrayList<>();
-        Class<? extends ConfiguredObject> objClass = getConfiguredClass();
-        Collection<Class<? extends ConfiguredObject>> parentClasses =
-                getBroker().getModel().getParentTypes(objClass);
-        for (int i = _hierarchy.length - 2; i >= 0; i--)
-        {
-            if (parentClasses.contains(_hierarchy[i]))
-            {
-                if (objects[i].size() == 1)
-                {
-                    parents.add(objects[i].iterator().next());
-                }
-                else
-                {
-                    throw new IllegalArgumentException("Cannot deduce parent of class "
-                                                       + _hierarchy[i].getSimpleName());
-                }
-            }
-
-        }
-        return parents;
-    }
-
     private Map<String, Object> getRequestProvidedObject(HttpServletRequest request, final RequestInfo requestInfo)
             throws IOException, ServletException
     {

Modified: qpid/java/trunk/broker/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker/pom.xml?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/broker/pom.xml (original)
+++ qpid/java/trunk/broker/pom.xml Sun Nov 20 16:42:57 2016
@@ -142,6 +142,13 @@
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-broker-plugins-management-amqp</artifactId>
+      <version>${project.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-plugins-memory-store</artifactId>
       <version>${project.version}</version>
       <scope>runtime</scope>

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java Sun Nov 20 16:42:57 2016
@@ -2041,4 +2041,9 @@ public class AMQConnection extends Close
         }
         return false;
     }
+
+    boolean isVirtualHostPropertiesSupported()
+    {
+        return getDelegate().isVirtualHostPropertiesSupported();
+    }
 }

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java Sun Nov 20 16:42:57 2016
@@ -33,7 +33,6 @@ import javax.naming.Reference;
 import javax.naming.Referenceable;
 import javax.naming.StringRefAddr;
 
-import org.apache.qpid.jndi.ObjectFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +42,7 @@ import org.apache.qpid.client.messaging.
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jndi.ObjectFactory;
 import org.apache.qpid.messaging.Address;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
@@ -429,15 +429,22 @@ public abstract class AMQDestination imp
 
     public AMQShortString getEncodedName()
     {
-        if(_urlAsShortString == null)
+        if(getDestSyntax() == DestSyntax.BURL)
         {
-            if (_url == null)
+            if (_urlAsShortString == null)
             {
-                toURL();
+                if (_url == null)
+                {
+                    toURL();
+                }
+                _urlAsShortString = new AMQShortString(_url);
             }
-            _urlAsShortString = new AMQShortString(_url);
+            return _urlAsShortString;
+        }
+        else
+        {
+            return AMQShortString.valueOf(getName());
         }
-        return _urlAsShortString;
     }
 
     public boolean isDurable()

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Sun Nov 20 16:42:57 2016
@@ -23,7 +23,16 @@ package org.apache.qpid.client;
 import java.io.Serializable;
 import java.net.URISyntaxException;
 import java.text.MessageFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
@@ -41,17 +50,16 @@ import java.util.concurrent.locks.Reentr
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
-import javax.jms.Queue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQChannelClosedException;
 import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.QpidException;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.QpidException;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
@@ -70,6 +78,7 @@ import org.apache.qpid.client.message.JM
 import org.apache.qpid.client.message.MessageEncryptionHelper;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.messaging.address.Link;
 import org.apache.qpid.client.messaging.address.Node;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.client.util.JMSExceptionHelper;
@@ -206,7 +215,7 @@ public abstract class AMQSession<C exten
      */
     private int _nextTag = 1;
 
-    private final Map<Integer,C> _consumers = new ConcurrentHashMap<>();
+    private final Map<String,C> _consumers = new ConcurrentHashMap<>();
 
     /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
     private ConcurrentMap<Destination, AtomicInteger> _destinationConsumerCount =
@@ -307,7 +316,7 @@ public abstract class AMQSession<C exten
      */
     protected Collection<C> getConsumers()
     {
-        return new ArrayList(_consumers.values());
+        return new ArrayList<>(_consumers.values());
     }
 
     protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup)
@@ -924,7 +933,7 @@ public abstract class AMQSession<C exten
 
 
 
-    public void confirmConsumerCancelled(int consumerTag)
+    public void confirmConsumerCancelled(String consumerTag)
     {
         C consumer = _consumers.get(consumerTag);
         if (consumer != null)
@@ -2691,11 +2700,20 @@ public abstract class AMQSession<C exten
      */
     private void consumeFromQueue(C consumer, String queueName, boolean nowait) throws QpidException, FailoverException
     {
-        int tagId = _nextTag++;
+        Link link = consumer.getDestination().getLink();
+        String linkName;
+        if(link != null && link.getName() != null && consumer.getDestination().getAddressType() == AMQDestination.QUEUE_TYPE)
+        {
+            linkName = link.getName();
+        }
+        else
+        {
+            linkName = String.valueOf(_nextTag++);
+        }
 
-        consumer.setConsumerTag(tagId);
+        consumer.setConsumerTag(linkName);
         // we must register the consumer in the map before we actually start listening
-        _consumers.put(tagId, consumer);
+        _consumers.put(linkName, consumer);
 
         synchronized (consumer.getDestination())
         {
@@ -2706,12 +2724,12 @@ public abstract class AMQSession<C exten
 
         try
         {
-            sendConsume(consumer, queueName, nowait, tagId);
+            sendConsume(consumer, queueName, nowait);
         }
         catch (QpidException e)
         {
             // clean-up the map in the event of an error
-            _consumers.remove(tagId);
+            _consumers.remove(linkName);
             throw e;
         }
     }
@@ -2765,7 +2783,7 @@ public abstract class AMQSession<C exten
             throws QpidException;
 
     public abstract void sendConsume(C consumer, String queueName,
-                                     boolean nowait, int tag) throws QpidException, FailoverException;
+                                     boolean nowait) throws QpidException, FailoverException;
 
     private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
             throws JMSException
@@ -3152,7 +3170,7 @@ public abstract class AMQSession<C exten
         _producers.put(producerId, producer);
     }
 
-    private void rejectMessagesForConsumerTag(int consumerTag)
+    private void rejectMessagesForConsumerTag(String consumerTag)
     {
         Iterator<Dispatchable> messages = _queue.iterator();
         if (_logger.isDebugEnabled())
@@ -3172,7 +3190,7 @@ public abstract class AMQSession<C exten
         {
             UnprocessedMessage message = (UnprocessedMessage) messages.next();
 
-            if (message.getConsumerTag() == consumerTag)
+            if (message.getConsumerTag().equals(consumerTag))
             {
 
                 if (_queue.remove(message))

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Sun Nov 20 16:42:57 2016
@@ -611,7 +611,7 @@ public class AMQSession_0_10 extends AMQ
      * Registers the consumer with the broker
      */
     public void sendConsume(BasicMessageConsumer_0_10 consumer, String queueName,
-                            boolean nowait, int tag)
+                            boolean nowait)
             throws QpidException, FailoverException
     {
         queueName = preprocessAddressTopic(consumer, queueName);
@@ -631,13 +631,13 @@ public class AMQSession_0_10 extends AMQ
         boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
 
         String queue = queueName == null ? destination.getAddressName() : queueName;
+        String consumerTag = consumer.getConsumerTag();
         getQpidSession().messageSubscribe
-            (queue, String.valueOf(tag),
+            (queue, consumerTag,
              acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
              preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
              consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
 
-        String consumerTag = (consumer).getConsumerTagString();
 
         if (capacity == 0)
         {
@@ -814,7 +814,7 @@ public class AMQSession_0_10 extends AMQ
         {
             for (BasicMessageConsumer consumer : getConsumers())
             {
-                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+                getQpidSession().messageStop(consumer.getConsumerTag(),
                                              Option.UNRELIABLE);
             }
             sync();
@@ -823,7 +823,7 @@ public class AMQSession_0_10 extends AMQ
         {
             for (BasicMessageConsumer_0_10 consumer : getConsumers())
             {
-                String consumerTag = String.valueOf(consumer.getConsumerTag());
+                String consumerTag = consumer.getConsumerTag();
                 //only set if msg list is null
                 try
                 {

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Sun Nov 20 16:42:57 2016
@@ -502,8 +502,7 @@ public class AMQSession_0_8 extends AMQS
     @Override
     public void sendConsume(BasicMessageConsumer_0_8 consumer,
                             String queueName,
-                            boolean nowait,
-                            int tag) throws QpidException, FailoverException
+                            boolean nowait) throws QpidException, FailoverException
     {
         queueName = preprocessAddressTopic(consumer, queueName);
 
@@ -519,7 +518,7 @@ public class AMQSession_0_8 extends AMQS
 
         BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
                                                                            queueName,
-                                                                           String.valueOf(tag),
+                                                                           consumer.getConsumerTag(),
                                                                            consumer.isNoLocal(),
                                                                            consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
                                                                            consumer.isExclusive(),
@@ -997,7 +996,14 @@ public class AMQSession_0_8 extends AMQS
 
     public void sync() throws QpidException
     {
-        declareExchange("amq.direct", "direct", false);
+        if(getAMQConnection().isVirtualHostPropertiesSupported())
+        {
+            isBound(null, "$virtualhostProperties", null);
+        }
+        else
+        {
+            declareExchange("amq.direct", "direct", false);
+        }
     }
 
     @Override

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Sun Nov 20 16:42:57 2016
@@ -76,7 +76,7 @@ public abstract class BasicMessageConsum
      */
     private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
 
-    private int _consumerTag;
+    private String _consumerTag;
 
     private final int _channelId;
 
@@ -855,12 +855,12 @@ public abstract class BasicMessageConsum
     }
 
     /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
-    public int getConsumerTag()
+    public String getConsumerTag()
     {
         return _consumerTag;
     }
 
-    public void setConsumerTag(int consumerTag)
+    public void setConsumerTag(String consumerTag)
     {
         _consumerTag = consumerTag;
     }

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Sun Nov 20 16:42:57 2016
@@ -74,8 +74,7 @@ public class BasicMessageConsumer_0_10 e
      * Specify whether this consumer is performing a sync receive
      */
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
-    private String _consumerTagString;
-    
+
     private final long _capacity;
 
     /** Flag indicating if the server supports message selectors */
@@ -110,17 +109,6 @@ public class BasicMessageConsumer_0_10 e
         }
     }
 
-    @Override public void setConsumerTag(int consumerTag)
-    {
-        super.setConsumerTag(consumerTag);
-        _consumerTagString = String.valueOf(consumerTag);
-    }
-
-    public String getConsumerTagString()
-    {
-        return _consumerTagString;
-    }
-
     /**
      *
      * This is invoked by the session thread when emptying the session message queue.
@@ -165,7 +153,7 @@ public class BasicMessageConsumer_0_10 e
      */
     @Override void sendCancel() throws QpidException
     {
-        _0_10session.getQpidSession().messageCancel(getConsumerTagString());
+        _0_10session.getQpidSession().messageCancel(getConsumerTag());
         postSubscription();
         try
         {
@@ -337,7 +325,7 @@ public class BasicMessageConsumer_0_10 e
 
     private void messageFlow()
     {
-        _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+        _0_10session.getQpidSession().messageFlow(getConsumerTag(),
                                                   MessageCreditUnit.MESSAGE, 1,
                                                   Option.UNRELIABLE);
     }
@@ -398,17 +386,17 @@ public class BasicMessageConsumer_0_10 e
         Object o = super.getMessageFromQueue(l);
         if (o == null && _0_10session.isStarted())
         {
-           
+
             _0_10session.getQpidSession().messageFlush
-                (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
+                    (getConsumerTag(), Option.UNRELIABLE, Option.SYNC);
             _0_10session.getQpidSession().messageFlow
-                (getConsumerTagString(), MessageCreditUnit.BYTE,
+                (getConsumerTag(), MessageCreditUnit.BYTE,
                  0xFFFFFFFF, Option.UNRELIABLE);
             
             if (_capacity > 0)
             {
                 _0_10session.getQpidSession().messageFlow
-                                               (getConsumerTagString(),
+                                               (getConsumerTag(),
                                                 MessageCreditUnit.MESSAGE,
                                                 _capacity,
                                                 Option.UNRELIABLE);
@@ -559,7 +547,7 @@ public class BasicMessageConsumer_0_10 e
 
             if (capacity == 0 && getMessageListener() == null)
             {
-                session.getQpidSession().messageFlow(getConsumerTagString(),
+                session.getQpidSession().messageFlow(getConsumerTag(),
                                                      MessageCreditUnit.MESSAGE, 1,
                                                      Option.UNRELIABLE);
 
@@ -571,7 +559,7 @@ public class BasicMessageConsumer_0_10 e
 
             if (message == null && capacity == 0 && getMessageListener() == null)
             {
-                session.getQpidSession().messageFlow(getConsumerTagString(),
+                session.getQpidSession().messageFlow(getConsumerTag(),
                                                      MessageCreditUnit.MESSAGE, 0,
                                                      Option.UNRELIABLE);
                 session.sync();
@@ -596,7 +584,7 @@ public class BasicMessageConsumer_0_10 e
 
             if (capacity == 0 && getMessageListener() == null)
             {
-                session.getQpidSession().messageFlow(getConsumerTagString(),
+                session.getQpidSession().messageFlow(getConsumerTag(),
                                                      MessageCreditUnit.MESSAGE, 1,
                                                      Option.UNRELIABLE);
 
@@ -605,7 +593,7 @@ public class BasicMessageConsumer_0_10 e
             Message message = super.receiveNoWait();
             if (message == null && capacity == 0 && getMessageListener() == null)
             {
-                session.getQpidSession().messageFlow(getConsumerTagString(),
+                session.getQpidSession().messageFlow(getConsumerTag(),
                                                      MessageCreditUnit.MESSAGE, 0,
                                                      Option.UNRELIABLE);
                 session.sync();

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Sun Nov 20 16:42:57 2016
@@ -117,7 +117,7 @@ public class BasicMessageConsumer_0_8 ex
 
     void sendCancel() throws QpidException, FailoverException
     {
-        BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false);
+        BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(getConsumerTag()), false);
 
         final AMQFrame cancelFrame = body.generateFrame(getChannelId());
 

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Sun Nov 20 16:42:57 2016
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.client.handler;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,12 +30,15 @@ import org.apache.qpid.QpidException;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicDeliverBody;
 
 public class BasicDeliverMethodHandler implements StateAwareMethodListener<BasicDeliverBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicDeliverMethodHandler.class);
 
+    private static final ConcurrentMap<AMQShortString,String> CONSUMER_TAG_MAP = new ConcurrentHashMap<>();
+
     private static final BasicDeliverMethodHandler _instance = new BasicDeliverMethodHandler();
 
     public static BasicDeliverMethodHandler getInstance()
@@ -45,7 +51,7 @@ public class BasicDeliverMethodHandler i
     {
         final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
                 body.getDeliveryTag(),
-                body.getConsumerTag().toIntValue(),
+                getTagAsStringTag(body),
                 body.getExchange(),
                 body.getRoutingKey(),
                 body.getRedelivered());
@@ -55,4 +61,16 @@ public class BasicDeliverMethodHandler i
         }
         session.unprocessedMessageReceived(channelId, msg);
     }
+
+    private static String getTagAsStringTag(final BasicDeliverBody body)
+    {
+        AMQShortString consumerTag = body.getConsumerTag();
+        String tag = CONSUMER_TAG_MAP.get(consumerTag);
+        if(tag == null)
+        {
+            tag = consumerTag.toString();
+            CONSUMER_TAG_MAP.putIfAbsent(consumerTag, tag);
+        }
+        return tag;
+    }
 }

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java Sun Nov 20 16:42:57 2016
@@ -30,7 +30,7 @@ public class ReturnMessage extends Unpro
 
     public ReturnMessage(AMQShortString exchange, AMQShortString routingKey, AMQShortString replyText, int replyCode)
     {
-        super(-1,0,exchange,routingKey,false);
+        super(-1,"",exchange,routingKey,false);
         _replyText = replyText;
         _replyCode = replyCode;
     }

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Sun Nov 20 16:42:57 2016
@@ -32,10 +32,10 @@ import org.apache.qpid.client.AMQSession
  */
 public abstract class UnprocessedMessage implements AMQSession.Dispatchable
 {
-    private final int _consumerTag;
+    private final String _consumerTag;
 
 
-    public UnprocessedMessage(int consumerTag)
+    public UnprocessedMessage(String consumerTag)
     {
         _consumerTag = consumerTag;
     }
@@ -44,7 +44,7 @@ public abstract class UnprocessedMessage
     abstract public long getDeliveryTag();
 
 
-    public int getConsumerTag()
+    public String getConsumerTag()
     {
         return _consumerTag;
     }
@@ -54,4 +54,4 @@ public abstract class UnprocessedMessage
         ssn.dispatch(this);
     }
 
-}
\ No newline at end of file
+}

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Sun Nov 20 16:42:57 2016
@@ -35,7 +35,7 @@ public class UnprocessedMessage_0_10 ext
 
     public UnprocessedMessage_0_10(MessageTransfer xfr)
     {
-        super(Integer.parseInt(xfr.getDestination()));
+        super(xfr.getDestination());
         _transfer = xfr;
     }
 

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Sun Nov 20 16:42:57 2016
@@ -20,11 +20,6 @@
  */
 package org.apache.qpid.client.message;
 
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -32,6 +27,11 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
 /**
  * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
  * the content body/ies.
@@ -56,7 +56,7 @@ public class UnprocessedMessage_0_8 exte
     /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
     private List<ContentBody> _bodies;
 
-    public UnprocessedMessage_0_8(long deliveryId, int consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
+    public UnprocessedMessage_0_8(long deliveryId, String consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
     {
         super(consumerTag);
         _exchange = exchange;

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Sun Nov 20 16:42:57 2016
@@ -399,7 +399,7 @@ public class AMQProtocolSession implemen
     {
         final AMQSession session = getSession(channelId);
 
-        session.confirmConsumerCancelled(consumerTag.toIntValue());
+        session.confirmConsumerCancelled(consumerTag.toString());
     }
 
     public void setProtocolVersion(final ProtocolVersion pv)

Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Sun Nov 20 16:42:57 2016
@@ -287,7 +287,7 @@ public class AMQSession_0_10Test extends
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
                     null, null, false, true);
-            session.sendConsume(consumer, "test", true, 1);
+            session.sendConsume(consumer, "test", true);
         }
         catch (Exception e)
         {
@@ -390,6 +390,7 @@ public class AMQSession_0_10Test extends
         {
             BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
                     null, null, false, true);
+            consumer.setConsumerTag("");
             consumer.close();
         }
         catch (Exception e)
@@ -480,7 +481,7 @@ public class AMQSession_0_10Test extends
         UnprocessedMessage[] messages = new UnprocessedMessage[4];
         for (int i =0; i< messages.length;i++ )
         {
-            int consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
+            String consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
             int deliveryTag = i + 1;
             messages[i]= createMockMessage(deliveryTag, consumerTag);
             session.messageReceived(messages[i]);
@@ -515,7 +516,7 @@ public class AMQSession_0_10Test extends
         UnprocessedMessage[] messages = new UnprocessedMessage[4];
         for (int i =0; i< messages.length;i++ )
         {
-            int consumerTag = i % 2;
+            String consumerTag = String.valueOf(i % 2);
             int deliveryTag = i + 1;
             messages[i]= createMockMessage(deliveryTag, consumerTag);
             session.messageReceived(messages[i]);
@@ -540,7 +541,7 @@ public class AMQSession_0_10Test extends
         }
     }
 
-    private UnprocessedMessage createMockMessage(long deliveryTag, int consumerTag)
+    private UnprocessedMessage createMockMessage(long deliveryTag, String consumerTag)
     {
         UnprocessedMessage message = mock(UnprocessedMessage.class);
         when(message.getConsumerTag()).thenReturn(consumerTag);

Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java Sun Nov 20 16:42:57 2016
@@ -116,7 +116,7 @@ public class AMQSession_0_8Test extends
         UnprocessedMessage[] messages = new UnprocessedMessage[4];
         for (int i =0; i< messages.length;i++ )
         {
-            int consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
+            String consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
             int deliveryTag = i + 1;
             messages[i]= createMockMessage(deliveryTag, consumerTag);
             session.messageReceived(messages[i]);
@@ -153,7 +153,7 @@ public class AMQSession_0_8Test extends
         assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
     }
 
-    private UnprocessedMessage createMockMessage(long deliveryTag, int consumerTag)
+    private UnprocessedMessage createMockMessage(long deliveryTag, String consumerTag)
     {
         UnprocessedMessage message = mock(UnprocessedMessage.class);
         when(message.getConsumerTag()).thenReturn(consumerTag);

Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Sun Nov 20 16:42:57 2016
@@ -281,6 +281,19 @@ public class QpidBrokerTestCase extends
         return getConnection(curl);
     }
 
+    public Connection getConnectionForVHost(String vhost)
+            throws URLSyntaxException, NamingException, JMSException
+    {
+        ConnectionURL curl = new AMQConnectionURL(getConnectionFactory().getConnectionURLString());
+        curl.setVirtualHost(vhost);
+        curl = new AMQConnectionURL(curl.toString());
+
+        curl.setUsername(GUEST_USERNAME);
+        curl.setPassword(GUEST_PASSWORD);
+        return getConnection(curl);
+    }
+
+
     public Connection getConnection(ConnectionURL url) throws JMSException
     {
         _logger.debug("get connection for " + url.getURL());

Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java?rev=1770576&view=auto
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java (added)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java Sun Nov 20 16:42:57 2016
@@ -0,0 +1,639 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.management.amqp;
+
+import static org.apache.qpid.server.model.Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.queue.PriorityQueue;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class AmqpManagementTest extends QpidBrokerTestCase
+{
+    private Connection _connection;
+    private Session _session;
+    private Queue _queue;
+    private Queue _replyAddress;
+    private Queue _replyConsumer;
+    private MessageConsumer _consumer;
+    private MessageProducer _producer;
+
+    private void setupSession() throws Exception
+    {
+        _connection.start();
+        _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        _queue = _session.createQueue("ADDR:$management");
+        _replyAddress = _session.createQueue("ADDR:!response");
+        _replyConsumer = _session.createQueue(
+                "ADDR:$management ; {assert : never, node: { type: queue }, link:{name: \"!response\"}}");
+        _consumer = _session.createConsumer(_replyConsumer);
+        _producer = _session.createProducer(_queue);
+    }
+
+    private void setupBrokerManagementConnection() throws Exception
+    {
+        AMQConnectionFactory management = getConnectionFactory("management");
+        _connection = management.createConnection(GUEST_USERNAME, GUEST_PASSWORD);
+        setupSession();
+    }
+
+    private void setupVirtualHostManagementConnection() throws Exception
+    {
+        _connection = getConnection();
+        setupSession();
+    }
+
+    // test get types on $management
+    public void testGetTypesOnBrokerManagement() throws Exception
+    {
+        setupBrokerManagementConnection();
+
+        Message message = _session.createBytesMessage();
+
+        message.setStringProperty("identity", "self");
+        message.setStringProperty("type", "org.amqp.management");
+        message.setStringProperty("operation", "GET-TYPES");
+
+        message.setJMSReplyTo(_replyAddress);
+
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertEquals("The correlation id does not match the sent message's messageId", message.getJMSMessageID(), responseMessage.getJMSCorrelationID());
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+        assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+        assertNotNull("The response did not include the org.amqp.Management type",
+                      ((MapMessage) responseMessage).getObject("org.amqp.management"));
+        assertNotNull("The response did not include the org.apache.qpid.Port type",
+                      ((MapMessage) responseMessage).getObject("org.apache.qpid.Port"));
+    }
+
+    // test get types on a virtual host
+    public void testGetTypesOnVhostManagement() throws Exception
+    {
+        setupVirtualHostManagementConnection();
+
+        Message message = _session.createBytesMessage();
+
+        message.setStringProperty("identity", "self");
+        message.setStringProperty("type", "org.amqp.management");
+        message.setStringProperty("operation", "GET-TYPES");
+        byte[] correlationID = "some correlation id".getBytes();
+        message.setJMSCorrelationIDAsBytes(correlationID);
+
+        message.setJMSReplyTo(_replyAddress);
+
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The correlation id does not match the sent message's correlationId", Arrays.equals(correlationID, responseMessage.getJMSCorrelationIDAsBytes()));
+
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+        assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+        assertNotNull("The response did not include the org.amqp.Management type",
+                      ((MapMessage) responseMessage).getObject("org.amqp.management"));
+        assertNull("The response included the org.apache.qpid.Port type",
+                   ((MapMessage) responseMessage).getObject("org.apache.qpid.Port"));
+
+
+
+    }
+
+    // create / update / read / delete a queue via $management
+    public void testCreateQueueOnBrokerManagement() throws Exception
+    {
+        setupBrokerManagementConnection();
+
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name", getTestName());
+        message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 100L);
+        String path = "test/test/" + getTestName();
+        message.setString("object-path", path);
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 201, responseMessage.getIntProperty("statusCode"));
+        assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+        assertEquals("The created queue was not a standard queue", "org.apache.qpid.StandardQueue", ((MapMessage)responseMessage).getString("type"));
+        assertEquals("The created queue was not a standard queue", "standard", ((MapMessage)responseMessage).getString("qpid-type"));
+        assertEquals("the created queue did not have the correct alerting threshold", 100L, ((MapMessage)responseMessage).getLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
+        Object identity = ((MapMessage) responseMessage).getObject("identity");
+
+        message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "UPDATE");
+        message.setObjectProperty("identity", identity);
+        message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 250L);
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+        assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+        assertEquals("the created queue did not have the correct alerting threshold", 250L, ((MapMessage)responseMessage).getLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
+
+        message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "DELETE");
+        message.setObjectProperty("index", "object-path");
+        message.setObjectProperty("key", path);
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 204, responseMessage.getIntProperty("statusCode"));
+
+        message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "READ");
+        message.setObjectProperty("identity", identity);
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate not found", 404, responseMessage.getIntProperty("statusCode"));
+
+    }
+    // create / update / read / delete a queue via vhost
+
+    public void testCreateQueueOnVhostManagement() throws Exception
+    {
+        setupVirtualHostManagementConnection();
+
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name", getTestName());
+        message.setInt(PriorityQueue.PRIORITIES, 13);
+        String path = getTestName();
+        message.setString("object-path", path);
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 201, responseMessage.getIntProperty("statusCode"));
+        assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+        assertEquals("The created queue was not a priority queue", "org.apache.qpid.PriorityQueue", ((MapMessage)responseMessage).getString("type"));
+        assertEquals("The created queue was not a standard queue", "priority", ((MapMessage)responseMessage).getString("qpid-type"));
+        assertEquals("the created queue did not have the correct number of priorities", 13, ((MapMessage)responseMessage).getInt(PriorityQueue.PRIORITIES));
+        Object identity = ((MapMessage) responseMessage).getObject("identity");
+
+        // Trying to create a second queue with the same name should cause a conflict
+        message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name", getTestName());
+        message.setInt(PriorityQueue.PRIORITIES, 7);
+        message.setString("object-path", getTestName());
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate conflict", 409, responseMessage.getIntProperty("statusCode"));
+
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "READ");
+        message.setObjectProperty("identity", identity);
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+        assertEquals("the queue did not have the correct number of priorities", 13, ((MapMessage)responseMessage).getInt(PriorityQueue.PRIORITIES));
+        assertEquals("the queue did not have the expected path", getTestName(), ((MapMessage)responseMessage).getString("object-path"));
+
+
+        message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "UPDATE");
+        message.setObjectProperty("identity", identity);
+        message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 250L);
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+        assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+        assertEquals("The updated queue did not have the correct alerting threshold", 250L, ((MapMessage)responseMessage).getLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
+
+
+        message = _session.createMapMessage();
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "DELETE");
+        message.setObjectProperty("index", "object-path");
+        message.setObjectProperty("key", path);
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 204, responseMessage.getIntProperty("statusCode"));
+
+        message = _session.createMapMessage();
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "DELETE");
+        message.setObjectProperty("index", "object-path");
+        message.setObjectProperty("key", path);
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate not found", 404, responseMessage.getIntProperty("statusCode"));
+    }
+
+    // read virtual host from virtual host management
+    public void testReadVirtualHost() throws Exception
+    {
+        setupVirtualHostManagementConnection();
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.VirtualHost");
+        message.setStringProperty("operation", "READ");
+        message.setStringProperty("index", "object-path");
+        message.setStringProperty("key", "");
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("Incorrect response code", 200, responseMessage.getIntProperty("statusCode"));
+        assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+        assertEquals("The name of the virtual host is not as expected", "test", ((MapMessage)responseMessage).getString("name"));
+    }
+
+    // create a virtual host from $management
+    public void testCreateVirtualHost() throws Exception
+    {
+        setupBrokerManagementConnection();
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.JsonVirtualHostNode");
+        message.setStringProperty("operation", "CREATE");
+        String virtualHostName = "newMemoryVirtualHost";
+        message.setString("name", virtualHostName);
+        message.setString(VirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, "{ \"type\" : \"Memory\" }");
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("Incorrect response code", 201, responseMessage.getIntProperty("statusCode"));
+        _connection.close();
+        _connection = getConnectionForVHost("/"+virtualHostName);
+        setupSession();
+
+        message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.VirtualHost");
+        message.setStringProperty("operation", "READ");
+        message.setStringProperty("index", "object-path");
+        message.setStringProperty("key", "");
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("Incorrect response code", 200, responseMessage.getIntProperty("statusCode"));
+        assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+        assertEquals("The name of the virtual host is not as expected", virtualHostName, ((MapMessage)responseMessage).getString("name"));
+        assertEquals("The type of the virtual host is not as expected", "Memory", ((MapMessage)responseMessage).getString("qpid-type"));
+
+
+    }
+    // attempt to delete the virtual host via the virtual host
+    public void testDeleteVirtualHost() throws Exception
+    {
+        setupVirtualHostManagementConnection();
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.VirtualHost");
+        message.setStringProperty("operation", "DELETE");
+        message.setStringProperty("index", "object-path");
+        message.setStringProperty("key", "");
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("Incorrect response code", 501, responseMessage.getIntProperty("statusCode"));
+    }
+
+    // create a queue with the qpid type
+    public void testCreateQueueWithQpidType() throws Exception
+    {
+        setupVirtualHostManagementConnection();
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name", getTestName());
+        message.setString("qpid-type", "lvq");
+        String path = getTestName();
+        message.setString("object-path", path);
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("Incorrect response code", 201, responseMessage.getIntProperty("statusCode"));
+        assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+        assertEquals("The created queue did not have the correct type", "org.apache.qpid.LastValueQueue", ((MapMessage)responseMessage).getString("type"));
+    }
+
+    // create a queue using the AMQP type
+    public void testCreateQueueWithAmqpType() throws Exception
+    {
+        setupVirtualHostManagementConnection();
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.SortedQueue");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name", getTestName());
+        String path = getTestName();
+        message.setString("object-path", path);
+        message.setString("sortKey", "foo");
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("Incorrect response code", 201, responseMessage.getIntProperty("statusCode"));
+        assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+        assertEquals("The created queue did not have the correct type", "sorted", ((MapMessage)responseMessage).getString("qpid-type"));
+    }
+
+    // attempt to create an exchange without a type
+    public void testCreateExchangeWithoutType() throws Exception
+    {
+        setupVirtualHostManagementConnection();
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Exchange");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name", getTestName());
+        String path = getTestName();
+        message.setString("object-path", path);
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("Incorrect response code", 400, responseMessage.getIntProperty("statusCode"));
+    }
+
+
+
+    // attempt to create a connection
+    public void testCreateConnectionOnVhostManagement() throws Exception
+    {
+        setupVirtualHostManagementConnection();
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Connection");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name", getTestName());
+        String path = getTestName();
+        message.setString("object-path", path);
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate not implemented", 501, responseMessage.getIntProperty("statusCode"));
+    }
+
+    public void testCreateConnectionOnBrokerManagement() throws Exception
+    {
+        setupBrokerManagementConnection();
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Connection");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name", getTestName());
+        String path = getTestName();
+        message.setString("object-path", path);
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate not implemented", 501, responseMessage.getIntProperty("statusCode"));
+    }
+
+    // create a binding
+    public void testCreateBindingOnVhostManagement() throws Exception
+    {
+        setupVirtualHostManagementConnection();
+        String exchangeName = getTestName() + "_Exchange";
+        String queueName = getTestName() + "_Queue";
+        String exchangePath = exchangeName;
+        String queuePath = queueName;
+
+        doTestCreateBinding(exchangeName, queueName, exchangePath, queuePath);
+
+    }
+
+    public void testCreateBindingOnBrokerManagement() throws Exception
+    {
+        setupBrokerManagementConnection();
+        String exchangeName = getTestName() + "_Exchange";
+        String queueName = getTestName() + "_Queue";
+        String exchangePath = "test/test/"+exchangeName;
+        String queuePath = "test/test/"+exchangeName;
+
+        doTestCreateBinding(exchangeName, queueName, exchangePath, queuePath);
+
+    }
+
+    private void doTestCreateBinding(final String exchangeName,
+                                     final String queueName,
+                                     final String exchangePath,
+                                     final String queuePath) throws JMSException
+    {
+        MapMessage message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Queue");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name", queueName);
+        message.setString("object-path", queuePath);
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        Message responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 201, responseMessage.getIntProperty("statusCode"));
+
+        message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.FanoutExchange");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name", exchangeName);
+        message.setString("object-path", exchangePath);
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 201, responseMessage.getIntProperty("statusCode"));
+
+        message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Binding");
+        message.setStringProperty("operation", "CREATE");
+        message.setString("name",  "binding1");
+        message.setString("object-path", exchangePath + "/" + queueName + "/binding1");
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 201, responseMessage.getIntProperty("statusCode"));
+
+        // use an operation to bind
+        message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Exchange");
+        message.setStringProperty("operation", "bind");
+        message.setStringProperty("index", "object-path");
+        message.setStringProperty("key", exchangePath);
+        message.setStringProperty("bindingKey",  "binding2");
+        message.setStringProperty("queue", queueName);
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+
+        // read the new binding
+        message = _session.createMapMessage();
+
+        message.setStringProperty("type", "org.apache.qpid.Binding");
+        message.setStringProperty("operation", "READ");
+        message.setStringProperty("index",  "object-path");
+        message.setStringProperty("key", exchangePath + "/" + queueName + "/binding2");
+
+        message.setJMSReplyTo(_replyAddress);
+        _producer.send(message);
+
+        responseMessage = _consumer.receive(getReceiveTimeout());
+        assertNotNull("A response message was not sent", responseMessage);
+        assertTrue("The response message does not have a status code",
+                   Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+        assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+    }
+
+}

Propchange: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java Sun Nov 20 16:42:57 2016
@@ -217,11 +217,10 @@ public class QueueRestTest extends QpidR
                 ConfiguredObject.CONTEXT,
                 ConfiguredObject.DESIRED_STATE);
 
-        assertEquals("Unexpected binding attribute " + Consumer.NAME, "1", consumer.get(Consumer.NAME));
-        assertEquals("Unexpected binding attribute " + Consumer.DURABLE, Boolean.FALSE, consumer.get(Consumer.DURABLE));
-        assertEquals("Unexpected binding attribute " + Consumer.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END.name(),
+        assertEquals("Unexpected consumer attribute " + Consumer.DURABLE, Boolean.FALSE, consumer.get(Consumer.DURABLE));
+        assertEquals("Unexpected consumer attribute " + Consumer.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END.name(),
                 consumer.get(Consumer.LIFETIME_POLICY));
-        assertEquals("Unexpected binding attribute " + Consumer.DISTRIBUTION_MODE, "MOVE",
+        assertEquals("Unexpected consumer attribute " + Consumer.DISTRIBUTION_MODE, "MOVE",
                 consumer.get(Consumer.DISTRIBUTION_MODE));
 
         @SuppressWarnings("unchecked")




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org