You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [16/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp...

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Fri Aug  3 12:13:32 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.server.message.ServerMessage;
 
 public enum NotificationCheck
@@ -27,13 +28,16 @@ public enum NotificationCheck
 
     MESSAGE_COUNT_ALERT
     {
-        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
         {
             int msgCount;
             final long maximumMessageCount = queue.getMaximumMessageCount();
             if (maximumMessageCount!= 0 && (msgCount =  queue.getMessageCount()) >= maximumMessageCount)
             {
-                listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
+                String notificationMsg = msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.";
+
+                logNotification(this, queue, notificationMsg);
+                listener.notifyClients(this, queue, notificationMsg);
                 return true;
             }
             return false;
@@ -41,7 +45,7 @@ public enum NotificationCheck
     },
     MESSAGE_SIZE_ALERT(true)
     {
-        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
         {
             final long maximumMessageSize = queue.getMaximumMessageSize();
             if(maximumMessageSize != 0)
@@ -50,10 +54,12 @@ public enum NotificationCheck
                 long messageSize;
                 messageSize = (msg == null) ? 0 : msg.getSize();
 
-
                 if (messageSize >= maximumMessageSize)
                 {
-                    listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]");
+                    String notificationMsg = messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]";
+                    
+                    logNotification(this, queue, notificationMsg);
+                    listener.notifyClients(this, queue, notificationMsg);
                     return true;
                 }
             }
@@ -63,7 +69,7 @@ public enum NotificationCheck
     },
     QUEUE_DEPTH_ALERT
     {
-        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
         {
             // Check for threshold queue depth in bytes
             final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -74,7 +80,10 @@ public enum NotificationCheck
 
                 if (queueDepth >= maximumQueueDepth)
                 {
-                    listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
+                    String notificationMsg = (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.";
+                    
+                    logNotification(this, queue, notificationMsg);
+                    listener.notifyClients(this, queue, notificationMsg);
                     return true;
                 }
             }
@@ -84,7 +93,7 @@ public enum NotificationCheck
     },
     MESSAGE_AGE_ALERT
     {
-        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
         {
 
             final long maxMessageAge = queue.getMaximumMessageAge();
@@ -97,7 +106,10 @@ public enum NotificationCheck
                 if(firstArrivalTime < thresholdTime)
                 {
                     long oldestAge = currentTime - firstArrivalTime;
-                    listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
+                    String notificationMsg = (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.";
+                    
+                    logNotification(this, queue, notificationMsg);
+                    listener.notifyClients(this, queue, notificationMsg);
 
                     return true;
                 }
@@ -109,6 +121,8 @@ public enum NotificationCheck
     }
     ;
 
+    private static final Logger LOGGER = Logger.getLogger(NotificationCheck.class);
+
     private final boolean _messageSpecific;
 
     NotificationCheck()
@@ -126,6 +140,11 @@ public enum NotificationCheck
         return _messageSpecific;
     }
 
-    abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
+    public abstract boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener);
 
+    //A bit of a hack, only for use until we do the logging listener
+    private static void logNotification(NotificationCheck notification, AMQQueue queue, String notificationMsg)
+    {
+        LOGGER.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg);
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Fri Aug  3 12:13:32 2012
@@ -37,7 +37,7 @@ public class PriorityQueueList implement
         _priorityOffset = 5-((priorities + 1)/2);
         for(int i = 0; i < priorities; i++)
         {
-            _priorityLists[i] = new PriorityQueueEntrySubList(queue);
+            _priorityLists[i] = new PriorityQueueEntrySubList(queue, i);
         }
     }
 
@@ -164,9 +164,12 @@ public class PriorityQueueList implement
 
     private static class PriorityQueueEntrySubList extends SimpleQueueEntryList
     {
-        public PriorityQueueEntrySubList(AMQQueue queue)
+        private int _listPriority;
+
+        public PriorityQueueEntrySubList(AMQQueue queue, int listPriority)
         {
             super(queue);
+            _listPriority = listPriority;
         }
 
         @Override
@@ -174,6 +177,11 @@ public class PriorityQueueList implement
         {
             return new PriorityQueueEntryImpl(this, message);
         }
+
+        public int getListPriority()
+        {
+            return _listPriority;
+        }
     }
 
     private static class PriorityQueueEntryImpl extends SimpleQueueEntryImpl
