You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [23/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Oct 21 01:19:00 2011
@@ -52,17 +52,6 @@ public interface QueueEntry extends Comp
}
public abstract State getState();
-
- /**
- * Returns true if state is either DEQUEUED or DELETED.
- *
- * @return true if state is either DEQUEUED or DELETED.
- */
- public boolean isDispensed()
- {
- State currentState = getState();
- return currentState == State.DEQUEUED || currentState == State.DELETED;
- }
}
@@ -202,7 +191,11 @@ public interface QueueEntry extends Comp
void reject();
- boolean isRejectedBy(long subscriptionId);
+ void reject(Subscription subscription);
+
+ boolean isRejectedBy(Subscription subscription);
+
+ void requeue(Subscription subscription);
void dequeue();
@@ -216,18 +209,4 @@ public interface QueueEntry extends Comp
void addStateChangeListener(StateChangeListener listener);
boolean removeStateChangeListener(StateChangeListener listener);
-
- /**
- * Returns true if entry is in DEQUEUED state, otherwise returns false.
- *
- * @return true if entry is in DEQUEUED state, otherwise returns false
- */
- boolean isDequeued();
-
- /**
- * Returns true if entry is either DEQUED or DELETED state.
- *
- * @return true if entry is either DEQUED or DELETED state
- */
- boolean isDispensed();
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Oct 21 01:19:00 2011
@@ -51,7 +51,7 @@ public class QueueEntryImpl implements Q
private MessageReference _message;
- private Set<Long> _rejectedBy = null;
+ private Set<Subscription> _rejectedBy = null;
private volatile EntryState _state = AVAILABLE_STATE;
@@ -325,16 +325,19 @@ public class QueueEntryImpl implements Q
public void reject()
{
- Subscription subscription = getDeliveredSubscription();
+ reject(getDeliveredSubscription());
+ }
+ public void reject(Subscription subscription)
+ {
if (subscription != null)
{
if (_rejectedBy == null)
{
- _rejectedBy = new HashSet<Long>();
+ _rejectedBy = new HashSet<Subscription>();
}
- _rejectedBy.add(subscription.getSubscriptionID());
+ _rejectedBy.add(subscription);
}
else
{
@@ -342,12 +345,12 @@ public class QueueEntryImpl implements Q
}
}
- public boolean isRejectedBy(long subscriptionId)
+ public boolean isRejectedBy(Subscription subscription)
{
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
- return _rejectedBy.contains(subscriptionId);
+ return _rejectedBy.contains(subscription);
}
else // This messasge hasn't been rejected yet.
{
@@ -355,6 +358,15 @@ public class QueueEntryImpl implements Q
}
}
+ public void requeue(Subscription subscription)
+ {
+ getQueue().requeue(this, subscription);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
+ }
+
public void dequeue()
{
EntryState state = _state;
@@ -496,7 +508,7 @@ public class QueueEntryImpl implements Q
{
QueueEntryImpl next = nextNode();
- while(next != null && next.isDispensed() )
+ while(next != null && next.isDeleted())
{
final QueueEntryImpl newNext = next.nextNode();
@@ -544,14 +556,4 @@ public class QueueEntryImpl implements Q
return _queueEntryList;
}
- public boolean isDequeued()
- {
- return _state == DEQUEUED_STATE;
- }
-
- public boolean isDispensed()
- {
- return _state.isDispensed();
- }
-
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Oct 21 01:19:00 2011
@@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -83,7 +83,7 @@ public class SimpleAMQQueue implements A
/** null means shared */
private final AMQShortString _owner;
- private AuthorizationHolder _authorizationHolder;
+ private PrincipalHolder _prinicpalHolder;
private boolean _exclusive = false;
private AMQSessionModel _exclusiveOwner;
@@ -102,7 +102,9 @@ public class SimpleAMQQueue implements A
protected final QueueEntryList _entries;
- protected final SubscriptionList _subscriptionList = new SubscriptionList();
+ protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
+
+ private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
private volatile Subscription _exclusiveSubscriber;
@@ -371,14 +373,14 @@ public class SimpleAMQQueue implements A
return _owner;
}
- public AuthorizationHolder getAuthorizationHolder()
+ public PrincipalHolder getPrincipalHolder()
{
- return _authorizationHolder;
+ return _prinicpalHolder;
}
- public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
+ public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
{
- _authorizationHolder = authorizationHolder;
+ _prinicpalHolder = prinicpalHolder;
}
@@ -600,25 +602,25 @@ public class SimpleAMQQueue implements A
iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
- SubscriptionList.SubscriptionNode nextNode = node.findNext();
+ SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+ SubscriptionList.SubscriptionNode nextNode = node.getNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _subscriptionList.getHead().getNext();
}
while (nextNode != null)
{
- if (_subscriptionList.updateMarkedNode(node, nextNode))
+ if (_lastSubscriptionNode.compareAndSet(node, nextNode))
{
break;
}
else
{
- node = _subscriptionList.getMarkedNode();
- nextNode = node.findNext();
+ node = _lastSubscriptionNode.get();
+ nextNode = node.getNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _subscriptionList.getHead().getNext();
}
}
}
@@ -627,7 +629,7 @@ public class SimpleAMQQueue implements A
// this catches the case where we *just* miss an update
int loops = 2;
- while (entry.isAvailable() && loops != 0)
+ while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
{
if (nextNode == null)
{
@@ -640,13 +642,13 @@ public class SimpleAMQQueue implements A
Subscription sub = nextNode.getSubscription();
deliverToSubscription(sub, entry);
}
- nextNode = nextNode.findNext();
+ nextNode = nextNode.getNext();
}
}
- if (entry.isAvailable())
+ if (!(entry.isAcquired() || entry.isDeleted()))
{
checkSubscriptionsNotAheadOfDelivery(entry);
@@ -803,6 +805,24 @@ public class SimpleAMQQueue implements A
}
+ public void requeue(QueueEntryImpl entry, Subscription subscription)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+ }
+
public void dequeue(QueueEntry entry, Subscription sub)
{
decrementQueueCount();
@@ -940,7 +960,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node != null && !node.isDispensed())
+ if (node != null && !node.isDeleted())
{
entryList.add(node);
}
@@ -1044,7 +1064,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance() && !filter.filterComplete())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDispensed() && filter.accept(node))
+ if (!node.isDeleted() && filter.accept(node))
{
entryList.add(node);
}
@@ -1238,6 +1258,7 @@ public class SimpleAMQQueue implements A
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
+ && !node.isDeleted()
&& node.acquire())
{
dequeueEntry(node);
@@ -1267,7 +1288,7 @@ public class SimpleAMQQueue implements A
while (noDeletes && queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node.acquire())
+ if (!node.isDeleted() && node.acquire())
{
dequeueEntry(node);
noDeletes = false;
@@ -1297,7 +1318,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node.acquire())
+ if (!node.isDeleted() && node.acquire())
{
dequeueEntry(node, txn);
if(++count == request)
@@ -1564,7 +1585,7 @@ public class SimpleAMQQueue implements A
public void deliverAsync()
{
- QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
+ Runner runner = new Runner(_stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1583,6 +1604,52 @@ public class SimpleAMQQueue implements A
_asyncDelivery.execute(flusher);
}
+
+ private class Runner implements ReadWriteRunnable
+ {
+ String _name;
+ public Runner(long count)
+ {
+ _name = "QueueRunner-" + count + "-" + _logActor;
+ }
+
+ public void run()
+ {
+ String originalName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName(_name);
+ CurrentActor.set(_logActor);
+
+ processQueue(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
+ }
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+
+ public String toString()
+ {
+ return _name;
+ }
+ }
+
public void flushSubscription(Subscription sub) throws AMQException
{
// Access control
@@ -1651,7 +1718,7 @@ public class SimpleAMQQueue implements A
QueueEntry node = getNextAvailableEntry(sub);
- if (node != null && node.isAvailable())
+ if (node != null && !(node.isAcquired() || node.isDeleted()))
{
if (sub.hasInterest(node))
{
@@ -1712,7 +1779,7 @@ public class SimpleAMQQueue implements A
QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
- while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
+ while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
{
if (expired)
{
@@ -1741,40 +1808,14 @@ public class SimpleAMQQueue implements A
}
- /**
- * Used by queue Runners to asynchronously deliver messages to consumers.
- *
- * A queue Runner is started whenever a state change occurs, e.g when a new
- * message arrives on the queue and cannot be immediately delivered to a
- * subscription (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to subscriptions unsuspending) which are
- * capable of accepting/delivering all messages then these messages would
- * otherwise remain on the queue.
- *
- * processQueue should be running while there are messages on the queue AND
- * there are subscriptions that can deliver them. If there are no
- * subscriptions capable of delivering the remaining messages on the queue
- * then processQueue should stop to prevent spinning.
- *
- * Since processQueue is runs in a fixed size Executor, it should not run
- * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
- * incoming messages may not be able to be scheduled in the thread pool
- * because all threads are working on clearing down large queues). To solve
- * this problem, after an arbitrary number of message deliveries the
- * processQueue job stops iterating, resubmits itself to the executor, and
- * ends the current instance
- *
- * @param runner the Runner to schedule
- * @throws AMQException
- */
- public void processQueue(QueueRunner runner) throws AMQException
+ private void processQueue(Runnable runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
- boolean lastLoop = false;
- int iterations = MAX_ASYNC_DELIVERIES;
+ int extraLoops = 1;
+ long iterations = MAX_ASYNC_DELIVERIES;
_asynchronousRunner.compareAndSet(runner, null);
@@ -1791,14 +1832,12 @@ public class SimpleAMQQueue implements A
if (previousStateChangeCount != stateChangeCount)
{
- //further asynchronous delivery is required since the
- //previous loop. keep going if iteration slicing allows.
- lastLoop = false;
+ extraLoops = 1;
}
previousStateChangeCount = stateChangeCount;
- boolean allSubscriptionsDone = true;
- boolean subscriptionDone;
+ deliveryIncomplete = _subscriptionList.size() != 0;
+ boolean done;
SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
//iterate over the subscribers and try to advance their pointer
@@ -1808,25 +1847,30 @@ public class SimpleAMQQueue implements A
sub.getSendLock();
try
{
- //attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub);
- if (subscriptionDone)
+
+ done = attemptDelivery(sub);
+
+ if (done)
{
- //close autoClose subscriptions if we are not currently intent on continuing
- if (lastLoop && sub.isAutoClose())
+ if (extraLoops == 0)
{
- unregisterSubscription(sub);
+ deliveryIncomplete = false;
+ if (sub.isAutoClose())
+ {
+ unregisterSubscription(sub);
- sub.confirmAutoClose();
+ sub.confirmAutoClose();
+ }
+ }
+ else
+ {
+ extraLoops--;
}
}
else
{
- //this subscription can accept additional deliveries, so we must
- //keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
- lastLoop = false;
iterations--;
+ extraLoops = 1;
}
}
finally
@@ -1834,34 +1878,10 @@ public class SimpleAMQQueue implements A
sub.releaseSendLock();
}
}
-
- if(allSubscriptionsDone && lastLoop)
- {
- //We have done an extra loop already and there are again
- //again no further delivery attempts possible, only
- //keep going if state change demands it.
- deliveryIncomplete = false;
- }
- else if(allSubscriptionsDone)
- {
- //All subscriptions reported being done, but we have to do
- //an extra loop if the iterations are not exhausted and
- //there is still any work to be done
- deliveryIncomplete = _subscriptionList.size() != 0;
- lastLoop = true;
- }
- else
- {
- //some subscriptions can still accept more messages,
- //keep going if iteration count allows.
- lastLoop = false;
- deliveryIncomplete = true;
- }
-
_asynchronousRunner.set(null);
}
- // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
+ // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
@@ -1881,8 +1901,8 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- // Only process nodes that are not currently deleted and not dequeued
- if (!node.isDispensed())
+ // Only process nodes that are not currently deleted
+ if (!node.isDeleted())
{
// If the node has exired then aquire it
if (node.expired() && node.acquire())
@@ -2222,9 +2242,4 @@ public class SimpleAMQQueue implements A
}
}
}
-
- public LogActor getLogActor()
- {
- return _logActor;
- }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Fri Oct 21 01:19:00 2011
@@ -1,5 +1,6 @@
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -155,7 +156,7 @@ public class SimpleQueueEntryList implem
if(!atTail())
{
QueueEntryImpl nextNode = _lastNode.nextNode();
- while(nextNode.isDispensed() && nextNode.nextNode() != null)
+ while(nextNode.isDeleted() && nextNode.nextNode() != null)
{
nextNode = nextNode.nextNode();
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Fri Oct 21 01:19:00 2011
@@ -21,14 +21,9 @@
package org.apache.qpid.server.registry;
import java.net.InetSocketAddress;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
@@ -46,27 +41,23 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
import org.apache.qpid.server.logging.Log4jMessageLogger;
import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.AbstractRootMessageLogger;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
-import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.SecurityManager.SecurityConfiguration;
+import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
-import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.osgi.framework.BundleContext;
-
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -78,10 +69,12 @@ public abstract class ApplicationRegistr
{
protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
- private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null);
+ private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>();
protected final ServerConfiguration _configuration;
+ public static final int DEFAULT_INSTANCE = 1;
+
protected final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>();
protected ManagedObjectRegistry _managedObjectRegistry;
@@ -92,6 +85,8 @@ public abstract class ApplicationRegistr
protected SecurityManager _securityManager;
+ protected PrincipalDatabaseManager _databaseManager;
+
protected PluginManager _pluginManager;
protected ConfigurationManager _configurationManager;
@@ -107,12 +102,8 @@ public abstract class ApplicationRegistr
private BrokerConfig _broker;
private ConfigStore _configStore;
-
- private Timer _reportingTimer;
- private boolean _statisticsEnabled = false;
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- private BundleContext _bundleContext;
+ protected String _registryName;
static
{
@@ -123,54 +114,53 @@ public abstract class ApplicationRegistr
{
public void run()
{
- remove();
+ removeAll();
}
}
public static void initialise(IApplicationRegistry instance) throws Exception
{
- if(instance == null)
- {
- throw new IllegalArgumentException("ApplicationRegistry instance must not be null");
- }
+ initialise(instance, DEFAULT_INSTANCE);
+ }
- if(!_instance.compareAndSet(null, instance))
+ @SuppressWarnings("finally")
+ public static void initialise(IApplicationRegistry instance, int instanceID) throws Exception
+ {
+ if (instance != null)
{
- throw new IllegalStateException("An ApplicationRegistry is already initialised");
- }
-
- _logger.info("Initialising Application Registry(" + instance + ")");
-
-
- final ConfigStore store = ConfigStore.newInstance();
- store.setRoot(new SystemConfigImpl(store));
- instance.setConfigStore(store);
+ _logger.info("Initialising Application Registry(" + instance + "):" + instanceID);
+ _instanceMap.put(instanceID, instance);
- BrokerConfig broker = new BrokerConfigAdapter(instance);
+ final ConfigStore store = ConfigStore.newInstance();
+ store.setRoot(new SystemConfigImpl(store));
+ instance.setConfigStore(store);
- SystemConfig system = (SystemConfig) store.getRoot();
- system.addBroker(broker);
- instance.setBroker(broker);
+ BrokerConfig broker = new BrokerConfigAdapter(instance);
- try
- {
- instance.initialise();
- }
- catch (Exception e)
- {
- _instance.set(null);
+ SystemConfig system = (SystemConfig) store.getRoot();
+ system.addBroker(broker);
+ instance.setBroker(broker);
- //remove the Broker instance, then re-throw
try
{
- system.removeBroker(broker);
+ instance.initialise(instanceID);
}
- catch(Throwable t)
+ catch (Exception e)
{
- //ignore
+ _instanceMap.remove(instanceID);
+ try
+ {
+ system.removeBroker(broker);
+ }
+ finally
+ {
+ throw e;
+ }
}
-
- throw e;
+ }
+ else
+ {
+ remove(instanceID);
}
}
@@ -186,19 +176,35 @@ public abstract class ApplicationRegistr
public static boolean isConfigured()
{
- return _instance.get() != null;
+ return isConfigured(DEFAULT_INSTANCE);
+ }
+
+ public static boolean isConfigured(int instanceID)
+ {
+ return _instanceMap.containsKey(instanceID);
}
+ /** Method to cleanly shutdown the default registry running in this JVM */
public static void remove()
{
- IApplicationRegistry instance = _instance.getAndSet(null);
+ remove(DEFAULT_INSTANCE);
+ }
+
+ /**
+ * Method to cleanly shutdown specified registry running in this JVM
+ *
+ * @param instanceID the instance to shutdown
+ */
+ public static void remove(int instanceID)
+ {
try
{
+ IApplicationRegistry instance = _instanceMap.get(instanceID);
if (instance != null)
{
if (_logger.isInfoEnabled())
{
- _logger.info("Shutting down ApplicationRegistry(" + instance + ")");
+ _logger.info("Shutting down ApplicationRegistry(" + instanceID + "):" + instance);
}
instance.close();
instance.getBroker().getSystem().removeBroker(instance.getBroker());
@@ -206,19 +212,27 @@ public abstract class ApplicationRegistr
}
catch (Exception e)
{
- _logger.error("Error shutting down Application Registry(" + instance + "): " + e, e);
+ _logger.error("Error shutting down Application Registry(" + instanceID + "): " + e, e);
+ }
+ finally
+ {
+ _instanceMap.remove(instanceID);
}
}
- protected ApplicationRegistry(ServerConfiguration configuration)
+ /** Method to cleanly shutdown all registries currently running in this JVM */
+ public static void removeAll()
{
- this(configuration, null);
+ Object[] keys = _instanceMap.keySet().toArray();
+ for (Object k : keys)
+ {
+ remove((Integer) k);
+ }
}
- protected ApplicationRegistry(ServerConfiguration configuration, BundleContext bundleContext)
+ protected ApplicationRegistry(ServerConfiguration configuration)
{
_configuration = configuration;
- _bundleContext = bundleContext;
}
public void configure() throws ConfigurationException
@@ -227,7 +241,7 @@ public abstract class ApplicationRegistr
try
{
- _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory(), _bundleContext);
+ _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory());
}
catch (Exception e)
{
@@ -237,10 +251,11 @@ public abstract class ApplicationRegistr
_configuration.initialise();
}
- public void initialise() throws Exception
+ public void initialise(int instanceID) throws Exception
{
//Create the RootLogger to be used during broker operation
_rootMessageLogger = new Log4jMessageLogger(_configuration);
+ _registryName = String.valueOf(instanceID);
//Create the composite (log4j+SystemOut MessageLogger to be used during startup
RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger};
@@ -262,7 +277,11 @@ public abstract class ApplicationRegistr
_securityManager = new SecurityManager(_configuration, _pluginManager);
- _authenticationManager = createAuthenticationManager();
+ createDatabaseManager(_configuration);
+
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
+
+ _databaseManager.initialiseManagement(_configuration);
_managedObjectRegistry.start();
}
@@ -275,8 +294,6 @@ public abstract class ApplicationRegistr
try
{
initialiseVirtualHosts();
- initialiseStatistics();
- initialiseStatisticsReporting();
}
finally
{
@@ -285,51 +302,9 @@ public abstract class ApplicationRegistr
}
}
- /**
- * Iterates across all discovered authentication manager factories, offering the security configuration to each.
- * Expects <b>exactly</b> one authentication manager to configure and initialise itself.
- *
- * It is an error to configure more than one authentication manager, or to configure none.
- *
- * @return authentication manager
- * @throws ConfigurationException
- */
- protected AuthenticationManager createAuthenticationManager() throws ConfigurationException
+ protected void createDatabaseManager(ServerConfiguration configuration) throws Exception
{
- final SecurityConfiguration securityConfiguration = _configuration.getConfiguration(SecurityConfiguration.class.getName());
- final Collection<AuthenticationManagerPluginFactory<? extends Plugin>> factories = _pluginManager.getAuthenticationManagerPlugins().values();
-
- if (factories.size() == 0)
- {
- throw new ConfigurationException("No authentication manager factory plugins found. Check the desired authentication" +
- "manager plugin has been placed in the plugins directory.");
- }
-
- AuthenticationManager authMgr = null;
-
- for (final Iterator<AuthenticationManagerPluginFactory<? extends Plugin>> iterator = factories.iterator(); iterator.hasNext();)
- {
- final AuthenticationManagerPluginFactory<? extends Plugin> factory = (AuthenticationManagerPluginFactory<? extends Plugin>) iterator.next();
- final AuthenticationManager tmp = factory.newInstance(securityConfiguration);
- if (tmp != null)
- {
- if (authMgr != null)
- {
- throw new ConfigurationException("Cannot configure more than one authentication manager."
- + " Both " + tmp.getClass() + " and " + authMgr.getClass() + " are configured."
- + " Remove configuration for one of the authentication manager, or remove the plugin JAR"
- + " from the classpath.");
- }
- authMgr = tmp;
- }
- }
-
- if (authMgr == null)
- {
- throw new ConfigurationException("No authentication managers configured within the configure file.");
- }
-
- return authMgr;
+ _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration);
}
protected void initialiseVirtualHosts() throws Exception
@@ -345,88 +320,26 @@ public abstract class ApplicationRegistr
{
_managedObjectRegistry = new NoopManagedObjectRegistry();
}
-
- public void initialiseStatisticsReporting()
- {
- long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
- final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
- final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
- final boolean reset = _configuration.isStatisticsReportResetEnabled();
-
- /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
- if (report > 0L && (broker || virtualhost))
- {
- _reportingTimer = new Timer("Statistics-Reporting", true);
-
- class StatisticsReportingTask extends TimerTask
- {
- private final int DELIVERED = 0;
- private final int RECEIVED = 1;
-
- public void run()
- {
- CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
- public String getLogMessage()
- {
- return "[" + Thread.currentThread().getName() + "] ";
- }
- });
-
- if (broker)
- {
- CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
- }
-
- if (virtualhost)
- {
- for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
- {
- String name = vhost.getName();
- StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
- StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
- StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
- StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
-
- CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
- }
- }
-
- if (reset)
- {
- resetStatistics();
- }
- CurrentActor.remove();
- }
- }
-
- _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
- report / 2,
- report);
- }
+ public static IApplicationRegistry getInstance()
+ {
+ return getInstance(DEFAULT_INSTANCE);
}
- /**
- * Get the ApplicationRegistry
- * @return the IApplicationRegistry instance
- * @throws IllegalStateException if no registry instance has been initialised.
- */
- public static IApplicationRegistry getInstance() throws IllegalStateException
+ public static IApplicationRegistry getInstance(int instanceID)
{
- IApplicationRegistry iApplicationRegistry = _instance.get();
- if (iApplicationRegistry == null)
- {
- throw new IllegalStateException("No ApplicationRegistry has been initialised");
- }
- else
+ synchronized (IApplicationRegistry.class)
{
- return iApplicationRegistry;
+ IApplicationRegistry instance = _instanceMap.get(instanceID);
+
+ if (instance == null)
+ {
+ throw new IllegalStateException("Application Registry (" + instanceID + ") not created");
+ }
+ else
+ {
+ return instance;
+ }
}
}
@@ -456,12 +369,6 @@ public abstract class ApplicationRegistr
{
_logger.info("Shutting down ApplicationRegistry:" + this);
}
-
- //Stop Statistics Reporting
- if (_reportingTimer != null)
- {
- _reportingTimer.cancel();
- }
//Stop incoming connections
unbind();
@@ -469,6 +376,10 @@ public abstract class ApplicationRegistr
//Shutdown virtualhosts
close(_virtualHostRegistry);
+// close(_accessManager);
+//
+// close(_databaseManager);
+
close(_authenticationManager);
close(_managedObjectRegistry);
@@ -490,7 +401,7 @@ public abstract class ApplicationRegistr
try
{
- acceptor.getNetworkTransport().close();
+ acceptor.getNetworkDriver().close();
}
catch (Throwable e)
{
@@ -530,6 +441,11 @@ public abstract class ApplicationRegistr
return _managedObjectRegistry;
}
+ public PrincipalDatabaseManager getDatabaseManager()
+ {
+ return _databaseManager;
+ }
+
public AuthenticationManager getAuthenticationManager()
{
return _authenticationManager;
@@ -577,81 +493,9 @@ public abstract class ApplicationRegistr
public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
{
- VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig, null);
+ VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
_virtualHostRegistry.registerVirtualHost(virtualHost);
getBroker().addVirtualHost(virtualHost);
return virtualHost;
}
-
- public void registerMessageDelivered(long messageSize)
- {
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
-
- for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
- {
- vhost.resetStatistics();
- }
- }
-
- public void initialiseStatistics()
- {
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- getConfiguration().isStatisticsGenerationBrokerEnabled());
-
- _messagesDelivered = new StatisticsCounter("messages-delivered");
- _dataDelivered = new StatisticsCounter("bytes-delivered");
- _messagesReceived = new StatisticsCounter("messages-received");
- _dataReceived = new StatisticsCounter("bytes-received");
- }
-
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Fri Oct 21 01:19:00 2011
@@ -71,7 +71,7 @@ public class BrokerConfigAdapter impleme
public Integer getWorkerThreads()
{
- return _instance.getConfiguration().getConnectorProcessors();
+ return _instance.getConfiguration().getProcessors();
}
public Integer getMaxConnections()
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Fri Oct 21 01:19:00 2011
@@ -29,18 +29,12 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.management.JMXManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
-import org.osgi.framework.BundleContext;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
- this(configurationURL, null);
- }
-
- public ConfigurationFileApplicationRegistry(File configurationURL, BundleContext bundleContext) throws ConfigurationException
- {
- super(new ServerConfiguration(configurationURL), bundleContext);
+ super(new ServerConfiguration(configurationURL));
}
@Override
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Fri Oct 21 01:19:00 2011
@@ -33,20 +33,21 @@ import org.apache.qpid.server.logging.Ro
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public interface IApplicationRegistry extends StatisticsGatherer
+public interface IApplicationRegistry
{
/**
* Initialise the application registry. All initialisation must be done in this method so that any components
* that need access to the application registry itself for initialisation are able to use it. Attempting to
* initialise in the constructor will lead to failures since the registry reference will not have been set.
+ * @param instanceID the instanceID that we can use to identify this AR.
*/
- void initialise() throws Exception;
+ void initialise(int instanceID) throws Exception;
/**
* Shutdown this Registry
@@ -62,6 +63,8 @@ public interface IApplicationRegistry ex
ManagedObjectRegistry getManagedObjectRegistry();
+ PrincipalDatabaseManager getDatabaseManager();
+
AuthenticationManager getAuthenticationManager();
VirtualHostRegistry getVirtualHostRegistry();
@@ -94,6 +97,4 @@ public interface IApplicationRegistry ex
ConfigStore getConfigStore();
void setConfigStore(ConfigStore store);
-
- void initialiseStatisticsReporting();
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java Fri Oct 21 01:19:00 2011
@@ -18,19 +18,8 @@
*/
package org.apache.qpid.server.security;
-import static org.apache.qpid.server.security.access.ObjectType.EXCHANGE;
-import static org.apache.qpid.server.security.access.ObjectType.METHOD;
-import static org.apache.qpid.server.security.access.ObjectType.OBJECT;
-import static org.apache.qpid.server.security.access.ObjectType.QUEUE;
-import static org.apache.qpid.server.security.access.ObjectType.VIRTUALHOST;
-import static org.apache.qpid.server.security.access.Operation.ACCESS;
-import static org.apache.qpid.server.security.access.Operation.BIND;
-import static org.apache.qpid.server.security.access.Operation.CONSUME;
-import static org.apache.qpid.server.security.access.Operation.CREATE;
-import static org.apache.qpid.server.security.access.Operation.DELETE;
-import static org.apache.qpid.server.security.access.Operation.PUBLISH;
-import static org.apache.qpid.server.security.access.Operation.PURGE;
-import static org.apache.qpid.server.security.access.Operation.UNBIND;
+import static org.apache.qpid.server.security.access.ObjectType.*;
+import static org.apache.qpid.server.security.access.Operation.*;
import java.net.SocketAddress;
import java.security.Principal;
@@ -40,8 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import javax.security.auth.Subject;
-
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
@@ -50,9 +37,11 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ObjectProperties;
import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
/**
* The security manager contains references to all loaded {@link SecurityPlugin}s and delegates security decisions to them based
@@ -66,7 +55,7 @@ public class SecurityManager
private static final Logger _logger = Logger.getLogger(SecurityManager.class);
/** Container for the {@link Principal} that is using to this thread. */
- private static final ThreadLocal<Subject> _subject = new ThreadLocal<Subject>();
+ private static final ThreadLocal<Principal> _principal = new ThreadLocal<Principal>();
private PluginManager _pluginManager;
private Map<String, SecurityPluginFactory> _pluginFactories = new HashMap<String, SecurityPluginFactory>();
@@ -137,14 +126,19 @@ public class SecurityManager
configureHostPlugins(configuration);
}
- public static Subject getThreadSubject()
+ public static Principal getThreadPrincipal()
+ {
+ return _principal.get();
+ }
+
+ public static void setThreadPrincipal(Principal principal)
{
- return _subject.get();
+ _principal.set(principal);
}
- public static void setThreadSubject(final Subject subject)
+ public static void setThreadPrincipal(String authId)
{
- _subject.set(subject);
+ setThreadPrincipal(new UsernamePrincipal(authId));
}
public void configureHostPlugins(ConfigurationPlugin hostConfig) throws ConfigurationException
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Fri Oct 21 01:19:00 2011
@@ -149,9 +149,9 @@ public class ObjectProperties extends Ha
{
put(Property.OWNER, queue.getOwner());
}
- else if (queue.getAuthorizationHolder() != null)
+ else if (queue.getPrincipalHolder() != null)
{
- put(Property.OWNER, queue.getAuthorizationHolder().getAuthorizedPrincipal().getName());
+ put(Property.OWNER, queue.getPrincipalHolder().getPrincipal().getName());
}
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java Fri Oct 21 01:19:00 2011
@@ -20,93 +20,42 @@
*/
package org.apache.qpid.server.security.auth;
-import javax.security.auth.Subject;
-
-/**
- * Encapsulates the result of an attempt to authenticate.
- * <p>
- * The authentication status describes the overall outcome.
- * <p>
- * <ol>
- * <li>If authentication status is SUCCESS, the subject will be populated.
- * </li>
- * <li>If authentication status is CONTINUE, the authentication has failed because the user
- * supplied incorrect credentials (etc). If the authentication requires it, the next challenge
- * is made available.
- * </li>
- * <li>If authentication status is ERROR , the authentication decision could not be made due
- * to a failure (such as an external system), the {@link AuthenticationResult#getCause()}
- * will provide the underlying exception.
- * </li>
- * </ol>
- *
- */
public class AuthenticationResult
{
public enum AuthenticationStatus
{
- /** Authentication successful */
- SUCCESS,
- /** Authentication not successful due to credentials problem etc */
- CONTINUE,
- /** Problem prevented the authentication from being made e.g. failure of an external system */
- ERROR
+ SUCCESS, CONTINUE, ERROR
}
- public final AuthenticationStatus _status;
- public final byte[] _challenge;
- private final Exception _cause;
- private final Subject _subject;
+ public AuthenticationStatus status;
+ public byte[] challenge;
+
+ private Exception cause;
- public AuthenticationResult(final AuthenticationStatus status)
+ public AuthenticationResult(AuthenticationStatus status)
{
this(null, status, null);
}
- public AuthenticationResult(final byte[] challenge, final AuthenticationStatus status)
+ public AuthenticationResult(byte[] challenge, AuthenticationStatus status)
{
this(challenge, status, null);
}
- public AuthenticationResult(final AuthenticationStatus error, final Exception cause)
+ public AuthenticationResult(AuthenticationStatus error, Exception cause)
{
this(null, error, cause);
}
- public AuthenticationResult(final byte[] challenge, final AuthenticationStatus status, final Exception cause)
- {
- this._status = status;
- this._challenge = challenge;
- this._cause = cause;
- this._subject = null;
- }
-
- public AuthenticationResult(final Subject subject)
+ public AuthenticationResult(byte[] challenge, AuthenticationStatus status, Exception cause)
{
- this._status = AuthenticationStatus.SUCCESS;
- this._challenge = null;
- this._cause = null;
- this._subject = subject;
+ this.status = status;
+ this.challenge = challenge;
+ this.cause = cause;
}
public Exception getCause()
{
- return _cause;
- }
-
- public AuthenticationStatus getStatus()
- {
- return _status;
+ return cause;
}
-
- public byte[] getChallenge()
- {
- return _challenge;
- }
-
- public Subject getSubject()
- {
- return _subject;
- }
-
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,19 @@
*/
package org.apache.qpid.server.security.auth.management;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.security.AccessControlContext;
+import java.security.AccessController;
import java.security.Principal;
+import java.util.Enumeration;
import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
import javax.management.JMException;
import javax.management.openmbean.CompositeData;
@@ -34,13 +44,17 @@ import javax.management.openmbean.Simple
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
+import javax.management.remote.JMXPrincipal;
+import javax.security.auth.Subject;
import javax.security.auth.login.AccountNotFoundException;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.management.common.mbeans.UserManagement;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanInvocationHandlerImpl;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
@@ -51,18 +65,22 @@ public class AMQUserManagementMBean exte
private static final Logger _logger = Logger.getLogger(AMQUserManagementMBean.class);
private PrincipalDatabase _principalDatabase;
+ private Properties _accessRights;
+ private File _accessFile;
+
+ private ReentrantLock _accessRightsUpdate = new ReentrantLock();
// Setup for the TabularType
- private static final TabularType _userlistDataType; // Datatype for representing User Lists
- private static final CompositeType _userDataType; // Composite type for representing User
+ static TabularType _userlistDataType; // Datatype for representing User Lists
+ static CompositeType _userDataType; // Composite type for representing User
static
{
OpenType[] userItemTypes = new OpenType[4]; // User item types.
userItemTypes[0] = SimpleType.STRING; // For Username
- userItemTypes[1] = SimpleType.BOOLEAN; // For Rights - Read - No longer in use
- userItemTypes[2] = SimpleType.BOOLEAN; // For Rights - Write - No longer in use
- userItemTypes[3] = SimpleType.BOOLEAN; // For Rights - Admin - No longer is use
+ userItemTypes[1] = SimpleType.BOOLEAN; // For Rights - Read
+ userItemTypes[2] = SimpleType.BOOLEAN; // For Rights - Write
+ userItemTypes[3] = SimpleType.BOOLEAN; // For Rights - Admin
try
{
@@ -74,11 +92,12 @@ public class AMQUserManagementMBean exte
}
catch (OpenDataException e)
{
- _logger.error("Tabular data setup for viewing users incorrect.", e);
- throw new ExceptionInInitializerError("Tabular data setup for viewing users incorrect");
+ _logger.error("Tabular data setup for viewing users incorrect.");
+ _userlistDataType = null;
}
}
+
public AMQUserManagementMBean() throws JMException
{
super(UserManagement.class, UserManagement.TYPE);
@@ -91,23 +110,121 @@ public class AMQUserManagementMBean exte
public boolean setPassword(String username, String password)
{
+ return setPassword(username, password.toCharArray());
+ }
+
+ public boolean setPassword(String username, char[] password)
+ {
try
{
//delegate password changes to the Principal Database
- return _principalDatabase.updatePassword(new UsernamePrincipal(username), password.toCharArray());
+ return _principalDatabase.updatePassword(new UsernamePrincipal(username), password);
}
catch (AccountNotFoundException e)
{
- _logger.warn("Attempt to set password of non-existent user'" + username + "'");
+ _logger.warn("Attempt to set password of non-existant user'" + username + "'");
return false;
}
}
- public boolean createUser(String username, String password)
+ public boolean setRights(String username, boolean read, boolean write, boolean admin)
{
- if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password.toCharArray()))
+
+ Object oldRights = null;
+ if ((oldRights =_accessRights.get(username)) == null)
{
- return true;
+ // If the user doesn't exist in the access rights file check that they at least have an account.
+ if (_principalDatabase.getUser(username) == null)
+ {
+ return false;
+ }
+ }
+
+ try
+ {
+ _accessRightsUpdate.lock();
+
+ // Update the access rights
+ if (admin)
+ {
+ _accessRights.put(username, MBeanInvocationHandlerImpl.ADMIN);
+ }
+ else
+ {
+ if (read | write)
+ {
+ if (read)
+ {
+ _accessRights.put(username, MBeanInvocationHandlerImpl.READONLY);
+ }
+ if (write)
+ {
+ _accessRights.put(username, MBeanInvocationHandlerImpl.READWRITE);
+ }
+ }
+ else
+ {
+ _accessRights.remove(username);
+ }
+ }
+
+ //save the rights file
+ try
+ {
+ saveAccessFile();
+ }
+ catch (IOException e)
+ {
+ _logger.warn("Problem occured saving '" + _accessFile + "', the access right changes will not be preserved: " + e);
+
+ //the rights file was not successfully saved, restore user rights to previous value
+ _logger.warn("Reverting attempted rights update for user'" + username + "'");
+ if (oldRights != null)
+ {
+ _accessRights.put(username, oldRights);
+ }
+ else
+ {
+ _accessRights.remove(username);
+ }
+
+ return false;
+ }
+ }
+ finally
+ {
+ _accessRightsUpdate.unlock();
+ }
+
+ return true;
+ }
+
+ public boolean createUser(String username, String password, boolean read, boolean write, boolean admin)
+ {
+ return createUser(username, password.toCharArray(), read, write, admin);
+ }
+
+ public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin)
+ {
+ if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password))
+ {
+ if (!setRights(username, read, write, admin))
+ {
+ //unable to set rights for user, remove account
+ try
+ {
+ _principalDatabase.deletePrincipal(new UsernamePrincipal(username));
+ }
+ catch (AccountNotFoundException e)
+ {
+ //ignore
+ }
+ return false;
+ }
+ else
+ {
+ return true;
+ }
}
return false;
@@ -117,7 +234,29 @@ public class AMQUserManagementMBean exte
{
try
{
- _principalDatabase.deletePrincipal(new UsernamePrincipal(username));
+ if (_principalDatabase.deletePrincipal(new UsernamePrincipal(username)))
+ {
+ try
+ {
+ _accessRightsUpdate.lock();
+
+ _accessRights.remove(username);
+
+ try
+ {
+ saveAccessFile();
+ }
+ catch (IOException e)
+ {
+ _logger.warn("Problem occured saving '" + _accessFile + "', the access right changes will not be preserved: " + e);
+ return false;
+ }
+ }
+ finally
+ {
+ _accessRightsUpdate.unlock();
+ }
+ }
}
catch (AccountNotFoundException e)
{
@@ -130,23 +269,38 @@ public class AMQUserManagementMBean exte
public boolean reloadData()
{
- try
- {
- _principalDatabase.reload();
- }
- catch (IOException e)
- {
- _logger.warn("Reload failed due to:", e);
- return false;
- }
- // Reload successful
- return true;
+ try
+ {
+ loadAccessFile();
+ _principalDatabase.reload();
+ }
+ catch (ConfigurationException e)
+ {
+ _logger.warn("Reload failed due to:" + e);
+ return false;
+ }
+ catch (IOException e)
+ {
+ _logger.warn("Reload failed due to:" + e);
+ return false;
+ }
+ // Reload successful
+ return true;
}
- @MBeanOperation(name = "viewUsers", description = "All users that are currently available to the system.")
+ @MBeanOperation(name = "viewUsers", description = "All users with access rights to the system.")
public TabularData viewUsers()
{
+ // Table of users
+ // Username(string), Access rights Read,Write,Admin(bool,bool,bool)
+
+ if (_userlistDataType == null)
+ {
+ _logger.warn("TabluarData not setup correctly");
+ return null;
+ }
+
List<Principal> users = _principalDatabase.getUsers();
TabularDataSupport userList = new TabularDataSupport(_userlistDataType);
@@ -157,15 +311,29 @@ public class AMQUserManagementMBean exte
for (Principal user : users)
{
// Create header attributes list
- // Read,Write,Admin items are depcreated and we return always false.
- Object[] itemData = {user.getName(), false, false, false};
+
+ String rights = (String) _accessRights.get(user.getName());
+
+ Boolean read = false;
+ Boolean write = false;
+ Boolean admin = false;
+
+ if (rights != null)
+ {
+ read = rights.equals(MBeanInvocationHandlerImpl.READONLY)
+ || rights.equals(MBeanInvocationHandlerImpl.READWRITE);
+ write = rights.equals(MBeanInvocationHandlerImpl.READWRITE);
+ admin = rights.equals(MBeanInvocationHandlerImpl.ADMIN);
+ }
+
+ Object[] itemData = {user.getName(), read, write, admin};
CompositeData messageData = new CompositeDataSupport(_userDataType, COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]), itemData);
userList.put(messageData);
}
}
catch (OpenDataException e)
{
- _logger.warn("Unable to create user list due to :", e);
+ _logger.warn("Unable to create user list due to :" + e);
return null;
}
@@ -183,4 +351,187 @@ public class AMQUserManagementMBean exte
{
_principalDatabase = database;
}
+
+ /**
+ * setAccessFile
+ *
+ * @param accessFile the file to use for updating.
+ *
+ * @throws java.io.IOException If the file cannot be accessed
+ * @throws org.apache.commons.configuration.ConfigurationException
+ * if checks on the file fail.
+ */
+ public void setAccessFile(String accessFile) throws IOException, ConfigurationException
+ {
+ if (accessFile != null)
+ {
+ _accessFile = new File(accessFile);
+ if (!_accessFile.exists())
+ {
+ throw new ConfigurationException("'" + _accessFile + "' does not exist");
+ }
+
+ if (!_accessFile.canRead())
+ {
+ throw new ConfigurationException("Cannot read '" + _accessFile + "'.");
+ }
+
+ if (!_accessFile.canWrite())
+ {
+ _logger.warn("Unable to write to access rights file '" + _accessFile + "', changes will not be preserved.");
+ }
+
+ loadAccessFile();
+ }
+ else
+ {
+ _logger.warn("Access rights file specified is null. Access rights not changed.");
+ }
+ }
+
+ private void loadAccessFile() throws IOException, ConfigurationException
+ {
+ if(_accessFile == null)
+ {
+ _logger.error("No jmx access rights file has been specified.");
+ return;
+ }
+
+ if(_accessFile.exists())
+ {
+ try
+ {
+ _accessRightsUpdate.lock();
+
+ Properties accessRights = new Properties();
+ FileInputStream inStream = new FileInputStream(_accessFile);
+ try
+ {
+ accessRights.load(inStream);
+ }
+ finally
+ {
+ inStream.close();
+ }
+
+ checkAccessRights(accessRights);
+ setAccessRights(accessRights);
+ }
+ finally
+ {
+ _accessRightsUpdate.unlock();
+ }
+ }
+ else
+ {
+ _logger.error("Specified jmxaccess rights file '" + _accessFile + "' does not exist.");
+ }
+ }
+
+ private void checkAccessRights(Properties accessRights)
+ {
+ Enumeration values = accessRights.propertyNames();
+
+ while (values.hasMoreElements())
+ {
+ String user = (String) values.nextElement();
+
+ if (_principalDatabase.getUser(user) == null)
+ {
+ _logger.warn("Access rights contains user '" + user + "' but there is no authentication data for that user");
+ }
+ }
+ }
+
+ private void saveAccessFile() throws IOException
+ {
+ try
+ {
+ _accessRightsUpdate.lock();
+
+ // Create temporary file
+ Random r = new Random();
+ File tmp;
+ do
+ {
+ tmp = new File(_accessFile.getPath() + r.nextInt() + ".tmp");
+ }
+ while(tmp.exists());
+
+ tmp.deleteOnExit();
+
+ FileOutputStream output = new FileOutputStream(tmp);
+ _accessRights.store(output, "Generated by AMQUserManagementMBean Console : Last edited by user:" + getCurrentJMXUser());
+ output.close();
+
+ // Swap temp file to main rights file.
+ File old = new File(_accessFile.getAbsoluteFile() + ".old");
+ if (old.exists())
+ {
+ old.delete();
+ }
+
+ if(!_accessFile.renameTo(old))
+ {
+ //unable to rename the existing file to the backup name
+ _logger.error("Could not backup the existing management rights file");
+ throw new IOException("Could not backup the existing management rights file");
+ }
+
+ if(!tmp.renameTo(_accessFile))
+ {
+ //failed to rename the new file to the required filename
+
+ if(!old.renameTo(_accessFile))
+ {
+ //unable to return the backup to required filename
+ _logger.error("Could not rename the new management rights file into place, and unable to restore original file");
+ throw new IOException("Could not rename the new management rights file into place, and unable to restore original file");
+ }
+
+ _logger.error("Could not rename the new management rights file into place");
+ throw new IOException("Could not rename the new management rights file into place");
+ }
+ }
+ finally
+ {
+ _accessRightsUpdate.unlock();
+ }
+
+ }
+
+ private String getCurrentJMXUser()
+ {
+ AccessControlContext acc = AccessController.getContext();
+
+ Subject subject = Subject.getSubject(acc);
+ if (subject == null)
+ {
+ return "Unknown user, authentication Subject was null";
+ }
+
+ // Retrieve JMXPrincipal from Subject
+ Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class);
+ if (principals == null || principals.isEmpty())
+ {
+ return "Unknown user principals were null";
+ }
+
+ Principal principal = principals.iterator().next();
+ return principal.getName();
+ }
+
+ /**
+ * user=read user=write user=readwrite user=admin
+ *
+ * @param accessRights The properties list of access rights to process
+ */
+ private void setAccessRights(Properties accessRights)
+ {
+ _logger.debug("Setting Access Rights:" + accessRights);
+ _accessRights = accessRights;
+
+ // TODO check where this is used
+ // MBeanInvocationHandlerImpl.setAccessRights(_accessRights);
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Fri Oct 21 01:19:00 2011
@@ -20,73 +20,17 @@
*/
package org.apache.qpid.server.security.auth.manager;
-import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.security.auth.AuthenticationResult;
-/**
- * Implementations of the AuthenticationManager are responsible for determining
- * the authenticity of a user's credentials.
- *
- * If the authentication is successful, the manager is responsible for producing a populated
- * {@link Subject} containing the user's identity and zero or more principals representing
- * groups to which the user belongs.
- * <p>
- * The {@link #initialise()} method is responsible for registering SASL mechanisms required by
- * the manager. The {@link #close()} method must reverse this registration.
- *
- */
-public interface AuthenticationManager extends Closeable, Plugin
+public interface AuthenticationManager extends Closeable
{
- /** The name for the required SASL Server mechanisms */
- public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
-
- /**
- * Initialise the authentication plugin.
- *
- */
- void initialise();
-
- /**
- * Gets the SASL mechanisms known to this manager.
- *
- * @return SASL mechanism names, space separated.
- */
String getMechanisms();
- /**
- * Creates a SASL server for the specified mechanism name for the given
- * fully qualified domain name.
- *
- * @param mechanism mechanism name
- * @param localFQDN domain name
- *
- * @return SASL server
- * @throws SaslException
- */
SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
- /**
- * Authenticates a user using SASL negotiation.
- *
- * @param server SASL server
- * @param response SASL response to process
- *
- * @return authentication result
- */
AuthenticationResult authenticate(SaslServer server, byte[] response);
-
- /**
- * Authenticates a user using their username and password.
- *
- * @param username username
- * @param password password
- *
- * @return authentication result
- */
- AuthenticationResult authenticate(String username, String password);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org