You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/20 16:42:59 UTC
svn commit: r1770576 [7/8] - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/
broker-codegen/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/binding/ broker-core...
Added: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementOutputConverter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementOutputConverter.java?rev=1770576&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementOutputConverter.java (added)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementOutputConverter.java Sun Nov 20 16:42:57 2016
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectCustomSerialization;
+
+class ManagementOutputConverter
+{
+ private static final List<String> ID_AND_TYPE = Arrays.asList(ConfiguredObject.ID, ConfiguredObject.TYPE);
+ private final ManagementNode _managementNode;
+
+ ManagementOutputConverter(final ManagementNode managementNode)
+ {
+ _managementNode = managementNode;
+ }
+
+ private Map<Object, Object> convertMapToOutput(final Map<?, ?> attributes)
+ {
+ Map<Object,Object> result = new LinkedHashMap<>();
+ for(Map.Entry<?,?> entry : attributes.entrySet())
+ {
+ result.put(convertObjectToOutput(entry.getKey()), convertObjectToOutput(entry.getValue()));
+ }
+ return result;
+ }
+
+ private Collection<?> convertCollectionToOutput(final Collection<?> value)
+ {
+
+ List<Object> result = new ArrayList<>();
+ for(Object entry : value)
+ {
+ result.add(convertObjectToOutput(entry));
+ }
+ return result;
+ }
+
+ private Object convertObjectToOutput(final Object value)
+ {
+ if(value == null)
+ {
+ return null;
+ }
+ else if(value instanceof String
+ || value instanceof Integer
+ || value instanceof Long
+ || value instanceof Byte
+ || value instanceof Character
+ || value instanceof Float
+ || value instanceof Double
+ || value instanceof byte[])
+ {
+ return value;
+ }
+ else if(value instanceof Map)
+ {
+ return convertMapToOutput((Map<?,?>)value);
+ }
+ else if(value instanceof Collection)
+ {
+ return convertCollectionToOutput((Collection<?>)value);
+ }
+ else if(value instanceof ConfiguredObject)
+ {
+ return ((ConfiguredObject)value).getName();
+ }
+ else
+ {
+ for(ConfiguredObjectCustomSerialization.Converter converter : ConfiguredObjectCustomSerialization.getConverters())
+ {
+ if(converter.getConversionClass().isAssignableFrom(value.getClass()))
+ {
+ return convertObjectToOutput(converter.convert(value));
+ }
+ }
+
+ return value.toString();
+ }
+ }
+
+ protected Map<?, ?> convertToOutput(final ConfiguredObject<?> object,
+ final boolean actuals)
+ {
+ Map<String, Object> attributes = new LinkedHashMap<>();
+ attributes.put(ManagementNode.IDENTITY_ATTRIBUTE, object.getId());
+ attributes.put(ManagementNode.OBJECT_PATH, _managementNode.generatePath(object));
+ attributes.put(ManagementNode.TYPE_ATTRIBUTE, _managementNode.getAmqpName(object.getTypeClass()));
+ attributes.put(ManagementNode.QPID_TYPE, object.getType());
+
+ if(object != _managementNode.getManagedObject() && !_managementNode.isSyntheticChildClass(object.getCategoryClass()))
+ {
+ for (Class<? extends ConfiguredObject> parentType : object.getModel()
+ .getParentTypes(object.getCategoryClass()))
+ {
+ if (parentType != _managementNode.getManagedObject().getCategoryClass())
+ {
+ attributes.put(parentType.getSimpleName().toLowerCase(), object.getParent(parentType));
+ }
+ }
+ }
+
+ for(String name : object.getAttributeNames())
+ {
+ if(!ID_AND_TYPE.contains(name))
+ {
+ Object value = actuals
+ ? object.getActualAttributes().get(name)
+ : object.getAttribute(name);
+ if (value != null)
+ {
+ attributes.put(name, value);
+ }
+ }
+ }
+
+ return convertMapToOutput(attributes);
+ }
+}
Propchange: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementOutputConverter.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java Sun Nov 20 16:42:57 2016
@@ -56,11 +56,11 @@ import org.apache.qpid.server.management
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFinder;
import org.apache.qpid.server.model.ConfiguredObjectOperation;
import org.apache.qpid.server.model.Content;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.IntegrityViolationException;
-import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.OperationTimeoutException;
import org.apache.qpid.server.model.preferences.UserPreferences;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -110,6 +110,7 @@ public class RestServlet extends Abstrac
private final boolean _hierarchyInitializationRequired;
private volatile RequestInfoParser _requestInfoParser;
private RestUserPreferenceHandler _userPreferenceHandler;
+ private ConfiguredObjectFinder _objectFinder;
@SuppressWarnings("unused")
public RestServlet()
@@ -133,6 +134,7 @@ public class RestServlet extends Abstrac
{
doInitialization();
}
+ _objectFinder = new ConfiguredObjectFinder(getBroker());
_requestInfoParser = new RequestInfoParser(_hierarchy);
Handler.register();
Long preferenceOperationTimeout = getManagementConfiguration().getContextValue(Long.class, PREFERENCE_OPERTAION_TIMEOUT_CONTEXT_NAME);
@@ -181,116 +183,17 @@ public class RestServlet extends Abstrac
}
}
+
private Collection<ConfiguredObject<?>> getTargetObjects(RequestInfo requestInfo,
List<Predicate<ConfiguredObject<?>>> filterPredicateList)
{
List<String> names = requestInfo.getModelParts();
+ ConfiguredObject<?> root = getBroker();
+ Class<? extends ConfiguredObject>[] hierarchy = _hierarchy;
- Collection<ConfiguredObject<?>> parents = new ArrayList<>();
- parents.add(getBroker());
- Collection<ConfiguredObject<?>> children = new ArrayList<>();
-
- Map<Class<? extends ConfiguredObject>, String> filters =
- new HashMap<>();
-
- final Model model = getBroker().getModel();
- boolean wildcard = false;
- Class<? extends ConfiguredObject> parentType = Broker.class;
- for (int i = 0; i < _hierarchy.length; i++)
- {
- if (model.getChildTypes(parentType).contains(_hierarchy[i]))
- {
- parentType = _hierarchy[i];
- for (ConfiguredObject<?> parent : parents)
- {
- if (names.size() > i
- && names.get(i) != null
- && !names.get(i).equals("*")
- && names.get(i).trim().length() != 0)
- {
- for (ConfiguredObject<?> child : parent.getChildren(_hierarchy[i]))
- {
- if (child.getName().equals(names.get(i)))
- {
- children.add(child);
- }
- }
- if (children.isEmpty())
- {
- return null;
- }
- }
- else
- {
- wildcard = true;
- children.addAll((Collection<? extends ConfiguredObject<?>>) parent.getChildren(_hierarchy[i]));
- }
- }
- }
- else
- {
- children = parents;
- if (names.size() > i
- && names.get(i) != null
- && !names.get(i).equals("*")
- && names.get(i).trim().length() != 0)
- {
- filters.put(_hierarchy[i], names.get(i));
- }
- else
- {
- wildcard = true;
- }
- }
-
- parents = children;
- children = new ArrayList<>();
- }
-
- if (!filters.isEmpty() && !parents.isEmpty())
- {
- Collection<ConfiguredObject<?>> potentials = parents;
- parents = new ArrayList<>();
-
- for (ConfiguredObject o : potentials)
- {
-
- boolean match = true;
-
- for (Map.Entry<Class<? extends ConfiguredObject>, String> entry : filters.entrySet())
- {
- Collection<? extends ConfiguredObject> ancestors =
- getAncestors(getConfiguredClass(), entry.getKey(), o);
- match = false;
- for (ConfiguredObject ancestor : ancestors)
- {
- if (ancestor.getName().equals(entry.getValue()))
- {
- match = true;
- break;
- }
- }
- if (!match)
- {
- break;
- }
- }
- if (match)
- {
- parents.add(o);
- }
- }
- }
+ Collection<ConfiguredObject<?>> parents = _objectFinder.findObjectsFromPath(names, hierarchy, true);
- if (parents.isEmpty() && !wildcard)
- {
- return null;
- }
- else if (filterPredicateList.isEmpty())
- {
- return parents;
- }
- else
+ if (!(parents == null || filterPredicateList.isEmpty()))
{
Iterator<ConfiguredObject<?>> iter = parents.iterator();
while (iter.hasNext())
@@ -306,8 +209,8 @@ public class RestServlet extends Abstrac
}
}
- return parents;
}
+ return parents;
}
private List<Predicate<ConfiguredObject<?>>> buildFilterPredicates(final HttpServletRequest request)
@@ -334,36 +237,6 @@ public class RestServlet extends Abstrac
return Collections.unmodifiableList(predicates);
}
- private Collection<? extends ConfiguredObject> getAncestors(Class<? extends ConfiguredObject> childType,
- Class<? extends ConfiguredObject> ancestorType,
- ConfiguredObject child)
- {
- Collection<ConfiguredObject> ancestors = new HashSet<>();
- Collection<Class<? extends ConfiguredObject>> parentTypes = child.getModel().getParentTypes(childType);
-
- for(Class<? extends ConfiguredObject> parentClazz : parentTypes)
- {
- if(parentClazz == ancestorType)
- {
- ConfiguredObject parent = child.getParent(parentClazz);
- if(parent != null)
- {
- ancestors.add(parent);
- }
- }
- else
- {
- ConfiguredObject parent = child.getParent(parentClazz);
- if(parent != null)
- {
- ancestors.addAll(getAncestors(parentClazz, ancestorType, parent));
- }
- }
- }
-
- return ancestors;
- }
-
@Override
protected void doGetWithSubjectAndActor(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
@@ -585,7 +458,9 @@ public class RestServlet extends Abstrac
Class<? extends ConfiguredObject> objClass = getConfiguredClass();
if (_hierarchy.length > 1)
{
- List<ConfiguredObject> parents = findAllObjectParents(names);
+
+ List<ConfiguredObject> parents =
+ _objectFinder.findObjectParentsFromPath(names, _hierarchy, getConfiguredClass());
theParent = parents.remove(0);
otherParents = parents.toArray(new ConfiguredObject[parents.size()]);
}
@@ -853,7 +728,9 @@ public class RestServlet extends Abstrac
ConfiguredObject[] otherParents = null;
if (_hierarchy.length > 1)
{
- List<ConfiguredObject> parents = findAllObjectParents(names);
+
+ List<ConfiguredObject> parents =
+ _objectFinder.findObjectParentsFromPath(names, _hierarchy, getConfiguredClass());
theParent = parents.remove(0);
otherParents = parents.toArray(new ConfiguredObject[parents.size()]);
}
@@ -936,68 +813,6 @@ public class RestServlet extends Abstrac
return providedObject;
}
- private List<ConfiguredObject> findAllObjectParents(List<String> names)
- {
- Collection<ConfiguredObject>[] objects = new Collection[_hierarchy.length];
- for (int i = 0; i < _hierarchy.length - 1; i++)
- {
- objects[i] = new HashSet<>();
- if (i == 0)
- {
- for (ConfiguredObject object : getBroker().getChildren(_hierarchy[0]))
- {
- if (object.getName().equals(names.get(0)))
- {
- objects[0].add(object);
- break;
- }
- }
- }
- else
- {
- for (int j = i - 1; j >= 0; j--)
- {
- if (getBroker().getModel().getChildTypes(_hierarchy[j]).contains(_hierarchy[i]))
- {
- for (ConfiguredObject<?> parent : objects[j])
- {
- for (ConfiguredObject<?> object : parent.getChildren(_hierarchy[i]))
- {
- if (object.getName().equals(names.get(i)))
- {
- objects[i].add(object);
- }
- }
- }
- break;
- }
- }
- }
-
- }
- List<ConfiguredObject> parents = new ArrayList<>();
- Class<? extends ConfiguredObject> objClass = getConfiguredClass();
- Collection<Class<? extends ConfiguredObject>> parentClasses =
- getBroker().getModel().getParentTypes(objClass);
- for (int i = _hierarchy.length - 2; i >= 0; i--)
- {
- if (parentClasses.contains(_hierarchy[i]))
- {
- if (objects[i].size() == 1)
- {
- parents.add(objects[i].iterator().next());
- }
- else
- {
- throw new IllegalArgumentException("Cannot deduce parent of class "
- + _hierarchy[i].getSimpleName());
- }
- }
-
- }
- return parents;
- }
-
private Map<String, Object> getRequestProvidedObject(HttpServletRequest request, final RequestInfo requestInfo)
throws IOException, ServletException
{
Modified: qpid/java/trunk/broker/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker/pom.xml?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/broker/pom.xml (original)
+++ qpid/java/trunk/broker/pom.xml Sun Nov 20 16:42:57 2016
@@ -142,6 +142,13 @@
<dependency>
<groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-management-amqp</artifactId>
+ <version>${project.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-memory-store</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java Sun Nov 20 16:42:57 2016
@@ -2041,4 +2041,9 @@ public class AMQConnection extends Close
}
return false;
}
+
+ boolean isVirtualHostPropertiesSupported()
+ {
+ return getDelegate().isVirtualHostPropertiesSupported();
+ }
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java Sun Nov 20 16:42:57 2016
@@ -33,7 +33,6 @@ import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-import org.apache.qpid.jndi.ObjectFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +42,7 @@ import org.apache.qpid.client.messaging.
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jndi.ObjectFactory;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -429,15 +429,22 @@ public abstract class AMQDestination imp
public AMQShortString getEncodedName()
{
- if(_urlAsShortString == null)
+ if(getDestSyntax() == DestSyntax.BURL)
{
- if (_url == null)
+ if (_urlAsShortString == null)
{
- toURL();
+ if (_url == null)
+ {
+ toURL();
+ }
+ _urlAsShortString = new AMQShortString(_url);
}
- _urlAsShortString = new AMQShortString(_url);
+ return _urlAsShortString;
+ }
+ else
+ {
+ return AMQShortString.valueOf(getName());
}
- return _urlAsShortString;
}
public boolean isDurable()
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Sun Nov 20 16:42:57 2016
@@ -23,7 +23,16 @@ package org.apache.qpid.client;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.text.MessageFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
@@ -41,17 +50,16 @@ import java.util.concurrent.locks.Reentr
import javax.jms.*;
import javax.jms.IllegalStateException;
-import javax.jms.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.QpidException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
@@ -70,6 +78,7 @@ import org.apache.qpid.client.message.JM
import org.apache.qpid.client.message.MessageEncryptionHelper;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.client.util.JMSExceptionHelper;
@@ -206,7 +215,7 @@ public abstract class AMQSession<C exten
*/
private int _nextTag = 1;
- private final Map<Integer,C> _consumers = new ConcurrentHashMap<>();
+ private final Map<String,C> _consumers = new ConcurrentHashMap<>();
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
private ConcurrentMap<Destination, AtomicInteger> _destinationConsumerCount =
@@ -307,7 +316,7 @@ public abstract class AMQSession<C exten
*/
protected Collection<C> getConsumers()
{
- return new ArrayList(_consumers.values());
+ return new ArrayList<>(_consumers.values());
}
protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup)
@@ -924,7 +933,7 @@ public abstract class AMQSession<C exten
- public void confirmConsumerCancelled(int consumerTag)
+ public void confirmConsumerCancelled(String consumerTag)
{
C consumer = _consumers.get(consumerTag);
if (consumer != null)
@@ -2691,11 +2700,20 @@ public abstract class AMQSession<C exten
*/
private void consumeFromQueue(C consumer, String queueName, boolean nowait) throws QpidException, FailoverException
{
- int tagId = _nextTag++;
+ Link link = consumer.getDestination().getLink();
+ String linkName;
+ if(link != null && link.getName() != null && consumer.getDestination().getAddressType() == AMQDestination.QUEUE_TYPE)
+ {
+ linkName = link.getName();
+ }
+ else
+ {
+ linkName = String.valueOf(_nextTag++);
+ }
- consumer.setConsumerTag(tagId);
+ consumer.setConsumerTag(linkName);
// we must register the consumer in the map before we actually start listening
- _consumers.put(tagId, consumer);
+ _consumers.put(linkName, consumer);
synchronized (consumer.getDestination())
{
@@ -2706,12 +2724,12 @@ public abstract class AMQSession<C exten
try
{
- sendConsume(consumer, queueName, nowait, tagId);
+ sendConsume(consumer, queueName, nowait);
}
catch (QpidException e)
{
// clean-up the map in the event of an error
- _consumers.remove(tagId);
+ _consumers.remove(linkName);
throw e;
}
}
@@ -2765,7 +2783,7 @@ public abstract class AMQSession<C exten
throws QpidException;
public abstract void sendConsume(C consumer, String queueName,
- boolean nowait, int tag) throws QpidException, FailoverException;
+ boolean nowait) throws QpidException, FailoverException;
private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
throws JMSException
@@ -3152,7 +3170,7 @@ public abstract class AMQSession<C exten
_producers.put(producerId, producer);
}
- private void rejectMessagesForConsumerTag(int consumerTag)
+ private void rejectMessagesForConsumerTag(String consumerTag)
{
Iterator<Dispatchable> messages = _queue.iterator();
if (_logger.isDebugEnabled())
@@ -3172,7 +3190,7 @@ public abstract class AMQSession<C exten
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if (message.getConsumerTag() == consumerTag)
+ if (message.getConsumerTag().equals(consumerTag))
{
if (_queue.remove(message))
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Sun Nov 20 16:42:57 2016
@@ -611,7 +611,7 @@ public class AMQSession_0_10 extends AMQ
* Registers the consumer with the broker
*/
public void sendConsume(BasicMessageConsumer_0_10 consumer, String queueName,
- boolean nowait, int tag)
+ boolean nowait)
throws QpidException, FailoverException
{
queueName = preprocessAddressTopic(consumer, queueName);
@@ -631,13 +631,13 @@ public class AMQSession_0_10 extends AMQ
boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
String queue = queueName == null ? destination.getAddressName() : queueName;
+ String consumerTag = consumer.getConsumerTag();
getQpidSession().messageSubscribe
- (queue, String.valueOf(tag),
+ (queue, consumerTag,
acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
- String consumerTag = (consumer).getConsumerTagString();
if (capacity == 0)
{
@@ -814,7 +814,7 @@ public class AMQSession_0_10 extends AMQ
{
for (BasicMessageConsumer consumer : getConsumers())
{
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+ getQpidSession().messageStop(consumer.getConsumerTag(),
Option.UNRELIABLE);
}
sync();
@@ -823,7 +823,7 @@ public class AMQSession_0_10 extends AMQ
{
for (BasicMessageConsumer_0_10 consumer : getConsumers())
{
- String consumerTag = String.valueOf(consumer.getConsumerTag());
+ String consumerTag = consumer.getConsumerTag();
//only set if msg list is null
try
{
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Sun Nov 20 16:42:57 2016
@@ -502,8 +502,7 @@ public class AMQSession_0_8 extends AMQS
@Override
public void sendConsume(BasicMessageConsumer_0_8 consumer,
String queueName,
- boolean nowait,
- int tag) throws QpidException, FailoverException
+ boolean nowait) throws QpidException, FailoverException
{
queueName = preprocessAddressTopic(consumer, queueName);
@@ -519,7 +518,7 @@ public class AMQSession_0_8 extends AMQS
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
- String.valueOf(tag),
+ consumer.getConsumerTag(),
consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
@@ -997,7 +996,14 @@ public class AMQSession_0_8 extends AMQS
public void sync() throws QpidException
{
- declareExchange("amq.direct", "direct", false);
+ if(getAMQConnection().isVirtualHostPropertiesSupported())
+ {
+ isBound(null, "$virtualhostProperties", null);
+ }
+ else
+ {
+ declareExchange("amq.direct", "direct", false);
+ }
}
@Override
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Sun Nov 20 16:42:57 2016
@@ -76,7 +76,7 @@ public abstract class BasicMessageConsum
*/
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
- private int _consumerTag;
+ private String _consumerTag;
private final int _channelId;
@@ -855,12 +855,12 @@ public abstract class BasicMessageConsum
}
/** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
- public int getConsumerTag()
+ public String getConsumerTag()
{
return _consumerTag;
}
- public void setConsumerTag(int consumerTag)
+ public void setConsumerTag(String consumerTag)
{
_consumerTag = consumerTag;
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Sun Nov 20 16:42:57 2016
@@ -74,8 +74,7 @@ public class BasicMessageConsumer_0_10 e
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
- private String _consumerTagString;
-
+
private final long _capacity;
/** Flag indicating if the server supports message selectors */
@@ -110,17 +109,6 @@ public class BasicMessageConsumer_0_10 e
}
}
- @Override public void setConsumerTag(int consumerTag)
- {
- super.setConsumerTag(consumerTag);
- _consumerTagString = String.valueOf(consumerTag);
- }
-
- public String getConsumerTagString()
- {
- return _consumerTagString;
- }
-
/**
*
* This is invoked by the session thread when emptying the session message queue.
@@ -165,7 +153,7 @@ public class BasicMessageConsumer_0_10 e
*/
@Override void sendCancel() throws QpidException
{
- _0_10session.getQpidSession().messageCancel(getConsumerTagString());
+ _0_10session.getQpidSession().messageCancel(getConsumerTag());
postSubscription();
try
{
@@ -337,7 +325,7 @@ public class BasicMessageConsumer_0_10 e
private void messageFlow()
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTag(),
MessageCreditUnit.MESSAGE, 1,
Option.UNRELIABLE);
}
@@ -398,17 +386,17 @@ public class BasicMessageConsumer_0_10 e
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())
{
-
+
_0_10session.getQpidSession().messageFlush
- (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
+ (getConsumerTag(), Option.UNRELIABLE, Option.SYNC);
_0_10session.getQpidSession().messageFlow
- (getConsumerTagString(), MessageCreditUnit.BYTE,
+ (getConsumerTag(), MessageCreditUnit.BYTE,
0xFFFFFFFF, Option.UNRELIABLE);
if (_capacity > 0)
{
_0_10session.getQpidSession().messageFlow
- (getConsumerTagString(),
+ (getConsumerTag(),
MessageCreditUnit.MESSAGE,
_capacity,
Option.UNRELIABLE);
@@ -559,7 +547,7 @@ public class BasicMessageConsumer_0_10 e
if (capacity == 0 && getMessageListener() == null)
{
- session.getQpidSession().messageFlow(getConsumerTagString(),
+ session.getQpidSession().messageFlow(getConsumerTag(),
MessageCreditUnit.MESSAGE, 1,
Option.UNRELIABLE);
@@ -571,7 +559,7 @@ public class BasicMessageConsumer_0_10 e
if (message == null && capacity == 0 && getMessageListener() == null)
{
- session.getQpidSession().messageFlow(getConsumerTagString(),
+ session.getQpidSession().messageFlow(getConsumerTag(),
MessageCreditUnit.MESSAGE, 0,
Option.UNRELIABLE);
session.sync();
@@ -596,7 +584,7 @@ public class BasicMessageConsumer_0_10 e
if (capacity == 0 && getMessageListener() == null)
{
- session.getQpidSession().messageFlow(getConsumerTagString(),
+ session.getQpidSession().messageFlow(getConsumerTag(),
MessageCreditUnit.MESSAGE, 1,
Option.UNRELIABLE);
@@ -605,7 +593,7 @@ public class BasicMessageConsumer_0_10 e
Message message = super.receiveNoWait();
if (message == null && capacity == 0 && getMessageListener() == null)
{
- session.getQpidSession().messageFlow(getConsumerTagString(),
+ session.getQpidSession().messageFlow(getConsumerTag(),
MessageCreditUnit.MESSAGE, 0,
Option.UNRELIABLE);
session.sync();
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Sun Nov 20 16:42:57 2016
@@ -117,7 +117,7 @@ public class BasicMessageConsumer_0_8 ex
void sendCancel() throws QpidException, FailoverException
{
- BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false);
+ BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(getConsumerTag()), false);
final AMQFrame cancelFrame = body.generateFrame(getChannelId());
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Sun Nov 20 16:42:57 2016
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client.handler;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,12 +30,15 @@ import org.apache.qpid.QpidException;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicDeliverBody;
public class BasicDeliverMethodHandler implements StateAwareMethodListener<BasicDeliverBody>
{
private static final Logger _logger = LoggerFactory.getLogger(BasicDeliverMethodHandler.class);
+ private static final ConcurrentMap<AMQShortString,String> CONSUMER_TAG_MAP = new ConcurrentHashMap<>();
+
private static final BasicDeliverMethodHandler _instance = new BasicDeliverMethodHandler();
public static BasicDeliverMethodHandler getInstance()
@@ -45,7 +51,7 @@ public class BasicDeliverMethodHandler i
{
final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
body.getDeliveryTag(),
- body.getConsumerTag().toIntValue(),
+ getTagAsStringTag(body),
body.getExchange(),
body.getRoutingKey(),
body.getRedelivered());
@@ -55,4 +61,16 @@ public class BasicDeliverMethodHandler i
}
session.unprocessedMessageReceived(channelId, msg);
}
+
+ private static String getTagAsStringTag(final BasicDeliverBody body)
+ {
+ AMQShortString consumerTag = body.getConsumerTag();
+ String tag = CONSUMER_TAG_MAP.get(consumerTag);
+ if(tag == null)
+ {
+ tag = consumerTag.toString();
+ CONSUMER_TAG_MAP.putIfAbsent(consumerTag, tag);
+ }
+ return tag;
+ }
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java Sun Nov 20 16:42:57 2016
@@ -30,7 +30,7 @@ public class ReturnMessage extends Unpro
public ReturnMessage(AMQShortString exchange, AMQShortString routingKey, AMQShortString replyText, int replyCode)
{
- super(-1,0,exchange,routingKey,false);
+ super(-1,"",exchange,routingKey,false);
_replyText = replyText;
_replyCode = replyCode;
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Sun Nov 20 16:42:57 2016
@@ -32,10 +32,10 @@ import org.apache.qpid.client.AMQSession
*/
public abstract class UnprocessedMessage implements AMQSession.Dispatchable
{
- private final int _consumerTag;
+ private final String _consumerTag;
- public UnprocessedMessage(int consumerTag)
+ public UnprocessedMessage(String consumerTag)
{
_consumerTag = consumerTag;
}
@@ -44,7 +44,7 @@ public abstract class UnprocessedMessage
abstract public long getDeliveryTag();
- public int getConsumerTag()
+ public String getConsumerTag()
{
return _consumerTag;
}
@@ -54,4 +54,4 @@ public abstract class UnprocessedMessage
ssn.dispatch(this);
}
-}
\ No newline at end of file
+}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Sun Nov 20 16:42:57 2016
@@ -35,7 +35,7 @@ public class UnprocessedMessage_0_10 ext
public UnprocessedMessage_0_10(MessageTransfer xfr)
{
- super(Integer.parseInt(xfr.getDestination()));
+ super(xfr.getDestination());
_transfer = xfr;
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Sun Nov 20 16:42:57 2016
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -32,6 +27,11 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
* the content body/ies.
@@ -56,7 +56,7 @@ public class UnprocessedMessage_0_8 exte
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
- public UnprocessedMessage_0_8(long deliveryId, int consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
+ public UnprocessedMessage_0_8(long deliveryId, String consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
{
super(consumerTag);
_exchange = exchange;
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Sun Nov 20 16:42:57 2016
@@ -399,7 +399,7 @@ public class AMQProtocolSession implemen
{
final AMQSession session = getSession(channelId);
- session.confirmConsumerCancelled(consumerTag.toIntValue());
+ session.confirmConsumerCancelled(consumerTag.toString());
}
public void setProtocolVersion(final ProtocolVersion pv)
Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Sun Nov 20 16:42:57 2016
@@ -287,7 +287,7 @@ public class AMQSession_0_10Test extends
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, null, false, true);
- session.sendConsume(consumer, "test", true, 1);
+ session.sendConsume(consumer, "test", true);
}
catch (Exception e)
{
@@ -390,6 +390,7 @@ public class AMQSession_0_10Test extends
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, null, false, true);
+ consumer.setConsumerTag("");
consumer.close();
}
catch (Exception e)
@@ -480,7 +481,7 @@ public class AMQSession_0_10Test extends
UnprocessedMessage[] messages = new UnprocessedMessage[4];
for (int i =0; i< messages.length;i++ )
{
- int consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
+ String consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
int deliveryTag = i + 1;
messages[i]= createMockMessage(deliveryTag, consumerTag);
session.messageReceived(messages[i]);
@@ -515,7 +516,7 @@ public class AMQSession_0_10Test extends
UnprocessedMessage[] messages = new UnprocessedMessage[4];
for (int i =0; i< messages.length;i++ )
{
- int consumerTag = i % 2;
+ String consumerTag = String.valueOf(i % 2);
int deliveryTag = i + 1;
messages[i]= createMockMessage(deliveryTag, consumerTag);
session.messageReceived(messages[i]);
@@ -540,7 +541,7 @@ public class AMQSession_0_10Test extends
}
}
- private UnprocessedMessage createMockMessage(long deliveryTag, int consumerTag)
+ private UnprocessedMessage createMockMessage(long deliveryTag, String consumerTag)
{
UnprocessedMessage message = mock(UnprocessedMessage.class);
when(message.getConsumerTag()).thenReturn(consumerTag);
Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java Sun Nov 20 16:42:57 2016
@@ -116,7 +116,7 @@ public class AMQSession_0_8Test extends
UnprocessedMessage[] messages = new UnprocessedMessage[4];
for (int i =0; i< messages.length;i++ )
{
- int consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
+ String consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
int deliveryTag = i + 1;
messages[i]= createMockMessage(deliveryTag, consumerTag);
session.messageReceived(messages[i]);
@@ -153,7 +153,7 @@ public class AMQSession_0_8Test extends
assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
}
- private UnprocessedMessage createMockMessage(long deliveryTag, int consumerTag)
+ private UnprocessedMessage createMockMessage(long deliveryTag, String consumerTag)
{
UnprocessedMessage message = mock(UnprocessedMessage.class);
when(message.getConsumerTag()).thenReturn(consumerTag);
Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Sun Nov 20 16:42:57 2016
@@ -281,6 +281,19 @@ public class QpidBrokerTestCase extends
return getConnection(curl);
}
+ public Connection getConnectionForVHost(String vhost)
+ throws URLSyntaxException, NamingException, JMSException
+ {
+ ConnectionURL curl = new AMQConnectionURL(getConnectionFactory().getConnectionURLString());
+ curl.setVirtualHost(vhost);
+ curl = new AMQConnectionURL(curl.toString());
+
+ curl.setUsername(GUEST_USERNAME);
+ curl.setPassword(GUEST_PASSWORD);
+ return getConnection(curl);
+ }
+
+
public Connection getConnection(ConnectionURL url) throws JMSException
{
_logger.debug("get connection for " + url.getURL());
Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java?rev=1770576&view=auto
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java (added)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java Sun Nov 20 16:42:57 2016
@@ -0,0 +1,639 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.management.amqp;
+
+import static org.apache.qpid.server.model.Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.queue.PriorityQueue;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class AmqpManagementTest extends QpidBrokerTestCase
+{
+ private Connection _connection;
+ private Session _session;
+ private Queue _queue;
+ private Queue _replyAddress;
+ private Queue _replyConsumer;
+ private MessageConsumer _consumer;
+ private MessageProducer _producer;
+
+ private void setupSession() throws Exception
+ {
+ _connection.start();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _queue = _session.createQueue("ADDR:$management");
+ _replyAddress = _session.createQueue("ADDR:!response");
+ _replyConsumer = _session.createQueue(
+ "ADDR:$management ; {assert : never, node: { type: queue }, link:{name: \"!response\"}}");
+ _consumer = _session.createConsumer(_replyConsumer);
+ _producer = _session.createProducer(_queue);
+ }
+
+ private void setupBrokerManagementConnection() throws Exception
+ {
+ AMQConnectionFactory management = getConnectionFactory("management");
+ _connection = management.createConnection(GUEST_USERNAME, GUEST_PASSWORD);
+ setupSession();
+ }
+
+ private void setupVirtualHostManagementConnection() throws Exception
+ {
+ _connection = getConnection();
+ setupSession();
+ }
+
+ // test get types on $management
+ public void testGetTypesOnBrokerManagement() throws Exception
+ {
+ setupBrokerManagementConnection();
+
+ Message message = _session.createBytesMessage();
+
+ message.setStringProperty("identity", "self");
+ message.setStringProperty("type", "org.amqp.management");
+ message.setStringProperty("operation", "GET-TYPES");
+
+ message.setJMSReplyTo(_replyAddress);
+
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertEquals("The correlation id does not match the sent message's messageId", message.getJMSMessageID(), responseMessage.getJMSCorrelationID());
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+ assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+ assertNotNull("The response did not include the org.amqp.Management type",
+ ((MapMessage) responseMessage).getObject("org.amqp.management"));
+ assertNotNull("The response did not include the org.apache.qpid.Port type",
+ ((MapMessage) responseMessage).getObject("org.apache.qpid.Port"));
+ }
+
+ // test get types on a virtual host
+ public void testGetTypesOnVhostManagement() throws Exception
+ {
+ setupVirtualHostManagementConnection();
+
+ Message message = _session.createBytesMessage();
+
+ message.setStringProperty("identity", "self");
+ message.setStringProperty("type", "org.amqp.management");
+ message.setStringProperty("operation", "GET-TYPES");
+ byte[] correlationID = "some correlation id".getBytes();
+ message.setJMSCorrelationIDAsBytes(correlationID);
+
+ message.setJMSReplyTo(_replyAddress);
+
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The correlation id does not match the sent message's correlationId", Arrays.equals(correlationID, responseMessage.getJMSCorrelationIDAsBytes()));
+
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+ assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+ assertNotNull("The response did not include the org.amqp.Management type",
+ ((MapMessage) responseMessage).getObject("org.amqp.management"));
+ assertNull("The response included the org.apache.qpid.Port type",
+ ((MapMessage) responseMessage).getObject("org.apache.qpid.Port"));
+
+
+
+ }
+
+ // create / update / read / delete a queue via $management
+ public void testCreateQueueOnBrokerManagement() throws Exception
+ {
+ setupBrokerManagementConnection();
+
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", getTestName());
+ message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 100L);
+ String path = "test/test/" + getTestName();
+ message.setString("object-path", path);
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 201, responseMessage.getIntProperty("statusCode"));
+ assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+ assertEquals("The created queue was not a standard queue", "org.apache.qpid.StandardQueue", ((MapMessage)responseMessage).getString("type"));
+ assertEquals("The created queue was not a standard queue", "standard", ((MapMessage)responseMessage).getString("qpid-type"));
+ assertEquals("the created queue did not have the correct alerting threshold", 100L, ((MapMessage)responseMessage).getLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
+ Object identity = ((MapMessage) responseMessage).getObject("identity");
+
+ message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "UPDATE");
+ message.setObjectProperty("identity", identity);
+ message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 250L);
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+ assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+ assertEquals("the created queue did not have the correct alerting threshold", 250L, ((MapMessage)responseMessage).getLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
+
+ message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "DELETE");
+ message.setObjectProperty("index", "object-path");
+ message.setObjectProperty("key", path);
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 204, responseMessage.getIntProperty("statusCode"));
+
+ message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "READ");
+ message.setObjectProperty("identity", identity);
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate not found", 404, responseMessage.getIntProperty("statusCode"));
+
+ }
+ // create / update / read / delete a queue via vhost
+
+ public void testCreateQueueOnVhostManagement() throws Exception
+ {
+ setupVirtualHostManagementConnection();
+
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", getTestName());
+ message.setInt(PriorityQueue.PRIORITIES, 13);
+ String path = getTestName();
+ message.setString("object-path", path);
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 201, responseMessage.getIntProperty("statusCode"));
+ assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+ assertEquals("The created queue was not a priority queue", "org.apache.qpid.PriorityQueue", ((MapMessage)responseMessage).getString("type"));
+ assertEquals("The created queue was not a standard queue", "priority", ((MapMessage)responseMessage).getString("qpid-type"));
+ assertEquals("the created queue did not have the correct number of priorities", 13, ((MapMessage)responseMessage).getInt(PriorityQueue.PRIORITIES));
+ Object identity = ((MapMessage) responseMessage).getObject("identity");
+
+ // Trying to create a second queue with the same name should cause a conflict
+ message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", getTestName());
+ message.setInt(PriorityQueue.PRIORITIES, 7);
+ message.setString("object-path", getTestName());
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate conflict", 409, responseMessage.getIntProperty("statusCode"));
+
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "READ");
+ message.setObjectProperty("identity", identity);
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+ assertEquals("the queue did not have the correct number of priorities", 13, ((MapMessage)responseMessage).getInt(PriorityQueue.PRIORITIES));
+ assertEquals("the queue did not have the expected path", getTestName(), ((MapMessage)responseMessage).getString("object-path"));
+
+
+ message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "UPDATE");
+ message.setObjectProperty("identity", identity);
+ message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 250L);
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+ assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+ assertEquals("The updated queue did not have the correct alerting threshold", 250L, ((MapMessage)responseMessage).getLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
+
+
+ message = _session.createMapMessage();
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "DELETE");
+ message.setObjectProperty("index", "object-path");
+ message.setObjectProperty("key", path);
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 204, responseMessage.getIntProperty("statusCode"));
+
+ message = _session.createMapMessage();
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "DELETE");
+ message.setObjectProperty("index", "object-path");
+ message.setObjectProperty("key", path);
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate not found", 404, responseMessage.getIntProperty("statusCode"));
+ }
+
+ // read virtual host from virtual host management
+ public void testReadVirtualHost() throws Exception
+ {
+ setupVirtualHostManagementConnection();
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.VirtualHost");
+ message.setStringProperty("operation", "READ");
+ message.setStringProperty("index", "object-path");
+ message.setStringProperty("key", "");
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("Incorrect response code", 200, responseMessage.getIntProperty("statusCode"));
+ assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+ assertEquals("The name of the virtual host is not as expected", "test", ((MapMessage)responseMessage).getString("name"));
+ }
+
+ // create a virtual host from $management
+ public void testCreateVirtualHost() throws Exception
+ {
+ setupBrokerManagementConnection();
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.JsonVirtualHostNode");
+ message.setStringProperty("operation", "CREATE");
+ String virtualHostName = "newMemoryVirtualHost";
+ message.setString("name", virtualHostName);
+ message.setString(VirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, "{ \"type\" : \"Memory\" }");
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("Incorrect response code", 201, responseMessage.getIntProperty("statusCode"));
+ _connection.close();
+ _connection = getConnectionForVHost("/"+virtualHostName);
+ setupSession();
+
+ message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.VirtualHost");
+ message.setStringProperty("operation", "READ");
+ message.setStringProperty("index", "object-path");
+ message.setStringProperty("key", "");
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("Incorrect response code", 200, responseMessage.getIntProperty("statusCode"));
+ assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+ assertEquals("The name of the virtual host is not as expected", virtualHostName, ((MapMessage)responseMessage).getString("name"));
+ assertEquals("The type of the virtual host is not as expected", "Memory", ((MapMessage)responseMessage).getString("qpid-type"));
+
+
+ }
+ // attempt to delete the virtual host via the virtual host
+ public void testDeleteVirtualHost() throws Exception
+ {
+ setupVirtualHostManagementConnection();
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.VirtualHost");
+ message.setStringProperty("operation", "DELETE");
+ message.setStringProperty("index", "object-path");
+ message.setStringProperty("key", "");
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("Incorrect response code", 501, responseMessage.getIntProperty("statusCode"));
+ }
+
+ // create a queue with the qpid type
+ public void testCreateQueueWithQpidType() throws Exception
+ {
+ setupVirtualHostManagementConnection();
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", getTestName());
+ message.setString("qpid-type", "lvq");
+ String path = getTestName();
+ message.setString("object-path", path);
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("Incorrect response code", 201, responseMessage.getIntProperty("statusCode"));
+ assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+ assertEquals("The created queue did not have the correct type", "org.apache.qpid.LastValueQueue", ((MapMessage)responseMessage).getString("type"));
+ }
+
+ // create a queue using the AMQP type
+ public void testCreateQueueWithAmqpType() throws Exception
+ {
+ setupVirtualHostManagementConnection();
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.SortedQueue");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", getTestName());
+ String path = getTestName();
+ message.setString("object-path", path);
+ message.setString("sortKey", "foo");
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("Incorrect response code", 201, responseMessage.getIntProperty("statusCode"));
+ assertTrue("The response was not a MapMessage", responseMessage instanceof MapMessage);
+ assertEquals("The created queue did not have the correct type", "sorted", ((MapMessage)responseMessage).getString("qpid-type"));
+ }
+
+ // attempt to create an exchange without a type
+ public void testCreateExchangeWithoutType() throws Exception
+ {
+ setupVirtualHostManagementConnection();
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Exchange");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", getTestName());
+ String path = getTestName();
+ message.setString("object-path", path);
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("Incorrect response code", 400, responseMessage.getIntProperty("statusCode"));
+ }
+
+
+
+ // attempt to create a connection
+ public void testCreateConnectionOnVhostManagement() throws Exception
+ {
+ setupVirtualHostManagementConnection();
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Connection");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", getTestName());
+ String path = getTestName();
+ message.setString("object-path", path);
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate not implemented", 501, responseMessage.getIntProperty("statusCode"));
+ }
+
+ public void testCreateConnectionOnBrokerManagement() throws Exception
+ {
+ setupBrokerManagementConnection();
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Connection");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", getTestName());
+ String path = getTestName();
+ message.setString("object-path", path);
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate not implemented", 501, responseMessage.getIntProperty("statusCode"));
+ }
+
+ // create a binding
+ public void testCreateBindingOnVhostManagement() throws Exception
+ {
+ setupVirtualHostManagementConnection();
+ String exchangeName = getTestName() + "_Exchange";
+ String queueName = getTestName() + "_Queue";
+ String exchangePath = exchangeName;
+ String queuePath = queueName;
+
+ doTestCreateBinding(exchangeName, queueName, exchangePath, queuePath);
+
+ }
+
+ public void testCreateBindingOnBrokerManagement() throws Exception
+ {
+ setupBrokerManagementConnection();
+ String exchangeName = getTestName() + "_Exchange";
+ String queueName = getTestName() + "_Queue";
+ String exchangePath = "test/test/"+exchangeName;
+ String queuePath = "test/test/"+exchangeName;
+
+ doTestCreateBinding(exchangeName, queueName, exchangePath, queuePath);
+
+ }
+
+ private void doTestCreateBinding(final String exchangeName,
+ final String queueName,
+ final String exchangePath,
+ final String queuePath) throws JMSException
+ {
+ MapMessage message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Queue");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", queueName);
+ message.setString("object-path", queuePath);
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ Message responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 201, responseMessage.getIntProperty("statusCode"));
+
+ message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.FanoutExchange");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", exchangeName);
+ message.setString("object-path", exchangePath);
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 201, responseMessage.getIntProperty("statusCode"));
+
+ message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Binding");
+ message.setStringProperty("operation", "CREATE");
+ message.setString("name", "binding1");
+ message.setString("object-path", exchangePath + "/" + queueName + "/binding1");
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 201, responseMessage.getIntProperty("statusCode"));
+
+ // use an operation to bind
+ message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Exchange");
+ message.setStringProperty("operation", "bind");
+ message.setStringProperty("index", "object-path");
+ message.setStringProperty("key", exchangePath);
+ message.setStringProperty("bindingKey", "binding2");
+ message.setStringProperty("queue", queueName);
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+
+ // read the new binding
+ message = _session.createMapMessage();
+
+ message.setStringProperty("type", "org.apache.qpid.Binding");
+ message.setStringProperty("operation", "READ");
+ message.setStringProperty("index", "object-path");
+ message.setStringProperty("key", exchangePath + "/" + queueName + "/binding2");
+
+ message.setJMSReplyTo(_replyAddress);
+ _producer.send(message);
+
+ responseMessage = _consumer.receive(getReceiveTimeout());
+ assertNotNull("A response message was not sent", responseMessage);
+ assertTrue("The response message does not have a status code",
+ Collections.list(responseMessage.getPropertyNames()).contains("statusCode"));
+ assertEquals("The response code did not indicate success", 200, responseMessage.getIntProperty("statusCode"));
+ }
+
+}
Propchange: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/management/amqp/AmqpManagementTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java Sun Nov 20 16:42:57 2016
@@ -217,11 +217,10 @@ public class QueueRestTest extends QpidR
ConfiguredObject.CONTEXT,
ConfiguredObject.DESIRED_STATE);
- assertEquals("Unexpected binding attribute " + Consumer.NAME, "1", consumer.get(Consumer.NAME));
- assertEquals("Unexpected binding attribute " + Consumer.DURABLE, Boolean.FALSE, consumer.get(Consumer.DURABLE));
- assertEquals("Unexpected binding attribute " + Consumer.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END.name(),
+ assertEquals("Unexpected consumer attribute " + Consumer.DURABLE, Boolean.FALSE, consumer.get(Consumer.DURABLE));
+ assertEquals("Unexpected consumer attribute " + Consumer.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END.name(),
consumer.get(Consumer.LIFETIME_POLICY));
- assertEquals("Unexpected binding attribute " + Consumer.DISTRIBUTION_MODE, "MOVE",
+ assertEquals("Unexpected consumer attribute " + Consumer.DISTRIBUTION_MODE, "MOVE",
consumer.get(Consumer.DISTRIBUTION_MODE));
@SuppressWarnings("unchecked")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org