@@ -186,8 +194,9 @@ public class PriorityQueueList implement
         @Override
         public int compareTo(final QueueEntry o)
         {
-            byte thisPriority = getMessageHeader().getPriority();
-            byte otherPriority = o.getMessageHeader().getPriority();
+            PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)((PriorityQueueEntryImpl)o).getQueueEntryList();
+            int otherPriority = pqel.getListPriority();
+            int thisPriority = ((PriorityQueueEntrySubList) getQueueEntryList()).getListPriority();
 
             if(thisPriority != otherPriority)
             {

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Aug  3 12:13:32 2012
@@ -1,9 +1,3 @@
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.subscription.Subscription;
-
 /*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
@@ -24,6 +18,12 @@ import org.apache.qpid.server.subscripti
 * under the License.
 *
 */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription;
+
 public interface QueueEntry extends Comparable<QueueEntry>, Filterable
 {
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Aug  3 12:13:32 2012
@@ -233,7 +233,7 @@ public abstract class QueueEntryImpl imp
 
             if(state instanceof SubscriptionAcquiredState)
             {
-                getQueue().decrementUnackedMsgCount();
+                getQueue().decrementUnackedMsgCount(this);
                 Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
                 if (subscription != null)
                 {
@@ -369,7 +369,7 @@ public abstract class QueueEntryImpl imp
             Subscription s = null;
             if (state instanceof SubscriptionAcquiredState)
             {
-                getQueue().decrementUnackedMsgCount();
+                getQueue().decrementUnackedMsgCount(this);
                 s = ((SubscriptionAcquiredState) state).getSubscription();
                 s.onDequeue(this);
             }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java Fri Aug  3 12:13:32 2012
@@ -1,21 +1,21 @@
-package org.apache.qpid.server.queue;
-
-/**
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
-* <p/>
+*
 * http://www.apache.org/licenses/LICENSE-2.0
-* <p/>
+*
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
+package org.apache.qpid.server.queue;
+
 public interface QueueEntryVisitor
 {
     boolean visit(QueueEntry entry);

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Fri Aug  3 12:13:32 2012
@@ -42,7 +42,15 @@ public interface QueueRegistry
 
     AMQQueue getQueue(String queue);
 
+    void addRegistryChangeListener(RegistryChangeListener listener);
+
     void stopAllAndUnregisterMBeans();
 
     AMQQueue getQueue(UUID queueId);
+
+    interface RegistryChangeListener
+    {
+        void queueRegistered(AMQQueue queue);
+        void queueUnregistered(AMQQueue queue);
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Aug  3 12:13:32 2012
@@ -19,8 +19,10 @@
 package org.apache.qpid.server.queue;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,12 +30,11 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.management.JMException;
-
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
@@ -52,7 +53,6 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.QueueActor;
 import org.apache.qpid.server.logging.messages.QueueMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
-import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -70,6 +70,7 @@ import org.apache.qpid.server.virtualhos
 public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
 {
     private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+
     private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
     private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
     private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group";
@@ -77,11 +78,9 @@ public class SimpleAMQQueue implements A
     // TODO - should make this configurable at the vhost / broker level
     private static final int DEFAULT_MAX_GROUPS = 255;
 
-
     private final VirtualHost _virtualHost;
 
     private final AMQShortString _name;
-    private final String _resourceName;
 
     /** null means shared */
     private final AMQShortString _owner;
@@ -118,6 +117,7 @@ public class SimpleAMQQueue implements A
 
     private final AtomicLong _dequeueCount = new AtomicLong();
     private final AtomicLong _dequeueSize = new AtomicLong();
+    private final AtomicLong _enqueueCount = new AtomicLong();
     private final AtomicLong _enqueueSize = new AtomicLong();
     private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
     private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
@@ -130,6 +130,7 @@ public class SimpleAMQQueue implements A
     private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
     private final AtomicLong _unackedMsgCount = new AtomicLong(0);
     private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
+    private final AtomicLong _unackedMsgBytes = new AtomicLong();
 
     private final AtomicInteger _bindingCountHigh = new AtomicInteger();
 
@@ -173,7 +174,6 @@ public class SimpleAMQQueue implements A
     private LogSubject _logSubject;
     private LogActor _logActor;
 
-    private AMQQueueMBean _managedObject;
     private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
     private boolean _nolocal;
 
@@ -185,12 +185,19 @@ public class SimpleAMQQueue implements A
 
     //TODO : persist creation time
     private long _createTime = System.currentTimeMillis();
+    private UUID _qmfId;
     private ConfigurationPlugin _queueConfiguration;
 
     /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
     private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
     private final MessageGroupManager _messageGroupManager;
 
+    private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
+            new ArrayList<SubscriptionRegistrationListener>();
+
+    private AMQQueue.NotificationListener _notificationListener;
+    private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+
     protected SimpleAMQQueue(UUID id, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
     {
         this(id, name, durable, owner, autoDelete, exclusive,virtualHost, new SimpleQueueEntryList.Factory(), arguments);
@@ -227,17 +234,16 @@ public class SimpleAMQQueue implements A
         }
 
         _name = name;
-        _resourceName = String.valueOf(name);
         _durable = durable;
         _owner = owner;
         _autoDelete = autoDelete;
         _exclusive = exclusive;
         _virtualHost = virtualHost;
         _entries = entryListFactory.createQueueEntryList(this);
-        _arguments = arguments;
+        _arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments);
 
         _id = id;
-
+        _qmfId = getConfigStore().createId();
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
 
         _logSubject = new QueueLogSubject(this);
@@ -255,16 +261,6 @@ public class SimpleAMQQueue implements A
 
         getConfigStore().addConfiguredObject(this);
 
-        try
-        {
-            _managedObject = new AMQQueueMBean(this);
-            _managedObject.register();
-        }
-        catch (JMException e)
-        {
-            _logger.error("AMQQueue MBean creation has failed ", e);
-        }
-
         if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
         {
             if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
@@ -302,7 +298,22 @@ public class SimpleAMQQueue implements A
 
     public void execute(Runnable runnable)
     {
-        _asyncDelivery.execute(runnable);
+        try
+        {
+            _asyncDelivery.execute(runnable);
+        }
+        catch (RejectedExecutionException ree)
+        {
+            if (_stopped.get())
+            {
+                // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
+            }
+            else
+            {
+                _logger.error("Unexpected rejected execution", ree);
+                throw ree;
+            }
+        }
     }
 
     public AMQShortString getNameShortString()
@@ -320,6 +331,12 @@ public class SimpleAMQQueue implements A
         return _id;
     }
 
+    @Override
+    public UUID getQMFId()
+    {
+        return _qmfId;
+    }
+
     public QueueConfigType getConfigType()
     {
         return QueueConfigType.getInstance();
@@ -339,15 +356,10 @@ public class SimpleAMQQueue implements A
     {
         return _exclusive;
     }
-    
-    public void setExclusive(boolean exclusive) throws AMQException
+
+    public void setExclusive(boolean exclusive)
     {
         _exclusive = exclusive;
-
-        if(isDurable())
-        {
-            getVirtualHost().getMessageStore().updateQueue(this);
-        }
     }
 
     public Exchange getAlternateExchange()
@@ -368,22 +380,10 @@ public class SimpleAMQQueue implements A
         _alternateExchange = exchange;
     }
 
-    public void setAlternateExchange(String exchangeName)
-    {
-        if(exchangeName == null || exchangeName.equals(""))
-        {
-            _alternateExchange = null;
-            return;
-        }
-
-        Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName));
-        if (exchange == null)
-        {
-            throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
-        }
-        setAlternateExchange(exchange);
-    }
-
+    /**
+     * Arguments used to create this queue.  The caller is assured
+     * that null will never be returned.
+     */
     public Map<String, Object> getArguments()
     {
         return _arguments;
@@ -430,8 +430,8 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied");
         }
-        
-        
+
+
         if (hasExclusiveSubscriber())
         {
             throw new ExistingExclusiveSubscription();
@@ -463,15 +463,24 @@ public class SimpleAMQQueue implements A
             {
                 subscription.setNoLocal(_nolocal);
             }
+
+            synchronized (_subscriptionListeners)
+            {
+                for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+                {
+                    listener.subscriptionRegistered(this, subscription);
+                }
+            }
+
             _subscriptionList.add(subscription);
-            
+
             //Increment consumerCountHigh if necessary. (un)registerSubscription are both
             //synchronized methods so we don't need additional synchronization here
             if(_counsumerCountHigh.get() < getConsumerCount())
             {
                 _counsumerCountHigh.incrementAndGet();
             }
-            
+
             if (isDeleted())
             {
                 subscription.queueDeleted(this);
@@ -507,6 +516,14 @@ public class SimpleAMQQueue implements A
                 resetSubPointersForGroups(subscription, true);
             }
 
+            synchronized (_subscriptionListeners)
+            {
+                for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+                {
+                    listener.subscriptionUnregistered(this, subscription);
+                }
+            }
+
             // auto-delete queues must be deleted if there are no remaining subscribers
 
             if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0  )
@@ -526,6 +543,34 @@ public class SimpleAMQQueue implements A
 
     }
 
+    public Collection<Subscription> getConsumers()
+    {
+        List<Subscription> consumers = new ArrayList<Subscription>();
+        SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+        while(iter.advance())
+        {
+            consumers.add(iter.getNode().getSubscription());
+        }
+        return consumers;
+
+    }
+
+    public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+    {
+        synchronized (_subscriptionListeners)
+        {
+            _subscriptionListeners.add(listener);
+        }
+    }
+
+    public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+    {
+        synchronized (_subscriptionListeners)
+        {
+            _subscriptionListeners.remove(listener);
+        }
+    }
+
     public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
     {
         QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
@@ -576,10 +621,10 @@ public class SimpleAMQQueue implements A
                 break;
             }
         }
-        
+
         reconfigure();
     }
-    
+
     private void reconfigure()
     {
         //Reconfigure the queue for to reflect this new binding.
@@ -604,7 +649,7 @@ public class SimpleAMQQueue implements A
     public void removeBinding(final Binding binding)
     {
         _bindings.remove(binding);
-        
+
         reconfigure();
     }
 
@@ -718,10 +763,7 @@ public class SimpleAMQQueue implements A
            }
         }
 
