You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/02/25 14:54:56 UTC
svn commit: r630854 [2/3] - in /incubator/qpid/branches/M2.1/java:
broker/etc/ broker/src/main/java/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/management/
broker/src/main/java/org/apache/qpid/serve...
Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java?rev=630854&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java Mon Feb 25 05:54:46 2008
@@ -0,0 +1,587 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.Exchange;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PrincipalPermissions
+{
+
+ private static final int CONSUME_QUEUES_KEY = 0;
+ private static final int CONSUME_TEMPORARY_KEY = 1;
+ private static final int CONSUME_OWN_QUEUES_ONLY_KEY = 2;
+
+ private static final int CREATE_QUEUES_KEY = 0;
+ private static final int CREATE_EXCHANGES_KEY = 1;
+
+ private static final int CREATE_QUEUE_TEMPORARY_KEY = 2;
+ private static final int CREATE_QUEUE_QUEUES_KEY = 1;
+ private static final int CREATE_QUEUE_EXCHANGES_KEY = 0;
+
+ private static final int CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY = 0;
+ private static final int CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY = 1;
+
+ private static final int PUBLISH_EXCHANGES_KEY = 0;
+
+ private Map _permissions;
+
+ private String _user;
+
+
+ public PrincipalPermissions(String user)
+ {
+ _user = user;
+ _permissions = new ConcurrentHashMap();
+ }
+
+ public void grant(Permission permission, Object... parameters)
+ {
+ switch (permission)
+ {
+ case ACCESS:
+ break; // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS
+ case BIND:
+ break; // All the details are currently included in the create setup.
+ case CONSUME: // Parameters : AMQShortString queueName, Boolean Temporary, Boolean ownQueueOnly
+ Map consumeRights = (Map) _permissions.get(permission);
+
+ if (consumeRights == null)
+ {
+ consumeRights = new ConcurrentHashMap();
+ _permissions.put(permission, consumeRights);
+ }
+
+ //if we have parametsre
+ if (parameters.length > 0)
+ {
+ AMQShortString queueName = (AMQShortString) parameters[0];
+ Boolean temporary = (Boolean) parameters[1];
+ Boolean ownQueueOnly = (Boolean) parameters[2];
+
+ if (temporary)
+ {
+ consumeRights.put(CONSUME_TEMPORARY_KEY, true);
+ }
+ else
+ {
+ consumeRights.put(CONSUME_TEMPORARY_KEY, false);
+ }
+
+ if (ownQueueOnly)
+ {
+ consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true);
+ }
+ else
+ {
+ consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false);
+ }
+
+
+ LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY);
+ if (queues == null)
+ {
+ queues = new LinkedList();
+ consumeRights.put(CONSUME_QUEUES_KEY, queues);
+ }
+
+ if (queueName != null)
+ {
+ queues.add(queueName);
+ }
+ }
+
+
+ break;
+ case CREATE: // Parameters : Boolean temporary, AMQShortString queueName
+ // , AMQShortString exchangeName , AMQShortString routingKey
+ // || AMQShortString exchangeName , AMQShortString Class
+
+ Map createRights = (Map) _permissions.get(permission);
+
+ if (createRights == null)
+ {
+ createRights = new ConcurrentHashMap();
+ _permissions.put(permission, createRights);
+
+ }
+
+ //The existence of the empty map mean permission to all.
+ if (parameters.length == 0)
+ {
+ return;
+ }
+
+
+ if (parameters[0] instanceof Boolean) //Create Queue :
+ // Boolean temporary, [AMQShortString queueName, AMQShortString exchangeName , AMQShortString routingKey]
+ {
+ Boolean temporary = (Boolean) parameters[0];
+
+ AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null;
+ AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null;
+ //Set the routingkey to the specified value or the queueName if present
+ AMQShortString routingKey = parameters.length > 3 ? (AMQShortString) parameters[3] : queueName;
+
+ // Get the queues map
+ Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
+
+ if (create_queues == null)
+ {
+ create_queues = new ConcurrentHashMap();
+ createRights.put(CREATE_QUEUES_KEY, create_queues);
+ }
+
+ //Allow all temp queues to be created
+ create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary);
+
+ //Create empty list of queues
+ Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
+
+ if (create_queues_queues == null)
+ {
+ create_queues_queues = new ConcurrentHashMap();
+ create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues);
+ }
+
+ // We are granting CREATE rights to all temporary queues only
+ if (parameters.length == 1)
+ {
+ return;
+ }
+
+ // if we have a queueName then we need to store any associated exchange / rk bindings
+ if (queueName != null)
+ {
+ Map queue = (Map) create_queues_queues.get(queueName);
+ if (queue == null)
+ {
+ queue = new ConcurrentHashMap();
+ create_queues_queues.put(queueName, queue);
+ }
+
+ if (exchangeName != null)
+ {
+ queue.put(exchangeName, routingKey);
+ }
+
+ //If no exchange is specified then the presence of the queueName in the map says any exchange is ok
+ }
+
+ // Store the exchange that we are being granted rights to. This will be used as part of binding
+
+ //Lookup the list of exchanges
+ Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
+
+ if (create_queues_exchanges == null)
+ {
+ create_queues_exchanges = new ConcurrentHashMap();
+ create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges);
+ }
+
+ //if we have an exchange
+ if (exchangeName != null)
+ {
+ //Retrieve the list of permitted exchanges.
+ Map exchanges = (Map) create_queues_exchanges.get(exchangeName);
+
+ if (exchanges == null)
+ {
+ exchanges = new ConcurrentHashMap();
+ create_queues_exchanges.put(exchangeName, exchanges);
+ }
+
+ //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY
+ exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary);
+
+ //Store the binding details of queue/rk for this exchange.
+ if (queueName != null)
+ {
+ //Retrieve the list of permitted routingKeys.
+ Map rKeys = (Map) exchanges.get(exchangeName);
+
+ if (rKeys == null)
+ {
+ rKeys = new ConcurrentHashMap();
+ exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys);
+ }
+
+ rKeys.put(queueName, routingKey);
+ }
+ }
+ }
+ else // Create Exchange : AMQShortString exchangeName , AMQShortString Class
+ {
+ Map create_exchanges = (Map) createRights.get(CREATE_EXCHANGES_KEY);
+
+ if (create_exchanges == null)
+ {
+ create_exchanges = new ConcurrentHashMap();
+ createRights.put(CREATE_EXCHANGES_KEY, create_exchanges);
+ }
+
+ //Should perhaps error if parameters[0] is null;
+ AMQShortString exchangeName = parameters.length > 0 ? (AMQShortString) parameters[0] : null;
+ AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : null;
+
+ //Store the exchangeName / class mapping if the mapping is null
+ createRights.put(exchangeName, className);
+ }
+ break;
+ case DELETE:
+ break;
+
+ case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey
+ Map publishRights = (Map) _permissions.get(permission);
+
+ if (publishRights == null)
+ {
+ publishRights = new ConcurrentHashMap();
+ _permissions.put(permission, publishRights);
+ }
+
+ if (parameters == null || parameters.length == 0)
+ {
+ //If we have no parameters then allow publish to all destinations
+ // this is signified by having a null value for publish_exchanges
+ }
+ else
+ {
+ Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
+
+ if (publish_exchanges == null)
+ {
+ publish_exchanges = new ConcurrentHashMap();
+ publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges);
+ }
+
+
+ HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]);
+
+ // Check to see if we have a routing key
+ if (parameters.length == 2)
+ {
+ if (routingKeys == null)
+ {
+ routingKeys = new HashSet<AMQShortString>();
+ }
+ //Add routing key to permitted publish destinations
+ routingKeys.add(parameters[1]);
+ }
+
+ // Add the updated routingkey list or null if all values allowed
+ publish_exchanges.put(parameters[0], routingKeys);
+ }
+ break;
+ case PURGE:
+ break;
+ case UNBIND:
+ break;
+ }
+
+ }
+
+ public boolean authorise(Permission permission, Object... parameters)
+ {
+
+ switch (permission)
+ {
+ case ACCESS:
+ return true; // This is here for completeness but the SimpleXML ACLManager never calls it.
+ // The existence of this user specific PP can be validated in the map SimpleXML maintains.
+ case BIND: // Parameters : QueueBindMethod , Exchange , AMQQueue, AMQShortString routingKey
+
+// QueueDeclareBody body = (QueueDeclareBody) parameters[0];
+
+ Exchange exchange = (Exchange) parameters[1];
+
+ if (exchange.getName().equals("<<default>>"))
+ {
+ // Binding to <<default>> can not be programmed via ACLs due to '<','>' unable to be used in the XML
+ System.err.println("Binding on exchange <<default>> not alowed via ACLs");
+ }
+
+ AMQQueue bind_queueName = (AMQQueue) parameters[2];
+ AMQShortString routingKey = (AMQShortString) parameters[3];
+
+ //Get all Create Rights for this user
+ Map bindCreateRights = (Map) _permissions.get(Permission.CREATE);
+
+ //Look up the Queue Creation Rights
+ Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY);
+
+ //Lookup the list of queues
+ Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY);
+
+ // Check and see if we have a queue white list to check
+ if (bind_create_queues_queues != null)
+ {
+ //There a white list for queues
+ Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName);
+
+ if (exchangeDetails == null) //Then all queue can be bound to all exchanges.
+ {
+ return true;
+ }
+
+ // Check to see if we have a white list of routingkeys to check
+ Map rkeys = (Map) exchangeDetails.get(exchange.getName());
+
+ // if keys is null then any rkey is allowed on this exchange
+ if (rkeys == null)
+ {
+ // There is no routingkey white list
+ return true;
+ }
+ else
+ {
+ // We have routingKeys so a match must be found to allowed binding
+ Iterator keys = rkeys.keySet().iterator();
+
+ boolean matched = false;
+ while (keys.hasNext() && !matched)
+ {
+ AMQShortString rkey = (AMQShortString) keys.next();
+ if (rkey.endsWith("*"))
+ {
+ matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString());
+ }
+ else
+ {
+ matched = routingKey.equals(rkey);
+ }
+ }
+
+
+ return matched;
+ }
+
+
+ }
+ else
+ {
+ //There a is no white list for queues
+
+ // So can allow all queues to be bound
+ // but we should first check and see if we have a temp queue and validate that we are allowed
+ // to bind temp queues.
+
+ //Check to see if we have a temporary queue
+ if (bind_queueName.isAutoDelete())
+ {
+ // Check and see if we have an exchange white list.
+ Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
+
+ // If the exchange exists then we must check to see if temporary queues are allowed here
+ if (bind_exchanges != null)
+ {
+ // Check to see if the requested exchange is allowed.
+ Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName());
+
+ return (Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY);
+ }
+
+ //no white list so all allowed, drop through to return true below.
+ }
+
+ // not a temporary queue and no white list so all allowed.
+ return true;
+ }
+
+ case CREATE:// Paramters : QueueDeclareBody || ExchangeDeclareBody
+
+ Map createRights = (Map) _permissions.get(permission);
+
+ // If there are no create rights then deny request
+ if (createRights == null)
+ {
+ return false;
+ }
+
+ if (parameters.length == 1)
+ {
+ if (parameters[0] instanceof QueueDeclareBody)
+ {
+ QueueDeclareBody body = (QueueDeclareBody) parameters[0];
+
+ //Look up the Queue Creation Rights
+ Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
+
+ //Lookup the list of queues allowed to be created
+ Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
+
+
+ AMQShortString queueName = body.getQueue();
+
+
+ if (body.getAutoDelete())// we have a temporary queue
+ {
+ return (Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY);
+ }
+ else
+ {
+ // If there is a white list then check
+ return create_queues_queues == null || create_queues_queues.containsKey(queueName);
+ }
+
+ }
+ else if (parameters[0] instanceof ExchangeDeclareBody)
+ {
+ ExchangeDeclareBody body = (ExchangeDeclareBody) parameters[0];
+
+ AMQShortString exchangeName = body.getExchange();
+
+ Map create_exchanges = (Map) createRights.get(CREATE_EXCHANGES_KEY);
+
+ // If the exchange list is doesn't exist then all is allowed else check the valid exchanges
+ return create_exchanges == null || create_exchanges.containsKey(exchangeName);
+ }
+ }
+ break;
+ case CONSUME: // Parameters : AMQQueue
+
+ if (parameters.length == 1 && parameters[0] instanceof AMQQueue)
+ {
+ AMQQueue queue = ((AMQQueue) parameters[0]);
+ Map queuePermissions = (Map) _permissions.get(permission);
+
+ List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY);
+
+ Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY);
+ Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY);
+
+ // If user is allowed to publish to temporary queues and this is a temp queue then allow it.
+ if (temporayQueues)
+ {
+ if (queue.isAutoDelete())
+ // This will allow consumption from any temporary queue including ones not owned by this user.
+ // Of course the exclusivity will not be broken.
+ {
+ // if not limited to ownQueuesOnly then ok else check queue Owner.
+ return !ownQueuesOnly || queue.getOwner().equals(_user);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ // if queues are white listed then ensure it is ok
+ if (queues != null)
+ {
+ // if no queues are listed then ALL are ok othereise it must be specified.
+ if (ownQueuesOnly)
+ {
+ if (queue.getOwner().equals(_user))
+ {
+ return queues.size() == 0 || queues.contains(queue.getName());
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ // If we are
+ return queues.size() == 0 || queues.contains(queue.getName());
+ }
+ }
+
+ // Can't authenticate without the right parameters
+ return false;
+ case DELETE:
+ break;
+
+ case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey
+ Map publishRights = (Map) _permissions.get(permission);
+
+ if (publishRights == null)
+ {
+ return false;
+ }
+
+ Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
+
+ // Having no exchanges listed gives full publish rights to all exchanges
+ if (exchanges == null)
+ {
+ return true;
+ }
+ // Otherwise exchange must be listed in the white list
+
+ // If the map doesn't have the exchange then it isn't allowed
+ if (!exchanges.containsKey(parameters[0]))
+ {
+ return false;
+ }
+ else
+ {
+
+ // Get valid routing keys
+ HashSet routingKeys = (HashSet) exchanges.get(parameters[0]);
+
+ // Having no routingKeys in the map then all are allowed.
+ if (routingKeys == null)
+ {
+ return true;
+ }
+ else
+ {
+ // We have routingKeys so a match must be found to allowed binding
+ Iterator keys = routingKeys.iterator();
+
+
+ AMQShortString publishRKey = (AMQShortString)parameters[1];
+
+ boolean matched = false;
+ while (keys.hasNext() && !matched)
+ {
+ AMQShortString rkey = (AMQShortString) keys.next();
+
+ if (rkey.endsWith("*"))
+ {
+ matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1));
+ }
+ else
+ {
+ matched = publishRKey.equals(rkey);
+ }
+ }
+ return matched;
+ }
+ }
+ case PURGE:
+ break;
+ case UNBIND:
+ break;
+
+ }
+
+ return false;
+ }
+}
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
------------------------------------------------------------------------------
svn:executable = *
Copied: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java (from r613139, incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java?p2=incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java&p1=incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java&r1=613139&r2=630854&rev=630854&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java Mon Feb 25 05:54:46 2008
@@ -18,7 +18,7 @@
*
*
*/
-package org.apache.qpid.server.security.access;
+package org.apache.qpid.server.security.access.management;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
@@ -26,6 +26,7 @@
import org.apache.qpid.server.management.MBeanInvocationHandlerImpl;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.access.management.UserManagement;
import org.apache.log4j.Logger;
import org.apache.commons.configuration.ConfigurationException;
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java (from r613139, incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java?p2=incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java&p1=incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java&r1=613139&r2=630854&rev=630854&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java Mon Feb 25 05:54:46 2008
@@ -18,7 +18,7 @@
*
*
*/
-package org.apache.qpid.server.security.access;
+package org.apache.qpid.server.security.access.management;
import org.apache.qpid.server.management.MBeanOperation;
import org.apache.qpid.server.management.MBeanOperationParameter;
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java?rev=630854&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java Mon Feb 25 05:54:46 2008
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access.plugins;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.security.access.AccessResult;
+import org.apache.qpid.server.security.access.Accessable;
+import org.apache.qpid.server.security.access.Permission;
+import org.apache.commons.configuration.Configuration;
+
+public class AllowAll implements ACLPlugin
+{
+ public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters)
+ {
+ if (ACLManager.getLogger().isInfoEnabled())
+ {
+ ACLManager.getLogger().info("Allowing user:" + session.getAuthorizedID() + " for :" + permission.toString()
+ + " on " + body.getClass().getSimpleName()
+ + (parameters == null || parameters.length == 0 ? "" : "-" + accessablesToString(parameters)));
+ }
+
+ return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+ }
+
+ public static String accessablesToString(Object[] accessObject)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ for (Object access : accessObject)
+ {
+ sb.append(access.getClass().getSimpleName() + ":" + access.toString() + ", ");
+ }
+
+ return sb.delete(sb.length() - 2, sb.length()).toString();
+ }
+
+ public String getPluginName()
+ {
+ return "AllowAll";
+ }
+
+ public void setConfiguaration(Configuration config)
+ {
+ //no-op
+ }
+
+}
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java?rev=630854&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java Mon Feb 25 05:54:46 2008
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access.plugins;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.AccessResult;
+import org.apache.qpid.server.security.access.Permission;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.commons.configuration.Configuration;
+
+public class DenyAll implements ACLPlugin
+{
+ public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException
+ {
+
+ if (ACLManager.getLogger().isInfoEnabled())
+ {
+ }
+ ACLManager.getLogger().info("Denying user:" + session.getAuthorizedID() + " for :" + permission.toString()
+ + " on " + body.getClass().getSimpleName()
+ + (parameters == null || parameters.length == 0 ? "" : "-" + AllowAll.accessablesToString(parameters)));
+
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "DenyAll Plugin");
+ }
+
+ public String getPluginName()
+ {
+ return "DenyAll";
+ }
+
+ public void setConfiguaration(Configuration config)
+ {
+ //no-op
+ }
+}
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java?rev=630854&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java Mon Feb 25 05:54:46 2008
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.server.security.access.plugins;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicPublishBody;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.AccessResult;
+import org.apache.qpid.server.security.access.Permission;
+import org.apache.qpid.server.security.access.PrincipalPermissions;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This uses the default
+ */
+public class SimpleXML implements ACLPlugin
+{
+ private static final Logger _logger = ACLManager.getLogger();
+
+ private Map<String, PrincipalPermissions> _users;
+
+ public SimpleXML()
+ {
+ _users = new ConcurrentHashMap<String, PrincipalPermissions>();
+ }
+
+ public void setConfiguaration(Configuration config)
+ {
+ _logger.info("SimpleXML Configuration");
+
+ processConfig(config);
+ }
+
+ private void processConfig(Configuration config)
+ {
+ processPublish(config);
+
+ processConsume(config);
+
+ processCreate(config);
+ }
+
+ /**
+ * Publish format takes
+ * Exchange + Routing Key Pairs
+ *
+ * @param config XML Configuration
+ */
+ private void processPublish(Configuration config)
+ {
+ Configuration publishConfig = config.subset("security.access_control_list.publish");
+
+ //Process users that have full publish permission
+ String[] users = publishConfig.getStringArray("users.user");
+
+ for (String user : users)
+ {
+ grant(Permission.PUBLISH, user);
+ _logger.info("PUBLISH:GRANTED:USER:" + user + " for all destinations");
+ }
+
+ // Process exchange limited users
+ int exchangeCount = 0;
+ Configuration exchangeConfig = publishConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+
+ while (!exchangeConfig.isEmpty())
+ {
+ //Get Exchange Name
+ AMQShortString exchangeName = new AMQShortString(exchangeConfig.getString("name"));
+
+ //Get Routing Keys
+ int keyCount = 0;
+ Configuration routingkeyConfig = exchangeConfig.subset("routing_keys.routing_key(" + keyCount + ")");
+
+ while (!routingkeyConfig.isEmpty())
+ {
+ //Get RoutingKey Value
+ AMQShortString routingKeyValue = new AMQShortString(routingkeyConfig.getString("value"));
+
+ //Apply Exchange + RoutingKey permissions to Users
+ users = routingkeyConfig.getStringArray("users.user");
+ for (String user : users)
+ {
+ grant(Permission.PUBLISH, user, exchangeName, routingKeyValue);
+ _logger.info("PUBLISH:GRANTED:USER:" + user + " on Exchange '" + exchangeName + "' for key '" + routingKeyValue + "'");
+ }
+
+ //Apply permissions to Groups
+
+ // Check for more configs
+ keyCount++;
+ routingkeyConfig = exchangeConfig.subset("routing_keys.routing_key(" + keyCount + ")");
+ }
+
+ //Apply Exchange wide permissions to Users
+ users = exchangeConfig.getStringArray("exchange(" + exchangeCount + ").users.user");
+
+ for (String user : users)
+ {
+ grant(Permission.PUBLISH, user, exchangeName);
+ _logger.info("PUBLISH:GRANTED:USER:" + user + " on Exchange:" + exchangeName);
+ }
+
+ //Apply permissions to Groups
+ exchangeCount++;
+ exchangeConfig = publishConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+ }
+ }
+
+ private void grant(Permission permission, String user, Object... parameters)
+ {
+ PrincipalPermissions permissions = _users.get(user);
+
+ if (permissions == null)
+ {
+ permissions = new PrincipalPermissions(user);
+ }
+
+ _users.put(user, permissions);
+ permissions.grant(permission, parameters);
+ }
+
+ private void processConsume(Configuration config)
+ {
+ Configuration consumeConfig = config.subset("security.access_control_list.consume");
+
+ // Process queue limited users
+ int queueCount = 0;
+ Configuration queueConfig = consumeConfig.subset("queues.queue(" + queueCount + ")");
+
+ while (!queueConfig.isEmpty())
+ {
+ //Get queue Name
+ AMQShortString queueName = new AMQShortString(queueConfig.getString("name"));
+ // if there is no name then there may be a temporary element
+ boolean temporary = queueConfig.containsKey("temporary");
+ boolean ownQueues = queueConfig.containsKey("own_queues");
+
+ //Process permissions for this queue
+ String[] users = queueConfig.getStringArray("users.user");
+ for (String user : users)
+ {
+ grant(Permission.CONSUME, user, queueName, temporary, ownQueues);
+ if (temporary)
+ {
+ if (ownQueues)
+ {
+ _logger.info("CONSUME:GRANTED:USER:" + user + " on temporary queues owned by user.");
+ }
+ else
+ {
+ _logger.info("CONSUME:GRANTED:USER:" + user + " on all temporary queues.");
+ }
+ }
+ else
+ {
+ _logger.info("CONSUME:GRANTED:USER:" + user + " on queue '" + queueName + "'");
+ }
+ }
+
+ //See if we have another config
+ queueCount++;
+ queueConfig = consumeConfig.subset("queues.queue(" + queueCount + ")");
+ }
+
+ // Process users that have full consume permission
+ String[] users = consumeConfig.getStringArray("users.user");
+
+ for (String user : users)
+ {
+ grant(Permission.CONSUME, user);
+ _logger.info("CONSUME:GRANTED:USER:" + user + " from all queues.");
+ }
+ }
+
+ private void processCreate(Configuration config)
+ {
+ Configuration createConfig = config.subset("security.access_control_list.create");
+
+ // Process create permissions for queue creation
+ int queueCount = 0;
+ Configuration queueConfig = createConfig.subset("queues.queue(" + queueCount + ")");
+
+ while (!queueConfig.isEmpty())
+ {
+ //Get queue Name
+ AMQShortString queueName = new AMQShortString(queueConfig.getString("name"));
+
+ // if there is no name then there may be a temporary element
+ boolean temporary = queueConfig.containsKey("temporary");
+
+ int exchangeCount = 0;
+ Configuration exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+
+ while (!exchangeConfig.isEmpty())
+ {
+
+ AMQShortString exchange = new AMQShortString(exchangeConfig.getString("name"));
+ AMQShortString routingKey = new AMQShortString(exchangeConfig.getString("routing_key"));
+
+ //Process permissions for this queue
+ String[] users = exchangeConfig.getStringArray("users.user");
+ for (String user : users)
+ {
+ grant(Permission.CREATE, user, temporary,
+ (queueName.equals("") ? null : queueName),
+ (exchange.equals("") ? null : exchange),
+ (routingKey.equals("") ? null : routingKey));
+
+ _logger.info("CREATE :GRANTED:USER:" + user + " for "
+ + (queueName.equals("") ? "" : "queue '" + queueName + "' ")
+ + (exchange.equals("") ? "" : "exchange '" + exchange + "' ")
+ + (routingKey.equals("") ? "" : " rk '" + routingKey + "' ")
+ + (temporary ? " temporary:" + temporary : ""));
+ }
+
+ //See if we have another config
+ exchangeCount++;
+ exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+ }
+
+ // Process users that are not bound to an exchange
+ String[] users = queueConfig.getStringArray("users.user");
+
+ for (String user : users)
+ {
+ grant(Permission.CREATE, user, temporary, queueName);
+ if (temporary)
+ {
+ _logger.info("CREATE :GRANTED:USER:" + user + " from temporary queues on any exchange.");
+ }
+ else
+ {
+ _logger.info("CREATE :GRANTED:USER:" + user + " from queue '" + queueName + "' on any exchange.");
+ }
+ }
+
+ //See if we have another config
+ queueCount++;
+ queueConfig = createConfig.subset("queues.queue(" + queueCount + ")");
+ }
+
+ // Process create permissions for exchange creation
+ int exchangeCount = 0;
+ Configuration exchangeConfig = createConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+
+ while (!exchangeConfig.isEmpty())
+ {
+ AMQShortString exchange = new AMQShortString(exchangeConfig.getString("name"));
+ AMQShortString clazz = new AMQShortString(exchangeConfig.getString("class"));
+
+ //Process permissions for this queue
+ String[] users = exchangeConfig.getStringArray("users.user");
+ for (String user : users)
+ {
+ grant(Permission.CREATE, user, exchange, clazz);
+ _logger.info("CREATE:GRANTED:USER:" + user + " for exchange '" + exchange + ":class:'" + clazz);
+ }
+
+ //See if we have another config
+ exchangeCount++;
+ exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")");
+ }
+
+ // Process users that have full create permission
+ String[] users = createConfig.getStringArray("users.user");
+
+ for (String user : users)
+ {
+ grant(Permission.CREATE, user);
+ _logger.info("CREATE:GRANTED:USER:" + user + " from all queues & exchanges.");
+ }
+
+
+ }
+
+ public String getPluginName()
+ {
+ return "Simple";
+ }
+
+ public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException
+ {
+ String error = "";
+
+ if (ACLManager.getLogger().isInfoEnabled())
+ {
+ ACLManager.getLogger().info("Simple Authorisation processing user:" + session.getAuthorizedID() + " for :" + permission.toString()
+ + " on " + body.getClass().getSimpleName()
+ + (parameters == null || parameters.length == 0 ? "" : "-" + AllowAll.accessablesToString(parameters)));
+ }
+
+ String username = session.getAuthorizedID().getName();
+
+ //Get the Users Permissions
+ PrincipalPermissions permissions = _users.get(username);
+
+ _logger.warn("Processing :" + permission + " for:" + username + ":" + permissions+":"+parameters.length);
+
+ if (permissions != null)
+ {
+ switch (permission)
+ {
+ case ACCESS:
+ _logger.warn("GRANTED:"+permission);
+ return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+ case BIND: // Body QueueDeclareBody - Parameters : Exchange, Queue, QueueName
+ // Body QueueBindBody - Paramters : Exchange, Queue, QueueName
+ if (parameters.length == 3)
+ {
+ // Parameters : Exchange, Queue, RoutingKey
+ if (permissions.authorise(Permission.BIND, body, parameters[0], parameters[1], parameters[2]))
+ {
+ _logger.warn("GRANTED:"+permission);
+ return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+ }
+ }
+ break;
+ case CONSUME: // Parameters : none
+ if (parameters.length == 1 && permissions.authorise(Permission.CONSUME, parameters[0]))
+ {
+ _logger.warn("GRANTED:"+permission);
+ return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+ }
+ break;
+ case CREATE: // Body : QueueDeclareBody | ExchangeDeclareBody - Parameters : none
+ if (permissions.authorise(Permission.CREATE, body))
+ {
+ _logger.warn("GRANTED:"+permission);
+ return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+ }
+ break;
+ case PUBLISH: // Body : BasicPublishBody Parameters : exchange
+ if (parameters.length == 1 && parameters[0] instanceof Exchange)
+ {
+ if (permissions.authorise(Permission.PUBLISH, ((Exchange) parameters[0]).getName(),
+ ((BasicPublishBody) body).getRoutingKey()))
+ {
+ _logger.warn("GRANTED:"+permission);
+ return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+ }
+ }
+ break;
+ case PURGE:
+ break;
+ case DELETE:
+ break;
+ case UNBIND:
+ break;
+ }
+ }
+
+ _logger.warn("Access Denied for :" + permission + " for:" + username + ":" + permissions);
+ //todo potential refactor this ConnectionException Out of here
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error);
+ }
+
+//todo use or lose
+// if (accessObject instanceof VirtualHost)
+// {
+// VirtualHostAccess[] hosts = lookupVirtualHost(user.getName());
+//
+// if (hosts != null)
+// {
+// for (VirtualHostAccess host : hosts)
+// {
+// if (accessObject.getAccessableName().equals(host.getVirtualHost()))
+// {
+// if (host.getAccessRights().allows(rights))
+// {
+// return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+// }
+// else
+// {
+// return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
+// }
+// }
+// }
+// }
+// }
+// else if (accessObject instanceof AMQQueue)
+// {
+// String[] queues = lookupQueue(username, ((AMQQueue) accessObject).getVirtualHost());
+//
+// if (queues != null)
+// {
+// for (String queue : queues)
+// {
+// if (accessObject.getAccessableName().equals(queue))
+// {
+// return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+// }
+// }
+// }
+// }
+
+// return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
+// }
+
+}
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java?rev=630854&r1=630853&r2=630854&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java Mon Feb 25 05:54:46 2008
@@ -24,7 +24,7 @@
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
-import org.apache.qpid.server.security.access.AMQUserManagementMBean;
+import org.apache.qpid.server.security.access.management.AMQUserManagementMBean;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.EncoderException;
@@ -45,7 +45,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.security.Principal;
import java.security.NoSuchAlgorithmException;
-import java.security.MessageDigest;
/**
* Represents a user database where the account information is stored in a simple flat file.
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java?rev=630854&r1=630853&r2=630854&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java Mon Feb 25 05:54:46 2008
@@ -37,7 +37,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
-import org.apache.qpid.server.security.access.AMQUserManagementMBean;
+import org.apache.qpid.server.security.access.management.AMQUserManagementMBean;
import org.apache.qpid.AMQException;
import javax.management.JMException;
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=630854&r1=630853&r2=630854&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Mon Feb 25 05:54:46 2008
@@ -35,8 +35,8 @@
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AllowAll;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.plugins.AllowAll;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -48,7 +48,7 @@
private VirtualHostRegistry _virtualHostRegistry;
- private AccessManager _accessManager;
+ private ACLPlugin _accessManager;
private PrincipalDatabaseManager _databaseManager;
@@ -116,7 +116,7 @@
return _virtualHostRegistry;
}
- public AccessManager getAccessManager()
+ public ACLPlugin getAccessManager()
{
return _accessManager;
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=630854&r1=630853&r2=630854&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Mon Feb 25 05:54:46 2008
@@ -26,8 +26,8 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
@@ -69,7 +69,7 @@
private AuthenticationManager _authenticationManager;
- private AccessManager _accessManager;
+ private ACLPlugin _accessManager;
private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true);
@@ -168,7 +168,7 @@
_authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
- _accessManager = new AccessManagerImpl(name, hostConfig);
+ _accessManager = ACLManager.loadACLManager(name, hostConfig);
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
@@ -279,7 +279,7 @@
return _authenticationManager;
}
- public AccessManager getAccessManager()
+ public ACLPlugin getAccessManager()
{
return _accessManager;
}
Modified: incubator/qpid/branches/M2.1/java/systests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/pom.xml?rev=630854&r1=630853&r2=630854&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/pom.xml (original)
+++ incubator/qpid/branches/M2.1/java/systests/pom.xml Mon Feb 25 05:54:46 2008
@@ -6,9 +6,9 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -62,11 +62,11 @@
</dependency>
<!-- Test Dependencies -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.4.0</version>
- <scope>test</scope>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.4.0</version>
+ <scope>test</scope>
</dependency>
</dependencies>
@@ -82,18 +82,26 @@
<includes>
<include>**/*Test.class</include>
</includes>
-
+
<systemProperties>
<property>
<name>example.plugin.target</name>
<value>${basedir}/${topDirectoryLocation}/plugins/target</value>
</property>
+ <property>
+ <name>QPID_EXAMPLE_HOME</name>
+ <value>${basedir}</value>
+ </property>
+ <property>
+ <name>QPID_HOME</name>
+ <value>${basedir}/${topDirectoryLocation}/broker</value>
+ </property>
</systemProperties>
-
+
<excludes>
<exclude>**/testcases/ImmediateMessageTest.class</exclude>
<exclude>**/testcases/MandatoryMessageTest.class</exclude>
- <exclude>**/testcases/RollbackTest.class</exclude>
+ <exclude>**/testcases/RollbackTest.class</exclude>
<exclude>**/testcases/TTLTest.class</exclude>
<exclude>**/testcases/FailoverTest.class</exclude>
</excludes>
@@ -112,23 +120,23 @@
<value>${log4j.configuration}</value>
</property>
</systemproperties>
-
+
<testrunner>org.apache.qpid.junit.extensions.TKTestRunner</testrunner>
-
+
<testrunneroptions>
<option>-X:decorators "org.apache.qpid.test.framework.qpid.InVMBrokerDecorator:org.apache.qpid.test.framework.qpid.AMQPFeatureDecorator"</option>
<!--<option>-d30S</option>-->
<option>-o ${basedir}/target/surefire-reports</option>
<option>--xml</option>
</testrunneroptions>
-
+
<testrunnerproperties>
<property>
<name>notApplicableAssertion</name>
<value>warn</value>
</property>
</testrunnerproperties>
-
+
<commands>
<AMQBrokerManagerMBeanTest>-n AMQBrokerManagerMBeanTest org.apache.qpid.server.AMQBrokerManagerMBeanTest </AMQBrokerManagerMBeanTest>
<TxAckTest>-n TxAckTest org.apache.qpid.server.ack.TxAckTest </TxAckTest>
@@ -153,16 +161,16 @@
<!--<Mandatory-Message-Test>-n Mandatory-Test -s[1] org.apache.qpid.test.testcases.MandatoryMessageTest</Mandatory-Message-Test>-->
<!--<Rollback-Test>-n Rollback-Test -s[1] org.apache.qpid.test.testcases.RollbackTest</Rollback-Test>-->
</commands>
-
+
</configuration>
- <executions>
+ <executions>
<execution>
<id>framework_tests</id>
- <phase>test</phase>
+ <phase>test</phase>
<goals>
<goal>tktest</goal>
- </goals>
+ </goals>
</execution>
</executions>
</plugin>
Added: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java?rev=630854&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java (added)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java Mon Feb 25 05:54:46 2008
@@ -0,0 +1,580 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+
+package org.apache.qpid.server.security.acl;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.*;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+import java.io.File;
+
+
+public class SimpleACLTest extends TestCase implements ConnectionListener
+{
+ private String BROKER = "vm://:1";//"tcp://localhost:5672";
+
+ public void setUp() throws Exception
+ {
+ // Initialise ACLs.
+ final String QpidExampleHome = System.getProperty("QPID_EXAMPLE_HOME");
+ final File defaultaclConfigFile = new File(QpidExampleHome, "etc/acl.config.xml");
+
+ if (!defaultaclConfigFile.exists() || System.getProperty("QPID_HOME") == null)
+ {
+ System.err.println("Configuration file not found:" + defaultaclConfigFile);
+ fail("Configuration file not found:" + defaultaclConfigFile);
+ }
+
+ ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(defaultaclConfigFile);
+
+ ApplicationRegistry.initialise(config, 1);
+
+ TransportConnection.createVMBroker(1);
+ }
+
+ public void tearDown()
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+
+ public String createConnectionString(String username, String password, String broker)
+ {
+
+ return "amqp://" + username + ":" + password + "@clientid/test?brokerlist='" + broker + "'";
+ }
+
+ public void testAccessAuthorized() throws AMQException, URLSyntaxException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("client", "guest", BROKER));
+
+ Session sesh = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ conn.start();
+
+ //Do something to show connection is active.
+ sesh.rollback();
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Connection was not created due to:" + e.getMessage());
+ }
+ }
+
+ public void testAccessNoRights() throws URLSyntaxException, JMSException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("guest", "guest", BROKER));
+
+ //Attempt to do do things to test connection.
+ Session sesh = conn.createSession(true, Session.SESSION_TRANSACTED);
+ conn.start();
+ sesh.rollback();
+
+ conn.close();
+ fail("Connection was created.");
+ }
+ catch (AMQException amqe)
+ {
+ if (amqe.getCause() instanceof Exception)
+ {
+ System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure.");
+ }
+ assertEquals("Linked Exception Incorrect", JMSException.class, amqe.getCause().getClass());
+ Exception linked = ((JMSException) amqe.getCause()).getLinkedException();
+ assertEquals("Exception was wrong type", AMQAuthenticationException.class, linked.getClass());
+ assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) linked).getErrorCode().getCode());
+ }
+ }
+
+ public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("client", "guest", BROKER));
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sesh.createConsumer(sesh.createTemporaryQueue());
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test failed due to:" + e.getMessage());
+ }
+ }
+
+ public void testClientConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("client", "guest", BROKER));
+
+ //Prevent Failover
+ ((AMQConnection) conn).setConnectionListener(this);
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sesh.createConsumer(sesh.createQueue("IllegalQueue"));
+ fail("Test failed as consumer was created.");
+ }
+ catch (JMSException e)
+ {
+ Throwable cause = e.getLinkedException();
+
+ assertNotNull("There was no liked exception", cause);
+ assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
+ assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+ }
+ }
+
+ public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("client", "guest", BROKER));
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ //Create Temporary Queue
+ ((AMQSession) sesh).declareQueue((AMQDestination) sesh.createTemporaryQueue(),
+ ((AMQSession) sesh).getProtocolHandler());
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test failed due to:" + e.getMessage());
+ }
+ }
+
+ public void testClientCreateNamedQueue() throws JMSException, URLSyntaxException, AMQException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("client", "guest", BROKER));
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ //Create a Named Queue
+ ((AMQSession) sesh).declareQueue((AMQDestination) sesh.createQueue("IllegalQueue"),
+ ((AMQSession) sesh).getProtocolHandler());
+
+ fail("Test failed as Queue creation succeded.");
+ }
+ catch (AMQAuthenticationException amqe)
+ {
+ assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) amqe).getErrorCode().getCode());
+ }
+ }
+
+ public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("client", "guest", BROKER));
+
+ ((AMQConnection) conn).setConnectionListener(this);
+
+ Session sesh = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ conn.start();
+
+ MessageProducer sender = sesh.createProducer(sesh.createQueue("example.RequestQueue"));
+
+ sender.send(sesh.createTextMessage("test"));
+
+ //Send the message using a transaction as this will allow us to retrieve any errors that occur on the broker.
+ sesh.commit();
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test publish failed:" + e);
+ }
+ }
+
+ public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("client", "guest", BROKER));
+
+ ((AMQConnection) conn).setConnectionListener(this);
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ MessageProducer sender = ((AMQSession) sesh).createProducer(null);
+
+ Queue queue = sesh.createQueue("example.RequestQueue");
+
+ // Send a message that we will wait to be sent, this should give the broker time to process the msg
+ // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
+ // queue existence.
+ ((org.apache.qpid.jms.MessageProducer) sender).send(queue, sesh.createTextMessage("test"),
+ DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true);
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test publish failed:" + e);
+ }
+ }
+
+ public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("client", "guest", BROKER));
+
+ ((AMQConnection) conn).setConnectionListener(this);
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ MessageProducer sender = ((AMQSession) session).createProducer(null);
+
+ Queue queue = session.createQueue("Invalid");
+
+ // Send a message that we will wait to be sent, this should give the broker time to close the connection
+ // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
+ // queue existence.
+ ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"),
+ DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true);
+
+ // Test the connection with a valid consumer
+ session.createConsumer(session.createTemporaryQueue()).close();
+
+ //Connection should now be closed and will throw the exception caused by the above send
+ conn.close();
+
+ fail("Close is not expected to succeed.");
+ }
+ catch (JMSException e)
+ {
+ Throwable cause = e.getLinkedException();
+ assertEquals("Incorrect exception", AMQAuthenticationException.class, cause.getClass());
+ assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+ }
+ }
+
+ public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("server", "guest", BROKER));
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sesh.createConsumer(sesh.createQueue("example.RequestQueue"));
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test failed due to:" + e.getMessage());
+ }
+ }
+
+ public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("client", "guest", BROKER));
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sesh.createConsumer(sesh.createQueue("Invalid"));
+
+ fail("Test failed as consumer was created.");
+ }
+ catch (JMSException e)
+ {
+ Throwable cause = e.getLinkedException();
+
+ assertNotNull("There was no liked exception", cause);
+ assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
+ assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+ }
+ }
+
+ public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("server", "guest", BROKER));
+
+ //Prevent Failover
+ ((AMQConnection) conn).setConnectionListener(this);
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sesh.createConsumer(sesh.createTemporaryQueue());
+ fail("Test failed as consumer was created.");
+ }
+ catch (JMSException e)
+ {
+ Throwable cause = e.getLinkedException();
+
+ assertNotNull("There was no liked exception", cause);
+ assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
+ assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+ }
+ }
+
+ public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("server", "guest", BROKER));
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ //Create Temporary Queue
+ ((AMQSession) sesh).declareQueue((AMQDestination) sesh.createQueue("example.RequestQueue"),
+ ((AMQSession) sesh).getProtocolHandler());
+
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ fail("Test failed due to:" + e.getMessage());
+ }
+ }
+
+ public void testServerCreateNamedQueueInValid() throws JMSException, URLSyntaxException, AMQException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("server", "guest", BROKER));
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ //Create a Named Queue
+ ((AMQSession) sesh).declareQueue((AMQDestination) sesh.createQueue("IllegalQueue"),
+ ((AMQSession) sesh).getProtocolHandler());
+
+ fail("Test failed as creation succeded.");
+ }
+ catch (AMQAuthenticationException amqe)
+ {
+ assertEquals("Incorrect error code thrown", 403, amqe.getErrorCode().getCode());
+ }
+ }
+
+ public void testServerCreateTemporyQueueInvalid() throws JMSException, URLSyntaxException, AMQException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("server", "guest", BROKER));
+
+ Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ ((AMQSession) sesh).declareQueue((AMQDestination) sesh.createTemporaryQueue(),
+ ((AMQSession) sesh).getProtocolHandler());
+
+ fail("Test failed as creation succeded.");
+ }
+ catch (AMQAuthenticationException amqe)
+ {
+ assertEquals("Incorrect error code thrown", 403, amqe.getErrorCode().getCode());
+ }
+ }
+
+ /**
+ * This test uses both the cilent and sender to validate that the Server is able to publish to a temporary queue.
+ * The reason the client must be in volved is that the Serve is unable to create its own Temporary Queues.
+ *
+ * @throws AMQException
+ * @throws URLSyntaxException
+ * @throws JMSException
+ */
+ public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException
+ {
+ //Set up the Server
+ Connection serverConnection = new AMQConnection(createConnectionString("server", "guest", BROKER));
+
+ ((AMQConnection) serverConnection).setConnectionListener(this);
+
+ Session serverSession = serverConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+ Queue requestQueue = serverSession.createQueue("example.RequestQueue");
+
+ MessageConsumer server = serverSession.createConsumer(requestQueue);
+
+ serverConnection.start();
+
+ //Set up the consumer
+ Connection clientConnection = new AMQConnection(createConnectionString("client", "guest", BROKER));
+
+ //Send a test mesage
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue responseQueue = clientSession.createTemporaryQueue();
+
+ MessageConsumer clientResponse = clientSession.createConsumer(responseQueue);
+
+ clientConnection.start();
+
+ Message request = clientSession.createTextMessage("Request");
+
+ assertNotNull("Response Queue is null", responseQueue);
+
+ request.setJMSReplyTo(responseQueue);
+
+ clientSession.createProducer(requestQueue).send(request);
+
+ try
+ {
+ Message msg = null;
+
+ msg = server.receive(2000);
+
+ while (msg != null && !((TextMessage) msg).getText().equals("Request"))
+ {
+ msg = server.receive(2000);
+ }
+
+ assertNotNull("Message not received", msg);
+
+ assertNotNull("Reply-To is Null", msg.getJMSReplyTo());
+
+ MessageProducer sender = serverSession.createProducer(msg.getJMSReplyTo());
+
+ sender.send(serverSession.createTextMessage("Response"));
+
+ //Send the message using a transaction as this will allow us to retrieve any errors that occur on the broker.
+ serverSession.commit();
+
+ serverConnection.close();
+
+ //Ensure Response is received.
+ Message clientResponseMsg = clientResponse.receive(2000);
+ assertNotNull("Client did not receive response message,", clientResponseMsg);
+ assertEquals("Incorrect message received", "Response", ((TextMessage) clientResponseMsg).getText());
+
+ }
+ catch (Exception e)
+ {
+ fail("Test publish failed:" + e);
+ }
+ }
+
+ public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException
+ {
+ try
+ {
+ Connection conn = new AMQConnection(createConnectionString("server", "guest", BROKER));
+
+ ((AMQConnection) conn).setConnectionListener(this);
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ MessageProducer sender = ((AMQSession) session).createProducer(null);
+
+ Queue queue = session.createQueue("Invalid");
+
+ // Send a message that we will wait to be sent, this should give the broker time to close the connection
+ // before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
+ // queue existence.
+ ((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"),
+ DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true);
+
+ // Test the connection with a valid consumer
+ session.createConsumer(session.createQueue("example.RequestQueue")).close();
+
+ //Connection should now be closed and will throw the exception caused by the above send
+ conn.close();
+
+ fail("Close is not expected to succeed.");
+ }
+ catch (JMSException e)
+ {
+ Throwable cause = e.getLinkedException();
+ assertEquals("Incorrect exception", AMQAuthenticationException.class, cause.getClass());
+ assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+ }
+ }
+
+ // Connection Listener Interface - Used here to block failover
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Prevent failover.
+ return false;
+ }
+
+ public boolean preResubscribe()
+ {
+ return false;
+ }
+
+ public void failoverComplete()
+ {
+ }
+}