You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/09 18:40:37 UTC

svn commit: r1566328 [3/3] - in /qpid/branches/java-broker-amqp-1-0-management/java: ./ broker-core/src/main/java/org/apache/qpid/server/message/internal/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid...

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun Feb  9 17:40:35 2014
@@ -0,0 +1,1106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.model.AmqpManagement;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.plugin.SystemNodeCreator;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+class ManagementNode implements MessageSource<ManagementNodeConsumer,ManagementNode>, MessageDestination
+{
+
+    private final VirtualHost _virtualHost;
+
+    private final UUID _id;
+
+    private final CopyOnWriteArrayList<ConsumerRegistrationListener<ManagementNode>> _consumerRegistrationListeners =
+            new CopyOnWriteArrayList<ConsumerRegistrationListener<ManagementNode>>();
+
+    private final SystemNodeCreator.SystemNodeRegistry _registry;
+    private final ConfiguredObject _managedObject;
+    private Map<String, ManagementNodeConsumer> _consumers = new ConcurrentHashMap<String, ManagementNodeConsumer>();
+
+    private Map<String,ManagedEntityType> _entityTypes = Collections.synchronizedMap(new LinkedHashMap<String, ManagedEntityType>());
+
+    private Map<ManagedEntityType,Map<String,ConfiguredObject>> _entities = Collections.synchronizedMap(new LinkedHashMap<ManagedEntityType,Map<String,ConfiguredObject>>());
+
+
+    public ManagementNode(final SystemNodeCreator.SystemNodeRegistry registry,
+                          final ConfiguredObject configuredObject)
+    {
+        _virtualHost = registry.getVirtualHost();
+        _registry = registry;
+        final String name = configuredObject.getId() + "$management";
+        _id = UUID.nameUUIDFromBytes(name.getBytes(Charset.defaultCharset()));
+
+
+        _managedObject = configuredObject;
+
+        populateTypeMetaData(configuredObject.getClass(), false);
+
+        configuredObject.addChangeListener(new ModelObjectListener());
+
+        final Class managementClass = getManagementClass(_managedObject.getClass());
+        _entities.get(_entityTypes.get(managementClass.getName())).put(_managedObject.getName(), _managedObject);
+
+        Collection<Class<? extends ConfiguredObject>> childClasses = Model.getInstance().getChildTypes(managementClass);
+        for(Class<? extends ConfiguredObject> childClass : childClasses)
+        {
+            if(getManagementClass(childClass) != null)
+            {
+                for(ConfiguredObject child : _managedObject.getChildren(childClass))
+                {
+                    _entities.get(_entityTypes.get(getManagementClass(childClass).getName())).put(child.getName(), child);
+                }
+            }
+        }
+
+    }
+
+    private Class getManagementClass(Class objectClass)
+    {
+        List<Class> allClasses = new ArrayList<Class>();
+        allClasses.add(objectClass);
+        allClasses.addAll(Arrays.asList(objectClass.getInterfaces()));
+        allClasses.add(objectClass.getSuperclass());
+        for(Class clazz : allClasses)
+        {
+            AmqpManagement annotation = (AmqpManagement) clazz.getAnnotation(AmqpManagement.class);
+            if(annotation != null)
+            {
+                return clazz;
+            }
+        }
+        return null;
+    }
+
+    private boolean populateTypeMetaData(final Class<? extends ConfiguredObject> objectClass, boolean allowCreate)
+    {
+        Class clazz = getManagementClass(objectClass);
+        if( clazz != null)
+        {
+            AmqpManagement annotation = (AmqpManagement) clazz.getAnnotation(AmqpManagement.class);
+            populateTypeMetaData(clazz, annotation);
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    private ManagedEntityType populateTypeMetaData(Class clazz,
+                                                   final AmqpManagement entityType)
+    {
+
+        ManagedEntityType managedEntityType = _entityTypes.get(clazz.getName());
+
+        if(managedEntityType == null)
+        {
+            List<String> opsList = new ArrayList<String>(Arrays.asList(entityType.operations()));
+            if(entityType.creatable())
+            {
+                boolean isCreatableChild = false;
+                for(Class<? extends ConfiguredObject> parentConfig : Model.getInstance().getParentTypes(clazz))
+                {
+                    isCreatableChild = parentConfig.isAssignableFrom(_managedObject.getClass());
+                    if(isCreatableChild)
+                    {
+                        opsList.add("CREATE");
+                        break;
+                    }
+                }
+            }
+            opsList.addAll(Arrays.asList("READ","UPDATE","DELETE"));
+
+            Set<ManagedEntityType> parentSet = new HashSet<ManagedEntityType>();
+
+            List<Class> allClasses = new ArrayList<Class>(Arrays.asList(clazz.getInterfaces()));
+            if(clazz.getSuperclass() != null)
+            {
+                allClasses.add(clazz.getSuperclass());
+            }
+
+            for(Class parentClazz : allClasses)
+            {
+                if(parentClazz.getAnnotation(AmqpManagement.class) != null)
+                {
+                    ManagedEntityType parentType = populateTypeMetaData(parentClazz,
+                                                                        (AmqpManagement) parentClazz.getAnnotation(
+                                                                                AmqpManagement.class)
+                                                                       );
+                    parentSet.add(parentType);
+                    parentSet.addAll(Arrays.asList(parentType.getParents()));
+
+                }
+            }
+            managedEntityType = new ManagedEntityType(clazz.getName(), parentSet.toArray(new ManagedEntityType[parentSet.size()]), entityType.attributes(), opsList.toArray(new String[opsList.size()]));
+            _entityTypes.put(clazz.getName(),managedEntityType);
+            _entities.put(managedEntityType, Collections.synchronizedMap(new LinkedHashMap<String, ConfiguredObject>()));
+
+            if(ConfiguredObject.class.isAssignableFrom(clazz))
+            {
+                Collection<Class<? extends ConfiguredObject>> childTypes = Model.getInstance().getChildTypes(clazz);
+                for(Class<? extends ConfiguredObject> childClass : childTypes)
+                {
+                    populateTypeMetaData(childClass, true);
+                }
+            }
+
+        }
+
+        return managedEntityType;
+
+    }
+
+    @Override
+    public  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
+                    final InstanceProperties instanceProperties,
+                    final ServerTransaction txn,
+                    final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
+    {
+
+        @SuppressWarnings("unchecked")
+        MessageConverter<M, InternalMessage> converter =
+                MessageConverterRegistry.getConverter((Class<M>)message.getClass(), InternalMessage.class);
+
+        final InternalMessage msg = converter.<M>convert(message, _virtualHost);
+
+        if(validateMessage(msg))
+        {
+            txn.addPostTransactionAction(new ServerTransaction.Action()
+            {
+                @Override
+                public void postCommit()
+                {
+                    enqueue(msg, instanceProperties, postEnqueueAction);
+                }
+
+                @Override
+                public void onRollback()
+                {
+
+                }
+            });
+
+            return 1;
+        }
+        else
+        {
+            return 0;
+        }
+    }
+
+    private boolean validateMessage(final ServerMessage message)
+    {
+        AMQMessageHeader header = message.getMessageHeader();
+        return containsStringHeader(header, "type") && containsStringHeader(header, "operation")
+               && (containsStringHeader(header,"name") || containsStringHeader(header,"identity"));
+    }
+
+    private boolean containsStringHeader(final AMQMessageHeader header, String name)
+    {
+        return header.containsHeader(name) && header.getHeader(name) instanceof String;
+    }
+
+    synchronized void enqueue(InternalMessage message, InstanceProperties properties, Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
+    {
+        if(postEnqueueAction != null)
+        {
+            postEnqueueAction.performAction(new ConsumedMessageInstance(message, properties));
+        }
+
+
+
+        String name = (String) message.getMessageHeader().getHeader("name");
+        String id = (String) message.getMessageHeader().getHeader("identity");
+        String type = (String) message.getMessageHeader().getHeader("type");
+
+        InternalMessage response;
+
+        if("self".equals(name) && type.equals("org.amqp.management"))
+        {
+            response = performManagementOperation(message);
+        }
+        else
+        {
+            response = createFailureResponse(message);
+        }
+
+
+
+        for(ManagementNodeConsumer consumer : _consumers.values())
+        {
+            consumer.send(response);
+        }
+
+    }
+
+    private InternalMessage performManagementOperation(final InternalMessage msg)
+    {
+        final InternalMessage responseMessage;
+        final InternalMessageHeader requestHeader = msg.getMessageHeader();
+        final MutableMessageHeader responseHeader = new MutableMessageHeader();
+        responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
+                                            ? requestHeader.getMessageId()
+                                            : requestHeader.getCorrelationId());
+        responseHeader.setMessageId(UUID.randomUUID().toString());
+
+
+        String operation = (String) requestHeader.getHeader("operation");
+        if("GET-TYPES".equals(operation))
+        {
+            responseMessage = performGetTypes(requestHeader, responseHeader);
+        }
+        else if("GET-ATTRIBUTES".equals(operation))
+        {
+            responseMessage = performGetAttributes(requestHeader, responseHeader);
+        }
+        else if("GET-OPERATIONS".equals(operation))
+        {
+            responseMessage = performGetOperations(requestHeader, responseHeader);
+        }
+        else if("QUERY".equals(operation))
+        {
+            responseMessage = performQuery(requestHeader, responseHeader);
+        }
+        else
+        {
+            responseMessage = InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), requestHeader, new byte[0]);
+        }
+        return responseMessage;
+    }
+
+    private InternalMessage performGetTypes(final InternalMessageHeader requestHeader,
+                                            final MutableMessageHeader responseHeader)
+    {
+        final InternalMessage responseMessage;
+        List<String> restriction;
+        if(requestHeader.containsHeader("entityTypes"))
+        {
+            restriction = (List<String>) requestHeader.getHeader("entityTypes");
+        }
+        else
+        {
+            restriction = null;
+        }
+
+        responseHeader.setHeader("statusCode", 200);
+        Map<String,Object> responseMap = new LinkedHashMap<String, Object>();
+        Map<String,ManagedEntityType> entityMapCopy;
+        synchronized (_entityTypes)
+        {
+            entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes);
+        }
+
+        for(ManagedEntityType type : entityMapCopy.values())
+        {
+            if(restriction == null || meetsIndirectRestriction(type,restriction))
+            {
+                final ManagedEntityType[] parents = type.getParents();
+                List<String> parentNames = new ArrayList<String>();
+                if(parents != null)
+                {
+                    for(ManagedEntityType parent : parents)
+                    {
+                        parentNames.add(parent.getName());
+                    }
+                }
+                responseMap.put(type.getName(), parentNames);
+            }
+        }
+        responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap);
+        return responseMessage;
+    }
+
+    private InternalMessage performGetAttributes(final InternalMessageHeader requestHeader,
+                                            final MutableMessageHeader responseHeader)
+    {
+        final InternalMessage responseMessage;
+        List<String> restriction;
+        if(requestHeader.containsHeader("entityTypes"))
+        {
+            restriction = (List<String>) requestHeader.getHeader("entityTypes");
+        }
+        else
+        {
+            restriction = null;
+        }
+
+        responseHeader.setHeader("statusCode", 200);
+        Map<String,Object> responseMap = new LinkedHashMap<String, Object>();
+        Map<String,ManagedEntityType> entityMapCopy;
+        synchronized (_entityTypes)
+        {
+            entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes);
+        }
+
+        for(ManagedEntityType type : entityMapCopy.values())
+        {
+            if(restriction == null || restriction.contains(type.getName()))
+            {
+                responseMap.put(type.getName(), Arrays.asList(type.getAttributes()));
+            }
+        }
+        responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap);
+        return responseMessage;
+    }
+
+
+    private InternalMessage performGetOperations(final InternalMessageHeader requestHeader,
+                                                 final MutableMessageHeader responseHeader)
+    {
+        final InternalMessage responseMessage;
+        List<String> restriction;
+        if(requestHeader.containsHeader("entityTypes"))
+        {
+            restriction = (List<String>) requestHeader.getHeader("entityTypes");
+        }
+        else
+        {
+            restriction = null;
+        }
+
+        responseHeader.setHeader("statusCode", 200);
+        Map<String,Object> responseMap = new LinkedHashMap<String, Object>();
+        Map<String,ManagedEntityType> entityMapCopy;
+        synchronized (_entityTypes)
+        {
+            entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes);
+        }
+
+        for(ManagedEntityType type : entityMapCopy.values())
+        {
+            if(restriction == null || restriction.contains(type.getName()))
+            {
+                responseMap.put(type.getName(), Arrays.asList(type.getOperations()));
+            }
+        }
+        responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap);
+        return responseMessage;
+    }
+
+    private InternalMessage performQuery(final InternalMessageHeader requestHeader,
+                                         final MutableMessageHeader responseHeader)
+    {
+        final InternalMessage responseMessage;
+        List<String> restriction;
+        List<String> attributes;
+        int offset;
+        int count;
+
+        if(requestHeader.containsHeader("entityTypes"))
+        {
+            restriction = (List<String>) requestHeader.getHeader("entityTypes");
+            responseHeader.setHeader("entityTypes", restriction);
+        }
+        else
+        {
+            restriction = new ArrayList<String>(_entityTypes.keySet());
+        }
+
+
+        if(requestHeader.containsHeader("attributes"))
+        {
+            attributes = (List<String>) requestHeader.getHeader("attributes");
+        }
+        else
+        {
+            LinkedHashMap<String,Void> attributeSet = new LinkedHashMap<String, Void>();
+            for(String entityType : restriction)
+            {
+                ManagedEntityType type = _entityTypes.get(entityType);
+                if(type != null)
+                {
+                    for(String attributeName : type.getAttributes())
+                    {
+                        attributeSet.put(attributeName, null);
+                    }
+                }
+            }
+            attributes = new ArrayList<String>(attributeSet.keySet());
+
+        }
+
+        if(requestHeader.containsHeader("offset"))
+        {
+            offset = ((Number) requestHeader.getHeader("offset")).intValue();
+            responseHeader.setHeader("offset",offset);
+        }
+        else
+        {
+            offset = 0;
+        }
+
+        if(requestHeader.containsHeader("count"))
+        {
+            count = ((Number) requestHeader.getHeader("count")).intValue();
+        }
+        else
+        {
+            count = Integer.MAX_VALUE;
+        }
+
+        responseHeader.setHeader("attributes", attributes);
+
+        responseHeader.setHeader("statusCode", 200);
+        List<List<Object>> responseList = new ArrayList<List<Object>>();
+
+        int rowNo = 0;
+        for(String type : restriction)
+        {
+            ManagedEntityType entityType = _entityTypes.get(type);
+            if(entityType != null)
+            {
+                Map<String, ConfiguredObject> entityMap = _entities.get(entityType);
+                if(entityMap  != null)
+                {
+                    List<ConfiguredObject> entities;
+                    synchronized(entityMap)
+                    {
+                        entities = new ArrayList<ConfiguredObject>(entityMap.values());
+                    }
+                    for(ConfiguredObject entity : entities)
+                    {
+                        if(rowNo++ >= offset)
+                        {
+                            Object[] attrValue = new Object[attributes.size()];
+                            int col = 0;
+                            for(String attr : attributes)
+                            {
+                                Object value;
+                                if("type".equals(attr))
+                                {
+                                    value = entityType.getName();
+                                }
+                                else
+                                {
+                                    value = fixValue(entity.getAttribute(attr));
+                                }
+                                attrValue[col++] = value;
+                            }
+                            responseList.add(Arrays.asList(attrValue));
+                        }
+                        if(responseList.size()==count)
+                        {
+                            break;
+                        }
+                    }
+                }
+            }
+
+            if(responseList.size()==count)
+            {
+                break;
+            }
+        }
+        responseHeader.setHeader("count", count);
+        responseMessage = InternalMessage.createListMessage(_virtualHost.getMessageStore(),
+                                                            responseHeader,
+                                                            responseList);
+        return responseMessage;
+    }
+
+    private Object fixValue(final Object value)
+    {
+        Object fixedValue;
+        if(value instanceof Enum)
+        {
+            fixedValue = value.toString();
+        }
+        else if(value instanceof Map)
+        {
+            Map<Object, Object> oldValue = (Map<Object, Object>) value;
+            Map<Object, Object> newValue = new LinkedHashMap<Object, Object>();
+            for(Map.Entry<Object, Object> entry : oldValue.entrySet())
+            {
+                newValue.put(fixValue(entry.getKey()),fixValue(entry.getValue()));
+            }
+            fixedValue = newValue;
+        }
+        else if(value instanceof Collection)
+        {
+            Collection oldValue = (Collection) value;
+            List newValue = new ArrayList(oldValue.size());
+            for(Object o : oldValue)
+            {
+                newValue.add(fixValue(o));
+            }
+            fixedValue = newValue;
+        }
+        else if(value != null && value.getClass().isArray() && !(value instanceof byte[]))
+        {
+            fixedValue = fixValue(Arrays.asList((Object[])value));
+        }
+        else
+        {
+            fixedValue = value;
+        }
+        return fixedValue;
+
+    }
+
+
+    private boolean meetsIndirectRestriction(final ManagedEntityType type, final List<String> restriction)
+    {
+        if(restriction.contains(type.getName()))
+        {
+            return true;
+        }
+        if(type.getParents() != null)
+        {
+            for(ManagedEntityType parent : type.getParents())
+            {
+                if(meetsIndirectRestriction(parent, restriction))
+                {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private InternalMessage createFailureResponse(final InternalMessage msg)
+    {
+        final InternalMessageHeader requestHeader = msg.getMessageHeader();
+        final Map<String,Object> appProps = new HashMap<String, Object>();
+        final Map<Object, Object> body = new HashMap<Object, Object>();
+
+        appProps.put("statusCode",200);
+        body.put("prop1", "wibble");
+
+
+        final InternalMessageHeader header =
+                new InternalMessageHeader(appProps,
+                                          requestHeader.getCorrelationId() == null
+                                                  ? requestHeader.getMessageId()
+                                                  : requestHeader.getCorrelationId(),
+                                          0,
+                                          null,
+                                          null,
+                                          UUID.randomUUID().toString(),
+                                          null,
+                                          null,
+                                          (byte)4,
+                                          System.currentTimeMillis(),
+                                          null,
+                                          null);
+
+
+        return InternalMessage.createMapMessage(_virtualHost.getMessageStore(), header, body);
+    }
+
+    @Override
+    public synchronized <T extends ConsumerTarget> ManagementNodeConsumer addConsumer(final T target,
+                                final FilterManager filters,
+                                final Class<? extends ServerMessage> messageClass,
+                                final String consumerName,
+                                final EnumSet<Consumer.Option> options) throws AMQException
+    {
+
+        final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
+        target.consumerAdded(managementNodeConsumer);
+        _consumers.put(consumerName, managementNodeConsumer);
+        for(ConsumerRegistrationListener<ManagementNode> listener : _consumerRegistrationListeners)
+        {
+            listener.consumerAdded(this, managementNodeConsumer);
+        }
+        return managementNodeConsumer;
+    }
+
+    @Override
+    public synchronized Collection<ManagementNodeConsumer> getConsumers()
+    {
+        return new ArrayList<ManagementNodeConsumer>(_consumers.values());
+    }
+
+    @Override
+    public void addConsumerRegistrationListener(final ConsumerRegistrationListener<ManagementNode> listener)
+    {
+        _consumerRegistrationListeners.add(listener);
+    }
+
+    @Override
+    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
+    {
+        _consumerRegistrationListeners.remove(listener);
+    }
+
+    @Override
+    public AuthorizationHolder getAuthorizationHolder()
+    {
+        return null;
+    }
+
+    @Override
+    public void setAuthorizationHolder(final AuthorizationHolder principalHolder)
+    {
+
+    }
+
+    @Override
+    public void setExclusiveOwningSession(final AMQSessionModel owner)
+    {
+
+    }
+
+    @Override
+    public AMQSessionModel getExclusiveOwningSession()
+    {
+        return null;
+    }
+
+    @Override
+    public boolean isExclusive()
+    {
+        return false;
+    }
+
+    @Override
+    public String getName()
+    {
+        return "$management";
+    }
+
+    @Override
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    @Override
+    public boolean isDurable()
+    {
+        return false;
+    }
+
+    private class ConsumedMessageInstance implements MessageInstance<ConsumedMessageInstance,Consumer>
+    {
+        private final ServerMessage _message;
+        private final InstanceProperties _properties;
+
+        public ConsumedMessageInstance(final ServerMessage message,
+                                       final InstanceProperties properties)
+        {
+            _message = message;
+            _properties = properties;
+        }
+
+        @Override
+        public int getDeliveryCount()
+        {
+            return 0;
+        }
+
+        @Override
+        public void incrementDeliveryCount()
+        {
+
+        }
+
+        @Override
+        public void decrementDeliveryCount()
+        {
+
+        }
+
+        @Override
+        public void addStateChangeListener(final StateChangeListener<? super ConsumedMessageInstance, State> listener)
+        {
+
+        }
+
+        @Override
+        public boolean removeStateChangeListener(final StateChangeListener<? super ConsumedMessageInstance, State> listener)
+        {
+            return false;
+        }
+
+
+        @Override
+        public boolean acquiredByConsumer()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isAcquiredBy(final Consumer consumer)
+        {
+            return false;
+        }
+
+        @Override
+        public void setRedelivered()
+        {
+
+        }
+
+        @Override
+        public boolean isRedelivered()
+        {
+            return false;
+        }
+
+        @Override
+        public Consumer getDeliveredConsumer()
+        {
+            return null;
+        }
+
+        @Override
+        public void reject()
+        {
+
+        }
+
+        @Override
+        public boolean isRejectedBy(final Consumer consumer)
+        {
+            return false;
+        }
+
+        @Override
+        public boolean getDeliveredToConsumer()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean expired() throws AMQException
+        {
+            return false;
+        }
+
+        @Override
+        public boolean acquire(final Consumer sub)
+        {
+            return false;
+        }
+
+        @Override
+        public int getMaximumDeliveryCount()
+        {
+            return 0;
+        }
+
+        @Override
+        public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action,
+                                    final ServerTransaction txn)
+        {
+            return 0;
+        }
+
+
+        @Override
+        public Filterable asFilterable()
+        {
+            return null;
+        }
+
+        @Override
+        public boolean isAvailable()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean acquire()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isAcquired()
+        {
+            return false;
+        }
+
+        @Override
+        public void release()
+        {
+
+        }
+
+        @Override
+        public boolean resend() throws AMQException
+        {
+            return false;
+        }
+
+        @Override
+        public void delete()
+        {
+
+        }
+
+        @Override
+        public boolean isDeleted()
+        {
+            return false;
+        }
+
+        @Override
+        public ServerMessage getMessage()
+        {
+            return _message;
+        }
+
+        @Override
+        public InstanceProperties getInstanceProperties()
+        {
+            return _properties;
+        }
+
+        @Override
+        public TransactionLogResource getOwningResource()
+        {
+            return ManagementNode.this;
+        }
+    }
+
+    private class ModelObjectListener implements ConfigurationChangeListener
+    {
+        @Override
+        public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
+        {
+            if(newState == State.DELETED)
+            {
+                _registry.removeSystemNode(ManagementNode.this);
+            }
+        }
+
+        @Override
+        public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
+        {
+            final ManagedEntityType entityType = _entityTypes.get(getManagementClass(child.getClass()).getName());
+            if(entityType != null)
+            {
+                _entities.get(entityType).put(child.getName(), child);
+            }
+        }
+
+        @Override
+        public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
+        {
+            final ManagedEntityType entityType = _entityTypes.get(getManagementClass(child.getClass()).getName());
+            if(entityType != null)
+            {
+                _entities.get(entityType).remove(child.getName());
+            }
+        }
+
+        @Override
+        public void attributeSet(final ConfiguredObject object,
+                                 final String attributeName,
+                                 final Object oldAttributeValue,
+                                 final Object newAttributeValue)
+        {
+
+        }
+    }
+
+    private static class MutableMessageHeader implements AMQMessageHeader
+    {
+        private final LinkedHashMap<String, Object> _headers = new LinkedHashMap<String, Object>();
+        private String _correlationId;
+        private long _expiration;
+        private String _userId;
+        private String _appId;
+        private String _messageId;
+        private String _mimeType;
+        private String _encoding;
+        private byte _priority;
+        private long _timestamp;
+        private String _type;
+        private String _replyTo;
+
+        public void setCorrelationId(final String correlationId)
+        {
+            _correlationId = correlationId;
+        }
+
+        public void setExpiration(final long expiration)
+        {
+            _expiration = expiration;
+        }
+
+        public void setUserId(final String userId)
+        {
+            _userId = userId;
+        }
+
+        public void setAppId(final String appId)
+        {
+            _appId = appId;
+        }
+
+        public void setMessageId(final String messageId)
+        {
+            _messageId = messageId;
+        }
+
+        public void setMimeType(final String mimeType)
+        {
+            _mimeType = mimeType;
+        }
+
+        public void setEncoding(final String encoding)
+        {
+            _encoding = encoding;
+        }
+
+        public void setPriority(final byte priority)
+        {
+            _priority = priority;
+        }
+
+        public void setTimestamp(final long timestamp)
+        {
+            _timestamp = timestamp;
+        }
+
+        public void setType(final String type)
+        {
+            _type = type;
+        }
+
+        public void setReplyTo(final String replyTo)
+        {
+            _replyTo = replyTo;
+        }
+
+        public String getCorrelationId()
+        {
+            return _correlationId;
+        }
+
+        public long getExpiration()
+        {
+            return _expiration;
+        }
+
+        public String getUserId()
+        {
+            return _userId;
+        }
+
+        public String getAppId()
+        {
+            return _appId;
+        }
+
+        public String getMessageId()
+        {
+            return _messageId;
+        }
+
+        public String getMimeType()
+        {
+            return _mimeType;
+        }
+
+        public String getEncoding()
+        {
+            return _encoding;
+        }
+
+        public byte getPriority()
+        {
+            return _priority;
+        }
+
+        public long getTimestamp()
+        {
+            return _timestamp;
+        }
+
+        public String getType()
+        {
+            return _type;
+        }
+
+        public String getReplyTo()
+        {
+            return _replyTo;
+        }
+
+        @Override
+        public Object getHeader(final String name)
+        {
+            return _headers.get(name);
+        }
+
+        @Override
+        public boolean containsHeaders(final Set<String> names)
+        {
+            return _headers.keySet().containsAll(names);
+        }
+
+        @Override
+        public boolean containsHeader(final String name)
+        {
+            return _headers.containsKey(name);
+        }
+
+        @Override
+        public Collection<String> getHeaderNames()
+        {
+            return Collections.unmodifiableCollection(_headers.keySet());
+        }
+
+        public void setHeader(String header, Object value)
+        {
+            _headers.put(header,value);
+        }
+
+    }
+}

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Sun Feb  9 17:40:35 2014
@@ -0,0 +1,242 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+class ManagementNodeConsumer implements Consumer
+{
+    private final long _id = Consumer.SUB_ID_GENERATOR.getAndIncrement();
+    private final ManagementNode _managementNode;
+    private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
+    private final ConsumerTarget _target;
+    private final Lock _stateChangeLock = new ReentrantLock();
+    private final String _name;
+    private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener();
+
+
+    public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, ConsumerTarget target)
+    {
+        _name = consumerName;
+        _managementNode = managementNode;
+        _target = target;
+        target.setStateListener(_targetChangeListener);
+    }
+
+    @Override
+    public void externalStateChange()
+    {
+
+    }
+
+    @Override
+    public long getBytesOut()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getMessagesOut()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getUnacknowledgedBytes()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getUnacknowledgedMessages()
+    {
+        return 0;
+    }
+
+    @Override
+    public AMQSessionModel getSessionModel()
+    {
+        return _target.getSessionModel();
+    }
+
+    @Override
+    public void setNoLocal(final boolean noLocal)
+    {
+    }
+
+    @Override
+    public long getId()
+    {
+        return _id;
+    }
+
+    @Override
+    public boolean isSuspended()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isClosed()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean acquires()
+    {
+        return true;
+    }
+
+    @Override
+    public boolean seesRequeues()
+    {
+        return false;
+    }
+
+    @Override
+    public void close() throws AMQException
+    {
+
+    }
+
+    @Override
+    public boolean trySendLock()
+    {
+        return _stateChangeLock.tryLock();
+    }
+
+    @Override
+    public void getSendLock()
+    {
+        _stateChangeLock.lock();
+    }
+
+    @Override
+    public void releaseSendLock()
+    {
+        _stateChangeLock.unlock();
+    }
+
+
+    @Override
+    public boolean isActive()
+    {
+        return false;
+    }
+
+    @Override
+    public String getName()
+    {
+        return _name;
+    }
+
+    @Override
+    public void flush() throws AMQException
+    {
+
+    }
+
+    ManagementNode getManagementNode()
+    {
+        return _managementNode;
+    }
+
+    void send(final InternalMessage response)
+    {
+        getSendLock();
+        try
+        {
+            final ManagementResponse responseEntry = new ManagementResponse(this, response);
+            if(_queue.isEmpty() && _target.allocateCredit(response))
+            {
+                _target.send(responseEntry,false);
+            }
+            else
+            {
+                _queue.add(responseEntry);
+            }
+        }
+        catch (AMQException e)
+        {
+            e.printStackTrace();
+        }
+        finally
+        {
+            releaseSendLock();
+        }
+    }
+
+    private class TargetChangeListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State>
+    {
+        @Override
+        public void stateChanged(final ConsumerTarget object,
+                                 final ConsumerTarget.State oldState,
+                                 final ConsumerTarget.State newState)
+        {
+            if(newState == ConsumerTarget.State.ACTIVE)
+            {
+                deliverMessages();
+            }
+        }
+    }
+
+    private void deliverMessages()
+    {
+        getSendLock();
+        try
+        {
+            while(!_queue.isEmpty())
+            {
+
+                final ManagementResponse managementResponse = _queue.get(0);
+                if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
+                {
+                    _queue.remove(0);
+                    _target.send(managementResponse,false);
+                }
+                else
+                {
+                    break;
+                }
+            }
+        }
+        catch (AMQException e)
+        {
+            throw new RuntimeException(e);
+        }
+        finally
+        {
+            releaseSendLock();
+        }
+    }
+}

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java Sun Feb  9 17:40:35 2014
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import org.apache.qpid.server.plugin.SystemNodeCreator;
+
+public class ManagementNodeCreator implements SystemNodeCreator
+{
+    @Override
+    public void register(final SystemNodeRegistry registry)
+    {
+        ManagementNode managementNode = new ManagementNode(registry,registry.getVirtualHostModel());
+        registry.registerSystemNode(managementNode);
+    }
+
+    @Override
+    public String getType()
+    {
+        return "AMQP-VIRTUALHOST-MANAGEMENT";
+    }
+}

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Sun Feb  9 17:40:35 2014
@@ -0,0 +1,220 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
+
+class ManagementResponse implements MessageInstance<ManagementResponse,ManagementNodeConsumer>
+{
+    private final ManagementNodeConsumer _consumer;
+    private int _deliveryCount;
+    private boolean _isRedelivered;
+    private boolean _isDelivered;
+    private boolean _isDeleted;
+    private InternalMessage _message;
+
+    ManagementResponse(final ManagementNodeConsumer consumer, final InternalMessage message)
+    {
+        _consumer = consumer;
+        _message = message;
+    }
+
+    @Override
+    public int getDeliveryCount()
+    {
+        return 0;
+    }
+
+    @Override
+    public void incrementDeliveryCount()
+    {
+        _deliveryCount++;
+    }
+
+    @Override
+    public void decrementDeliveryCount()
+    {
+        _deliveryCount--;
+    }
+
+    @Override
+    public void addStateChangeListener(final StateChangeListener<? super ManagementResponse, State> listener)
+    {
+
+    }
+
+    @Override
+    public boolean removeStateChangeListener(final StateChangeListener<? super ManagementResponse, State> listener)
+    {
+        return false;
+    }
+
+
+    @Override
+    public boolean acquiredByConsumer()
+    {
+        return !isDeleted();
+    }
+
+    @Override
+    public boolean isAcquiredBy(final ManagementNodeConsumer consumer)
+    {
+        return consumer == _consumer && !isDeleted();
+    }
+
+    @Override
+    public void setRedelivered()
+    {
+        _isRedelivered = true;
+    }
+
+    @Override
+    public boolean isRedelivered()
+    {
+        return _isRedelivered;
+    }
+
+    @Override
+    public ManagementNodeConsumer getDeliveredConsumer()
+    {
+        return isDeleted() ? null : _consumer;
+    }
+
+    @Override
+    public void reject()
+    {
+        delete();
+    }
+
+    @Override
+    public boolean isRejectedBy(final ManagementNodeConsumer consumer)
+    {
+        return false;
+    }
+
+    @Override
+    public boolean getDeliveredToConsumer()
+    {
+        return _isDelivered;
+    }
+
+    @Override
+    public boolean expired() throws AMQException
+    {
+        return false;
+    }
+
+    @Override
+    public boolean acquire(final ManagementNodeConsumer sub)
+    {
+        return false;
+    }
+
+    @Override
+    public int getMaximumDeliveryCount()
+    {
+        return 0;
+    }
+
+    @Override
+    public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action,
+                                final ServerTransaction txn)
+    {
+        return 0;
+    }
+
+
+    @Override
+    public Filterable asFilterable()
+    {
+        return null;
+    }
+
+    @Override
+    public boolean isAvailable()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean acquire()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isAcquired()
+    {
+        return !isDeleted();
+    }
+
+    @Override
+    public void release()
+    {
+        delete();
+    }
+
+    @Override
+    public boolean resend() throws AMQException
+    {
+        return false;
+    }
+
+    @Override
+    public void delete()
+    {
+        // TODO
+    }
+
+    @Override
+    public boolean isDeleted()
+    {
+        return _isDeleted;
+    }
+
+    @Override
+    public ServerMessage getMessage()
+    {
+        return _message;
+    }
+
+    @Override
+    public InstanceProperties getInstanceProperties()
+    {
+        return InstanceProperties.EMPTY;
+    }
+
+    @Override
+    public TransactionLogResource getOwningResource()
+    {
+        return _consumer.getManagementNode();
+    }
+}

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator?rev=1566328&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator Sun Feb  9 17:40:35 2014
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.management.amqp.ManagementNodeCreator

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/addQueue.html?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/addQueue.html (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/addQueue.html Sun Feb  9 17:40:35 2014
@@ -35,16 +35,16 @@
                 <tr>
                     <td valign="top"><strong>Queue Type: </strong></td>
                     <td>
-                    <input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" />
+                    <input type="radio" id="formAddQueueTypeStandard" name="queueType" value="standard" checked="checked" dojoType="dijit.form.RadioButton" />
                     <label for="formAddQueueTypeStandard">Standard</label>
                     &nbsp;&nbsp;
-                    <input type="radio" id="formAddQueueTypePriority" name="type" value="priority" dojoType="dijit.form.RadioButton" />
+                    <input type="radio" id="formAddQueueTypePriority" name="queueType" value="priority" dojoType="dijit.form.RadioButton" />
                     <label for="formAddQueueTypePriority">Priority</label>
                     &nbsp;&nbsp;
-                    <input type="radio" id="formAddQueueTypeLVQ" name="type" value="lvq" dojoType="dijit.form.RadioButton" />
+                    <input type="radio" id="formAddQueueTypeLVQ" name="queueType" value="lvq" dojoType="dijit.form.RadioButton" />
                     <label for="formAddQueueTypeLVQ">LVQ</label>
                     &nbsp;&nbsp;
-                    <input type="radio" id="formAddQueueTypeSorted" name="type" value="sorted" dojoType="dijit.form.RadioButton" />
+                    <input type="radio" id="formAddQueueTypeSorted" name="queueType" value="sorted" dojoType="dijit.form.RadioButton" />
                     <label for="formAddQueueTypeSorted">Sorted</label>
                     </td>
                 </tr>

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js Sun Feb  9 17:40:35 2014
@@ -277,7 +277,7 @@ define(["dojo/_base/xhr",
                            "exclusive",
                            "owner",
                            "lifetimePolicy",
-                           "type",
+                           "queueType",
                            "typeQualifier",
                            "alertRepeatGap",
                            "alertRepeatGapUnits",
@@ -359,14 +359,14 @@ define(["dojo/_base/xhr",
                bytesDepth = formatter.formatBytes( this.queueData["unacknowledgedBytes"] );
                this.unacknowledgedBytes.innerHTML = "(" + bytesDepth.value;
                this.unacknowledgedBytesUnits.innerHTML = bytesDepth.units + ")";
-               this.type.innerHTML = entities.encode(this.queueData[ "type" ]);
-               if (this.queueData.type == "standard")
+               this.queueType.innerHTML = entities.encode(this.queueData[ "queueType" ]);
+               if (this.queueData.queueType == "standard")
                {
                    this.typeQualifier.style.display = "none";
                }
                else
                {
-                   this.typeQualifier.innerHTML = entities.encode("(" + queueTypeKeyNames[this.queueData.type] + ": " + this.queueData[queueTypeKeys[this.queueData.type]] + ")");
+                   this.typeQualifier.innerHTML = entities.encode("(" + queueTypeKeyNames[this.queueData.queueType] + ": " + this.queueData[queueTypeKeys[this.queueData.queueType]] + ")");
                }
 
                if(this.queueData["messageGroupKey"])

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js Sun Feb  9 17:40:35 2014
@@ -96,7 +96,7 @@ define(["dojo/_base/xhr",
                             }
                         }
                         else if (!typeSpecificFields.hasOwnProperty(propName) ||
-                                        formValues.type === typeSpecificFields[ propName ]) {
+                                        formValues.queueType === typeSpecificFields[ propName ]) {
                             if(formValues[ propName ] !== "") {
                                 if (fieldConverters.hasOwnProperty(propName))
                                 {
@@ -130,7 +130,7 @@ define(["dojo/_base/xhr",
                             theForm = registry.byId("formAddQueue");
                             array.forEach(theForm.getDescendants(), function(widget)
                                 {
-                                    if(widget.name === "type") {
+                                    if(widget.name === "queueType") {
                                         widget.on("change", function(isChecked) {
 
                                             var objId = widget.id + ":fields";

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/showQueue.html?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/showQueue.html (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/resources/showQueue.html Sun Feb  9 17:40:35 2014
@@ -47,7 +47,7 @@
         <div style="clear:both">
             <div class="formLabel-labelCell" style="float:left; width: 150px;">Type:</div>
             <div style="float:left;">
-                <span class="type"></span>
+                <span class="queueType"></span>
                 <span class="typeQualifier"></span>
             </div>
         </div>

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java Sun Feb  9 17:40:35 2014
@@ -183,7 +183,7 @@ public class QueueMBean extends AMQManag
     @Override
     public String getQueueType()
     {
-        return (String) _queue.getAttribute(Queue.TYPE);
+        return (String) _queue.getAttribute(Queue.QUEUE_TYPE);
     }
 
     public boolean isDurable()

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java Sun Feb  9 17:40:35 2014
@@ -127,7 +127,7 @@ public class QueueMBeanTest extends Qpid
 
     public void testQueueType() throws Exception
     {
-        assertAttribute("queueType", QUEUE_TYPE, Queue.TYPE);
+        assertAttribute("queueType", QUEUE_TYPE, Queue.QUEUE_TYPE);
     }
 
     public void testMaximumDeliveryCount() throws Exception

Modified: qpid/branches/java-broker-amqp-1-0-management/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/build.deps?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/build.deps (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/build.deps Sun Feb  9 17:40:35 2014
@@ -103,6 +103,7 @@ perftests.test.libs=${test.libs}
 qpid-test-utils.libs = ${test.libs} ${geronimo-jms}
 
 broker-plugins-access-control.test.libs=${test.libs}
+broker-plugins-management-amqp.test.libs=${test.libs}
 broker-plugins-management-http.test.libs=${test.libs}
 broker-plugins-management-jmx.test.libs=${test.libs}
 broker-plugins-jdbc-store.test.libs=${test.libs}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Sun Feb  9 17:40:35 2014
@@ -30,8 +30,8 @@ import java.io.IOException;
 public class BasicContentHeaderProperties
 {
     //persistent & non-persistent constants, values as per JMS DeliveryMode
-    public static final int NON_PERSISTENT = 1;
-    public static final int PERSISTENT = 2;
+    public static final byte NON_PERSISTENT = 1;
+    public static final byte PERSISTENT = 2;
 
     private static final Logger _logger = LoggerFactory.getLogger(BasicContentHeaderProperties.class);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/ivy.nexus.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/ivy.nexus.xml?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/ivy.nexus.xml (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/ivy.nexus.xml Sun Feb  9 17:40:35 2014
@@ -51,6 +51,12 @@
     <artifact name="qpid-broker-plugins-access-control" type="jar.asc" ext="jar.asc"/>
     <artifact name="qpid-broker-plugins-access-control" type="source" ext="jar" e:classifier="sources"/>
     <artifact name="qpid-broker-plugins-access-control" type="source.asc" ext="jar.asc" e:classifier="sources"/>
+    <artifact name="qpid-broker-plugins-management-amqp" type="pom" ext="pom"/>
+    <artifact name="qpid-broker-plugins-management-amqp" type="pom.asc" ext="pom.asc"/>
+    <artifact name="qpid-broker-plugins-management-amqp" type="jar" ext="jar"/>
+    <artifact name="qpid-broker-plugins-management-amqp" type="jar.asc" ext="jar.asc"/>
+    <artifact name="qpid-broker-plugins-management-amqp" type="source" ext="jar" e:classifier="sources"/>
+    <artifact name="qpid-broker-plugins-management-amqp" type="source.asc" ext="jar.asc" e:classifier="sources"/>
     <artifact name="qpid-broker-plugins-management-http" type="pom" ext="pom"/>
     <artifact name="qpid-broker-plugins-management-http" type="pom.asc" ext="pom.asc"/>
     <artifact name="qpid-broker-plugins-management-http" type="jar" ext="jar"/>

Modified: qpid/branches/java-broker-amqp-1-0-management/java/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/pom.xml?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/pom.xml (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/pom.xml Sun Feb  9 17:40:35 2014
@@ -173,6 +173,7 @@
     <module>broker-plugins/derby-store</module>
     <module>broker-plugins/jdbc-provider-bone</module>
     <module>broker-plugins/jdbc-store</module>
+    <module>broker-plugins/management-amqp</module>
     <module>broker-plugins/management-http</module>
     <module>broker-plugins/management-jmx</module>
     <module>broker-plugins/memory-store</module>

Modified: qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java Sun Feb  9 17:40:35 2014
@@ -97,7 +97,7 @@ public class Asserts
         assertEquals("Unexpected value of queue attribute " + Queue.STATE, State.ACTIVE.name(), queueData.get(Queue.STATE));
         assertEquals("Unexpected value of queue attribute " + Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name(),
                 queueData.get(Queue.LIFETIME_POLICY));
-        assertEquals("Unexpected value of queue attribute " + Queue.TYPE, queueType, queueData.get(Queue.TYPE));
+        assertEquals("Unexpected value of queue attribute " + Queue.QUEUE_TYPE, queueType, queueData.get(Queue.QUEUE_TYPE));
         if (expectedAttributes == null)
         {
             assertEquals("Unexpected value of queue attribute " + Queue.EXCLUSIVE, Boolean.FALSE, queueData.get(Queue.EXCLUSIVE));

Modified: qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java?rev=1566328&r1=1566327&r2=1566328&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java Sun Feb  9 17:40:35 2014
@@ -37,7 +37,6 @@ import org.apache.qpid.server.model.Exch
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
 import org.apache.qpid.test.utils.TestFileUtils;
 import org.apache.qpid.util.FileUtils;
@@ -527,7 +526,7 @@ public class VirtualHostRestTest extends
         queueData.put(Queue.DURABLE, Boolean.TRUE);
         if (queueType != null)
         {
-            queueData.put(Queue.TYPE, queueType);
+            queueData.put(Queue.QUEUE_TYPE, queueType);
         }
         if (attributes != null)
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org