-        if(_managedObject != null)
-        {
-            _managedObject.checkForNotification(entry.getMessage());
-        }
+        checkForNotification(entry.getMessage());
 
         if(action != null)
         {
@@ -738,8 +780,8 @@ public class SimpleAMQQueue implements A
         {
             try
             {
-                if (!sub.isSuspended() 
-                    && subscriptionReadyAndHasInterest(sub, entry) 
+                if (!sub.isSuspended()
+                    && subscriptionReadyAndHasInterest(sub, entry)
                     && mightAssign(sub, entry)
                     && !sub.wouldSuspend(entry))
                 {
@@ -788,6 +830,7 @@ public class SimpleAMQQueue implements A
     {
         long size = message.getSize();
         getAtomicQueueSize().addAndGet(size);
+        _enqueueCount.incrementAndGet();
         _enqueueSize.addAndGet(size);
         if(message.isPersistent() && isDurable())
         {
@@ -796,19 +839,29 @@ public class SimpleAMQQueue implements A
         }
     }
 
+    public long getTotalDequeueCount()
+    {
+        return _dequeueCount.get();
+    }
+
+    public long getTotalEnqueueCount()
+    {
+        return _enqueueCount.get();
+    }
+
     private void incrementQueueCount()
     {
         getAtomicQueueCount().incrementAndGet();
     }
-    
+
     private void incrementTxnEnqueueStats(final ServerMessage message)
     {
         _msgTxnEnqueues.incrementAndGet();
         _byteTxnEnqueues.addAndGet(message.getSize());
     }
-    
+
     private void incrementTxnDequeueStats(QueueEntry entry)
-    {      
+    {
         _msgTxnDequeues.incrementAndGet();
         _byteTxnDequeues.addAndGet(entry.getSize());
     }
@@ -819,7 +872,7 @@ public class SimpleAMQQueue implements A
         setLastSeenEntry(sub, entry);
 
         _deliveredMessages.incrementAndGet();
-        incrementUnackedMsgCount();
+        incrementUnackedMsgCount(entry);
 
         sub.send(entry, batch);
     }
@@ -833,12 +886,15 @@ public class SimpleAMQQueue implements A
     private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
     {
         QueueContext subContext = (QueueContext) sub.getQueueContext();
-        QueueEntry releasedEntry = subContext.getReleasedEntry();
-
-        QueueContext._lastSeenUpdater.set(subContext, entry);
-        if(releasedEntry == entry)
+        if (subContext != null)
         {
-           QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+            QueueEntry releasedEntry = subContext.getReleasedEntry();
+
+            QueueContext._lastSeenUpdater.set(subContext, entry);
+            if(releasedEntry == entry)
+            {
+               QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+            }
         }
     }
 
@@ -887,7 +943,7 @@ public class SimpleAMQQueue implements A
         {
             _deliveredMessages.decrementAndGet();
         }
-        
+
         if(sub != null && sub.isSessionTransactional())
         {
             incrementTxnDequeueStats(entry);
@@ -940,11 +996,13 @@ public class SimpleAMQQueue implements A
         }
     }
 
+
+
     public int getConsumerCount()
     {
         return _subscriptionList.size();
     }
-    
+
     public int getConsumerCountHigh()
     {
         return _counsumerCountHigh.get();
@@ -1148,7 +1206,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void visit(final Visitor visitor)
+    public void visit(final QueueEntryVisitor visitor)
     {
         QueueEntryIterator queueListIterator = _entries.iterator();
 
@@ -1195,192 +1253,6 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void moveMessagesToAnotherQueue(final long fromMessageId,
-                                           final long toMessageId,
-                                           String destinationQueueName) throws IllegalArgumentException
-    {
-
-        final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
-
-        List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
-        {
-
-            public boolean accept(QueueEntry entry)
-            {
-                final long messageId = entry.getMessage().getMessageNumber();
-                return (messageId >= fromMessageId)
-                       && (messageId <= toMessageId)
-                       && entry.acquire();
-            }
-
-            public boolean filterComplete()
-            {
-                return false;
-            }
-        });
-
-
-        final ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
-        boolean shouldRollback = true;
-        try
-        {
-            // Move the messages in on the message store.
-            for (final QueueEntry entry : entries)
-            {
-                final ServerMessage message = entry.getMessage();
-                txn.enqueue(toQueue, message,
-                            new ServerTransaction.Action()
-                            {
-
-                                public void postCommit()
-                                {
-                                    try
-                                    {
-                                        toQueue.enqueue(message);
-                                    }
-                                    catch (AMQException e)
-                                    {
-                                        throw new RuntimeException(e);
-                                    }
-                                }
-
-                                public void onRollback()
-                                {
-                                    entry.release();
-                                }
-                            });
-                txn.dequeue(this, message,
-                            new ServerTransaction.Action()
-                            {
-
-                                public void postCommit()
-                                {
-                                    entry.discard();
-                                }
-
-                                public void onRollback()
-                                {
-
-                                }
-                            });
-            }
-            txn.commit();
-            shouldRollback = false;
-        }
-        finally
-        {
-            if (shouldRollback)
-            {
-                txn.rollback();
-            }
-        }
-
-    }
-
-    public void copyMessagesToAnotherQueue(final long fromMessageId,
-                                           final long toMessageId,
-                                           String destinationQueueName) throws IllegalArgumentException
-    {
-        final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
-
-        List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
-        {
-
-            public boolean accept(QueueEntry entry)
-            {
-                final long messageId = entry.getMessage().getMessageNumber();
-                return ((messageId >= fromMessageId)
-                    && (messageId <= toMessageId));
-            }
-
-            public boolean filterComplete()
-            {
-                return false;
-            }
-        });
-
-        final ServerTransaction txn = new LocalTransaction(_virtualHost.getMessageStore());
-        boolean shouldRollback = true;
-        try
-        {
-            // Copy the messages in on the message store.
-            for (QueueEntry entry : entries)
-            {
-                final ServerMessage message = entry.getMessage();
-
-                txn.enqueue(toQueue, message, new ServerTransaction.Action()
-                {
-                    public void postCommit()
-                    {
-                        try
-                        {
-                            toQueue.enqueue(message);
-                        }
-                        catch (AMQException e)
-                        {
-                            throw new RuntimeException(e);
-                        }
-                    }
-
-                    public void onRollback()
-                    {
-                    }
-                });
-
-            }
-
-            txn.commit();
-            shouldRollback = false;
-        }
-        finally
-        {
-            if (shouldRollback)
-            {
-                txn.rollback();
-            }
-        }
-
-    }
-
-    private AMQQueue getValidatedDestinationQueue(String queueName)
-    {
-        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        if (toQueue == null)
-        {
-            throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
-        }
-        else if (toQueue == this)
-        {
-            throw new IllegalArgumentException("The destination queue can't be the same as the source queue");
-        }
-        return toQueue;
-    }
-
-    public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
-    {
-
-        QueueEntryIterator queueListIterator = _entries.iterator();
-
-        while (queueListIterator.advance())
-        {
-            QueueEntry node = queueListIterator.getNode();
-
-            final ServerMessage message = node.getMessage();
-            if(message != null)
-            {
-                final long messageId = message.getMessageNumber();
-
-                if ((messageId >= fromMessageId)
-                    && (messageId <= toMessageId)
-                    && node.acquire())
-                {
-                    dequeueEntry(node);
-                }
-            }
-        }
-
-    }
-
     public void purge(final long request) throws AMQException
     {
         clear(request);
@@ -1393,6 +1265,7 @@ public class SimpleAMQQueue implements A
 
     // ------ Management functions
 
+    // TODO - now only used by the tests
     public void deleteMessageFromTop()
     {
         QueueEntryIterator queueListIterator = _entries.iterator();
@@ -1411,7 +1284,7 @@ public class SimpleAMQQueue implements A
     }
 
     public long clearQueue() throws AMQException
-    {         
+    {
         return clear(0l);
     }
 
@@ -1422,7 +1295,7 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied: queue " + getName());
         }
-        
+
         QueueEntryIterator queueListIterator = _entries.iterator();
         long count = 0;
 
@@ -1489,7 +1362,7 @@ public class SimpleAMQQueue implements A
         {
             throw new AMQSecurityException("Permission denied: " + getName());
         }
-        
+
         if (!_deleted.getAndSet(true))
         {
 
@@ -1617,12 +1490,6 @@ public class SimpleAMQQueue implements A
 
             txn.commit();
 
-
-            if(_managedObject!=null)
-            {
-                _managedObject.unregister();
-            }
-
             for (Task task : _deleteTaskList)
             {
                 task.doTask(this);
@@ -2101,16 +1968,13 @@ public class SimpleAMQQueue implements A
                 }
                 else
                 {
-                    if (_managedObject != null)
+                    // There is a chance that the node could be deleted by
+                    // the time the check actually occurs. So verify we
+                    // can actually get the message to perform the check.
+                    ServerMessage msg = node.getMessage();
+                    if (msg != null)
                     {
-                        // There is a chance that the node could be deleted by
-                        // the time the check actually occurs. So verify we
-                        // can actually get the message to perform the check.
-                        ServerMessage msg = node.getMessage();
-                        if (msg != null)
-                        {
-                            _managedObject.checkForNotification(msg);
-                        }
+                        checkForNotification(msg);
                     }
                 }
             }
@@ -2235,11 +2099,6 @@ public class SimpleAMQQueue implements A
         return _notificationChecks;
     }
 
-    public ManagedObject getManagedObject()
-    {
-        return _managedObject;
-    }
-
     private final class QueueEntryListener implements QueueEntry.StateChangeListener
     {
 
@@ -2330,12 +2189,6 @@ public class SimpleAMQQueue implements A
         return _queueConfiguration;
     }
 
-    public String getResourceName()
-    {
-        return _resourceName;
-    }
-
-
     public ConfigStore getConfigStore()
     {
         return getVirtualHost().getConfigStore();
@@ -2355,22 +2208,22 @@ public class SimpleAMQQueue implements A
     {
         return _dequeueSize.get();
     }
-    
+
     public long getByteTxnEnqueues()
     {
         return _byteTxnEnqueues.get();
     }
-    
+
     public long getByteTxnDequeues()
     {
         return _byteTxnDequeues.get();
     }
-    
+
     public long getMsgTxnEnqueues()
     {
         return _msgTxnEnqueues.get();
     }
-    
+
     public long getMsgTxnDequeues()
     {
         return _msgTxnDequeues.get();
@@ -2407,21 +2260,28 @@ public class SimpleAMQQueue implements A
     {
         return _unackedMsgCountHigh.get();
     }
-    
+
     public long getUnackedMessageCount()
     {
         return _unackedMsgCount.get();
     }
-    
-    public void decrementUnackedMsgCount()
+
+    public long getUnackedMessageBytes()
+    {
+        return _unackedMsgBytes.get();
+    }
+
+    public void decrementUnackedMsgCount(QueueEntry queueEntry)
     {
         _unackedMsgCount.decrementAndGet();
+        _unackedMsgBytes.addAndGet(-queueEntry.getSize());
     }
-    
-    private void incrementUnackedMsgCount()
+
+    private void incrementUnackedMsgCount(QueueEntry entry)
     {
         long unackedMsgCount = _unackedMsgCount.incrementAndGet();
-        
+        _unackedMsgBytes.addAndGet(entry.getSize());
+
         long unackedMsgCountHigh;
         while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
         {
@@ -2447,4 +2307,54 @@ public class SimpleAMQQueue implements A
         _maximumDeliveryCount = maximumDeliveryCount;
     }
 
+    /**
+     * Checks if there is any notification to send to the listeners
+     */
+    private void checkForNotification(ServerMessage<?> msg) throws AMQException
+    {
+        final Set<NotificationCheck> notificationChecks = getNotificationChecks();
+        final AMQQueue.NotificationListener listener = _notificationListener;
+
+        if(listener != null && !notificationChecks.isEmpty())
+        {
+            final long currentTime = System.currentTimeMillis();
+            final long thresholdTime = currentTime - getMinimumAlertRepeatGap();
+
+            for (NotificationCheck check : notificationChecks)
+            {
+                if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
+                {
+                    if (check.notifyIfNecessary(msg, this, listener))
+                    {
+                        _lastNotificationTimes[check.ordinal()] = currentTime;
+                    }
+                }
+            }
+        }
+    }
+
+    public void setNotificationListener(AMQQueue.NotificationListener listener)
+    {
+        _notificationListener = listener;
+    }
+
+    @Override
+    public void setDescription(String description)
+    {
+        if (description == null)
+        {
+            _arguments.remove(AMQQueueFactory.X_QPID_DESCRIPTION);
+        }
+        else
+        {
+            _arguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, description);
+        }
+    }
+
+    @Override
+    public String getDescription()
+    {
+        return (String) _arguments.get(AMQQueueFactory.X_QPID_DESCRIPTION);
+    }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Fri Aug  3 12:13:32 2012
@@ -1,4 +1,3 @@
-package org.apache.qpid.server.queue;
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.server.queue;
  * under the License.
  * 
  */
+package org.apache.qpid.server.queue;
 
 
 import org.apache.log4j.Logger;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Fri Aug  3 12:13:32 2012
@@ -22,9 +22,9 @@ package org.apache.qpid.server.registry;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.*;
 import org.osgi.framework.BundleContext;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.qmf.QMFService;
@@ -35,24 +35,18 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.SystemConfig;
 import org.apache.qpid.server.configuration.SystemConfigImpl;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
-import org.apache.qpid.server.logging.Log4jMessageLogger;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
 import org.apache.qpid.server.logging.actors.AbstractActor;
 import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
-import org.apache.qpid.server.management.ManagedObjectRegistry;
-import org.apache.qpid.server.management.NoopManagedObjectRegistry;
-import org.apache.qpid.server.plugins.Plugin;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.adapter.BrokerAdapter;
 import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.SecurityManager.SecurityConfiguration;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManagerRegistry;
+import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.transport.QpidAcceptor;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -60,13 +54,8 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
+import java.net.SocketAddress;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 
 
@@ -78,19 +67,19 @@ import java.util.concurrent.atomic.Atomi
  */
 public abstract class ApplicationRegistry implements IApplicationRegistry
 {
+
     private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
 
     private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null);
 
     private final ServerConfiguration _configuration;
 
-    private final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>();
-
-    private ManagedObjectRegistry _managedObjectRegistry;
+    private final Map<InetSocketAddress, QpidAcceptor> _acceptors =
+            Collections.synchronizedMap(new HashMap<InetSocketAddress, QpidAcceptor>());
 
-    private AuthenticationManager _authenticationManager;
+    private IAuthenticationManagerRegistry _authenticationManagerRegistry;
 
-    private VirtualHostRegistry _virtualHostRegistry;
+    private final VirtualHostRegistry _virtualHostRegistry = new VirtualHostRegistry(this);
 
     private SecurityManager _securityManager;
 
@@ -106,39 +95,32 @@ public abstract class ApplicationRegistr
 
     private QMFService _qmfService;
 
-    private BrokerConfig _broker;
+    private BrokerConfig _brokerConfig;
+
+    private Broker _broker;
 
     private ConfigStore _configStore;
-    
+
     private Timer _reportingTimer;
-    private boolean _statisticsEnabled = false;
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     private BundleContext _bundleContext;
 
-    protected static Logger get_logger()
-    {
-        return _logger;
-    }
+    private final List<PortBindingListener> _portBindingListeners = new ArrayList<PortBindingListener>();
 
-    protected Map<InetSocketAddress, QpidAcceptor> getAcceptors()
-    {
-        return _acceptors;
-    }
+    private int _httpManagementPort = -1, _httpsManagementPort = -1;
 
-    protected void setManagedObjectRegistry(ManagedObjectRegistry managedObjectRegistry)
-    {
-        _managedObjectRegistry = managedObjectRegistry;
-    }
+    private LogRecorder _logRecorder;
 
-    protected void setAuthenticationManager(AuthenticationManager authenticationManager)
-    {
-        _authenticationManager = authenticationManager;
-    }
+    private List<IAuthenticationManagerRegistry.RegistryChangeListener> _authManagerChangeListeners =
+            new ArrayList<IAuthenticationManagerRegistry.RegistryChangeListener>();
 
-    protected void setVirtualHostRegistry(VirtualHostRegistry virtualHostRegistry)
+    public Map<InetSocketAddress, QpidAcceptor> getAcceptors()
     {
-        _virtualHostRegistry = virtualHostRegistry;
+        synchronized (_acceptors)
+        {
+            return new HashMap<InetSocketAddress, QpidAcceptor>(_acceptors);
+        }
     }
 
     protected void setSecurityManager(SecurityManager securityManager)
@@ -205,11 +187,11 @@ public abstract class ApplicationRegistr
         store.setRoot(new SystemConfigImpl(store));
         instance.setConfigStore(store);
 
-        BrokerConfig broker = new BrokerConfigAdapter(instance);
+        final BrokerConfig brokerConfig = new BrokerConfigAdapter(instance);
 
-        SystemConfig system = store.getRoot();
-        system.addBroker(broker);
-        instance.setBroker(broker);
+        final SystemConfig system = store.getRoot();
+        system.addBroker(brokerConfig);
+        instance.setBrokerConfig(brokerConfig);
 
         try
         {
@@ -222,7 +204,7 @@ public abstract class ApplicationRegistr
             //remove the Broker instance, then re-throw
             try
             {
-                system.removeBroker(broker);
+                system.removeBroker(brokerConfig);
             }
             catch(Throwable t)
             {
@@ -297,18 +279,32 @@ public abstract class ApplicationRegistr
 
     public void initialise() throws Exception
     {
+        _logRecorder = new LogRecorder();
         //Create the RootLogger to be used during broker operation
         _rootMessageLogger = new Log4jMessageLogger(_configuration);
 
         //Create the composite (log4j+SystemOut MessageLogger to be used during startup
         RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger};
         _startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers);
-        
-        CurrentActor.set(new BrokerActor(_startupMessageLogger));
+
+        BrokerActor actor = new BrokerActor(_startupMessageLogger);
+        CurrentActor.setDefault(actor);
+        CurrentActor.set(actor);
 
         try
         {
-            initialiseManagedObjectRegistry();
+            initialiseStatistics();
+
+            if(_configuration.getHTTPManagementEnabled())
+            {
+                _httpManagementPort = _configuration.getHTTPManagementPort();
+            }
+            if (_configuration.getHTTPSManagementEnabled())
+            {
+                _httpsManagementPort = _configuration.getHTTPSManagementPort();
+            }
+
+            _broker = new BrokerAdapter(this);
 
             configure();
 
@@ -316,13 +312,23 @@ public abstract class ApplicationRegistr
 
             logStartupMessages(CurrentActor.get());
 
-            _virtualHostRegistry = new VirtualHostRegistry(this);
-
             _securityManager = new SecurityManager(_configuration, _pluginManager);
 
-            _authenticationManager = createAuthenticationManager();
+            _authenticationManagerRegistry = createAuthenticationManagerRegistry(_configuration, _pluginManager);
+
+            if(!_authManagerChangeListeners.isEmpty())
+            {
+                for(IAuthenticationManagerRegistry.RegistryChangeListener listener : _authManagerChangeListeners)
+                {
 
-            _managedObjectRegistry.start();
+                    _authenticationManagerRegistry.addRegistryChangeListener(listener);
+                    for(AuthenticationManager authMgr : _authenticationManagerRegistry.getAvailableAuthenticationManagers().values())
+                    {
+                        listener.authenticationManagerRegistered(authMgr);
+                    }
+                }
+                _authManagerChangeListeners.clear();
+            }
         }
         finally
         {
@@ -333,7 +339,6 @@ public abstract class ApplicationRegistr
         try
         {
             initialiseVirtualHosts();
-            initialiseStatistics();
             initialiseStatisticsReporting();
         }
         finally
@@ -343,52 +348,10 @@ public abstract class ApplicationRegistr
         }
     }
 
-
-    /**
-     * Iterates across all discovered authentication manager factories, offering the security configuration to each.
-     * Expects <b>exactly</b> one authentication manager to configure and initialise itself.
-     * 
-     * It is an error to configure more than one authentication manager, or to configure none.
-     *
-     * @return authentication manager
-     * @throws ConfigurationException
-     */
-    protected AuthenticationManager createAuthenticationManager() throws ConfigurationException
+    protected IAuthenticationManagerRegistry createAuthenticationManagerRegistry(ServerConfiguration _configuration, PluginManager _pluginManager)
+            throws ConfigurationException
     {
-        final SecurityConfiguration securityConfiguration = _configuration.getConfiguration(SecurityConfiguration.class.getName());
-        final Collection<AuthenticationManagerPluginFactory<? extends Plugin>> factories = _pluginManager.getAuthenticationManagerPlugins().values();
-        
-        if (factories.size() == 0)
-        {
-            throw new ConfigurationException("No authentication manager factory plugins found.  Check the desired authentication" +
-                    "manager plugin has been placed in the plugins directory.");
-        }
-        
-        AuthenticationManager authMgr = null;
-        
-        for (final Iterator<AuthenticationManagerPluginFactory<? extends Plugin>> iterator = factories.iterator(); iterator.hasNext();)
-        {
-            final AuthenticationManagerPluginFactory<? extends Plugin> factory = (AuthenticationManagerPluginFactory<? extends Plugin>) iterator.next();
-            final AuthenticationManager tmp = factory.newInstance(securityConfiguration);
-            if (tmp != null)
-            {
-                if (authMgr != null)
-                {
-                    throw new ConfigurationException("Cannot configure more than one authentication manager."
-                            + " Both " + tmp.getClass() + " and " + authMgr.getClass() + " are configured."
-                            + " Remove configuration for one of the authentication manager, or remove the plugin JAR"
-                            + " from the classpath.");
-                }
-                authMgr = tmp;
-            }
-        }
-
-        if (authMgr == null)
-        {
-            throw new ConfigurationException("No authentication managers configured within the configure file.");
-        }
-        
-        return authMgr;
+        return new AuthenticationManagerRegistry(_configuration, _pluginManager);
     }
 
     protected void initialiseVirtualHosts() throws Exception
@@ -400,23 +363,18 @@ public abstract class ApplicationRegistr
         getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost());
     }
 
-    protected void initialiseManagedObjectRegistry() throws AMQException
-    {
-        _managedObjectRegistry = new NoopManagedObjectRegistry();
-    }
-    
     public void initialiseStatisticsReporting()
     {
         long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
         final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
         final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
         final boolean reset = _configuration.isStatisticsReportResetEnabled();
-        
+
         /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
         if (report > 0L && (broker || virtualhost))
         {
             _reportingTimer = new Timer("Statistics-Reporting", true);
-            
+
 
 
             _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(broker, virtualhost, reset),
@@ -545,15 +503,13 @@ public abstract class ApplicationRegistr
             //Shutdown virtualhosts
             close(_virtualHostRegistry);
 
-            close(_authenticationManager);
+            close(_authenticationManagerRegistry);
 
             close(_qmfService);
 
             close(_pluginManager);
 
-            close(_managedObjectRegistry);
-
-            BrokerConfig broker = getBroker();
+            BrokerConfig broker = getBrokerConfig();
             if(broker != null)
             {
                 broker.getSystem().removeBroker(broker);
@@ -569,12 +525,14 @@ public abstract class ApplicationRegistr
 
     private void unbind()
     {
+        List<QpidAcceptor> removedAcceptors = new ArrayList<QpidAcceptor>();
         synchronized (_acceptors)
         {
             for (InetSocketAddress bindAddress : _acceptors.keySet())
             {
                 QpidAcceptor acceptor = _acceptors.get(bindAddress);
 
+                removedAcceptors.add(acceptor);
                 try
                 {
                     acceptor.getNetworkTransport().close();
@@ -587,6 +545,16 @@ public abstract class ApplicationRegistr
                CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(acceptor.toString(), bindAddress.getPort()));
             }
         }
+        synchronized (_portBindingListeners)
+        {
+            for(QpidAcceptor acceptor : removedAcceptors)
+            {
+                for(PortBindingListener listener : _portBindingListeners)
+                {
+                    listener.unbound(acceptor);
+                }
+            }
+        }
     }
 
     public ServerConfiguration getConfiguration()
@@ -600,6 +568,13 @@ public abstract class ApplicationRegistr
         {
             _acceptors.put(bindAddress, acceptor);
         }
+        synchronized (_portBindingListeners)
+        {
+            for(PortBindingListener listener : _portBindingListeners)
+            {
+                listener.bound(acceptor, bindAddress);
+            }
+        }
     }
 
     public VirtualHostRegistry getVirtualHostRegistry()
@@ -612,14 +587,16 @@ public abstract class ApplicationRegistr
         return _securityManager;
     }
 
-    public ManagedObjectRegistry getManagedObjectRegistry()
+    @Override
+    public AuthenticationManager getAuthenticationManager(SocketAddress address)
     {
-        return _managedObjectRegistry;
+        return _authenticationManagerRegistry.getAuthenticationManager(address);
     }
 
-    public AuthenticationManager getAuthenticationManager()
+    @Override
+    public IAuthenticationManagerRegistry getAuthenticationManagerRegistry()
     {
-        return _authenticationManager;
+        return _authenticationManagerRegistry;
     }
 
     public PluginManager getPluginManager()
@@ -636,7 +613,7 @@ public abstract class ApplicationRegistr
     {
         return _rootMessageLogger;
     }
-    
+
     public RootMessageLogger getCompositeStartupMessageLogger()
     {
         return _startupMessageLogger;
@@ -652,69 +629,63 @@ public abstract class ApplicationRegistr
         return _qmfService;
     }
 
-    public BrokerConfig getBroker()
+    public BrokerConfig getBrokerConfig()
     {
-        return _broker;
+        return _brokerConfig;
     }
 
-    public void setBroker(final BrokerConfig broker)
+    public void setBrokerConfig(final BrokerConfig broker)
     {
-        _broker = broker;
+        _brokerConfig = broker;
     }
 
     public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
     {
         VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
         _virtualHostRegistry.registerVirtualHost(virtualHost);
-        getBroker().addVirtualHost(virtualHost);
+        getBrokerConfig().addVirtualHost(virtualHost);
         return virtualHost;
     }
-    
+
     public void registerMessageDelivered(long messageSize)
     {
-        if (isStatisticsEnabled())
-        {
-            _messagesDelivered.registerEvent(1L);
-            _dataDelivered.registerEvent(messageSize);
-        }
+        _messagesDelivered.registerEvent(1L);
+        _dataDelivered.registerEvent(messageSize);
     }
-    
+
     public void registerMessageReceived(long messageSize, long timestamp)
     {
-        if (isStatisticsEnabled())
-        {
-            _messagesReceived.registerEvent(1L, timestamp);
-            _dataReceived.registerEvent(messageSize, timestamp);
-        }
+        _messagesReceived.registerEvent(1L, timestamp);
+        _dataReceived.registerEvent(messageSize, timestamp);
     }
-    
+
     public StatisticsCounter getMessageReceiptStatistics()
     {
         return _messagesReceived;
     }
-    
+
     public StatisticsCounter getDataReceiptStatistics()
     {
         return _dataReceived;
     }
-    
+
     public StatisticsCounter getMessageDeliveryStatistics()
     {
         return _messagesDelivered;
     }
-    
+
     public StatisticsCounter getDataDeliveryStatistics()
     {
         return _dataDelivered;
     }
-    
+
     public void resetStatistics()
     {
         _messagesDelivered.reset();
         _dataDelivered.reset();
         _messagesReceived.reset();
         _dataReceived.reset();
-        
+
         for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
         {
             vhost.resetStatistics();
@@ -723,25 +694,12 @@ public abstract class ApplicationRegistr
 
     public void initialiseStatistics()
     {
-        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
-                getConfiguration().isStatisticsGenerationBrokerEnabled());
-        
         _messagesDelivered = new StatisticsCounter("messages-delivered");
         _dataDelivered = new StatisticsCounter("bytes-delivered");
         _messagesReceived = new StatisticsCounter("messages-received");
         _dataReceived = new StatisticsCounter("bytes-received");
     }
 
-    public boolean isStatisticsEnabled()
-    {
-        return _statisticsEnabled;
-    }
-
-    public void setStatisticsEnabled(boolean enabled)
-    {
-        _statisticsEnabled = enabled;
-    }
-
     private void logStartupMessages(LogActor logActor)
     {
         logActor.message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion()));
@@ -755,4 +713,60 @@ public abstract class ApplicationRegistr
         logActor.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory()));
     }
 
+    public Broker getBroker()
+    {
+        return _broker;
+    }
+
+    @Override
+    public void addPortBindingListener(PortBindingListener listener)
+    {
+        synchronized (_portBindingListeners)
+        {
+            _portBindingListeners.add(listener);
+        }
+    }
+
+
+    @Override
+    public boolean useHTTPManagement()
+    {
+        return _httpManagementPort != -1;
+    }
+
+    @Override
+    public int getHTTPManagementPort()
+    {
+        return _httpManagementPort;
+    }
+
+    @Override
+    public boolean useHTTPSManagement()
+    {
+        return _httpsManagementPort != -1;
+    }
+
+    @Override
+    public int getHTTPSManagementPort()
+    {
+        return _httpsManagementPort;
+    }
+
+    public LogRecorder getLogRecorder()
+    {
+        return _logRecorder;
+    }
+
+    @Override
+    public void addRegistryChangeListener(IAuthenticationManagerRegistry.RegistryChangeListener registryChangeListener)
+    {
+        if(_authenticationManagerRegistry == null)
+        {
+            _authManagerChangeListeners.add(registryChangeListener);
+        }
+        else
+        {
+            _authenticationManagerRegistry.addRegistryChangeListener(registryChangeListener);
+        }
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Fri Aug  3 12:13:32 2012
@@ -44,13 +44,13 @@ public class BrokerConfigAdapter impleme
 
     private final Map<UUID, VirtualHostConfig> _vhosts = new ConcurrentHashMap<UUID, VirtualHostConfig>();
     private final long _createTime = System.currentTimeMillis();
-    private UUID _id;
+    private UUID _qmfId;
     private String _federationTag;
 
     public BrokerConfigAdapter(final IApplicationRegistry instance)
     {
         _instance = instance;
-        _id = instance.getConfigStore().createId();
+        _qmfId = instance.getConfigStore().createId();
         _federationTag = UUID.randomUUID().toString();
     }
 
@@ -114,7 +114,7 @@ public class BrokerConfigAdapter impleme
 
     public void addVirtualHost(final VirtualHostConfig virtualHost)
     {
-        _vhosts.put(virtualHost.getId(), virtualHost);
+        _vhosts.put(virtualHost.getQMFId(), virtualHost);
         getConfigStore().addConfiguredObject(virtualHost);
 
     }
@@ -141,9 +141,10 @@ public class BrokerConfigAdapter impleme
         vhost.createBrokerConnection(transport, host, port, "", durable, authMechanism, username, password);
     }
 
-    public UUID getId()
+    @Override
+    public UUID getQMFId()
     {
-        return _id;
+        return _qmfId;
     }
 
     public BrokerConfigType getConfigType()
@@ -184,7 +185,7 @@ public class BrokerConfigAdapter impleme
     public String toString()
     {
         return "BrokerConfigAdapter{" +
-               "_id=" + _id +
+               "_id=" + _qmfId +
                ", _system=" + _system +
                ", _vhosts=" + _vhosts +
                ", _createTime=" + _createTime +

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Fri Aug  3 12:13:32 2012
@@ -25,8 +25,6 @@ import org.osgi.framework.BundleContext;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.management.JMXManagedObjectRegistry;
-import org.apache.qpid.server.management.NoopManagedObjectRegistry;
 
 import java.io.File;
 
@@ -41,18 +39,4 @@ public class ConfigurationFileApplicatio
     {
         super(new ServerConfiguration(configurationURL), bundleContext);
     }
-
-    @Override
-    protected void initialiseManagedObjectRegistry() throws AMQException
-    {
-        if (getConfiguration().getManagementEnabled())
-        {
-            setManagedObjectRegistry(new JMXManagedObjectRegistry());
-        }
-        else
-        {
-            setManagedObjectRegistry(new NoopManagedObjectRegistry());
-        }
-    }
-
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Fri Aug  3 12:13:32 2012
@@ -27,16 +27,19 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry;
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.transport.QpidAcceptor;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
 import java.util.UUID;
 
 public interface IApplicationRegistry extends StatisticsGatherer
@@ -60,9 +63,18 @@ public interface IApplicationRegistry ex
      */
     ServerConfiguration getConfiguration();
 
-    ManagedObjectRegistry getManagedObjectRegistry();
+    /**
+     * Get the AuthenticationManager for the given socket address
+     *
+     * If no AuthenticationManager has been specifically set for the given address, then use the default
+     * AuthenticationManager
+     *
+     * @param address The (listening) socket address for which the AuthenticationManager is required
+     * @return the AuthenticationManager
+     */
+    AuthenticationManager getAuthenticationManager(SocketAddress address);
 
-    AuthenticationManager getAuthenticationManager();
+    IAuthenticationManagerRegistry getAuthenticationManagerRegistry();
 
     VirtualHostRegistry getVirtualHostRegistry();
 
@@ -85,15 +97,39 @@ public interface IApplicationRegistry ex
 
     QMFService getQMFService();
 
-    void setBroker(BrokerConfig broker);
+    void setBrokerConfig(BrokerConfig broker);
 
-    BrokerConfig getBroker();
+    BrokerConfig getBrokerConfig();
+
+    Broker getBroker();
 
     VirtualHost createVirtualHost(VirtualHostConfiguration vhostConfig) throws Exception;
 
     ConfigStore getConfigStore();
 
     void setConfigStore(ConfigStore store);
-    
+
     void initialiseStatisticsReporting();
+
+    Map<InetSocketAddress, QpidAcceptor> getAcceptors();
+
+    void addPortBindingListener(PortBindingListener listener);
+
+    boolean useHTTPManagement();
+
+    int getHTTPManagementPort();
+
+    boolean useHTTPSManagement();
+
+    int getHTTPSManagementPort();
+
+    void addRegistryChangeListener(IAuthenticationManagerRegistry.RegistryChangeListener registryChangeListener);
+
+    public interface PortBindingListener
+    {
+        public void bound(QpidAcceptor acceptor, InetSocketAddress bindAddress);
+        public void unbound(QpidAcceptor acceptor);
+
+    }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java Fri Aug  3 12:13:32 2012
@@ -198,7 +198,7 @@ public abstract class AbstractPasswordFi
         try
         {
             _userUpdate.lock();
-            _userMap.clear();
+            final Map<String, U> newUserMap = new HashMap<String, U>();
 
             BufferedReader reader = null;
             try
@@ -216,7 +216,7 @@ public abstract class AbstractPasswordFi
 
                     U user = createUserFromFileData(result);
                     getLogger().info("Created user:" + user);
-                    _userMap.put(user.getName(), user);
+                    newUserMap.put(user.getName(), user);
                 }
             }
             finally
@@ -226,6 +226,9 @@ public abstract class AbstractPasswordFi
                     reader.close();
                 }
             }
