You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC
svn commit: r447994 [14/46] - in /incubator/qpid/trunk/qpid: ./ cpp/
cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/
cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/
cpp/common/concurrent/ cpp/common/concur...
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,262 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ *
+ */
+class DeliveryManager
+{
+ private static final Logger _log = Logger.getLogger(DeliveryManager.class);
+
+ /**
+ * Holds any queued messages
+ */
+ private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
+ /**
+ * Ensures that only one asynchronous task is running for this manager at
+ * any time.
+ */
+ private final AtomicBoolean _processing = new AtomicBoolean();
+ /**
+ * The subscriptions on the queue to whom messages are delivered
+ */
+ private final SubscriptionManager _subscriptions;
+
+ /**
+ * An indication of the mode we are in. If this is true then messages are
+ * being queued up in _messages for asynchronous delivery. If it is false
+ * then messages can be delivered directly as they come in.
+ */
+ private boolean _queueing;
+
+ /**
+ * A reference to the queue we are delivering messages for. We need this to be able
+ * to pass the code that handles acknowledgements a handle on the queue.
+ */
+ private final AMQQueue _queue;
+
+ DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
+ {
+ _subscriptions = subscriptions;
+ _queue = queue;
+ }
+
+ private synchronized boolean enqueue(AMQMessage msg)
+ {
+ if (_queueing)
+ {
+ _messages.offer(msg);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private synchronized void startQueueing(AMQMessage msg)
+ {
+ _queueing = true;
+ enqueue(msg);
+ }
+
+ /**
+ * Determines whether there are queued messages. Sets _queueing to false if
+ * there are no queued messages. This needs to be atomic.
+ *
+ * @return true if there are queued messages
+ */
+ private synchronized boolean hasQueuedMessages()
+ {
+ boolean empty = _messages.isEmpty();
+ if (empty)
+ {
+ _queueing = false;
+ }
+ return !empty;
+ }
+
+ public synchronized int getQueueMessageCount()
+ {
+ return _messages.size();
+ }
+
+ protected synchronized List<AMQMessage> getMessages()
+ {
+ return new ArrayList<AMQMessage>(_messages);
+ }
+
+ protected synchronized void removeAMessageFromTop() throws AMQException
+ {
+ AMQMessage msg = poll();
+ if (msg != null)
+ {
+ msg.dequeue(_queue);
+ }
+ }
+
+ protected synchronized void clearAllMessages() throws AMQException
+ {
+ AMQMessage msg = poll();
+ while (msg != null)
+ {
+ msg.dequeue(_queue);
+ msg = poll();
+ }
+ }
+
+ /**
+ * Only one thread should ever execute this method concurrently, but
+ * it can do so while other threads invoke deliver().
+ */
+ private void processQueue()
+ {
+ try
+ {
+ boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+ while (hasQueuedMessages() && hasSubscribers)
+ {
+ Subscription next = _subscriptions.nextSubscriber(peek());
+ //We don't synchronize access to subscribers so need to re-check
+ if (next != null)
+ {
+ try
+ {
+ next.send(poll(), _queue);
+ }
+ catch (AMQException e)
+ {
+ _log.error("Unable to deliver message: " + e, e);
+ }
+ }
+ else
+ {
+ hasSubscribers = false;
+ }
+ }
+ }
+ finally
+ {
+ _processing.set(false);
+ }
+ }
+
+ private synchronized AMQMessage peek()
+ {
+ return _messages.peek();
+ }
+
+ private synchronized AMQMessage poll()
+ {
+ return _messages.poll();
+ }
+
+ /**
+ * Requests that the delivery manager start processing the queue asynchronously
+ * if there is work that can be done (i.e. there are messages queued up and
+ * subscribers that can receive them.
+ * <p/>
+ * This should be called when subscribers are added, but only after the consume-ok
+ * message has been returned as message delivery may start immediately. It should also
+ * be called after unsuspending a client.
+ * <p/>
+ *
+ * @param executor the executor on which the delivery should take place
+ */
+ void processAsync(Executor executor)
+ {
+ if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+ {
+ //are we already running? if so, don't re-run
+ if (_processing.compareAndSet(false, true))
+ {
+ executor.execute(new Runner());
+ }
+ }
+ }
+
+ /**
+ * Handles message delivery. The delivery manager is always in one of two modes;
+ * it is either queueing messages for asynchronous delivery or delivering
+ * directly.
+ *
+ * @param name the name of the entity on whose behalf we are delivering the message
+ * @param msg the message to deliver
+ * @throws NoConsumersException if there are no active subscribers to deliver
+ * the message to
+ */
+ void deliver(String name, AMQMessage msg) throws AMQException
+ {
+ msg.incrementReference();
+ // first check whether we are queueing, and enqueue if we are
+ if (!enqueue(msg))
+ {
+ // not queueing so deliver message to 'next' subscriber
+ Subscription s = _subscriptions.nextSubscriber(msg);
+ if (s == null)
+ {
+ if (msg.isImmediate())
+ {
+ throw msg.getNoConsumersException(name);
+ }
+ else
+ {
+ // no subscribers yet so enter 'queueing' mode and queue this message
+ startQueueing(msg);
+ }
+ }
+ else
+ {
+ s.send(msg, _queue);
+ }
+ }
+
+ else
+ {
+ if (msg.isImmediate())
+ {
+ //todo check with spec to see if enqueing for immediate client delivery is ok.
+ Subscription s = _subscriptions.nextSubscriber(msg);
+ if (s == null)
+ {
+ throw msg.getNoConsumersException(name);
+ }
+ }
+ }
+ }
+
+ private class Runner implements Runnable
+ {
+ public void run()
+ {
+ processQueue();
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ExchangeBindings.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ExchangeBindings.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ExchangeBindings.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.HashSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * When a queue is deleted, it should be deregistered from any
+ * exchange it has been bound to. This class assists in this task,
+ * by keeping track of all bindings for a given queue.
+ */
+class ExchangeBindings
+{
+ static class ExchangeBinding
+ {
+ private final Exchange exchange;
+ private final String routingKey;
+
+ ExchangeBinding(String routingKey, Exchange exchange)
+ {
+ this.routingKey = routingKey;
+ this.exchange = exchange;
+ }
+
+ void unbind(AMQQueue queue) throws AMQException
+ {
+ exchange.deregisterQueue(routingKey, queue);
+ }
+
+ public Exchange getExchange()
+ {
+ return exchange;
+ }
+
+ public String getRoutingKey()
+ {
+ return routingKey;
+ }
+
+ public int hashCode()
+ {
+ return exchange.hashCode() + routingKey.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ExchangeBinding)) return false;
+ ExchangeBinding eb = (ExchangeBinding) o;
+ return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey);
+ }
+ }
+
+ private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>();
+ private final AMQQueue _queue;
+
+ ExchangeBindings(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ /**
+ * Adds the specified binding to those being tracked.
+ * @param routingKey the routing key with which the queue whose bindings
+ * are being tracked by the instance has been bound to the exchange
+ * @param exchange the exchange bound to
+ */
+ void addBinding(String routingKey, Exchange exchange)
+ {
+ _bindings.add(new ExchangeBinding(routingKey, exchange));
+ }
+
+ /**
+ * Deregisters this queue from any exchange it has been bound to
+ */
+ void deregister() throws AMQException
+ {
+ //remove duplicates at this point
+ HashSet<ExchangeBinding> copy = new HashSet<ExchangeBinding>(_bindings);
+ for (ExchangeBinding b : copy)
+ {
+ b.unbind(_queue);
+ }
+ }
+
+ List<ExchangeBinding> getExchangeBindings()
+ {
+ return _bindings;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ExchangeBindings.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,177 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import javax.management.openmbean.TabularData;
+import javax.management.JMException;
+import java.io.IOException;
+
+/**
+ * The management interface exposed to allow management of a queue.
+ * @author Robert J. Greig
+ * @author Bhupendra Bhardwaj
+ * @version 0.1
+ */
+public interface ManagedQueue
+{
+ static final String TYPE = "Queue";
+
+ /**
+ * Returns the Name of the ManagedQueue.
+ * @return the name of the managedQueue.
+ * @throws IOException
+ */
+ String getName() throws IOException;
+
+ /**
+ * Tells whether this ManagedQueue is durable or not.
+ * @return true if this ManagedQueue is a durable queue.
+ * @throws IOException
+ */
+ boolean isDurable() throws IOException;
+
+ /**
+ * Tells the Owner of the ManagedQueue.
+ * @return the owner's name.
+ * @throws IOException
+ */
+ String getOwner() throws IOException;
+
+ /**
+ * Tells if the ManagedQueue is set to AutoDelete.
+ * @return true if the ManagedQueue is set to AutoDelete.
+ * @throws IOException
+ */
+ boolean isAutoDelete() throws IOException;
+
+ /**
+ * Gets the total number of messages on the queue, which are yet to be
+ * delivered to the consumer(s).
+ * @return number of undelivered message in the Queue.
+ * @throws IOException
+ */
+ int getMessageCount() throws IOException;
+
+ /**
+ * Returns the maximum size of a message (in bytes) allowed to be accepted by the
+ * ManagedQueue. This is useful in setting notifications or taking
+ * appropriate action, if the size of the message received is more than
+ * the allowed size.
+ * @return the maximum size of a message allowed to be aceepted by the
+ * ManagedQueue.
+ * @throws IOException
+ */
+ long getMaximumMessageSize() throws IOException;
+
+ /**
+ * Sets the maximum size of the message (in bytes) that is allowed to be
+ * accepted by the Queue.
+ * @param bytes maximum size of message.
+ * @throws IOException
+ */
+ void setMaximumMessageSize(long bytes) throws IOException;
+
+ /**
+ * Returns the total number of subscribers to the queue.
+ * @return the number of subscribers.
+ * @throws IOException
+ */
+ int getConsumerCount() throws IOException;
+
+ /**
+ * Returns the total number of active subscribers to the queue.
+ * @return the number of active subscribers
+ * @throws IOException
+ */
+ int getActiveConsumerCount() throws IOException;
+
+ /**
+ * Tells the total number of messages receieved by the queue since startup.
+ * @return total number of messages received.
+ * @throws IOException
+ */
+ long getReceivedMessageCount() throws IOException;
+
+ /**
+ * Tells the maximum number of messages that can be stored in the queue.
+ * This is useful in setting the notifications or taking required
+ * action is the number of message increase this limit.
+ * @return maximum muber of message allowed to be stored in the queue.
+ * @throws IOException
+ */
+ long getMaximumMessageCount() throws IOException;
+
+ /**
+ * Sets the maximum number of messages allowed to be stored in the queue.
+ * @param value the maximum number of messages allowed to be stored in the queue.
+ * @throws IOException
+ */
+ void setMaximumMessageCount(long value) throws IOException;
+
+ /**
+ * Tells the maximum size of all the messages combined together,
+ * that can be stored in the queue. This is useful for setting notifications
+ * or taking required action if the size of messages stored in the queue
+ * increases over this limit.
+ * @return maximum size of the all the messages allowed for the queue.
+ * @throws IOException
+ */
+ long getQueueDepth() throws IOException;
+
+ /**
+ * Sets the maximum size of all the messages together, that can be stored
+ * in the queue.
+ * @param value
+ * @throws IOException
+ */
+ void setQueueDepth(long value) throws IOException;
+
+
+
+ //********** Operations *****************//
+
+
+ /**
+ * Returns a subset of all the messages stored in the queue. The messages
+ * are returned based on the given index numbers.
+ * @param fromIndex
+ * @param toIndex
+ * @return
+ * @throws IOException
+ * @throws JMException
+ */
+ TabularData viewMessages(int fromIndex, int toIndex)
+ throws IOException, JMException;
+
+ /**
+ * Deletes the first message from top.
+ * @throws IOException
+ * @throws JMException
+ */
+ void deleteMessageFromTop()
+ throws IOException, JMException;
+
+ /**
+ * Clears the queue by deleting all the messages from the queue.
+ * @throws IOException
+ * @throws JMException
+ */
+ void clearQueue()
+ throws IOException, JMException;
+
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.protocol.AMQConstant;
+
+import java.util.List;
+
+/**
+ * Signals that no consumers exist for a message at a given point in time.
+ * Used if a message has immediate=true and there are no consumers registered
+ * with the queue.
+ */
+public class NoConsumersException extends RequiredDeliveryException
+{
+ public NoConsumersException(String queue,
+ BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody,
+ List<ContentBody> contentBodies)
+ {
+ super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
+ }
+
+ public int getReplyCode()
+ {
+ return AMQConstant.NO_CONSUMERS.getCode();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/QueueRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/QueueRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/QueueRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+
+public interface QueueRegistry
+{
+ void registerQueue(AMQQueue queue) throws AMQException;
+
+ void unregisterQueue(String name) throws AMQException;
+
+ AMQQueue getQueue(String name);
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/QueueRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public interface Subscription
+{
+ void send(AMQMessage msg, AMQQueue queue) throws AMQException;
+
+ boolean isSuspended();
+
+ void queueDeleted(AMQQueue queue);
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/Subscription.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+
+/**
+ * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
+ * factory primarily assists testing although in future more sophisticated subscribers may need a different
+ * subscription implementation.
+ *
+ * @see org.apache.qpid.server.queue.AMQQueue
+ */
+public interface SubscriptionFactory
+{
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
+ throws AMQException;
+
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag)
+ throws AMQException;
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,172 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+
+/**
+ * Encapsulation of a supscription to a queue.
+ * <p/>
+ * Ties together the protocol session of a subscriber, the consumer tag that
+ * was given out by the broker and the channel id.
+ * <p/>
+ */
+public class SubscriptionImpl implements Subscription
+{
+ private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
+
+ public final AMQChannel channel;
+
+ public final AMQProtocolSession protocolSession;
+
+ public final String consumerTag;
+
+ private final Object sessionKey;
+
+ /**
+ * True if messages need to be acknowledged
+ */
+ private final boolean _acks;
+
+ public static class Factory implements SubscriptionFactory
+ {
+ public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
+ throws AMQException
+ {
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+ }
+
+ public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+ throws AMQException
+ {
+ return new SubscriptionImpl(channel, protocolSession, consumerTag);
+ }
+ }
+
+ public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+ String consumerTag, boolean acks)
+ throws AMQException
+ {
+ AMQChannel channel = protocolSession.getChannel(channelId);
+ if (channel == null) {
+ throw new NullPointerException("channel not found in protocol session");
+ }
+
+ this.channel = channel;
+ this.protocolSession = protocolSession;
+ this.consumerTag = consumerTag;
+ sessionKey = protocolSession.getKey();
+ _acks = acks;
+ }
+
+ public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
+ String consumerTag)
+ throws AMQException
+ {
+ this(channel, protocolSession, consumerTag, false);
+ }
+
+ public boolean equals(Object o)
+ {
+ return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
+ }
+
+ /**
+ * Equality holds if the session matches and the channel and consumer tag are the same.
+ */
+ private boolean equals(SubscriptionImpl psc)
+ {
+ return sessionKey.equals(psc.sessionKey)
+ && psc.channel == channel
+ && psc.consumerTag.equals(consumerTag);
+ }
+
+ public int hashCode()
+ {
+ return sessionKey.hashCode();
+ }
+
+ public String toString()
+ {
+ return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
+ }
+
+ public void send(AMQMessage msg, AMQQueue queue) throws AMQException
+ {
+ if (msg != null)
+ {
+ final long deliveryTag = channel.getNextDeliveryTag();
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+ protocolSession.writeFrame(frame);
+ // if we do not need to wait for client acknowledgements we can decrement
+ // the reference count immediately
+ if (!_acks)
+ {
+ msg.decrementReference();
+ msg.dequeue(queue);
+ }
+ else
+ {
+ //move the msg to the back of the persistently recorded queue while
+ //witing for ack
+ msg.requeue(queue);
+ }
+ }
+ else
+ {
+ _logger.error("Attempt to send Null message", new NullPointerException());
+ }
+ }
+
+ public boolean isSuspended()
+ {
+ return channel.isSuspended();
+ }
+
+ /**
+ * Callback indicating that a queue has been deleted.
+ * @param queue
+ */
+ public void queueDeleted(AMQQueue queue)
+ {
+ channel.queueDeleted(queue);
+ }
+
+ private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+ {
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
+ deliveryTag, false, exchange,
+ routingKey);
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+/**
+ * Abstraction of actor that will determine the subscriber to whom
+ * a message will be sent.
+ */
+public interface SubscriptionManager
+{
+ public boolean hasActiveSubscribers();
+ public Subscription nextSubscriber(AMQMessage msg);
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Holds a set of subscriptions for a queue and manages the round
+ * robin-ing of deliver etc.
+ */
+class SubscriptionSet implements WeightedSubscriptionManager
+{
+ private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
+
+ /**
+ * List of registered subscribers
+ */
+ private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
+
+ /**
+ * Used to control the round robin delivery of content
+ */
+ private int _currentSubscriber;
+
+ /**
+ * Accessor for unit tests.
+ */
+ int getCurrentSubscriber()
+ {
+ return _currentSubscriber;
+ }
+
+ public void addSubscriber(Subscription subscription)
+ {
+ _subscriptions.add(subscription);
+ }
+
+ /**
+ * Remove the subscription, returning it if it was found
+ * @param subscription
+ * @return null if no match was found
+ */
+ public Subscription removeSubscriber(Subscription subscription)
+ {
+ boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here.
+ if (isRemoved)
+ {
+ return subscription;
+ }
+ else
+ {
+ debugDumpSubscription(subscription);
+ return null;
+ }
+ }
+
+ private void debugDumpSubscription(Subscription subscription)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Subscription " + subscription + " not found. Dumping subscriptions:");
+ for (Subscription s : _subscriptions)
+ {
+ _log.debug("Subscription: " + s);
+ }
+ _log.debug("Subscription dump complete");
+ }
+ }
+
+ /**
+ * Return the next unsuspended subscription or null if not found.
+ *
+ * Performance note:
+ * This method can scan all items twice when looking for a subscription that is not
+ * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
+ * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of
+ * race conditions and when subscriptions are removed between calls to nextSubscriber, the
+ * IndexOutOfBoundsException also causes the scan to start at the beginning.
+ */
+ public Subscription nextSubscriber(AMQMessage msg)
+ {
+ if (_subscriptions.isEmpty())
+ {
+ return null;
+ }
+
+ try {
+ final Subscription result = nextSubscriber();
+ if (result == null) {
+ _currentSubscriber = 0;
+ return nextSubscriber();
+ } else {
+ return result;
+ }
+ } catch (IndexOutOfBoundsException e) {
+ _currentSubscriber = 0;
+ return nextSubscriber();
+ }
+ }
+
+ private Subscription nextSubscriber()
+ {
+ final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+ while (iterator.hasNext()) {
+ Subscription subscription = iterator.next();
+ ++_currentSubscriber;
+ subscriberScanned();
+ if (!subscription.isSuspended()) {
+ return subscription;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Overridden in test classes.
+ */
+ protected void subscriberScanned()
+ {
+ }
+
+ public boolean isEmpty()
+ {
+ return _subscriptions.isEmpty();
+ }
+
+ public boolean hasActiveSubscribers()
+ {
+ for (Subscription s : _subscriptions)
+ {
+ if (!s.isSuspended()) return true;
+ }
+ return false;
+ }
+
+ public int getWeight()
+ {
+ int count = 0;
+ for (Subscription s : _subscriptions)
+ {
+ if (!s.isSuspended()) count++;
+ }
+ return count;
+ }
+
+ /**
+ * Notification that a queue has been deleted. This is called so that the subscription can inform the
+ * channel, which in turn can update its list of unacknowledged messages.
+ * @param queue
+ */
+ public void queueDeleted(AMQQueue queue)
+ {
+ for (Subscription s : _subscriptions)
+ {
+ s.queueDeleted(queue);
+ }
+ }
+
+ int size() {
+ return _subscriptions.size();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/WeightedSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/WeightedSubscriptionManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/WeightedSubscriptionManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/WeightedSubscriptionManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface WeightedSubscriptionManager extends SubscriptionManager
+{
+ public int getWeight();
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/WeightedSubscriptionManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,108 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.registry;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.Configurator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An abstract application registry that provides access to configuration information and handles the
+ * construction and caching of configurable objects.
+ *
+ * Subclasses should handle the construction of the "registered objects" such as the exchange registry.
+ *
+ */
+public abstract class ApplicationRegistry implements IApplicationRegistry
+{
+ private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
+
+ private static IApplicationRegistry _instance;
+
+ private final Map<Class<?>, Object> _configuredObjects = new HashMap<Class<?>, Object>();
+
+ protected final Configuration _configuration;
+
+ private static class ShutdownService implements Runnable
+ {
+ public void run()
+ {
+ _logger.info("Shutting down application registry...");
+ try
+ {
+ _instance.getMessageStore().close();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error shutting down message store: " + e, e);
+ }
+ }
+ }
+
+ public static void initialise(IApplicationRegistry instance) throws Exception
+ {
+ _instance = instance;
+ instance.initialise();
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
+ }
+
+ protected ApplicationRegistry(Configuration configuration)
+ {
+ _configuration = configuration;
+ }
+
+ public static IApplicationRegistry getInstance()
+ {
+ if (_instance == null)
+ {
+ throw new RuntimeException("Application registry not initialised");
+ }
+ else
+ {
+ return _instance;
+ }
+ }
+
+ public Configuration getConfiguration()
+ {
+ return _configuration;
+ }
+
+ public <T> T getConfiguredObject(Class<T> instanceType)
+ {
+ T instance = (T) _configuredObjects.get(instanceType);
+ if (instance == null)
+ {
+ try
+ {
+ instance = instanceType.newInstance();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ }
+ Configurator.configure(instance);
+ _configuredObjects.put(instanceType, instance);
+ }
+ return instance;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ApplicationRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,155 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.registry;
+
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.JMXManagedObjectRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.ManagementConfiguration;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.security.auth.SASLAuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+
+import java.io.File;
+
+public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
+{
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private ManagedObjectRegistry _managedObjectRegistry;
+
+ private AuthenticationManager _authenticationManager;
+
+ private MessageStore _messageStore;
+
+ public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
+ {
+ super(config(configurationURL));
+ }
+
+ // Our configuration class needs to make the interpolate method
+ // public so it can be called below from the config method.
+ private static class MyConfiguration extends CompositeConfiguration {
+ public String interpolate(String obj) {
+ return super.interpolate(obj);
+ }
+ }
+
+ private static final Configuration config(File url) throws ConfigurationException {
+ // We have to override the interpolate methods so that
+ // interpolation takes place accross the entirety of the
+ // composite configuration. Without doing this each
+ // configuration object only interpolates variables defined
+ // inside itself.
+ final MyConfiguration conf = new MyConfiguration();
+ conf.addConfiguration(new SystemConfiguration() {
+ protected String interpolate(String o) {
+ return conf.interpolate(o);
+ }
+ });
+ conf.addConfiguration(new XMLConfiguration(url) {
+ protected String interpolate(String o) {
+ return conf.interpolate(o);
+ }
+ });
+ return conf;
+ }
+
+ public void initialise() throws Exception
+ {
+ initialiseManagedObjectRegistry();
+ _queueRegistry = new DefaultQueueRegistry();
+ _exchangeFactory = new DefaultExchangeFactory();
+ _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _authenticationManager = new SASLAuthenticationManager();
+ initialiseMessageStore();
+ }
+
+ private void initialiseManagedObjectRegistry()
+ {
+ ManagementConfiguration config = getConfiguredObject(ManagementConfiguration.class);
+ if (config.enabled)
+ {
+ _managedObjectRegistry = new JMXManagedObjectRegistry();
+ }
+ else
+ {
+ _managedObjectRegistry = new NoopManagedObjectRegistry();
+ }
+ }
+
+ private void initialiseMessageStore() throws Exception
+ {
+ String messageStoreClass = _configuration.getString("store.class");
+ Class clazz = Class.forName(messageStoreClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof MessageStore))
+ {
+ throw new Exception("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
+ }
+ _messageStore = (MessageStore) o;
+ _messageStore.configure(getQueueRegistry(), "store", _configuration);
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return _managedObjectRegistry;
+ }
+
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return _authenticationManager;
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/IApplicationRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/IApplicationRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/IApplicationRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.registry;
+
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.commons.configuration.Configuration;
+
+public interface IApplicationRegistry
+{
+ /**
+ * Initialise the application registry. All initialisation must be done in this method so that any components
+ * that need access to the application registry itself for initialisation are able to use it. Attempting to
+ * initialise in the constructor will lead to failures since the registry reference will not have been set.
+ */
+ void initialise() throws Exception;
+
+ /**
+ * This gets access to a "configured object". A configured object has fields populated from a the configuration
+ * object (Commons Configuration) automatically, where it has the appropriate attributes defined on fields.
+ * Application registry implementations can choose the refresh strategy or caching approach.
+ * @param instanceType the type of object you want initialised. This must be unique - i.e. you can only
+ * have a single object of this type in the system.
+ * @return the configured object
+ */
+ <T> T getConfiguredObject(Class<T> instanceType);
+
+ /**
+ * Get the low level configuration. For use cases where the configured object approach is not required
+ * you can get the complete configuration information.
+ * @return a Commons Configuration instance
+ */
+ Configuration getConfiguration();
+
+ QueueRegistry getQueueRegistry();
+
+ ExchangeRegistry getExchangeRegistry();
+
+ ExchangeFactory getExchangeFactory();
+
+ ManagedObjectRegistry getManagedObjectRegistry();
+
+ AuthenticationManager getAuthenticationManager();
+
+ MessageStore getMessageStore();
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/registry/IApplicationRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+public interface AuthenticationManager
+{
+ String getMechanisms();
+
+ SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
+
+ AuthenticationResult authenticate(SaslServer server, byte[] response);
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationProviderInitialiser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationProviderInitialiser.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationProviderInitialiser.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationProviderInitialiser.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import org.apache.commons.configuration.Configuration;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslServerFactory;
+import java.util.Map;
+
+public interface AuthenticationProviderInitialiser
+{
+ /**
+ * @return the mechanism's name. This will be used in the list of mechanism's advertised to the
+ * client.
+ */
+ String getMechanismName();
+
+ /**
+ * Initialise the authentication provider.
+ * @param baseConfigPath the path in the config file that points to any config options for this provider. Each
+ * provider can have its own set of configuration options
+ * @param configuration the Apache Commons Configuration instance used to configure this provider
+ * @param principalDatabases the set of principal databases that are available
+ */
+ void initialise(String baseConfigPath, Configuration configuration,
+ Map<String, PrincipalDatabase> principalDatabases) throws Exception;
+
+ /**
+ * @return the callback handler that should be used to process authentication requests for this mechanism. This will
+ * be called after initialise and will be stored by the authentication manager. The callback handler <b>must</b> be
+ * fully threadsafe.
+ */
+ CallbackHandler getCallbackHandler();
+
+ /**
+ * Get the properties that must be passed in to the Sasl.createSaslServer method.
+ * @return the properties, which may be null
+ */
+ Map<String, ?> getProperties();
+
+ /**
+ * Get the class that is the server factory. This is used for the JCA registration.
+ * @return null if no JCA registration is required, otherwise return the class
+ * that will be used in JCA registration
+ */
+ Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration();
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationProviderInitialiser.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationResult.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationResult.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationResult.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationResult.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+public class AuthenticationResult
+{
+ public enum AuthenticationStatus
+ {
+ SUCCESS, CONTINUE, ERROR
+ }
+
+ public AuthenticationStatus status;
+ public byte[] challenge;
+
+ public AuthenticationResult(byte[] challenge, AuthenticationStatus status)
+ {
+ this.status = status;
+ this.challenge = challenge;
+ }
+
+ public AuthenticationResult(AuthenticationStatus status)
+ {
+ this.status = status;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/AuthenticationResult.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/CRAMMD5Initialiser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/CRAMMD5Initialiser.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/CRAMMD5Initialiser.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/CRAMMD5Initialiser.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import javax.security.sasl.SaslServerFactory;
+
+public class CRAMMD5Initialiser extends UsernamePasswordInitialiser
+{
+ public String getMechanismName()
+ {
+ return "CRAM-MD5";
+ }
+
+ public Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration()
+ {
+ // since the CRAM-MD5 provider is registered as part of the JDK, we do not
+ // return the factory class here since we do not need to register it ourselves.
+ return null;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/CRAMMD5Initialiser.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/JCAProvider.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/JCAProvider.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/JCAProvider.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/JCAProvider.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import javax.security.sasl.SaslServerFactory;
+import java.security.Provider;
+import java.security.Security;
+import java.util.Map;
+
+public final class JCAProvider extends Provider
+{
+ public JCAProvider(Map<String, Class<? extends SaslServerFactory>> providerMap)
+ {
+ super("AMQSASLProvider", 1.0, "A JCA provider that registers all " +
+ "AMQ SASL providers that want to be registered");
+ register(providerMap);
+ Security.addProvider(this);
+ }
+
+ private void register(Map<String, Class<? extends SaslServerFactory>> providerMap)
+ {
+ for (Map.Entry<String, Class<? extends SaslServerFactory>> me :
+ providerMap.entrySet())
+ {
+ put("SaslServerFactory." + me.getKey(), me.getValue().getName());
+ }
+ }
+}
\ No newline at end of file
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/JCAProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PasswordFilePrincipalDatabase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PasswordFilePrincipalDatabase.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PasswordFilePrincipalDatabase.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PasswordFilePrincipalDatabase.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,130 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import org.apache.log4j.Logger;
+
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.AccountNotFoundException;
+import java.security.Principal;
+import java.io.*;
+import java.util.regex.Pattern;
+
+/**
+ * Represents a user database where the account information is stored in a simple flat file.
+ *
+ * The file is expected to be in the form:
+ * username:password
+ * username1:password1
+ * ...
+ * usernamen:passwordn
+ *
+ * where a carriage return separates each username/password pair. Passwords are assumed to be in
+ * plain text.
+ *
+ */
+public class PasswordFilePrincipalDatabase implements PrincipalDatabase
+{
+ private static final Logger _logger = Logger.getLogger(PasswordFilePrincipalDatabase.class);
+
+ private File _passwordFile;
+
+ private Pattern _regexp = Pattern.compile(":");
+
+ public PasswordFilePrincipalDatabase()
+ {
+ }
+
+ public void setPasswordFile(String passwordFile) throws FileNotFoundException
+ {
+ File f = new File(passwordFile);
+ _logger.info("PasswordFilePrincipalDatabase using file " + f.getAbsolutePath());
+ _passwordFile = f;
+ if (!f.exists())
+ {
+ throw new FileNotFoundException("Cannot find password file " + f);
+ }
+ if (!f.canRead())
+ {
+ throw new FileNotFoundException("Cannot read password file " + f +
+ ". Check permissions.");
+ }
+ }
+
+ public void setPassword(Principal principal, PasswordCallback callback) throws IOException,
+ AccountNotFoundException
+ {
+ if (_passwordFile == null)
+ {
+ throw new AccountNotFoundException("Unable to locate principal since no password file was specified during initialisation");
+ }
+ if (principal == null)
+ {
+ throw new IllegalArgumentException("principal must not be null");
+ }
+ char[] pwd = lookupPassword(principal.getName());
+ if (pwd != null)
+ {
+ callback.setPassword(pwd);
+ }
+ else
+ {
+ throw new AccountNotFoundException("No account found for principal " + principal);
+ }
+ }
+
+ /**
+ * Looks up the password for a specified user in the password file.
+ * Note this code is <b>not</b> secure since it creates strings of passwords. It should be modified
+ * to create only char arrays which get nulled out.
+ * @param name
+ * @return
+ * @throws IOException
+ */
+ private char[] lookupPassword(String name) throws IOException
+ {
+ BufferedReader reader = null;
+ try
+ {
+ reader = new BufferedReader(new FileReader(_passwordFile));
+ String line;
+
+ while ((line = reader.readLine()) != null)
+ {
+ String[] result = _regexp.split(line);
+ if (result == null || result.length < 2)
+ {
+ continue;
+ }
+
+ if (name.equals(result[0]))
+ {
+ return result[1].toCharArray();
+ }
+ }
+ return null;
+ }
+ finally
+ {
+ if (reader != null)
+ {
+ reader.close();
+ }
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PasswordFilePrincipalDatabase.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PrincipalDatabase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PrincipalDatabase.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PrincipalDatabase.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PrincipalDatabase.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.AccountNotFoundException;
+import java.security.Principal;
+import java.io.IOException;
+
+/**
+ * Represents a "user database" which is really a way of storing principals (i.e. usernames) and
+ * passwords.
+ */
+public interface PrincipalDatabase
+{
+ /**
+ * Set the password for a given principal in the specified callback. This is used for certain
+ * SASL providers. The user database implementation should look up the password in any way it
+ * chooses and set it in the callback by calling its setPassword method.
+ * @param principal the principal
+ * @param callback the password callback that wants to receive the password
+ * @throws AccountNotFoundException if the account for specified principal could not be found
+ * @throws IOException if there was an error looking up the principal
+ */
+ void setPassword(Principal principal, PasswordCallback callback)
+ throws IOException, AccountNotFoundException;
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/PrincipalDatabase.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,224 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.configuration.PropertyUtils;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.security.Security;
+
+public class SASLAuthenticationManager implements AuthenticationManager
+{
+ private static final Logger _log = Logger.getLogger(SASLAuthenticationManager.class);
+
+ /**
+ * The list of mechanisms, in the order in which they are configured (i.e. preferred order)
+ */
+ private String _mechanisms;
+
+ /**
+ * Maps from the mechanism to the callback handler to use for handling those requests
+ */
+ private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
+
+ /**
+ * Maps from the mechanism to the properties used to initialise the server. See the method
+ * Sasl.createSaslServer for details of the use of these properties. This map is populated during initialisation
+ * of each provider.
+ */
+ private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
+
+ public SASLAuthenticationManager() throws Exception
+ {
+ _log.info("Initialising SASL authentication manager");
+ Map<String, PrincipalDatabase> databases = initialisePrincipalDatabases();
+ initialiseAuthenticationMechanisms(databases);
+ }
+
+ private Map<String, PrincipalDatabase> initialisePrincipalDatabases() throws Exception
+ {
+ Configuration config = ApplicationRegistry.getInstance().getConfiguration();
+ List<String> databaseNames = config.getList("security.principal-databases.principal-database.name");
+ List<String> databaseClasses = config.getList("security.principal-databases.principal-database.class");
+ Map<String, PrincipalDatabase> databases = new HashMap<String, PrincipalDatabase>();
+ for (int i = 0; i < databaseNames.size(); i++)
+ {
+ Object o;
+ try
+ {
+ o = Class.forName(databaseClasses.get(i)).newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new Exception("Error initialising principal database: " + e, e);
+ }
+
+ if (!(o instanceof PrincipalDatabase))
+ {
+ throw new Exception("Principal databases must implement the PrincipalDatabase interface");
+ }
+
+ initialisePrincipalDatabase((PrincipalDatabase) o, config, i);
+
+ String name = databaseNames.get(i);
+ if (name == null || name.length() == 0)
+ {
+ throw new Exception("Principal database names must have length greater than or equal to one character");
+ }
+ PrincipalDatabase pd = databases.get(name);
+ if (pd != null)
+ {
+ throw new Exception("Duplicate principal database name provided");
+ }
+ _log.info("Initialised principal database " + name + " successfully");
+ databases.put(name, (PrincipalDatabase) o);
+ }
+ return databases;
+ }
+
+ private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, Configuration config, int index)
+ throws Exception
+ {
+ String baseName = "security.principal-databases.principal-database(" + index + ").attributes.attribute.";
+ List<String> argumentNames = config.getList(baseName + "name");
+ List<String> argumentValues = config.getList(baseName + "value");
+ for (int i = 0; i < argumentNames.size(); i++)
+ {
+ String argName = argumentNames.get(i);
+ if (argName == null || argName.length() == 0)
+ {
+ throw new Exception("Argument names must have length >= 1 character");
+ }
+ if (Character.isLowerCase(argName.charAt(0)))
+ {
+ argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
+ }
+ String methodName = "set" + argName;
+ Method method = principalDatabase.getClass().getMethod(methodName, String.class);
+ if (method == null)
+ {
+ throw new Exception("No method " + methodName + " found in class " + principalDatabase.getClass() +
+ " hence unable to configure principal database. The method must be public and " +
+ "have a single String argument with a void return type");
+ }
+ method.invoke(principalDatabase, PropertyUtils.replaceProperties(argumentValues.get(i)));
+ }
+ }
+
+ private void initialiseAuthenticationMechanisms(Map<String, PrincipalDatabase> databases) throws Exception
+ {
+ Configuration config = ApplicationRegistry.getInstance().getConfiguration();
+ List<String> mechanisms = config.getList("security.sasl.mechanisms.mechanism.initialiser.class");
+
+ // Maps from the mechanism to the properties used to initialise the server. See the method
+ // Sasl.createSaslServer for details of the use of these properties. This map is populated during initialisation
+ // of each provider.
+ Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
+
+ for (int i = 0; i < mechanisms.size(); i++)
+ {
+ String baseName = "security.sasl.mechanisms.mechanism(" + i + ").initialiser";
+ String clazz = config.getString(baseName + ".class");
+ initialiseAuthenticationMechanism(baseName, clazz, databases, config, providerMap);
+ }
+ if (providerMap.size() > 0)
+ {
+ Security.addProvider(new JCAProvider(providerMap));
+ }
+ }
+
+ private void initialiseAuthenticationMechanism(String baseName, String clazz,
+ Map<String, PrincipalDatabase> databases,
+ Configuration configuration,
+ Map<String, Class<? extends SaslServerFactory>> providerMap)
+ throws Exception
+ {
+ Class initialiserClazz = Class.forName(clazz);
+ Object o = initialiserClazz.newInstance();
+ if (!(o instanceof AuthenticationProviderInitialiser))
+ {
+ throw new Exception("The class " + clazz + " must be an instance of " +
+ AuthenticationProviderInitialiser.class);
+ }
+ AuthenticationProviderInitialiser initialiser = (AuthenticationProviderInitialiser) o;
+ initialiser.initialise(baseName, configuration, databases);
+ String mechanism = initialiser.getMechanismName();
+ if (_mechanisms == null)
+ {
+ _mechanisms = mechanism;
+ }
+ else
+ {
+ // simple append should be fine since the number of mechanisms is small and this is a one time initialisation
+ _mechanisms = _mechanisms + " " + mechanism;
+ }
+ _callbackHandlerMap.put(mechanism, initialiser.getCallbackHandler());
+ _serverCreationProperties.put(mechanism, initialiser.getProperties());
+ Class<? extends SaslServerFactory> factory = initialiser.getServerFactoryClassForJCARegistration();
+ if (factory != null)
+ {
+ providerMap.put(mechanism, factory);
+ }
+ _log.info("Initialised " + mechanism + " SASL provider successfully");
+ }
+
+ public String getMechanisms()
+ {
+ return _mechanisms;
+ }
+
+ public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
+ {
+ return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
+ _callbackHandlerMap.get(mechanism));
+ }
+
+ public AuthenticationResult authenticate(SaslServer server, byte[] response)
+ {
+ try
+ {
+ // Process response from the client
+ byte[] challenge = server.evaluateResponse(response != null ? response : new byte[0]);
+
+ if (server.isComplete())
+ {
+ return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.SUCCESS);
+ }
+ else
+ {
+ return new AuthenticationResult(challenge, AuthenticationResult.AuthenticationStatus.CONTINUE);
+ }
+ }
+ catch (SaslException e)
+ {
+ return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java
------------------------------------------------------------------------------
svn:eol-style = native