You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/20 16:42:59 UTC
svn commit: r1770576 [6/8] - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/
broker-codegen/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/binding/ broker-core...
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun Nov 20 16:42:57 2016
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.server.management.amqp;
+import java.io.Serializable;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.lang.reflect.WildcardType;
import java.nio.charset.Charset;
import java.security.AccessControlException;
import java.security.AccessController;
@@ -31,6 +36,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -40,6 +46,7 @@ import java.util.concurrent.CopyOnWriteA
import javax.security.auth.Subject;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -53,11 +60,16 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.message.internal.InternalMessageHeader;
-import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFinder;
+import org.apache.qpid.server.model.ConfiguredObjectOperation;
+import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry;
+import org.apache.qpid.server.model.IntegrityViolationException;
import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.OperationParameter;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
@@ -69,13 +81,16 @@ import org.apache.qpid.server.store.Tran
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
class ManagementNode implements MessageSource, MessageDestination
{
- public static final String NAME_ATTRIBUTE = "name";
public static final String IDENTITY_ATTRIBUTE = "identity";
+ public static final String INDEX_ATTRIBUTE = "index";
+ public static final String KEY_ATTRIBUTE = "key";
+
public static final String TYPE_ATTRIBUTE = "type";
public static final String OPERATION_HEADER = "operation";
public static final String SELF_NODE_NAME = "self";
@@ -86,176 +101,198 @@ class ManagementNode implements MessageS
public static final String QUERY = "QUERY";
public static final String ENTITY_TYPE_HEADER = "entityType";
public static final String STATUS_CODE_HEADER = "statusCode";
- public static final int STATUS_CODE_OK = 200;
- public static final String ATTRIBUTES_HEADER = "attributes";
public static final String OFFSET_HEADER = "offset";
public static final String COUNT_HEADER = "count";
public static final String MANAGEMENT_NODE_NAME = "$management";
- public static final String CREATE_OPERATION = "CREATE";
- public static final String READ_OPERATION = "READ";
- public static final String UPDATE_OPERATION = "UPDATE";
- public static final String DELETE_OPERATION = "DELETE";
public static final String STATUS_DESCRIPTION_HEADER = "statusDescription";
- public static final int NOT_FOUND_STATUS_CODE = 404;
- public static final int NOT_IMPLEMENTED_STATUS_CODE = 501;
+ public static final String ATTRIBUTES_HEADER = "attributes";
+ public static final String ATTRIBUTE_NAMES = "attributeNames";
+ public static final String RESULTS = "results";
+ static final String OBJECT_PATH = "object-path";
+ static final String QPID_TYPE = "qpid-type";
+
+
+ public static final int STATUS_CODE_OK = 200;
+ private static final int STATUS_CODE_CREATED = 201;
public static final int STATUS_CODE_NO_CONTENT = 204;
- public static final int STATUS_CODE_FORBIDDEN = 403;
public static final int STATUS_CODE_BAD_REQUEST = 400;
+ public static final int STATUS_CODE_FORBIDDEN = 403;
+ public static final int STATUS_CODE_NOT_FOUND = 404;
+ public static final int STATUS_CODE_CONFLICT = 409;
public static final int STATUS_CODE_INTERNAL_ERROR = 500;
- public static final String ATTRIBUTE_NAMES = "attributeNames";
- public static final String RESULTS = "results";
+ public static final int STATUS_CODE_NOT_IMPLEMENTED= 501;
private final NamedAddressSpace _addressSpace;
private final UUID _id;
- private final Action<ManagementNode> _onDelete;
private final ConfiguredObject<?> _managedObject;
+ private final Model _model;
+ private final Map<Class<? extends ConfiguredObject>, ConfiguredObjectOperation<?>> _associatedChildrenOperations = new HashMap<>();
+ private final ConfiguredObjectFinder _configuredObjectFinder;
private List<ManagementNodeConsumer> _consumers = new CopyOnWriteArrayList<>();
- private Map<String,ManagedEntityType> _entityTypes = Collections.synchronizedMap(new LinkedHashMap<String, ManagedEntityType>());
-
- private Map<ManagedEntityType,Map<String,ConfiguredObject>> _entities = Collections.synchronizedMap(new LinkedHashMap<ManagedEntityType,Map<String,ConfiguredObject>>());
-
+ private final Set<Class<? extends ConfiguredObject>> _managedCategories = new HashSet<>();
+ private final Map<String, Class<? extends ConfiguredObject>> _managedTypes = new HashMap<>();
+ private final Map<Class<? extends ConfiguredObject>, Map<String, StandardOperation>> _standardOperations = new HashMap<>();
+ private final ManagementOutputConverter _managementOutputConverter;
- public ManagementNode(final NamedAddressSpace addressSpace,
- final ConfiguredObject<?> configuredObject,
- final Action<ManagementNode> onDelete)
+ ManagementNode(final NamedAddressSpace addressSpace,
+ final ConfiguredObject<?> configuredObject)
{
_addressSpace = addressSpace;
- _onDelete = onDelete;
final String name = configuredObject.getId() + MANAGEMENT_NODE_NAME;
_id = UUID.nameUUIDFromBytes(name.getBytes(Charset.defaultCharset()));
-
+ _model = configuredObject.getModel();
_managedObject = configuredObject;
- configuredObject.addChangeListener(new ModelObjectListener());
+ populateMetaData();
+ _managementOutputConverter = new ManagementOutputConverter(this);
+ _configuredObjectFinder = new ConfiguredObjectFinder(configuredObject);
}
- private Class getManagementClass(Class objectClass)
+ ConfiguredObject<?> getManagedObject()
{
+ return _managedObject;
+ }
+
+ boolean isSyntheticChildClass(Class<? extends ConfiguredObject> clazz)
+ {
+ return _associatedChildrenOperations.containsKey(clazz);
+ }
- if(objectClass.getAnnotation(ManagedObject.class)!=null)
+ private void populateMetaData()
+ {
+ populateManagedCategories();
+
+ populateManagedTypes();
+
+ populateStandardOperations();
+ }
+
+ private void populateStandardOperations()
+ {
+ for(Class<? extends ConfiguredObject> type : _managedTypes.values())
{
- return objectClass;
+ HashMap<String, StandardOperation> operationsMap = new HashMap<>();
+ _standardOperations.put(type, operationsMap);
+ operationsMap.put(READ_OPERATION.getName(), READ_OPERATION);
+ operationsMap.put(UPDATE_OPERATION.getName(), UPDATE_OPERATION);
+ if(ConfiguredObjectTypeRegistry.getCategory(type) != _managedObject.getCategoryClass())
+ {
+ operationsMap.put(DELETE_OPERATION.getName(), DELETE_OPERATION);
+ if(type.getAnnotation(ManagedObject.class).creatable())
+ {
+ operationsMap.put(CREATE_OPERATION.getName(), CREATE_OPERATION);
+ }
+ }
}
- List<Class> allClasses = Collections.singletonList(objectClass);
- List<Class> testedClasses = new ArrayList<Class>();
- do
+ }
+
+ private void populateManagedTypes()
+ {
+ for(Class<? extends ConfiguredObject> category : _managedCategories)
{
- testedClasses.addAll( allClasses );
- allClasses = new ArrayList<Class>();
- for(Class c : testedClasses)
+ _managedTypes.put(getAmqpName(category), category);
+ if(category != _managedObject.getCategoryClass())
{
- for(Class i : c.getInterfaces())
+ for (Class<? extends ConfiguredObject> type : _model.getTypeRegistry().getTypeSpecialisations(category))
{
- if(!allClasses.contains(i))
+ if (type.getAnnotation(ManagedObject.class) != null)
{
- allClasses.add(i);
+ _managedTypes.put(getAmqpName(type), type);
}
}
- if(c.getSuperclass() != null && !allClasses.contains(c.getSuperclass()))
- {
- allClasses.add(c.getSuperclass());
- }
}
- allClasses.removeAll(testedClasses);
- for(Class c : allClasses)
+ else if(_managedObject.getTypeClass() != _managedObject.getCategoryClass())
{
- if(c.getAnnotation(ManagedObject.class) != null)
- {
- return c;
- }
+ _managedTypes.put(getAmqpName(_managedObject.getTypeClass()), _managedObject.getTypeClass());
}
}
- while(!allClasses.isEmpty());
- return null;
}
- private boolean populateTypeMetaData(final Class<? extends ConfiguredObject> objectClass, boolean allowCreate)
+ private void populateManagedCategories()
{
- Class clazz = getManagementClass(objectClass);
- if( clazz != null)
- {
- ManagedObject annotation = (ManagedObject) clazz.getAnnotation(ManagedObject.class);
- populateTypeMetaData(clazz, annotation);
- return true;
- }
- else
+ Class<? extends ConfiguredObject> managedCategory = _managedObject.getCategoryClass();
+ addManagedCategories(managedCategory);
+
+
+ for(ConfiguredObjectOperation<?> operation : _model.getTypeRegistry().getOperations(managedCategory).values())
{
- return false;
+ if(operation.isAssociateAsIfChildren() && returnsCollectionOfConfiguredObjects(operation))
+ {
+ @SuppressWarnings("unchecked")
+ Class<? extends ConfiguredObject> associatedChildCategory =
+ (getCollectionMemberType((ParameterizedType) operation.getGenericReturnType()));
+ _associatedChildrenOperations.put(associatedChildCategory, operation);
+ addManagedCategories(associatedChildCategory);
+ }
}
}
- private ManagedEntityType populateTypeMetaData(Class clazz,
- final ManagedObject entityType)
+
+ private boolean returnsCollectionOfConfiguredObjects(ConfiguredObjectOperation operation)
{
+ return Collection.class.isAssignableFrom(operation.getReturnType())
+ && operation.getGenericReturnType() instanceof ParameterizedType
+ && ConfiguredObject.class.isAssignableFrom(getCollectionMemberType((ParameterizedType) operation.getGenericReturnType()));
+ }
- ManagedEntityType managedEntityType = _entityTypes.get(clazz.getName());
+ private Class getCollectionMemberType(ParameterizedType collectionType)
+ {
+ return getRawType((collectionType).getActualTypeArguments()[0]);
+ }
- if(managedEntityType == null)
+ private static Class getRawType(Type t)
+ {
+ if(t instanceof Class)
+ {
+ return (Class)t;
+ }
+ else if(t instanceof ParameterizedType)
{
- List<String> opsList = new ArrayList<String>(Arrays.asList(entityType.operations()));
- if(entityType.creatable())
+ return (Class)((ParameterizedType)t).getRawType();
+ }
+ else if(t instanceof TypeVariable)
+ {
+ Type[] bounds = ((TypeVariable)t).getBounds();
+ if(bounds.length == 1)
{
- boolean isCreatableChild = false;
- Collection<Class<? extends ConfiguredObject>> parentTypes = _managedObject.getModel().getParentTypes(clazz);
- for(Class<? extends ConfiguredObject> parentConfig : parentTypes)
- {
- isCreatableChild = parentConfig.isAssignableFrom(_managedObject.getClass());
- if(isCreatableChild)
- {
- opsList.add(CREATE_OPERATION);
- break;
- }
- }
+ return getRawType(bounds[0]);
}
- opsList.addAll(Arrays.asList(READ_OPERATION, UPDATE_OPERATION, DELETE_OPERATION));
-
- Set<ManagedEntityType> parentSet = new HashSet<ManagedEntityType>();
-
- List<Class> allClasses = new ArrayList<Class>(Arrays.asList(clazz.getInterfaces()));
- if(clazz.getSuperclass() != null)
+ }
+ else if(t instanceof WildcardType)
+ {
+ Type[] upperBounds = ((WildcardType)t).getUpperBounds();
+ if(upperBounds.length == 1)
{
- allClasses.add(clazz.getSuperclass());
+ return getRawType(upperBounds[0]);
}
+ }
+ throw new ServerScopedRuntimeException("Unable to process type when constructing configuration model: " + t);
+ }
- for(Class parentClazz : allClasses)
- {
- if(parentClazz.getAnnotation(ManagedObject.class) != null)
- {
- ManagedEntityType parentType = populateTypeMetaData(parentClazz,
- (ManagedObject) parentClazz.getAnnotation(
- ManagedObject.class)
- );
- parentSet.add(parentType);
- parentSet.addAll(Arrays.asList(parentType.getParents()));
- }
- }
- managedEntityType = new ManagedEntityType(clazz.getName(), parentSet.toArray(new ManagedEntityType[parentSet.size()]),
- (String[])(_managedObject.getModel().getTypeRegistry().getAttributeNames(
- clazz).toArray(new String[0])),
- opsList.toArray(new String[opsList.size()]));
- _entityTypes.put(clazz.getName(),managedEntityType);
- _entities.put(managedEntityType, Collections.synchronizedMap(new LinkedHashMap<String, ConfiguredObject>()));
+ String getAmqpName(final Class<? extends ConfiguredObject> type)
+ {
+ ManagedObject annotation = type.getAnnotation(ManagedObject.class);
+
+ return "".equals(annotation.amqpName()) ? type.getName() : annotation.amqpName();
+ }
- if(ConfiguredObject.class.isAssignableFrom(clazz))
+ private void addManagedCategories(Class<? extends ConfiguredObject> category)
+ {
+ if(_managedCategories.add(category))
+ {
+ for(Class<? extends ConfiguredObject> childClass : _model.getChildTypes(category))
{
- Collection<Class<? extends ConfiguredObject>> childTypes = _managedObject.getModel().getChildTypes(clazz);
- for(Class<? extends ConfiguredObject> childClass : childTypes)
- {
- populateTypeMetaData(childClass, true);
- }
+ addManagedCategories(childClass);
}
}
-
- return managedEntityType;
-
}
@Override
@@ -267,13 +304,13 @@ class ManagementNode implements MessageS
{
@SuppressWarnings("unchecked")
- MessageConverter converter =
- MessageConverterRegistry.getConverter(message.getClass(), InternalMessage.class);
+ MessageConverter<M, InternalMessage> converter =
+ MessageConverterRegistry.getConverter(((Class<M>)message.getClass()), InternalMessage.class);
- final InternalMessage msg = (InternalMessage) converter.convert(message, _addressSpace);
- if(validateMessage(msg))
+ if(converter != null)
{
+ final InternalMessage msg = converter.convert(message, _addressSpace);
txn.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
@@ -297,19 +334,9 @@ class ManagementNode implements MessageS
}
}
- private boolean validateMessage(final ServerMessage message)
- {
- AMQMessageHeader header = message.getMessageHeader();
- return containsStringHeader(header, TYPE_ATTRIBUTE) && containsStringHeader(header, OPERATION_HEADER)
- && (containsStringHeader(header, NAME_ATTRIBUTE) || containsStringHeader(header, IDENTITY_ATTRIBUTE));
- }
-
- private boolean containsStringHeader(final AMQMessageHeader header, String name)
- {
- return header.containsHeader(name) && header.getHeader(name) instanceof String;
- }
-
- synchronized void enqueue(InternalMessage message, InstanceProperties properties, Action<? super MessageInstance> postEnqueueAction)
+ private synchronized void enqueue(InternalMessage message,
+ InstanceProperties properties,
+ Action<? super MessageInstance> postEnqueueAction)
{
if(postEnqueueAction != null)
{
@@ -318,308 +345,603 @@ class ManagementNode implements MessageS
- String name = (String) message.getMessageHeader().getHeader(NAME_ATTRIBUTE);
String id = (String) message.getMessageHeader().getHeader(IDENTITY_ATTRIBUTE);
String type = (String) message.getMessageHeader().getHeader(TYPE_ATTRIBUTE);
String operation = (String) message.getMessageHeader().getHeader(OPERATION_HEADER);
InternalMessage response;
- if(SELF_NODE_NAME.equals(name) && type.equals(MANAGEMENT_TYPE))
+ // TODO - handle runtime exceptions
+
+ if(SELF_NODE_NAME.equals(id) && type.equals(MANAGEMENT_TYPE))
{
- response = performManagementOperation(message);
+ response = performManagementOperation(operation, message);
}
- else if(CREATE_OPERATION.equals(operation))
+ else if(_managedTypes.containsKey(type))
{
- response = performCreateOperation(message, type);
+ response = performOperation(_managedTypes.get(type), operation, message);
+
}
else
{
+ response = createFailureResponse(message,
+ STATUS_CODE_NOT_FOUND,
+ "Unknown type {0}", type);
+ }
+
+ sendResponse(message, response);
+
+ }
+
+ private interface StandardOperation
+ {
+ String getName();
+ InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message);
+
+ }
- ConfiguredObject entity = findSubject(name, id, type);
- if(entity != null)
+ private final StandardOperation CREATE_OPERATION =
+ new StandardOperation()
{
- response = performOperation(message, entity);
- }
- else
+ @Override
+ public String getName()
+ {
+ return "CREATE";
+ }
+
+ @Override
+ public InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message)
+ {
+ return performCreateOperation(clazz, message);
+ }
+ };
+
+
+ private final StandardOperation READ_OPERATION =
+ new StandardOperation()
{
- if(id != null)
+ @Override
+ public String getName()
{
- response = createFailureResponse(message,
- NOT_FOUND_STATUS_CODE,
- "No entity with id {0} of type {1} found", id, type);
+ return "READ";
}
- else
+
+ @Override
+ public InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message)
{
- response = createFailureResponse(message,
- NOT_FOUND_STATUS_CODE,
- "No entity with name {0} of type {1} found", name, type);
+ return performReadOperation(clazz, message);
}
- }
- }
+ };
- sendResponse(message, response);
+ private final StandardOperation UPDATE_OPERATION =
+ new StandardOperation()
+ {
+ @Override
+ public String getName()
+ {
+ return "UPDATE";
+ }
- }
+ @Override
+ public InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message)
+ {
+ return performUpdateOperation(clazz, message);
+ }
+ };
- private void sendResponse(final InternalMessage message, final InternalMessage response)
- {
- String replyTo = message.getMessageHeader().getReplyTo();
- response.setInitialRoutingAddress(replyTo);
+ private final StandardOperation DELETE_OPERATION =
+ new StandardOperation()
+ {
+ @Override
+ public String getName()
+ {
+ return "DELETE";
+ }
- getResponseDestination(replyTo).send(response,
- replyTo, InstanceProperties.EMPTY,
- new AutoCommitTransaction(_addressSpace.getMessageStore()),
- null);
+ @Override
+ public InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message)
+ {
+ return performDeleteOperation(clazz, message);
+ }
+ };
- }
- private MessageDestination getResponseDestination(String replyTo)
+ private InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz,
+ final String operation,
+ InternalMessage message)
{
- ManagementNodeConsumer consumer = null;
- Subject currentSubject = Subject.getSubject(AccessController.getContext());
- Set<SessionPrincipal> sessionPrincipals = currentSubject.getPrincipals(SessionPrincipal.class);
- if (!sessionPrincipals.isEmpty())
+ final Map<String, ConfiguredObjectOperation<?>> operations = _model.getTypeRegistry().getOperations(clazz);
+ @SuppressWarnings("unchecked")
+ final ConfiguredObjectOperation<ConfiguredObject<?>> method =
+ (ConfiguredObjectOperation<ConfiguredObject<?>>) operations.get(operation);
+ StandardOperation standardOperation;
+ try
{
- AMQSessionModel publishingSession = sessionPrincipals.iterator().next().getSession();
- for (ManagementNodeConsumer candidate : _consumers)
+ if (method != null)
{
- if (candidate.getTarget().getTargetAddress().equals(replyTo) && candidate.getSessionModel() == publishingSession)
- {
- consumer = candidate;
- break;
- }
+ return performConfiguredObjectOperation(clazz, message, method);
+ }
+ else if ((standardOperation = _standardOperations.get(clazz).get(operation)) != null)
+ {
+ return standardOperation.performOperation(clazz, message);
}
+ else
+ {
+ return createFailureResponse(message, STATUS_CODE_NOT_IMPLEMENTED, "Not implemented");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ return createFailureResponse(message, STATUS_CODE_INTERNAL_ERROR, e.getMessage());
}
+ }
- return consumer == null ? _addressSpace.getDefaultDestination() : consumer;
+ private InternalMessage performDeleteOperation(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message)
+ {
+ InternalMessageHeader requestHeader = message.getMessageHeader();
+ final Map<String, Object> headers = requestHeader.getHeaderMap();
+
+ ConfiguredObject<?> object = findObject(clazz, headers);
+ if(object != null)
+ {
+ try
+ {
+ object.delete();
+
+ final MutableMessageHeader responseHeader = new MutableMessageHeader();
+ responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
+ ? requestHeader.getMessageId()
+ : requestHeader.getCorrelationId());
+ responseHeader.setMessageId(UUID.randomUUID().toString());
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_NO_CONTENT);
+
+ return InternalMessage.createMapMessage(_addressSpace.getMessageStore(),
+ responseHeader,
+ Collections.emptyMap());
+ }
+ catch (IntegrityViolationException e)
+ {
+ return createFailureResponse(message, STATUS_CODE_FORBIDDEN, e.getMessage());
+ }
+ }
+ else
+ {
+ return createFailureResponse(message, STATUS_CODE_NOT_FOUND, "Not Found");
+ }
}
- private InternalMessage performCreateOperation(final InternalMessage message, final String type)
+ private InternalMessage performUpdateOperation(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message)
{
- InternalMessage response;
- ManagedEntityType entityType = _entityTypes.get(type);
- if(type != null)
+ InternalMessageHeader requestHeader = message.getMessageHeader();
+
+ final Map<String, Object> headers = requestHeader.getHeaderMap();
+
+ ConfiguredObject<?> object = findObject(clazz, headers);
+ if(object != null)
{
- if(Arrays.asList(entityType.getOperations()).contains(CREATE_OPERATION))
+ if(message.getMessageBody() instanceof Map)
{
- Object messageBody = message.getMessageBody();
- if(messageBody instanceof Map)
+ @SuppressWarnings("unchecked")
+ final HashMap<String, Object> attributes = new HashMap<>((Map) message.getMessageBody());
+ Object id = attributes.remove(IDENTITY_ATTRIBUTE);
+ if (id != null && !String.valueOf(id).equals(object.getId().toString()))
{
- try
- {
-
- Class<? extends ConfiguredObject> clazz =
- (Class<? extends ConfiguredObject>) Class.forName(type);
- try
- {
- ConfiguredObject child = _managedObject.createChild(clazz, (Map) messageBody);
- if(child == null)
- {
- child = _entities.get(entityType).get(message.getMessageHeader().getHeader(NAME_ATTRIBUTE));
- }
- response = performReadOperation(message, child);
- }
- catch(AccessControlException e)
- {
- response = createFailureResponse(message, STATUS_CODE_FORBIDDEN, e.getMessage());
- }
- }
- catch (ClassNotFoundException e)
+ return createFailureResponse(message,
+ STATUS_CODE_FORBIDDEN,
+ "Cannot change the value of '" + IDENTITY_ATTRIBUTE + "'");
+ }
+ String path = (String) attributes.remove(OBJECT_PATH);
+ for (Class<? extends ConfiguredObject> parentType : _model.getParentTypes(clazz))
+ {
+ String attributeName = parentType.getSimpleName().toLowerCase();
+ final Object parentValue = attributes.remove(attributeName);
+ if (parentValue != null && !String.valueOf(parentValue)
+ .equals(object.getParent(parentType).getName()))
{
- response = createFailureResponse(message,
- STATUS_CODE_INTERNAL_ERROR, "Unable to instantiate an instance of {0} ", type);
+ return createFailureResponse(message,
+ STATUS_CODE_FORBIDDEN,
+ "Cannot change the value of '" + attributeName + "'");
}
}
- else
+ if (path != null && !attributes.containsKey(ConfiguredObject.NAME))
{
- response = createFailureResponse(message,
- STATUS_CODE_BAD_REQUEST,
- "The message body in the request was not of the correct type");
+ String[] pathElements = path.split("/");
+ attributes.put(ConfiguredObject.NAME, pathElements[pathElements.length - 1]);
}
+ object.setAttributes(attributes);
+
+ final MutableMessageHeader responseHeader = new MutableMessageHeader();
+ responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
+ ? requestHeader.getMessageId()
+ : requestHeader.getCorrelationId());
+ responseHeader.setMessageId(UUID.randomUUID().toString());
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
+
+ return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader,
+ _managementOutputConverter.convertToOutput(object, true));
}
else
{
- response = createFailureResponse(message,
- STATUS_CODE_FORBIDDEN,
- "Cannot CREATE entities of type {0}", type);
+ return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "Message body must be a map");
}
}
else
{
- response = createFailureResponse(message,
- NOT_FOUND_STATUS_CODE,
- "Unknown type {0}",type);
+ return createFailureResponse(message, STATUS_CODE_NOT_FOUND, "No such object");
}
- return response;
}
- private InternalMessage performOperation(final InternalMessage requestMessage, final ConfiguredObject entity)
+ private InternalMessage performReadOperation(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message)
{
- String operation = (String) requestMessage.getMessageHeader().getHeader(OPERATION_HEADER);
+ InternalMessageHeader requestHeader = message.getMessageHeader();
- if(READ_OPERATION.equals(operation))
- {
- return performReadOperation(requestMessage, entity);
- }
- else if(DELETE_OPERATION.equals(operation))
- {
- return performDeleteOperation(requestMessage, entity);
- }
- else if(UPDATE_OPERATION.equals(operation))
+ final Map<String, Object> headers = requestHeader.getHeaderMap();
+
+ ConfiguredObject<?> object = findObject(clazz, headers);
+ if(object != null)
{
- return performUpdateOperation(requestMessage, entity);
+ final MutableMessageHeader responseHeader = new MutableMessageHeader();
+ responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
+ ? requestHeader.getMessageId()
+ : requestHeader.getCorrelationId());
+ responseHeader.setMessageId(UUID.randomUUID().toString());
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
+
+ // TODO - remove insecure on insecure channel, provide mechanism for requesting effective rather than actual
+
+ return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader,
+ _managementOutputConverter.convertToOutput(object, true));
}
else
{
- return createFailureResponse(requestMessage, NOT_IMPLEMENTED_STATUS_CODE, "Unable to perform the {0} operation",operation);
+ return createFailureResponse(message,
+ STATUS_CODE_NOT_FOUND,
+ "Not found");
}
}
- private InternalMessage performReadOperation(final InternalMessage requestMessage, final ConfiguredObject entity)
+ private InternalMessage performCreateOperation(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message)
{
- final InternalMessageHeader requestHeader = requestMessage.getMessageHeader();
+ InternalMessageHeader requestHeader = message.getMessageHeader();
+
final MutableMessageHeader responseHeader = new MutableMessageHeader();
responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
? requestHeader.getMessageId()
: requestHeader.getCorrelationId());
responseHeader.setMessageId(UUID.randomUUID().toString());
- responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName());
- responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString());
- responseHeader.setHeader(STATUS_CODE_HEADER,STATUS_CODE_OK);
- final String type = getManagementClass(entity.getClass()).getName();
- responseHeader.setHeader(TYPE_ATTRIBUTE, type);
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_CREATED);
+
+ if(message.getMessageBody() instanceof Map)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String,Object> attributes = (Map<String,Object>) message.getMessageBody();
+ if(attributes.containsKey(IDENTITY_ATTRIBUTE))
+ {
+ return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "The '"+IDENTITY_ATTRIBUTE+"' cannot be set when creating an object");
+ }
+ if(attributes.containsKey(ConfiguredObject.ID))
+ {
+ return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "The '"+ConfiguredObject.ID+"' cannot be set when creating an object");
+ }
+ if(!attributes.containsKey(QPID_TYPE) && _model.getTypeRegistry().getCategory(clazz) != clazz)
+ {
+ Class<? extends ConfiguredObject> typeClass = _model.getTypeRegistry().getTypeClass(clazz);
+ String type = typeClass.getAnnotation(ManagedObject.class).type();
+ if(!"".equals(type))
+ {
+ attributes.put(QPID_TYPE, type);
+ }
+ }
+
+ if(attributes.containsKey(OBJECT_PATH))
+ {
+ String path = String.valueOf(attributes.remove(OBJECT_PATH));
+
+ ConfiguredObject theParent = _managedObject;
+ ConfiguredObject[] otherParents = null;
- Map<String,Object> responseBody = new LinkedHashMap<String, Object>();
- final ManagedEntityType entityType = _entityTypes.get(type);
- for(String attribute : entityType.getAttributes())
+ final Class<? extends ConfiguredObject>[] hierarchy = _configuredObjectFinder.getHierarchy(clazz);
+ if (hierarchy.length > 1)
+ {
+
+ List<ConfiguredObject> parents =
+ _configuredObjectFinder.findObjectParentsFromPath(Arrays.asList(path.split("/")), hierarchy, _model.getTypeRegistry().getCategory(clazz));
+ if(parents.isEmpty())
+ {
+ return createFailureResponse(message, STATUS_CODE_NOT_FOUND, "The '"+OBJECT_PATH+"' "+path+" does not identify a valid parent");
+ }
+ theParent = parents.remove(0);
+ otherParents = parents.toArray(new ConfiguredObject[parents.size()]);
+ }
+ return doCreate(clazz, message, responseHeader, attributes, theParent, otherParents);
+
+ }
+ else if(_configuredObjectFinder.getHierarchy(clazz).length == 1 && attributes.containsKey(ConfiguredObject.NAME))
+ {
+ return doCreate(clazz, message, responseHeader, attributes, _managedObject);
+ }
+ else
+ {
+ return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "The '"+OBJECT_PATH+"' must be supplied");
+ }
+ }
+ else
{
- responseBody.put(attribute, fixValue(entity.getAttribute(attribute)));
+ return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "Message body must be a map");
}
- return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader, responseBody);
}
-
- private InternalMessage performDeleteOperation(final InternalMessage requestMessage, final ConfiguredObject entity)
+ private InternalMessage doCreate(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message,
+ final MutableMessageHeader responseHeader,
+ final Map<String, Object> attributes,
+ final ConfiguredObject<?> primaryParent,
+ final ConfiguredObject<?>... otherParents)
{
- final InternalMessageHeader requestHeader = requestMessage.getMessageHeader();
- final MutableMessageHeader responseHeader = new MutableMessageHeader();
- responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
- ? requestHeader.getMessageId()
- : requestHeader.getCorrelationId());
- responseHeader.setMessageId(UUID.randomUUID().toString());
- responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName());
- responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString());
- final String type = getManagementClass(entity.getClass()).getName();
- responseHeader.setHeader(TYPE_ATTRIBUTE, type);
try
{
- entity.delete();
- responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_NO_CONTENT);
+ ManagedObject annotation = clazz.getAnnotation(ManagedObject.class);
+ if(!annotation.category() || !"".equals(annotation.defaultType()) || attributes.containsKey(QPID_TYPE) || _model.getTypeRegistry().getTypeSpecialisations(clazz).size()==1)
+ {
+ if(attributes.containsKey(QPID_TYPE))
+ {
+ attributes.put(ConfiguredObject.TYPE, attributes.remove(QPID_TYPE));
+ }
+ else
+ {
+ attributes.remove(TYPE_ATTRIBUTE);
+ }
+
+
+ final ConfiguredObject object = primaryParent.createChild(_model.getTypeRegistry().getCategory(clazz), attributes, otherParents);
+ return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader,
+ _managementOutputConverter.convertToOutput(object, true));
+ }
+ else
+ {
+ return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "type: '"+getAmqpName(clazz)+"' requires the '"+QPID_TYPE+"' attribute");
+ }
}
- catch(AccessControlException e)
+ catch (AbstractConfiguredObject.DuplicateNameException e)
{
- responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_FORBIDDEN);
- }
+ return createFailureResponse(message, STATUS_CODE_CONFLICT, "Object already exists with the same path");
- return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader, Collections.emptyMap());
+ }
+ catch (IllegalArgumentException | IllegalStateException | IllegalConfigurationException e)
+ {
+ return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, e.getMessage());
+ }
+ catch (AccessControlException e)
+ {
+ return createFailureResponse(message, STATUS_CODE_FORBIDDEN, "Forbidden");
+ }
}
-
- private InternalMessage performUpdateOperation(final InternalMessage requestMessage, final ConfiguredObject entity)
+ private InternalMessage performConfiguredObjectOperation(final Class<? extends ConfiguredObject> clazz,
+ final InternalMessage message,
+ final ConfiguredObjectOperation<ConfiguredObject<?>> method)
{
- final InternalMessageHeader requestHeader = requestMessage.getMessageHeader();
+ InternalMessageHeader requestHeader = message.getMessageHeader();
+
+ final Map<String, Object> headers = requestHeader.getHeaderMap();
+
+ ConfiguredObject<?> object = findObject(clazz, headers);
+ Map<String,Object> parameters = new HashMap<>(headers);
+ parameters.remove(KEY_ATTRIBUTE);
+ parameters.remove(IDENTITY_ATTRIBUTE);
+ parameters.remove(TYPE_ATTRIBUTE);
+ parameters.remove(INDEX_ATTRIBUTE);
+ parameters.remove(OPERATION_HEADER);
+
+ Iterator<String> paramIterator = parameters.keySet().iterator();
+ while (paramIterator.hasNext())
+ {
+ final String paramName = paramIterator.next();
+ if(paramName.startsWith("JMS_QPID"))
+ {
+ paramIterator.remove();
+ }
+
+ }
final MutableMessageHeader responseHeader = new MutableMessageHeader();
responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
? requestHeader.getMessageId()
: requestHeader.getCorrelationId());
responseHeader.setMessageId(UUID.randomUUID().toString());
- responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName());
- responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString());
- final String type = getManagementClass(entity.getClass()).getName();
- responseHeader.setHeader(TYPE_ATTRIBUTE, type);
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
- Object messageBody = requestMessage.getMessageBody();
- if(messageBody instanceof Map)
+ Serializable result = (Serializable) method.perform(object, parameters);
+ if(result == null)
{
- try
+ result = new byte[0];
+ }
+ return InternalMessage.createMessage(_addressSpace.getMessageStore(), responseHeader,
+ result, false);
+ }
+
+ String generatePath(final ConfiguredObject<?> object)
+ {
+ return _configuredObjectFinder.getPath(object);
+ }
+
+ private ConfiguredObject<?> findObject(final Class<? extends ConfiguredObject> clazz,
+ final Map<String, Object> headers)
+ {
+ if(headers.containsKey(IDENTITY_ATTRIBUTE))
+ {
+ Object value = headers.get(IDENTITY_ATTRIBUTE);
+ UUID id;
+ if(value instanceof UUID)
{
- entity.setAttributes((Map)messageBody);
- return performReadOperation(requestMessage, entity);
+ id= (UUID) value;
}
- catch(AccessControlException e)
+ else if(value instanceof String)
{
- return createFailureResponse(requestMessage, STATUS_CODE_FORBIDDEN, e.getMessage());
+ id = UUID.fromString((String) value);
}
+ else
+ {
+ return null;
+ }
+
+ return findObjectById(id, clazz);
}
- else
+ else if(headers.containsKey(INDEX_ATTRIBUTE))
{
- return createFailureResponse(requestMessage,
- STATUS_CODE_BAD_REQUEST,
- "The message body in the request was not of the correct type");
+ Object index = headers.get(INDEX_ATTRIBUTE);
+ if(OBJECT_PATH.equals(index))
+ {
+ return _configuredObjectFinder.findObjectFromPath(String.valueOf(headers.get(KEY_ATTRIBUTE)), clazz);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown index: '"+index+'"');
+ }
}
+ else
+ {
+ throw new IllegalArgumentException("Either "+IDENTITY_ATTRIBUTE+" or "+INDEX_ATTRIBUTE+" must be specified");
+ }
+ }
+ private ConfiguredObject<?> findObjectById(final UUID id, final Class<? extends ConfiguredObject> clazz)
+ {
+ Collection<Class<? extends ConfiguredObject>> ancestorCategories = _model.getAncestorCategories(clazz);
+ if(ancestorCategories.contains(_managedObject.getCategoryClass()))
+ {
+ return findDescendantById(clazz, id, _managedObject.getCategoryClass(), Collections.singleton(_managedObject));
+ }
+ else
+ {
+ for(Map.Entry<Class<? extends ConfiguredObject>,ConfiguredObjectOperation<?>> entry : _associatedChildrenOperations.entrySet())
+ {
+ if(ancestorCategories.contains(entry.getKey()))
+ {
+ @SuppressWarnings("unchecked")
+ ConfiguredObjectOperation<ConfiguredObject<?>> operation =
+ (ConfiguredObjectOperation<ConfiguredObject<?>>) entry.getValue();
+ @SuppressWarnings("unchecked")
+ Collection<? extends ConfiguredObject> associated =
+ (Collection<? extends ConfiguredObject>) operation
+ .perform(_managedObject,
+ Collections.<String, Object>emptyMap());
+ ConfiguredObject<?> object = findDescendantById(clazz, id,
+ entry.getKey(),
+ associated);
+ if(object != null)
+ {
+ return object;
+ }
+ }
+ }
+ }
+ return null;
}
- private ConfiguredObject findSubject(final String name, final String id, final String type)
+ private ConfiguredObject<?> findDescendantById(final Class<? extends ConfiguredObject> category,
+ final UUID id,
+ final Class<? extends ConfiguredObject> rootCategory,
+ final Collection<? extends ConfiguredObject> roots)
{
- ConfiguredObject subject;
- ManagedEntityType met = _entityTypes.get(type);
- if(met == null)
+ if(category == rootCategory)
{
- return null;
+ for(ConfiguredObject<?> root : roots)
+ {
+ if(root.getId().equals(id))
+ {
+ return root;
+ }
+ }
}
-
- subject = findSubject(name, id, met);
- if(subject == null)
+ else
{
- ArrayList<ManagedEntityType> allTypes = new ArrayList<ManagedEntityType>(_entityTypes.values());
- for(ManagedEntityType entityType : allTypes)
+ if(_model.getChildTypes(rootCategory).contains(category))
+ {
+ for(ConfiguredObject<?> root : roots)
+ {
+ final ConfiguredObject<?> child = root.getChildById(category, id);
+ if(child != null)
+ {
+ return child;
+ }
+ }
+ }
+ else
{
- if(Arrays.asList(entityType.getParents()).contains(met))
+ Collection<Class<? extends ConfiguredObject>> ancestorCategories = _model.getAncestorCategories(category);
+ for(Class<? extends ConfiguredObject> childClass : _model.getChildTypes(rootCategory))
{
- subject = findSubject(name, id, entityType);
- if(subject != null)
+ if(ancestorCategories.contains(childClass))
{
- return subject;
+ List<ConfiguredObject> newRoots = new ArrayList<>();
+ for(ConfiguredObject<?> root : roots)
+ {
+ newRoots.addAll(root.getChildren(childClass));
+ }
+ if(!newRoots.isEmpty())
+ {
+ final ConfiguredObject<?> child = findDescendantById(category, id, childClass, newRoots);
+ if(child != null)
+ {
+ return child;
+ }
+ }
}
}
}
}
- return subject;
+ return null;
}
- private ConfiguredObject findSubject(final String name, final String id, final ManagedEntityType entityType)
+ private void sendResponse(final InternalMessage message, final InternalMessage response)
{
+ String replyTo = message.getMessageHeader().getReplyTo();
+ response.setInitialRoutingAddress(replyTo);
- Map<String, ConfiguredObject> objects = _entities.get(entityType);
- if(name != null)
- {
- ConfiguredObject subject = objects.get(name);
- if(subject != null)
- {
- return subject;
- }
- }
- else
+
+ getResponseDestination(replyTo).send(response,
+ replyTo, InstanceProperties.EMPTY,
+ new AutoCommitTransaction(_addressSpace.getMessageStore()),
+ null);
+
+ }
+
+ private MessageDestination getResponseDestination(String replyTo)
+ {
+ ManagementNodeConsumer consumer = null;
+ Subject currentSubject = Subject.getSubject(AccessController.getContext());
+ Set<SessionPrincipal> sessionPrincipals = currentSubject.getPrincipals(SessionPrincipal.class);
+ if (!sessionPrincipals.isEmpty())
{
- final Collection<ConfiguredObject> values = new ArrayList<ConfiguredObject>(objects.values());
- for(ConfiguredObject o : values)
+ AMQSessionModel publishingSession = sessionPrincipals.iterator().next().getSession();
+ for (ManagementNodeConsumer candidate : _consumers)
{
- if(o.getId().toString().equals(id))
+ if (candidate.getTarget().getTargetAddress().equals(replyTo) && candidate.getSessionModel() == publishingSession)
{
- return o;
+ consumer = candidate;
+ break;
}
}
}
- return null;
+
+ return consumer == null ? _addressSpace.getDefaultDestination() : consumer;
}
+
private InternalMessage createFailureResponse(final InternalMessage requestMessage,
final int statusCode,
final String stateDescription,
@@ -636,12 +958,12 @@ class ManagementNode implements MessageS
responseHeader.setHeader(header, requestHeader.getHeader(header));
}
responseHeader.setHeader(STATUS_CODE_HEADER, statusCode);
- responseHeader.setHeader(STATUS_DESCRIPTION_HEADER, MessageFormat.format(stateDescription, params));
+ responseHeader.setHeader(STATUS_DESCRIPTION_HEADER, params.length == 0 ? stateDescription : MessageFormat.format(stateDescription, params));
return InternalMessage.createBytesMessage(_addressSpace.getMessageStore(), responseHeader, new byte[0]);
}
- private InternalMessage performManagementOperation(final InternalMessage msg)
+ private InternalMessage performManagementOperation(String operation, final InternalMessage msg)
{
final InternalMessage responseMessage;
final InternalMessageHeader requestHeader = msg.getMessageHeader();
@@ -651,332 +973,202 @@ class ManagementNode implements MessageS
: requestHeader.getCorrelationId());
responseHeader.setMessageId(UUID.randomUUID().toString());
-
- String operation = (String) requestHeader.getHeader(OPERATION_HEADER);
+ Map<?, ?> result;
if(GET_TYPES.equals(operation))
{
- responseMessage = performGetTypes(requestHeader, responseHeader);
+ result = performGetTypes(requestHeader.getHeaderMap());
}
else if(GET_ATTRIBUTES.equals(operation))
{
- responseMessage = performGetAttributes(requestHeader, responseHeader);
+ result = performGetAttributes(requestHeader.getHeaderMap());
}
else if(GET_OPERATIONS.equals(operation))
{
- responseMessage = performGetOperations(requestHeader, responseHeader);
+ result = performGetOperations(requestHeader.getHeaderMap());
}
else if(QUERY.equals(operation))
{
- responseMessage = performQuery(requestHeader, msg.getMessageBody(), responseHeader);
- }
- else
- {
- responseMessage = InternalMessage.createBytesMessage(_addressSpace.getMessageStore(), requestHeader, new byte[0]);
- }
- return responseMessage;
- }
-
- private InternalMessage performGetTypes(final InternalMessageHeader requestHeader,
- final MutableMessageHeader responseHeader)
- {
- final InternalMessage responseMessage;
- List<String> restriction;
- if(requestHeader.containsHeader(ENTITY_TYPE_HEADER))
- {
- restriction = new ArrayList<String>(Collections.singletonList( (String)requestHeader.getHeader(ENTITY_TYPE_HEADER)));
- }
- else
- {
- restriction = null;
- }
-
- responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
- Map<String,Object> responseMap = new LinkedHashMap<String, Object>();
- Map<String,ManagedEntityType> entityMapCopy;
- synchronized (_entityTypes)
- {
- entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes);
- }
-
- for(ManagedEntityType type : entityMapCopy.values())
- {
- if(restriction == null || meetsIndirectRestriction(type,restriction))
+ if(msg.getMessageBody() instanceof Map)
{
- final ManagedEntityType[] parents = type.getParents();
- List<String> parentNames = new ArrayList<String>();
- if(parents != null)
- {
- for(ManagedEntityType parent : parents)
- {
- parentNames.add(parent.getName());
- }
- }
- responseMap.put(type.getName(), parentNames);
+ result = performQuery(requestHeader.getHeaderMap(), (Map)(msg.getMessageBody()));
+ }
+ else
+ {
+ return createFailureResponse(msg, STATUS_CODE_BAD_REQUEST, "Body of a QUERY operation must be a map");
}
- }
- responseMessage = InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader, responseMap);
- return responseMessage;
- }
-
- private InternalMessage performGetAttributes(final InternalMessageHeader requestHeader,
- final MutableMessageHeader responseHeader)
- {
- final InternalMessage responseMessage;
- String restriction;
- if(requestHeader.containsHeader(ENTITY_TYPE_HEADER))
- {
- restriction = (String) requestHeader.getHeader(ENTITY_TYPE_HEADER);
}
else
{
- restriction = null;
+ return createFailureResponse(msg, STATUS_CODE_NOT_IMPLEMENTED, "Unknown operation {}", operation);
}
-
responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
- Map<String,Object> responseMap = new LinkedHashMap<String, Object>();
- Map<String,ManagedEntityType> entityMapCopy;
- synchronized (_entityTypes)
- {
- entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes);
- }
- if(restriction == null)
- {
- for(ManagedEntityType type : entityMapCopy.values())
- {
- responseMap.put(type.getName(), Arrays.asList(type.getAttributes()));
- }
- }
- else if(entityMapCopy.containsKey(restriction))
- {
- responseMap.put(restriction, Arrays.asList(entityMapCopy.get(restriction).getAttributes()));
- }
+ responseMessage = InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader, result);
- responseMessage = InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader, responseMap);
return responseMessage;
}
-
- private InternalMessage performGetOperations(final InternalMessageHeader requestHeader,
- final MutableMessageHeader responseHeader)
+ private Map<?, ?> performQuery(final Map<String, Object> headerMap, final Map messageBody)
{
- final InternalMessage responseMessage;
- String restriction;
- if(requestHeader.containsHeader(ENTITY_TYPE_HEADER))
- {
- restriction = (String) requestHeader.getHeader(ENTITY_TYPE_HEADER);
- }
- else
- {
- restriction = null;
- }
-
- responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
- Map<String,Object> responseMap = new LinkedHashMap<String, Object>();
- Map<String,ManagedEntityType> entityMapCopy;
- synchronized (_entityTypes)
- {
- entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes);
- }
+ List<String> attributeNames = (List<String>) messageBody.get(ATTRIBUTE_NAMES);
+ String entityType = (String)headerMap.get(ENTITY_TYPE_HEADER);
- if(restriction == null)
- {
- for(ManagedEntityType type : entityMapCopy.values())
- {
- responseMap.put(type.getName(), Arrays.asList(type.getOperations()));
- }
- }
- else if(entityMapCopy.containsKey(restriction))
+ if(attributeNames == null || attributeNames.isEmpty())
{
- ManagedEntityType type = entityMapCopy.get(restriction);
- responseMap.put(type.getName(), Arrays.asList(type.getOperations()));
+ attributeNames = generateAttributeNames(entityType);
}
- responseMessage = InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader, responseMap);
- return responseMessage;
+ return null;
}
- private InternalMessage performQuery(final InternalMessageHeader requestHeader,
- final Object messageBody, final MutableMessageHeader responseHeader)
+ private List<String> generateAttributeNames(String entityType)
{
- final InternalMessage responseMessage;
- List<String> restriction;
- List<String> attributes;
- int offset;
- int count;
-
- if(requestHeader.containsHeader(ENTITY_TYPE_HEADER))
- {
- restriction = new ArrayList<String>(Collections.singletonList((String) requestHeader.getHeader(
- ENTITY_TYPE_HEADER)));
- responseHeader.setHeader(ENTITY_TYPE_HEADER, restriction);
- }
- else
- {
- restriction = new ArrayList<String>(_entityTypes.keySet());
- }
-
+ Set<String> attrNameSet = new HashSet<>();
+ List<String> attributeNames = new ArrayList<>();
+ final ConfiguredObjectTypeRegistry typeRegistry = _model.getTypeRegistry();
+ List<Class<? extends ConfiguredObject>> classes = new ArrayList<>();
- if(messageBody instanceof Map && ((Map)messageBody).get(ATTRIBUTE_NAMES) instanceof List)
- {
- attributes = (List<String>) ((Map)messageBody).get(ATTRIBUTE_NAMES);
- }
- else
+ if(entityType != null && !entityType.trim().equals(""))
{
- LinkedHashMap<String,Void> attributeSet = new LinkedHashMap<String, Void>();
- for(String entityType : restriction)
+ Class<? extends ConfiguredObject> clazz = _managedTypes.get(entityType);
+ if(clazz != null)
{
- ManagedEntityType type = _entityTypes.get(entityType);
- if(type != null)
+ classes.add(clazz);
+ if(ConfiguredObjectTypeRegistry.getCategory(clazz) == clazz)
{
- for(String attributeName : type.getAttributes())
- {
- attributeSet.put(attributeName, null);
- }
+ classes.addAll(_model.getTypeRegistry().getTypeSpecialisations(clazz));
}
}
- attributes = new ArrayList<String>(attributeSet.keySet());
-
- }
-
- if(requestHeader.containsHeader(OFFSET_HEADER))
- {
- offset = ((Number) requestHeader.getHeader(OFFSET_HEADER)).intValue();
- responseHeader.setHeader(OFFSET_HEADER,offset);
}
else
{
- offset = 0;
- }
-
- if(requestHeader.containsHeader(COUNT_HEADER))
- {
- count = ((Number) requestHeader.getHeader(COUNT_HEADER)).intValue();
+ for (Class<? extends ConfiguredObject> clazz : _managedCategories)
+ {
+ classes.add(clazz);
+ classes.addAll(_model.getTypeRegistry().getTypeSpecialisations(clazz));
+ }
}
- else
+ for(Class<? extends ConfiguredObject> clazz : classes)
{
- count = Integer.MAX_VALUE;
+ for(String name : typeRegistry.getAttributeNames(clazz))
+ {
+ if(attrNameSet.add(name))
+ {
+ attributeNames.add(name);
+ }
+ }
}
+ return attributeNames;
+ }
- responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
- List<List<? extends Object>> responseList = new ArrayList<List<? extends Object>>();
- int rowNo = 0;
- for(String type : restriction)
+ private <T> Map<String, T> performManagementOperation(final Map<String,Object> requestHeader, TypeOperation<T> operation, T selfValue)
+ {
+ Map<String,T> responseMap = new LinkedHashMap<>();
+
+ if(requestHeader.containsKey(ENTITY_TYPE_HEADER))
{
- ManagedEntityType entityType = _entityTypes.get(type);
- if(entityType != null)
+ String entityType = (String)requestHeader.get(ENTITY_TYPE_HEADER);
+ Class<? extends ConfiguredObject> clazz = _managedTypes.get(entityType);
+ if(clazz != null)
{
- Map<String, ConfiguredObject> entityMap = _entities.get(entityType);
- if(entityMap != null)
+ responseMap.put(entityType, operation.evaluateType(clazz));
+ if(ConfiguredObjectTypeRegistry.getCategory(clazz) == clazz)
{
- List<ConfiguredObject> entities;
- synchronized(entityMap)
- {
- entities = new ArrayList<ConfiguredObject>(entityMap.values());
- }
- for(ConfiguredObject entity : entities)
+ for(Class<? extends ConfiguredObject> type : _model.getTypeRegistry().getTypeSpecialisations(clazz))
{
- if(rowNo++ >= offset)
- {
- Object[] attrValue = new Object[attributes.size()];
- int col = 0;
- for(String attr : attributes)
- {
- Object value;
- if(TYPE_ATTRIBUTE.equals(attr))
- {
- value = entityType.getName();
- }
- else
- {
- value = fixValue(entity.getAttribute(attr));
- }
- attrValue[col++] = value;
- }
- responseList.add(Arrays.asList(attrValue));
- }
- if(responseList.size()==count+1)
+ if(type.isAnnotationPresent(ManagedObject.class))
{
- break;
+ responseMap.put(getAmqpName(type), operation.evaluateType(type));
}
}
}
}
-
- if(responseList.size()==count)
- {
- break;
- }
- }
- responseHeader.setHeader(COUNT_HEADER, responseList.size());
- Map<String,List> responseMap = new HashMap<String, List>();
- responseMap.put(ATTRIBUTE_NAMES, attributes);
- responseMap.put(RESULTS, responseList);
- responseMessage = InternalMessage.createMapMessage(_addressSpace.getMessageStore(),
- responseHeader,
- responseMap);
- return responseMessage;
- }
-
- private Object fixValue(final Object value)
- {
- Object fixedValue;
- if(value instanceof Enum)
- {
- fixedValue = value.toString();
- }
- else if(value instanceof Map)
- {
- Map<Object, Object> oldValue = (Map<Object, Object>) value;
- Map<Object, Object> newValue = new LinkedHashMap<Object, Object>();
- for(Map.Entry<Object, Object> entry : oldValue.entrySet())
+ else if(MANAGEMENT_TYPE.equals(entityType))
{
- newValue.put(fixValue(entry.getKey()),fixValue(entry.getValue()));
+ responseMap.put(entityType, selfValue);
}
- fixedValue = newValue;
}
- else if(value instanceof Collection)
+ else
{
- Collection oldValue = (Collection) value;
- List newValue = new ArrayList(oldValue.size());
- for(Object o : oldValue)
+
+ for(Map.Entry<String, Class<? extends ConfiguredObject>> entry : _managedTypes.entrySet())
{
- newValue.add(fixValue(o));
+ responseMap.put(entry.getKey(), operation.evaluateType(entry.getValue()));
}
- fixedValue = newValue;
- }
- else if(value != null && value.getClass().isArray() && !(value instanceof byte[]))
- {
- fixedValue = fixValue(Arrays.asList((Object[])value));
- }
- else
- {
- fixedValue = value;
+ responseMap.put(MANAGEMENT_TYPE, selfValue);
}
- return fixedValue;
-
+ return responseMap;
}
+ private Map<String,List<String>> performGetTypes(final Map<String, Object> header)
+ {
+ return performManagementOperation(header,
+ new TypeOperation<List<String>>()
+ {
+ @Override
+ public List<String> evaluateType(final Class<? extends ConfiguredObject> clazz)
+ {
+ Class<? extends ConfiguredObject> category =
+ ConfiguredObjectTypeRegistry.getCategory(clazz);
+ if(category == clazz)
+ {
+ return Collections.emptyList();
+ }
+ else
+ {
+ return Collections.singletonList(getAmqpName(category));
+ }
+ }
+ }, Collections.<String>emptyList());
+
+ }
+
+ private Map<String,List<String>> performGetAttributes(final Map<String, Object> headers)
+ {
+ return performManagementOperation(headers,
+ new TypeOperation<List<String>>()
+ {
+ @Override
+ public List<String> evaluateType(final Class<? extends ConfiguredObject> clazz)
+ {
+ return new ArrayList<>(_model.getTypeRegistry().getAttributeNames(clazz));
+ }
+ }, Collections.<String>emptyList());
+
+ }
+
+
+ private Map<String,Map<String,List<String>>> performGetOperations(final Map<String, Object> headers)
+ {
+ // TODO - enumerate management operations
+ final Map<String, List<String>> managementOperations = new HashMap<>();
+
+ return performManagementOperation(headers,
+ new TypeOperation<Map<String,List<String>>>()
+ {
+ @Override
+ public Map<String,List<String>> evaluateType(final Class<? extends ConfiguredObject> clazz)
+ {
+ final Map<String, ConfiguredObjectOperation<?>> operations =
+ _model.getTypeRegistry().getOperations(clazz);
+ Map<String,List<String>> result = new HashMap<>();
+ for(Map.Entry<String, ConfiguredObjectOperation<?>> operation : operations.entrySet())
+ {
+
+ List<String> arguments = new ArrayList<>();
+ for(OperationParameter param : operation.getValue().getParameters())
+ {
+ arguments.add(param.getName());
+ }
+ result.put(operation.getKey(), arguments);
+ }
+ return result;
+ }
+ }, managementOperations);
- private boolean meetsIndirectRestriction(final ManagedEntityType type, final List<String> restriction)
- {
- if(restriction.contains(type.getName()))
- {
- return true;
- }
- if(type.getParents() != null)
- {
- for(ManagedEntityType parent : type.getParents())
- {
- if(meetsIndirectRestriction(parent, restriction))
- {
- return true;
- }
- }
- }
- return false;
}
+
@Override
public synchronized ManagementNodeConsumer addConsumer(final ConsumerTarget target,
final FilterManager filters,
@@ -1046,8 +1238,8 @@ class ManagementNode implements MessageS
private final ServerMessage _message;
private final InstanceProperties _properties;
- public ConsumedMessageInstance(final ServerMessage message,
- final InstanceProperties properties)
+ ConsumedMessageInstance(final ServerMessage message,
+ final InstanceProperties properties)
{
_message = message;
_properties = properties;
@@ -1261,196 +1453,92 @@ class ManagementNode implements MessageS
}
}
- private class ModelObjectListener extends AbstractConfigurationChangeListener
- {
- @Override
- public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
- {
- if(newState == State.DELETED)
- {
- if(_onDelete != null)
- {
- _onDelete.performAction(ManagementNode.this);
- }
- }
- else if(newState == State.ACTIVE && object instanceof org.apache.qpid.server.model.VirtualHost)
- {
- populateTypeMetaData(object.getClass(), false);
- final Class managementClass = getManagementClass(_managedObject.getClass());
- _entities.get(_entityTypes.get(managementClass.getName())).put(_managedObject.getName(), _managedObject);
-
- Collection<Class<? extends ConfiguredObject>> childClasses = object.getModel().getChildTypes(managementClass);
- for(Class<? extends ConfiguredObject> childClass : childClasses)
- {
- if(getManagementClass(childClass) != null)
- {
- for(ConfiguredObject child : _managedObject.getChildren(childClass))
- {
- _entities.get(_entityTypes.get(getManagementClass(childClass).getName())).put(child.getName(), child);
- }
- }
- }
-
- }
- }
-
- @Override
- public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
- {
- final Class managementClass = getManagementClass(child.getClass());
- final ManagedEntityType entityType = _entityTypes.get(managementClass.getName());
- if(entityType != null)
- {
- _entities.get(entityType).put(child.getName(), child);
- }
- }
-
- @Override
- public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
- {
- final ManagedEntityType entityType = _entityTypes.get(getManagementClass(child.getClass()).getName());
- if(entityType != null)
- {
- _entities.get(entityType).remove(child.getName());
- }
- }
- }
-
private static class MutableMessageHeader implements AMQMessageHeader
{
- private final LinkedHashMap<String, Object> _headers = new LinkedHashMap<String, Object>();
+ private final LinkedHashMap<String, Object> _headers = new LinkedHashMap<>();
private String _correlationId;
- private long _expiration;
- private String _userId;
- private String _appId;
private String _messageId;
- private String _mimeType;
- private String _encoding;
- private byte _priority;
- private long _timestamp;
- private long _notValidBefore;
- private String _type;
- private String _replyTo;
- public void setCorrelationId(final String correlationId)
+ void setCorrelationId(final String correlationId)
{
_correlationId = correlationId;
}
- public void setExpiration(final long expiration)
- {
- _expiration = expiration;
- }
-
- public void setUserId(final String userId)
- {
- _userId = userId;
- }
-
- public void setAppId(final String appId)
- {
- _appId = appId;
- }
-
- public void setMessageId(final String messageId)
+ void setMessageId(final String messageId)
{
_messageId = messageId;
}
- public void setMimeType(final String mimeType)
- {
- _mimeType = mimeType;
- }
-
- public void setEncoding(final String encoding)
- {
- _encoding = encoding;
- }
-
- public void setPriority(final byte priority)
- {
- _priority = priority;
- }
-
- public void setTimestamp(final long timestamp)
- {
- _timestamp = timestamp;
- }
-
- public void setNotValidBefore(final long notValidBefore)
- {
- _notValidBefore = notValidBefore;
- }
-
- public void setType(final String type)
- {
- _type = type;
- }
-
- public void setReplyTo(final String replyTo)
- {
- _replyTo = replyTo;
- }
-
+ @Override
public String getCorrelationId()
{
return _correlationId;
}
+ @Override
public long getExpiration()
{
- return _expiration;
+ return 0L;
}
+ @Override
public String getUserId()
{
- return _userId;
+ return null;
}
+ @Override
public String getAppId()
{
- return _appId;
+ return null;
}
+ @Override
public String getMessageId()
{
return _messageId;
}
+ @Override
public String getMimeType()
{
- return _mimeType;
+ return null;
}
+ @Override
public String getEncoding()
{
- return _encoding;
+ return null;
}
+ @Override
public byte getPriority()
{
- return _priority;
+ return 4;
}
+ @Override
public long getTimestamp()
{
- return _timestamp;
+ return 0L;
}
@Override
public long getNotValidBefore()
{
- return _notValidBefore;
+ return 0L;
}
+ @Override
public String getType()
{
- return _type;
+ return null;
}
+ @Override
public String getReplyTo()
{
- return _replyTo;
+ return null;
}
@Override
@@ -1477,10 +1565,16 @@ class ManagementNode implements MessageS
return Collections.unmodifiableCollection(_headers.keySet());
}
- public void setHeader(String header, Object value)
+ void setHeader(String header, Object value)
{
_headers.put(header,value);
}
}
+
+ private interface TypeOperation<T>
+ {
+ T evaluateType(Class<? extends ConfiguredObject> operation);
+ }
+
}
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java?rev=1770576&r1=1770575&r2=1770576&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java Sun Nov 20 16:42:57 2016
@@ -31,14 +31,7 @@ public class ManagementNodeCreator imple
public void register(final SystemNodeRegistry registry)
{
ManagementNode managementNode = new ManagementNode(registry.getVirtualHost(),
- registry.getVirtualHost(), new Action<ManagementNode>()
- {
- @Override
- public void performAction(final ManagementNode node)
- {
- registry.removeSystemNode(node);
- }
- });
+ registry.getVirtualHost());
registry.registerSystemNode(managementNode);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org