+
+            _userMap.clear();
+            _userMap.putAll(newUserMap);
         }
         finally
         {

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Fri Aug  3 12:13:32 2012
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,25 +20,24 @@
  */
 package org.apache.qpid.server.security.auth.manager;
 
+import java.security.Principal;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.server.plugins.Plugin;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
 /**
  * Implementations of the AuthenticationManager are responsible for determining
  * the authenticity of a user's credentials.
- * 
+ *
  * If the authentication is successful, the manager is responsible for producing a populated
  * {@link javax.security.auth.Subject} containing the user's identity and zero or more principals representing
  * groups to which the user belongs.
  * <p>
  * The {@link #initialise()} method is responsible for registering SASL mechanisms required by
  * the manager.  The {@link #close()} method must reverse this registration.
- * 
+ *
  */
 public interface AuthenticationManager extends Closeable, Plugin
 {
@@ -64,11 +63,11 @@ public interface AuthenticationManager e
      *
      * @param mechanism mechanism name
      * @param localFQDN domain name
-     *
+     * @param externalPrincipal externally authenticated Principal
      * @return SASL server
      * @throws SaslException
      */
-    SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
+    SaslServer createSaslServer(String mechanism, String localFQDN, Principal externalPrincipal) throws SaslException;
 
     /**
      * Authenticates a user using SASL negotiation.
@@ -90,5 +89,4 @@ public interface AuthenticationManager e
      */
     AuthenticationResult authenticate(String username, String password);
 
-    CallbackHandler getHandler(String mechanism);
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Fri Aug  3 12:13:32 2012
@@ -14,12 +14,13 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.server.security.auth.manager;
 
+import java.security.Principal;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
@@ -31,7 +32,6 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean;
 import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
 import org.apache.qpid.server.security.auth.sasl.JCAProvider;
 import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
@@ -60,9 +60,9 @@ import java.util.TreeMap;
  * Concrete implementation of the AuthenticationManager that determines if supplied
  * user credentials match those appearing in a PrincipalDatabase.   The implementation
  * of the PrincipalDatabase is determined from the configuration.
- * 
+ *
  * This implementation also registers the JMX UserManagemement MBean.
- * 
+ *
  * This plugin expects configuration such as:
  *
  * <pre>
@@ -97,13 +97,14 @@ public class PrincipalDatabaseAuthentica
 
     private PrincipalDatabase _principalDatabase = null;
 
-    private AMQUserManagementMBean _mbean = null;
-
     public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>()
     {
         public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException
         {
-            final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
+            final PrincipalDatabaseAuthenticationManagerConfiguration configuration =
+                    config == null
+                            ? null
+                            : (PrincipalDatabaseAuthenticationManagerConfiguration) config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
 
             // If there is no configuration for this plugin then don't load it.
             if (configuration == null)
@@ -130,7 +131,7 @@ public class PrincipalDatabaseAuthentica
     };
 
     public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin {
- 
+
         public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory()
         {
             public List<String> getParentPaths()
@@ -141,7 +142,7 @@ public class PrincipalDatabaseAuthentica
             public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException
             {
                 final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration();
-                
+
                 instance.setConfiguration(path, config);
                 return instance;
             }
@@ -157,16 +158,16 @@ public class PrincipalDatabaseAuthentica
         public void validateConfiguration() throws ConfigurationException
         {
         }
-  
+
         public String getPrincipalDatabaseClass()
         {
             return getConfig().getString("principal-database.class");
         }
-  
+
         public Map<String,String> getPdClassAttributeMap() throws ConfigurationException
         {
-            final List<String> argumentNames = getConfig().getList("principal-database.attributes.attribute.name");
-            final List<String> argumentValues = getConfig().getList("principal-database.attributes.attribute.value");
+            final List<String> argumentNames = (List) getConfig().getList("principal-database.attributes.attribute.name");
+            final List<String> argumentValues = (List) getConfig().getList("principal-database.attributes.attribute.value");
             final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
 
             for (int i = 0; i < argumentNames.size(); i++)
@@ -181,7 +182,7 @@ public class PrincipalDatabaseAuthentica
         }
     }
 
-    protected PrincipalDatabaseAuthenticationManager()  
+    protected PrincipalDatabaseAuthenticationManager()
     {
     }
 
@@ -207,11 +208,9 @@ public class PrincipalDatabaseAuthentica
         {
             _logger.warn("No additional SASL providers registered.");
         }
-
-        registerManagement();
     }
 
-    private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) 
+    private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database)
     {
         if (database == null || database.getMechanisms().size() == 0)
         {
@@ -259,7 +258,7 @@ public class PrincipalDatabaseAuthentica
 
         _principalDatabase = createPrincipalDatabaseImpl(pdClazz);
 
-        configPrincipalDatabase(_principalDatabase, pdamConfig);        
+        configPrincipalDatabase(_principalDatabase, pdamConfig);
     }
 
     public String getMechanisms()
@@ -267,7 +266,7 @@ public class PrincipalDatabaseAuthentica
         return _mechanisms;
     }
 
-    public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
+    public SaslServer createSaslServer(String mechanism, String localFQDN, Principal externalPrincipal) throws SaslException
     {
         return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
                                      _callbackHandlerMap.get(mechanism));
@@ -300,11 +299,6 @@ public class PrincipalDatabaseAuthentica
         }
     }
 
