You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC

svn commit: r447994 [14/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,262 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ *
+ */
+class DeliveryManager
+{
+    private static final Logger _log = Logger.getLogger(DeliveryManager.class);
+
+    /**
+     * Holds any queued messages
+     */
+    private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
+    /**
+     * Ensures that only one asynchronous task is running for this manager at
+     * any time.
+     */
+    private final AtomicBoolean _processing = new AtomicBoolean();
+    /**
+     * The subscriptions on the queue to whom messages are delivered
+     */
+    private final SubscriptionManager _subscriptions;
+
+    /**
+     * An indication of the mode we are in. If this is true then messages are
+     * being queued up in _messages for asynchronous delivery. If it is false
+     * then messages can be delivered directly as they come in.
+     */
+    private boolean _queueing;
+
+    /**
+     * A reference to the queue we are delivering messages for. We need this to be able
+     * to pass the code that handles acknowledgements a handle on the queue.
+     */
+    private final AMQQueue _queue;
+
+    DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
+    {
+        _subscriptions = subscriptions;
+        _queue = queue;
+    }
+
+    private synchronized boolean enqueue(AMQMessage msg)
+    {
+        if (_queueing)
+        {
+            _messages.offer(msg);
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    private synchronized void startQueueing(AMQMessage msg)
+    {
+        _queueing = true;
+        enqueue(msg);
+    }
+
+    /**
+     * Determines whether there are queued messages. Sets _queueing to false if
+     * there are no queued messages. This needs to be atomic.
+     *
+     * @return true if there are queued messages
+     */
+    private synchronized boolean hasQueuedMessages()
+    {
+        boolean empty = _messages.isEmpty();
+        if (empty)
+        {
+            _queueing = false;
+        }
+        return !empty;
+    }
+
+    public synchronized int getQueueMessageCount()
+    {
+        return _messages.size();
+    }
+
+    protected synchronized List<AMQMessage> getMessages()
+    {
+        return new ArrayList<AMQMessage>(_messages);
+    }
+
+    protected synchronized void removeAMessageFromTop() throws AMQException
+    {
+        AMQMessage msg = poll();
+        if (msg != null)
+        {
+            msg.dequeue(_queue);
+        }
+    }
+
+    protected synchronized void clearAllMessages() throws AMQException
+    {
+        AMQMessage msg = poll();
+        while (msg != null)
+        {
+            msg.dequeue(_queue);
+            msg = poll();
+        }
+    }
+
+    /**
+     * Only one thread should ever execute this method concurrently, but
+     * it can do so while other threads invoke deliver().
+     */
+    private void processQueue()
+    {
+        try
+        {
+            boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+            while (hasQueuedMessages() && hasSubscribers)
+            {
+                Subscription next =  _subscriptions.nextSubscriber(peek());
+                //We don't synchronize access to subscribers so need to re-check
+                if (next != null)
+                {
+                    try
+                    {
+                        next.send(poll(), _queue);
+                    }
+                    catch (AMQException e)
+                    {
+                        _log.error("Unable to deliver message: " + e, e);
+                    }
+                }
+                else
+                {
+                    hasSubscribers = false;
+                }
+            }
+        }
+        finally
+        {
+            _processing.set(false);
+        }
+    }
+
+    private synchronized AMQMessage peek()
+    {
+        return _messages.peek();
+    }
+
+    private synchronized AMQMessage poll()
+    {
+        return _messages.poll();
+    }
+
+    /**
+     * Requests that the delivery manager start processing the queue asynchronously
+     * if there is work that can be done (i.e. there are messages queued up and
+     * subscribers that can receive them.
+     * <p/>
+     * This should be called when subscribers are added, but only after the consume-ok
+     * message has been returned as message delivery may start immediately. It should also
+     * be called after unsuspending a client.
+     * <p/>
+     *
+     * @param executor the executor on which the delivery should take place
+     */
+    void processAsync(Executor executor)
+    {
+        if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+        {
+            //are we already running? if so, don't re-run
+            if (_processing.compareAndSet(false, true))
+            {
+                executor.execute(new Runner());
+            }
+        }
+    }
+
+    /**
+     * Handles message delivery. The delivery manager is always in one of two modes;
+     * it is either queueing messages for asynchronous delivery or delivering
+     * directly.
+     *
+     * @param name the name of the entity on whose behalf we are delivering the message
+     * @param msg  the message to deliver
+     * @throws NoConsumersException if there are no active subscribers to deliver
+     *                              the message to
+     */
+    void deliver(String name, AMQMessage msg) throws AMQException
+    {
+        msg.incrementReference();
+        // first check whether we are queueing, and enqueue if we are
+        if (!enqueue(msg))
+        {
+            // not queueing so deliver message to 'next' subscriber
+            Subscription s =  _subscriptions.nextSubscriber(msg);
+            if (s == null)
+            {
+                if (msg.isImmediate())
+                {
+                    throw msg.getNoConsumersException(name);
+                }
+                else
+                {
+                    // no subscribers yet so enter 'queueing' mode and queue this message
+                    startQueueing(msg);
+                }
+            }
+            else
+            {
+                s.send(msg, _queue);
+            }
+        }
+
+        else
+        {
+            if (msg.isImmediate())
+            {
+                //todo check with spec to see if enqueing for immediate client delivery is ok.
+                Subscription s = _subscriptions.nextSubscriber(msg);
+                if (s == null)
+                {
+                    throw msg.getNoConsumersException(name);
+                }
+            }
+        }
+    }
+
+    private class Runner implements Runnable
+    {
+        public void run()
+        {
+            processQueue();
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ExchangeBindings.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ExchangeBindings.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ExchangeBindings.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.HashSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * When a queue is deleted, it should be deregistered from any
+ * exchange it has been bound to. This class assists in this task,
+ * by keeping track of all bindings for a given queue.
+ */
+class ExchangeBindings
+{
+    static class ExchangeBinding
+    {
+        private final Exchange exchange;
+        private final String routingKey;
+
+        ExchangeBinding(String routingKey, Exchange exchange)
+        {
+            this.routingKey = routingKey;
+            this.exchange = exchange;
+        }
+
+        void unbind(AMQQueue queue) throws AMQException
+        {
+            exchange.deregisterQueue(routingKey, queue);
+        }
+
+        public Exchange getExchange()
+        {
+            return exchange;
+        }
+
+        public String getRoutingKey()
+        {
+            return routingKey;
+        }
+
+        public int hashCode()
+        {
+            return exchange.hashCode() + routingKey.hashCode();
+        }
+
+        public boolean equals(Object o)
+        {
+            if (!(o instanceof ExchangeBinding)) return false;
+            ExchangeBinding eb = (ExchangeBinding) o;
+            return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey);
+        }
+    }
+
+    private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>();
+    private final AMQQueue _queue;
+
+    ExchangeBindings(AMQQueue queue)
+    {
+        _queue = queue;
+    }
+
+    /**
+     * Adds the specified binding to those being tracked.
+     * @param routingKey the routing key with which the queue whose bindings
+     * are being tracked by the instance has been bound to the exchange
+     * @param exchange the exchange bound to
+     */
+    void addBinding(String routingKey, Exchange exchange)
+    {
+        _bindings.add(new ExchangeBinding(routingKey, exchange));
+    }
+
+    /**
+     * Deregisters this queue from any exchange it has been bound to
+     */
+    void deregister() throws AMQException
+    {
+        //remove duplicates at this point
+        HashSet<ExchangeBinding> copy = new HashSet<ExchangeBinding>(_bindings);
+        for (ExchangeBinding b : copy)
+        {
+            b.unbind(_queue);
+        }
+    }
+
+    List<ExchangeBinding> getExchangeBindings()
+    {
+        return _bindings;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ExchangeBindings.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,177 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import javax.management.openmbean.TabularData;
+import javax.management.JMException;
+import java.io.IOException;
+
+/**
+ * The management interface exposed to allow management of a queue.
+ * @author  Robert J. Greig
+ * @author  Bhupendra Bhardwaj
+ * @version 0.1
+ */
+public interface ManagedQueue
+{
+    static final String TYPE = "Queue";
+
+    /**
+     * Returns the Name of the ManagedQueue.
+     * @return the name of the managedQueue.
+     * @throws IOException
+     */
+    String getName() throws IOException;
+
+    /**
+     * Tells whether this ManagedQueue is durable or not.
+     * @return true if this ManagedQueue is a durable queue.
+     * @throws IOException
+     */
+    boolean isDurable() throws IOException;
+
+    /**
+     * Tells the Owner of the ManagedQueue.
+     * @return the owner's name.
+     * @throws IOException
+     */
+    String getOwner() throws IOException;
+
+    /**
+     * Tells if the ManagedQueue is set to AutoDelete.
+     * @return  true if the ManagedQueue is set to AutoDelete.
+     * @throws IOException
+     */
+    boolean isAutoDelete() throws IOException;
+
+    /**
+     * Gets the total number of messages on the queue, which are yet to be
+     * delivered to the consumer(s).
+     * @return number of undelivered message in the Queue.
+     * @throws IOException
+     */
+    int getMessageCount() throws IOException;
+
+    /**
+     * Returns the maximum size of a message (in bytes) allowed to be accepted by the
+     * ManagedQueue. This is useful in setting notifications or taking
+     * appropriate action, if the size of the message received is more than
+     * the allowed size.
+     * @return the maximum size of a message allowed to be aceepted by the
+     *         ManagedQueue.
+     * @throws IOException
+     */
+    long getMaximumMessageSize() throws IOException;
+
+    /**
+     * Sets the maximum size of the message (in bytes) that is allowed to be
+     * accepted by the Queue.
+     * @param bytes  maximum size of message.
+     * @throws IOException
+     */
+    void setMaximumMessageSize(long bytes) throws IOException;
+
+    /**
+     * Returns the total number of subscribers to the queue.
+     * @return the number of subscribers.
+     * @throws IOException
+     */
+    int getConsumerCount() throws IOException;
+
+    /**
+     *  Returns the total number of active subscribers to the queue.
+     * @return the number of active subscribers
+     * @throws IOException
+     */
+    int getActiveConsumerCount() throws IOException;
+
+    /**
+     * Tells the total number of messages receieved by the queue since startup.
+     * @return total number of messages received.
+     * @throws IOException
+     */
+    long getReceivedMessageCount() throws IOException;
+
+    /**
+     * Tells the maximum number of messages that can be stored in the queue.
+     * This is useful in setting the notifications or taking required
+     * action is the number of message increase this limit.
+     * @return maximum muber of message allowed to be stored in the queue.
+     * @throws IOException
+     */
+    long getMaximumMessageCount() throws IOException;
+
+    /**
+     * Sets the maximum number of messages allowed to be stored in the queue.
+     * @param value  the maximum number of messages allowed to be stored in the queue.
+     * @throws IOException
+     */
+    void setMaximumMessageCount(long value) throws IOException;
+
+    /**
+     * Tells the maximum size of all the messages combined together,
+     * that can be stored in the queue. This is useful for setting notifications
+     * or taking required action if the size of messages stored in the queue
+     * increases over this limit.
+     * @return maximum size of the all the messages allowed for the queue.
+     * @throws IOException
+     */
+    long getQueueDepth() throws IOException;
+
+    /**
+     * Sets the maximum size of all the messages together, that can be stored
+     * in the queue.
+     * @param value
+     * @throws IOException
+     */
+    void setQueueDepth(long value) throws IOException;
+
+
+
+    //********** Operations *****************//
+
+
+    /**
+     * Returns a subset of all the messages stored in the queue. The messages
+     * are returned based on the given index numbers.
+     * @param fromIndex
+     * @param toIndex
+     * @return
+     * @throws IOException
+     * @throws JMException
+     */
+    TabularData viewMessages(int fromIndex, int toIndex)
+        throws IOException, JMException;
+
+    /**
+     * Deletes the first message from top.
+     * @throws IOException
+     * @throws JMException
+     */
+    void deleteMessageFromTop()
+         throws IOException, JMException;
+
+    /**
+     * Clears the queue by deleting all the messages from the queue.
+     * @throws IOException
+     * @throws JMException
+     */
+    void clearQueue()
+        throws IOException, JMException;
+
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.protocol.AMQConstant;
+
+import java.util.List;
+
+/**
+ * Signals that no consumers exist for a message at a given point in time.
+ * Used if a message has immediate=true and there are no consumers registered
+ * with the queue.
+ */
+public class NoConsumersException extends RequiredDeliveryException
+{
+    public NoConsumersException(String queue,
+                                BasicPublishBody publishBody,
+                                ContentHeaderBody contentHeaderBody,
+                                List<ContentBody> contentBodies)
+    {
+        super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
+    }
+
+    public int getReplyCode()
+    {
+        return AMQConstant.NO_CONSUMERS.getCode();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/QueueRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/QueueRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/QueueRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+
+public interface QueueRegistry
+{
+    void registerQueue(AMQQueue queue) throws AMQException;
+
+    void unregisterQueue(String name) throws AMQException;
+
+    AMQQueue getQueue(String name);
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/QueueRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public interface Subscription
+{
+    void send(AMQMessage msg, AMQQueue queue) throws AMQException;
+
+    boolean isSuspended();
+
+    void queueDeleted(AMQQueue queue);
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+
+/**
+ * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
+ * factory primarily assists testing although in future more sophisticated subscribers may need a different
+ * subscription implementation.
+ *
+ * @see org.apache.qpid.server.queue.AMQQueue
+ */
+public interface SubscriptionFactory
+{
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
+        throws AMQException;
+
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag)
+        throws AMQException;
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,172 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+
+/**
+ * Encapsulation of a supscription to a queue.
+ * <p/>
+ * Ties together the protocol session of a subscriber, the consumer tag that
+ * was given out by the broker and the channel id.
+ * <p/>
+ */
+public class SubscriptionImpl implements Subscription
+{
+    private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
+
+    public final AMQChannel channel;
+
+    public final AMQProtocolSession protocolSession;
+
+    public final String consumerTag;
+
+    private final Object sessionKey;
+
+    /**
+     * True if messages need to be acknowledged
+     */
+    private final boolean _acks;
+
+    public static class Factory implements SubscriptionFactory
+    {
+        public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
+            throws AMQException
+        {
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+        }
+
+        public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+            throws AMQException
+        {
+            return new SubscriptionImpl(channel, protocolSession, consumerTag);
+        }
+    }
+
+    public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+                            String consumerTag, boolean acks)
+        throws AMQException
+    {
+        AMQChannel channel = protocolSession.getChannel(channelId);
+        if (channel == null) {
+            throw new NullPointerException("channel not found in protocol session");
+        }
+
+        this.channel = channel;
+        this.protocolSession = protocolSession;
+        this.consumerTag = consumerTag;
+        sessionKey = protocolSession.getKey();
+        _acks = acks;
+    }
+
+    public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
+                            String consumerTag)
+        throws AMQException
+    {
+        this(channel, protocolSession, consumerTag, false);
+    }
+
+    public boolean equals(Object o)
+    {
+        return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
+    }
+
+    /**
+     * Equality holds if the session matches and the channel and consumer tag are the same.
+     */
+    private boolean equals(SubscriptionImpl psc)
+    {
+        return sessionKey.equals(psc.sessionKey)
+                && psc.channel == channel
+                && psc.consumerTag.equals(consumerTag);
+    }
+
+    public int hashCode()
+    {
+        return sessionKey.hashCode();
+    }
+
+    public String toString()
+    {
+        return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
+    }
+
+    public void send(AMQMessage msg, AMQQueue queue) throws AMQException
+    {
+        if (msg != null)
+        {
+            final long deliveryTag = channel.getNextDeliveryTag();
+            ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+            AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+            if (_acks)
+            {
+                channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+            }
+            protocolSession.writeFrame(frame);
+            // if we do not need to wait for client acknowledgements we can decrement
+            // the reference count immediately
+            if (!_acks)
+            {
+                msg.decrementReference();
+                msg.dequeue(queue);
+            }
+            else
+            {
+                //move the msg to the back of the persistently recorded queue while
+                //witing for ack
+                msg.requeue(queue);
+            }
+        }
+        else
+        {
+            _logger.error("Attempt to send Null message", new NullPointerException());
+        }
+    }
+
+    public boolean isSuspended()
+    {
+        return channel.isSuspended();
+    }
+
+    /**
+     * Callback indicating that a queue has been deleted.
+     * @param queue
+     */
+    public void queueDeleted(AMQQueue queue)
+    {
+        channel.queueDeleted(queue);
+    }
+
+    private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+    {
+        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
+                                                                deliveryTag, false, exchange,
+                                                                routingKey);
+        ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+        deliverFrame.writePayload(buf);
+        buf.flip();
+        return buf;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+/**
+ * Abstraction of actor that will determine the subscriber to whom
+ * a message will be sent.
+ */
+public interface SubscriptionManager
+{
+    public boolean hasActiveSubscribers();
+    public Subscription nextSubscriber(AMQMessage msg);
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Holds a set of subscriptions for a queue and manages the round
+ * robin-ing of deliver etc.
+ */
+class SubscriptionSet implements WeightedSubscriptionManager
+{
+    private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
+
+    /**
+     * List of registered subscribers
+     */
+    private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
+
+    /**
+     * Used to control the round robin delivery of content
+     */
+    private int _currentSubscriber;
+
+    /**
+     * Accessor for unit tests.
+     */
+    int getCurrentSubscriber()
+    {
+        return _currentSubscriber;
+    }
+
+    public void addSubscriber(Subscription subscription)
+    {
+        _subscriptions.add(subscription);
+    }
+
+    /**
+     * Remove the subscription, returning it if it was found
+     * @param subscription
+     * @return null if no match was found
+     */
+    public Subscription removeSubscriber(Subscription subscription)
+    {
+        boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here.
+        if (isRemoved)
+        {
+            return subscription;
+        }
+        else
+        {
+            debugDumpSubscription(subscription);
+            return null;
+        }
+    }
+
+    private void debugDumpSubscription(Subscription subscription)
+    {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Subscription " + subscription + " not found. Dumping subscriptions:");
+            for (Subscription s : _subscriptions)
+            {
+                _log.debug("Subscription: " + s);
+            }
+            _log.debug("Subscription dump complete");
+        }
+    }
+
+    /**
+     * Return the next unsuspended subscription or null if not found.
+     *
+     * Performance note:
+     * This method can scan all items twice when looking for a subscription that is not
+     * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
+     * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of
+     * race conditions and when subscriptions are removed between calls to nextSubscriber, the
+     * IndexOutOfBoundsException also causes the scan to start at the beginning.
+     */
+    public Subscription nextSubscriber(AMQMessage msg)
+    {
+        if (_subscriptions.isEmpty())
+        {
+            return null;
+        }
+
+        try {
+            final Subscription result = nextSubscriber();
+            if (result == null) {
+                _currentSubscriber = 0;
+                return nextSubscriber();
+            } else {
+                return result;
+            }
+        } catch (IndexOutOfBoundsException e) {
+            _currentSubscriber = 0;
+            return nextSubscriber();
+        }
+    }
+
+    private Subscription nextSubscriber()
+    {
+        final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+        while (iterator.hasNext()) {
+            Subscription subscription = iterator.next();
+            ++_currentSubscriber;
+            subscriberScanned();
+            if (!subscription.isSuspended()) {
+                return subscription;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Overridden in test classes.
+     */
+    protected void subscriberScanned()
+    {
+    }
+
+    public boolean isEmpty()
+    {
+        return _subscriptions.isEmpty();
+    }
+
+    public boolean hasActiveSubscribers()
+    {
+        for (Subscription s : _subscriptions)
+        {
+            if (!s.isSuspended()) return true;
+        }
+        return false;
+    }
+
+    public int getWeight()
+    {
+        int count = 0;
+        for (Subscription s : _subscriptions)
+        {
+            if (!s.isSuspended()) count++;
+        }
+        return count;
+    }
+
+    /**
+     * Notification that a queue has been deleted. This is called so that the subscription can inform the
+     * channel, which in turn can update its list of unacknowledged messages.
+     * @param queue
+     */
+    public void queueDeleted(AMQQueue queue)
+    {
+        for (Subscription s : _subscriptions)
+        {
+            s.queueDeleted(queue);
+        }
+    }
+
+    int size() {
+        return _subscriptions.size();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/WeightedSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/WeightedSubscriptionManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/WeightedSubscriptionManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/WeightedSubscriptionManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface WeightedSubscriptionManager extends SubscriptionManager
+{
+    public int getWeight();        
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/WeightedSubscriptionManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,108 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.registry;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.Configurator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An abstract application registry that provides access to configuration information and handles the
+ * construction and caching of configurable objects.
+ *
+ * Subclasses should handle the construction of the "registered objects" such as the exchange registry.
+ *
+ */
+public abstract class ApplicationRegistry implements IApplicationRegistry
+{
+    private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
+
+    private static IApplicationRegistry _instance;
+
+    private final Map<Class<?>, Object> _configuredObjects = new HashMap<Class<?>, Object>();
+
+    protected final Configuration _configuration;
+
+    private static class ShutdownService implements Runnable
+    {
+        public void run()
+        {
+            _logger.info("Shutting down application registry...");
+            try
+            {
+                _instance.getMessageStore().close();
+            }
+            catch (Exception e)
+            {
+                _logger.error("Error shutting down message store: " + e, e);
+            }
+        }
+    }
+
+    public static void initialise(IApplicationRegistry instance) throws Exception
+    {
+        _instance = instance;
+        instance.initialise();
+        Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
+    }
+
+    protected ApplicationRegistry(Configuration configuration)
+    {
+        _configuration = configuration;
+    }
+
+    public static IApplicationRegistry getInstance()
+    {
+        if (_instance == null)
+        {
+            throw new RuntimeException("Application registry not initialised");
+        }
+        else
+        {
+            return _instance;
+        }
+    }
+
+    public Configuration getConfiguration()
+    {
+        return _configuration;
+    }
+
+    public <T> T getConfiguredObject(Class<T> instanceType)
+    {
+        T instance = (T) _configuredObjects.get(instanceType);
+        if (instance == null)
+        {
+            try
+            {
+                instance = instanceType.newInstance();
+            }
+            catch (Exception e)
+            {
+                _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+                throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+            }
+            Configurator.configure(instance);
+            _configuredObjects.put(instanceType, instance);
+        }
+        return instance;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,155 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.registry;
+
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.JMXManagedObjectRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.ManagementConfiguration;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.security.auth.SASLAuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+
+import java.io.File;
+
+public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
+{
+    private QueueRegistry _queueRegistry;
+
+    private ExchangeRegistry _exchangeRegistry;
+
+    private ExchangeFactory _exchangeFactory;
+
+    private ManagedObjectRegistry _managedObjectRegistry;
+
+    private AuthenticationManager _authenticationManager;
+
+    private MessageStore _messageStore;
+
+    public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
+    {
+        super(config(configurationURL));
+    }
+
+    // Our configuration class needs to make the interpolate method
+    // public so it can be called below from the config method.
+    private static class MyConfiguration extends CompositeConfiguration {
+        public String interpolate(String obj) {
+            return super.interpolate(obj);
+        }
+    }
+
+    private static final Configuration config(File url) throws ConfigurationException {
+        // We have to override the interpolate methods so that
+        // interpolation takes place accross the entirety of the
+        // composite configuration. Without doing this each
+        // configuration object only interpolates variables defined
+        // inside itself.
+        final MyConfiguration conf = new MyConfiguration();
+        conf.addConfiguration(new SystemConfiguration() {
+            protected String interpolate(String o) {
+                return conf.interpolate(o);
+            }
+        });
+        conf.addConfiguration(new XMLConfiguration(url) {
+            protected String interpolate(String o) {
+                return conf.interpolate(o);
+            }
+        });
+        return conf;
+    }
+
+    public void initialise() throws Exception
+    {
+        initialiseManagedObjectRegistry();
+        _queueRegistry = new DefaultQueueRegistry();
+        _exchangeFactory = new DefaultExchangeFactory();
+        _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+        _authenticationManager = new SASLAuthenticationManager();
+        initialiseMessageStore();
+    }
+
+    private void initialiseManagedObjectRegistry()
+    {
+        ManagementConfiguration config = getConfiguredObject(ManagementConfiguration.class);
+        if (config.enabled)
+        {
+            _managedObjectRegistry = new JMXManagedObjectRegistry();
+        }
+        else
+        {
+            _managedObjectRegistry = new NoopManagedObjectRegistry();
+        }
+    }
+
+    private void initialiseMessageStore() throws Exception
+    {
+        String messageStoreClass = _configuration.getString("store.class");
+        Class clazz = Class.forName(messageStoreClass);
+        Object o = clazz.newInstance();
+
+        if (!(o instanceof MessageStore))
+        {
+            throw new Exception("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+                                " does not.");
+        }
+        _messageStore = (MessageStore) o;
+        _messageStore.configure(getQueueRegistry(), "store", _configuration);
+    }
+
+    public QueueRegistry getQueueRegistry()
+    {
+        return _queueRegistry;
+    }
+
+    public ExchangeRegistry getExchangeRegistry()
+    {
+        return _exchangeRegistry;
+    }
+
+    public ExchangeFactory getExchangeFactory()
+    {
+        return _exchangeFactory;
+    }
+
+    public ManagedObjectRegistry getManagedObjectRegistry()
+    {
+        return _managedObjectRegistry;
+    }
+
+    public AuthenticationManager getAuthenticationManager()
+    {
+        return _authenticationManager;
+    }
+
+    public MessageStore getMessageStore()
+    {
+        return _messageStore;
+    }    
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/IApplicationRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/IApplicationRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/IApplicationRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.registry;
+
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.commons.configuration.Configuration;
+
+public interface IApplicationRegistry
+{
+    /**
+     * Initialise the application registry. All initialisation must be done in this method so that any components
+     * that need access to the application registry itself for initialisation are able to use it. Attempting to
+     * initialise in the constructor will lead to failures since the registry reference will not have been set.
+     */
+    void initialise() throws Exception;    
+
+    /**
+     * This gets access to a "configured object". A configured object has fields populated from a the configuration
+     * object (Commons Configuration) automatically, where it has the appropriate attributes defined on fields.
+     * Application registry implementations can choose the refresh strategy or caching approach.
+     * @param instanceType the type of object you want initialised. This must be unique - i.e. you can only
+     * have a single object of this type in the system.
+     * @return the configured object
+     */
+    <T> T getConfiguredObject(Class<T> instanceType);
+
+    /**
+     * Get the low level configuration. For use cases where the configured object approach is not required
+     * you can get the complete configuration information.
+     * @return a Commons Configuration instance
+     */
+    Configuration getConfiguration();
+
+    QueueRegistry getQueueRegistry();
+
+    ExchangeRegistry getExchangeRegistry();
+
+    ExchangeFactory getExchangeFactory();
+
+    ManagedObjectRegistry getManagedObjectRegistry();
+
+    AuthenticationManager getAuthenticationManager();
+
+    MessageStore getMessageStore();
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/IApplicationRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+public interface AuthenticationManager
+{
+    String getMechanisms();
+
+    SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
+
+    AuthenticationResult authenticate(SaslServer server, byte[] response);
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationProviderInitialiser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationProviderInitialiser.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationProviderInitialiser.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationProviderInitialiser.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import org.apache.commons.configuration.Configuration;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslServerFactory;
+import java.util.Map;
+
+public interface AuthenticationProviderInitialiser
+{
+    /**
+     * @return the mechanism's name. This will be used in the list of mechanism's advertised to the
+     * client.
+     */
+    String getMechanismName();
+
+    /**
+     * Initialise the authentication provider.
+     * @param baseConfigPath the path in the config file that points to any config options for this provider. Each
+     * provider can have its own set of configuration options
+     * @param configuration the Apache Commons Configuration instance used to configure this provider
+     * @param principalDatabases the set of principal databases that are available
+     */
+    void initialise(String baseConfigPath, Configuration configuration,
+                    Map<String, PrincipalDatabase> principalDatabases) throws Exception;
+
+    /**
+     * @return the callback handler that should be used to process authentication requests for this mechanism. This will
+     * be called after initialise and will be stored by the authentication manager. The callback handler <b>must</b> be
+     * fully threadsafe.
+     */
+    CallbackHandler getCallbackHandler();
+
+    /**
+     * Get the properties that must be passed in to the Sasl.createSaslServer method.
+     * @return the properties, which may be null
+     */
+    Map<String, ?> getProperties();
+
+    /**
+     * Get the class that is the server factory. This is used for the JCA registration.
+     * @return null if no JCA registration is required, otherwise return the class
+     * that will be used in JCA registration
+     */
+    Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration();
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationProviderInitialiser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationResult.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationResult.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationResult.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationResult.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+public class AuthenticationResult
+{
+    public enum AuthenticationStatus
+    {
+        SUCCESS, CONTINUE, ERROR
+    }
+
+    public AuthenticationStatus status;
+    public byte[] challenge;
+
+    public AuthenticationResult(byte[] challenge, AuthenticationStatus status)
+    {
+        this.status = status;
+        this.challenge = challenge;
+    }
+
+    public AuthenticationResult(AuthenticationStatus status)
+    {
+        this.status = status;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationResult.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/CRAMMD5Initialiser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/CRAMMD5Initialiser.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/CRAMMD5Initialiser.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/CRAMMD5Initialiser.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import javax.security.sasl.SaslServerFactory;
+
+public class CRAMMD5Initialiser extends UsernamePasswordInitialiser
+{
+    public String getMechanismName()
+    {
+        return "CRAM-MD5";
+    }
+
+    public Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration()
+    {
+        // since the CRAM-MD5 provider is registered as part of the JDK, we do not
+        // return the factory class here since we do not need to register it ourselves.
+        return null;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/CRAMMD5Initialiser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/JCAProvider.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/JCAProvider.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/JCAProvider.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/JCAProvider.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import javax.security.sasl.SaslServerFactory;
+import java.security.Provider;
+import java.security.Security;
+import java.util.Map;
+
+public final class JCAProvider extends Provider
+{
+    public JCAProvider(Map<String, Class<? extends SaslServerFactory>> providerMap)
+    {
+        super("AMQSASLProvider", 1.0, "A JCA provider that registers all " +
+              "AMQ SASL providers that want to be registered");
+        register(providerMap);
+        Security.addProvider(this);
+    }
+
+    private void register(Map<String, Class<? extends SaslServerFactory>> providerMap)
+    {
+        for (Map.Entry<String, Class<? extends SaslServerFactory>> me :
+             providerMap.entrySet())
+        {
+            put("SaslServerFactory." + me.getKey(), me.getValue().getName());
+        }
+    }
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/JCAProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PasswordFilePrincipalDatabase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PasswordFilePrincipalDatabase.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PasswordFilePrincipalDatabase.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PasswordFilePrincipalDatabase.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,130 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import org.apache.log4j.Logger;
+
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.AccountNotFoundException;
+import java.security.Principal;
+import java.io.*;
+import java.util.regex.Pattern;
+
+/**
+ * Represents a user database where the account information is stored in a simple flat file.
+ *
+ * The file is expected to be in the form:
+ * username:password
+ * username1:password1
+ * ...
+ * usernamen:passwordn
+ *
+ * where a carriage return separates each username/password pair. Passwords are assumed to be in
+ * plain text.
+ *
+ */
+public class PasswordFilePrincipalDatabase implements PrincipalDatabase
+{
+    private static final Logger _logger = Logger.getLogger(PasswordFilePrincipalDatabase.class);
+
+    private File _passwordFile;
+
+    private Pattern _regexp = Pattern.compile(":");
+
+    public PasswordFilePrincipalDatabase()
+    {
+    }
+
+    public void setPasswordFile(String passwordFile) throws FileNotFoundException
+    {
+        File f = new File(passwordFile);
+        _logger.info("PasswordFilePrincipalDatabase using file " + f.getAbsolutePath());
+        _passwordFile = f;
+        if (!f.exists())
+        {
+            throw new FileNotFoundException("Cannot find password file " + f);
+        }
+        if (!f.canRead())
+        {
+            throw new FileNotFoundException("Cannot read password file " + f +
+                                            ". Check permissions.");
+        }
+    }
+
+    public void setPassword(Principal principal, PasswordCallback callback) throws IOException,
+            AccountNotFoundException
+    {
+        if (_passwordFile == null)
+        {
+            throw new AccountNotFoundException("Unable to locate principal since no password file was specified during initialisation");
+        }
+        if (principal == null)
+        {
+            throw new IllegalArgumentException("principal must not be null");
+        }
+        char[] pwd = lookupPassword(principal.getName());
+        if (pwd != null)
+        {
+            callback.setPassword(pwd);
+        }
+        else
+        {
+            throw new AccountNotFoundException("No account found for principal " + principal);
+        }
+    }
+
+    /**
+     * Looks up the password for a specified user in the password file.
+     * Note this code is <b>not</b> secure since it creates strings of passwords. It should be modified
+     * to create only char arrays which get nulled out.
+     * @param name
+     * @return
+     * @throws IOException
+     */
+    private char[] lookupPassword(String name) throws IOException
+    {
+        BufferedReader reader = null;
+        try
+        {
+            reader = new BufferedReader(new FileReader(_passwordFile));
+            String line;
+
+            while ((line = reader.readLine()) != null)
+            {
+                String[] result = _regexp.split(line);
+                if (result == null || result.length < 2)
+                {
+                    continue;
+                }
+
+                if (name.equals(result[0]))
+                {
+                    return result[1].toCharArray();
+                }
+            }
+            return null;
+        }
+        finally
+        {
+            if (reader != null)
+            {
+                reader.close();
+            }
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PasswordFilePrincipalDatabase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PrincipalDatabase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PrincipalDatabase.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PrincipalDatabase.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PrincipalDatabase.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.AccountNotFoundException;
+import java.security.Principal;
+import java.io.IOException;
+
+/**
+ * Represents a "user database" which is really a way of storing principals (i.e. usernames) and
+ * passwords.
+ */
+public interface PrincipalDatabase
+{
+    /**
+     * Set the password for a given principal in the specified callback. This is used for certain
+     * SASL providers. The user database implementation should look up the password in any way it
+     * chooses and set it in the callback by calling its setPassword method.
+     * @param principal the principal
+     * @param callback the password callback that wants to receive the password
+     * @throws AccountNotFoundException if the account for specified principal could not be found
+     * @throws IOException if there was an error looking up the principal
+     */
+    void setPassword(Principal principal, PasswordCallback callback)
+            throws IOException, AccountNotFoundException;
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PrincipalDatabase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,224 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.configuration.PropertyUtils;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.security.Security;
+
+public class SASLAuthenticationManager implements AuthenticationManager
+{
+    private static final Logger _log = Logger.getLogger(SASLAuthenticationManager.class);
+
+    /**
+     * The list of mechanisms, in the order in which they are configured (i.e. preferred order)
+     */
+    private String _mechanisms;
+
+    /**
+     * Maps from the mechanism to the callback handler to use for handling those requests
+     */
+    private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
+
+    /**
+     * Maps from the mechanism to the properties used to initialise the server. See the method
+     * Sasl.createSaslServer for details of the use of these properties. This map is populated during initialisation
+     * of each provider.
+     */
+    private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
+
+    public SASLAuthenticationManager() throws Exception
+    {
+        _log.info("Initialising SASL authentication manager");
+        Map<String, PrincipalDatabase> databases = initialisePrincipalDatabases();
+        initialiseAuthenticationMechanisms(databases);
+    }
+
+    private Map<String, PrincipalDatabase> initialisePrincipalDatabases() throws Exception
+    {
+        Configuration config = ApplicationRegistry.getInstance().getConfiguration();
+        List<String> databaseNames = config.getList("security.principal-databases.principal-database.name");
+        List<String> databaseClasses = config.getList("security.principal-databases.principal-database.class");
+        Map<String, PrincipalDatabase> databases = new HashMap<String, PrincipalDatabase>();
+        for (int i = 0; i < databaseNames.size(); i++)
+        {
+            Object o;
+            try
+            {
+                o = Class.forName(databaseClasses.get(i)).newInstance();
+            }
+            catch (Exception e)
+            {
+                throw new Exception("Error initialising principal database: " + e, e);
+            }
+
+            if (!(o instanceof PrincipalDatabase))
+            {
+                throw new Exception("Principal databases must implement the PrincipalDatabase interface");
+            }
+
+            initialisePrincipalDatabase((PrincipalDatabase) o, config, i);
+
+            String name = databaseNames.get(i);
+            if (name == null || name.length() == 0)
+            {
+                throw new Exception("Principal database names must have length greater than or equal to one character");
+            }
+            PrincipalDatabase pd = databases.get(name);
+            if (pd != null)
+            {
+                throw new Exception("Duplicate principal database name provided");
+            }
+            _log.info("Initialised principal database " + name + " successfully");
+            databases.put(name, (PrincipalDatabase) o);
+        }
+        return databases;
+    }
+
+    private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, Configuration config, int index)
+            throws Exception
+    {
+        String baseName = "security.principal-databases.principal-database(" + index + ").attributes.attribute.";
+        List<String> argumentNames = config.getList(baseName + "name");
+        List<String> argumentValues = config.getList(baseName + "value");
+        for (int i = 0; i < argumentNames.size(); i++)
+        {
+            String argName = argumentNames.get(i);
+            if (argName == null || argName.length() == 0)
+            {
+                throw new Exception("Argument names must have length >= 1 character");
+            }
+            if (Character.isLowerCase(argName.charAt(0)))
+            {
+                argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
+            }
+            String methodName = "set" + argName;
+            Method method = principalDatabase.getClass().getMethod(methodName, String.class);
+            if (method == null)
+            {
+                throw new Exception("No method " + methodName + " found in class " + principalDatabase.getClass() +
+                                    " hence unable to configure principal database. The method must be public and " +
+                                    "have a single String argument with a void return type");
+            }
+            method.invoke(principalDatabase, PropertyUtils.replaceProperties(argumentValues.get(i)));
+        }
+    }
+
+    private void initialiseAuthenticationMechanisms(Map<String, PrincipalDatabase> databases) throws Exception
+    {
+        Configuration config = ApplicationRegistry.getInstance().getConfiguration();
+        List<String> mechanisms = config.getList("security.sasl.mechanisms.mechanism.initialiser.class");
+
+        // Maps from the mechanism to the properties used to initialise the server. See the method
+        // Sasl.createSaslServer for details of the use of these properties. This map is populated during initialisation
+        // of each provider.
+        Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
+
+        for (int i = 0; i < mechanisms.size(); i++)
+        {
+            String baseName = "security.sasl.mechanisms.mechanism(" + i + ").initialiser";
+            String clazz = config.getString(baseName + ".class");
+            initialiseAuthenticationMechanism(baseName, clazz, databases, config, providerMap);
+        }
+        if (providerMap.size() > 0)
+        {
+            Security.addProvider(new JCAProvider(providerMap));
+        }
+    }
+
+    private void initialiseAuthenticationMechanism(String baseName, String clazz,
+                                                   Map<String, PrincipalDatabase> databases,
+                                                   Configuration configuration,
+                                                   Map<String, Class<? extends SaslServerFactory>> providerMap)
+            throws Exception
+    {
+        Class initialiserClazz = Class.forName(clazz);
+        Object o = initialiserClazz.newInstance();
+        if (!(o instanceof AuthenticationProviderInitialiser))
+        {
+            throw new Exception("The class " + clazz + " must be an instance of " +
+                                AuthenticationProviderInitialiser.class);
+        }
+        AuthenticationProviderInitialiser initialiser = (AuthenticationProviderInitialiser) o;
+        initialiser.initialise(baseName, configuration, databases);
+        String mechanism = initialiser.getMechanismName();
+        if (_mechanisms == null)
+        {
+            _mechanisms = mechanism;
+        }
+        else
+        {
+            // simple append should be fine since the number of mechanisms is small and this is a one time initialisation
+            _mechanisms = _mechanisms + " " + mechanism;
+        }
+        _callbackHandlerMap.put(mechanism, initialiser.getCallbackHandler());
+        _serverCreationProperties.put(mechanism, initialiser.getProperties());
+        Class<? extends SaslServerFactory> factory = initialiser.getServerFactoryClassForJCARegistration();
+        if (factory != null)
+        {
+            providerMap.put(mechanism, factory);
+        }
+        _log.info("Initialised " + mechanism + " SASL provider successfully");
+    }
+
+    public String getMechanisms()
+    {
+        return _mechanisms;
+    }
+
+    public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
+    {
+         return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
+                                      _callbackHandlerMap.get(mechanism));
+    }
+
+    public AuthenticationResult authenticate(SaslServer server, byte[] response)
+    {
+        try
+        {
+            // Process response from the client
+            byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]);
+
+            if (server.isComplete())
+            {
+                return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.SUCCESS);
+            }
+            else
+            {
+                return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE);
+            }
+        }
+        catch (SaslException e)
+        {
+            return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java
------------------------------------------------------------------------------
    svn:eol-style = native