-    public CallbackHandler getHandler(String mechanism)
-    {
-        return _callbackHandlerMap.get(mechanism);
-    }
-
     /**
      * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
      */
@@ -333,8 +327,6 @@ public class PrincipalDatabaseAuthentica
     {
         _mechanisms = null;
         Security.removeProvider(PROVIDER_NAME);
-
-        unregisterManagement();
     }
 
     private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException
@@ -408,6 +400,11 @@ public class PrincipalDatabaseAuthentica
         }
     }
 
+    public PrincipalDatabase getPrincipalDatabase()
+    {
+        return _principalDatabase;
+    }
+
     private String generateSetterName(String argName) throws ConfigurationException
     {
         if ((argName == null) || (argName.length() == 0))
@@ -428,41 +425,4 @@ public class PrincipalDatabaseAuthentica
     {
         _principalDatabase = principalDatabase;
     }
-
-    protected void registerManagement()
-    {
-        try
-        {
-            _logger.info("Registering UserManagementMBean");
-
-            _mbean = new AMQUserManagementMBean();
-            _mbean.setPrincipalDatabase(_principalDatabase);
-            _mbean.register();
-        }
-        catch (Exception e)
-        {
-            _logger.warn("User management disabled as unable to create MBean:", e);
-            _mbean = null;
-        }
-    }
-
-    protected void unregisterManagement()
-    {
-        try
-        {
-            if (_mbean != null)
-            {
-                _logger.info("Unregistering UserManagementMBean");
-                _mbean.unregister();
-            }
-        }
-        catch (Exception e)
-        {
-            _logger.warn("Failed to unregister User management MBean:", e);
-        }
-        finally
-        {
-            _mbean = null;
-        }
-    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java Fri Aug  3 12:13:32 2012
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.server.security.auth.rmi;
 
+import java.net.SocketAddress;
+
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
@@ -37,11 +40,13 @@ public class RMIPasswordAuthenticator im
     static final String INVALID_CREDENTIALS = "Invalid user details supplied";
     static final String CREDENTIALS_REQUIRED = "User details are required. " +
     		            "Please ensure you are using an up to date management console to connect.";
-    
+
     private AuthenticationManager _authenticationManager = null;
+    private SocketAddress _socketAddress;
 
-    public RMIPasswordAuthenticator()
+    public RMIPasswordAuthenticator(SocketAddress socketAddress)
     {
+        _socketAddress = socketAddress;
     }
 
     public void setAuthenticationManager(final AuthenticationManager authenticationManager)
@@ -79,11 +84,25 @@ public class RMIPasswordAuthenticator im
         {
             throw new SecurityException(SHOULD_BE_NON_NULL);
         }
-        
+
         // Verify that an AuthenticationManager has been set.
         if (_authenticationManager == null)
         {
-            throw new SecurityException(UNABLE_TO_LOOKUP);
+            try
+            {
+                if(ApplicationRegistry.getInstance().getAuthenticationManager(_socketAddress) != null)
+                {
+                    _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager(_socketAddress);
+                }
+                else
+                {
+                    throw new SecurityException(UNABLE_TO_LOOKUP);
+                }
+            }
+            catch(IllegalStateException e)
+            {
+                throw new SecurityException(UNABLE_TO_LOOKUP);
+            }
         }
         final AuthenticationResult result = _authenticationManager.authenticate(username, password);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org