You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC

svn commit: r1186990 [23/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Thu Oct 20 18:42:46 2011
@@ -20,25 +20,26 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.server.configuration.*;
 import org.apache.qpid.server.transport.ServerConnection;
-import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 
 import java.net.SocketAddress;
+import java.nio.ByteBuffer;
 import java.util.UUID;
 
-public class ProtocolEngine_0_10  extends InputHandler implements ProtocolEngine, ConnectionConfig
+public class ProtocolEngine_0_10  extends InputHandler implements ServerProtocolEngine, ConnectionConfig
 {
     public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
 
-    private NetworkDriver _networkDriver;
+    private NetworkConnection _network;
     private long _readBytes;
     private long _writtenBytes;
     private ServerConnection _connection;
@@ -47,26 +48,22 @@ public class ProtocolEngine_0_10  extend
     private long _createTime = System.currentTimeMillis();
 
     public ProtocolEngine_0_10(ServerConnection conn,
-                               NetworkDriver networkDriver,
+                               NetworkConnection network,
                                final IApplicationRegistry appRegistry)
     {
         super(new Assembler(conn));
         _connection = conn;
         _connection.setConnectionConfig(this);
-        _networkDriver = networkDriver;
+
         _id = appRegistry.getConfigStore().createId();
         _appRegistry = appRegistry;
 
-        // FIXME Two log messages to maintain compatinbility with earlier protocol versions
-        _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
-        _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
-    }
+        if(network != null)
+        {
+            setNetworkConnection(network);
+        }
+
 
-    public void setNetworkDriver(NetworkDriver driver)
-    {
-        _networkDriver = driver;
-        Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE);
-        _connection.setSender(dis);
         _connection.onOpen(new Runnable()
         {
             public void run()
@@ -77,14 +74,30 @@ public class ProtocolEngine_0_10  extend
 
     }
 
+    public void setNetworkConnection(NetworkConnection network)
+    {
+        setNetworkConnection(network, network.getSender());
+    }
+
+    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    {
+        _network = network;
+
+        _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+
+        // FIXME Two log messages to maintain compatibility with earlier protocol versions
+        _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
+        _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
+    }
+
     public SocketAddress getRemoteAddress()
     {
-        return _networkDriver.getRemoteAddress();
+        return _network.getRemoteAddress();
     }
 
     public SocketAddress getLocalAddress()
     {
-        return _networkDriver.getLocalAddress();
+        return _network.getLocalAddress();
     }
 
     public long getReadBytes()
@@ -134,7 +147,7 @@ public class ProtocolEngine_0_10  extend
 
     public String getAuthId()
     {
-        return _connection.getAuthorizationID();
+        return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
     }
 
     public String getRemoteProcessName()
@@ -193,9 +206,14 @@ public class ProtocolEngine_0_10  extend
     {
         return false;
     }
-    
+
     public void mgmtClose()
     {
         _connection.mgmtClose();
     }
+
+    public long getConnectionId()
+    {
+        return _connection.getConnectionId();
+    }
 }

Propchange: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -3,4 +3,5 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1072051-1185907

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Thu Oct 20 18:42:46 2011
@@ -60,7 +60,7 @@ public class AMQPriorityQueue extends Si
     {
         // check that all subscriptions are not in advance of the entry
         SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
-        while(subIter.advance() && !entry.isAcquired())
+        while(subIter.advance() && entry.isAvailable())
         {
             final Subscription subscription = subIter.getNode().getSubscription();
             if(!subscription.isClosed())
@@ -70,7 +70,7 @@ public class AMQPriorityQueue extends Si
                 {
                     QueueEntry subnode = context._lastSeenEntry;
                     QueueEntry released = context._releasedEntry;
-                    while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+                    while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0))
                     {
                         if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
                         {

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Oct 20 18:42:46 2011
@@ -21,21 +21,18 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.QueueConfig;
-import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -72,8 +69,8 @@ public interface AMQQueue extends Managa
     boolean isAutoDelete();
 
     AMQShortString getOwner();
-    PrincipalHolder getPrincipalHolder();
-    void setPrincipalHolder(PrincipalHolder principalHolder);
+    AuthorizationHolder getAuthorizationHolder();
+    void setAuthorizationHolder(AuthorizationHolder principalHolder);
 
     void setExclusiveOwningSession(AMQSessionModel owner);
     AMQSessionModel getExclusiveOwningSession();
@@ -108,23 +105,16 @@ public interface AMQQueue extends Managa
 
     boolean isDeleted();
 
-
     int delete() throws AMQException;
 
-
     void requeue(QueueEntry entry);
 
-    void requeue(QueueEntryImpl storeContext, Subscription subscription);
-
     void dequeue(QueueEntry entry, Subscription sub);
 
     void decrementUnackedMsgCount();
 
-
     boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
 
-
-
     void addQueueDeleteTask(final Task task);
     void removeQueueDeleteTask(final Task task);
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Oct 20 18:42:46 2011
@@ -43,6 +43,7 @@ import javax.management.JMException;
 import javax.management.MBeanException;
 import javax.management.MBeanNotificationInfo;
 import javax.management.Notification;
+import javax.management.ObjectName;
 import javax.management.OperationsException;
 import javax.management.monitor.MonitorNotification;
 import javax.management.openmbean.ArrayType;
@@ -97,7 +98,7 @@ public class AMQQueueMBean extends AMQMa
     {
         super(ManagedQueue.class, ManagedQueue.TYPE);
         _queue = queue;
-        _queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString();
+        _queueName = queue.getName();
     }
 
     public ManagedObject getParentObject()
@@ -147,7 +148,7 @@ public class AMQQueueMBean extends AMQMa
 
     public String getObjectInstanceName()
     {
-        return _queueName;
+        return ObjectName.quote(_queueName);
     }
 
     public String getName()
@@ -506,7 +507,7 @@ public class AMQQueueMBean extends AMQMa
     private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
     {
         List<String> list = new ArrayList<String>();
-        BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+        BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties();
         list.add("reply-to = " + headerProperties.getReplyToAsString());
         list.add("propertyFlags = " + headerProperties.getPropertyFlags());
         list.add("ApplicationID = " + headerProperties.getAppIdAsString());

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Thu Oct 20 18:42:46 2011
@@ -96,9 +96,9 @@ public class IncomingMessage implements 
     public void setExpiration()
     {
             long expiration =
-                    ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
+                    ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
             long timestamp =
-                    ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
+                    ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
 
             if (SYNCHED_CLOCKS)
             {
@@ -139,7 +139,7 @@ public class IncomingMessage implements 
     public int addContentBodyFrame(final ContentChunk contentChunk)
             throws AMQException
     {
-        _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
+        _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
 
@@ -193,8 +193,8 @@ public class IncomingMessage implements 
 
     public boolean isPersistent()
     {
-        return getContentHeader().properties instanceof BasicContentHeaderProperties &&
-             ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
+        return getContentHeader().getProperties() instanceof BasicContentHeaderProperties &&
+             ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() ==
                                                              BasicContentHeaderProperties.PERSISTENT;
     }
 
@@ -263,7 +263,7 @@ public class IncomingMessage implements 
         int written = 0;
         for(ContentChunk cb : _contentChunks)
         {
-            ByteBuffer data = cb.getData().buf();
+            ByteBuffer data = ByteBuffer.wrap(cb.getData());
             if(offset+written >= pos && offset < pos + data.limit())
             {
                 ByteBuffer src = data.duplicate();

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Oct 20 18:42:46 2011
@@ -52,6 +52,17 @@ public interface QueueEntry extends Comp
         }
 
         public abstract State getState();
+
+        /**
+         * Returns true if state is either DEQUEUED or DELETED.
+         *
+         * @return true if state is either DEQUEUED or DELETED.
+         */
+        public boolean isDispensed()
+        {
+            State currentState = getState();
+            return currentState == State.DEQUEUED || currentState == State.DELETED;
+        }
     }
 
 
@@ -191,11 +202,7 @@ public interface QueueEntry extends Comp
 
     void reject();
 
-    void reject(Subscription subscription);
-
-    boolean isRejectedBy(Subscription subscription);
-
-    void requeue(Subscription subscription);
+    boolean isRejectedBy(long subscriptionId);
 
     void dequeue();
 
@@ -209,4 +216,18 @@ public interface QueueEntry extends Comp
 
     void addStateChangeListener(StateChangeListener listener);
     boolean removeStateChangeListener(StateChangeListener listener);
+
+    /**
+     * Returns true if entry is in DEQUEUED state, otherwise returns false.
+     *
+     * @return true if entry is in DEQUEUED state, otherwise returns false
+     */
+    boolean isDequeued();
+
+    /**
+     * Returns true if entry is either DEQUED or DELETED state.
+     *
+     * @return true if entry is either DEQUED or DELETED state
+     */
+    boolean isDispensed();
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Oct 20 18:42:46 2011
@@ -51,7 +51,7 @@ public class QueueEntryImpl implements Q
 
     private MessageReference _message;
 
-    private Set<Subscription> _rejectedBy = null;
+    private Set<Long> _rejectedBy = null;
 
     private volatile EntryState _state = AVAILABLE_STATE;
 
@@ -325,19 +325,16 @@ public class QueueEntryImpl implements Q
 
     public void reject()
     {
-        reject(getDeliveredSubscription());
-    }
+        Subscription subscription = getDeliveredSubscription();
 
-    public void reject(Subscription subscription)
-    {
         if (subscription != null)
         {
             if (_rejectedBy == null)
             {
-                _rejectedBy = new HashSet<Subscription>();
+                _rejectedBy = new HashSet<Long>();
             }
 
-            _rejectedBy.add(subscription);
+            _rejectedBy.add(subscription.getSubscriptionID());
         }
         else
         {
@@ -345,12 +342,12 @@ public class QueueEntryImpl implements Q
         }
     }
 
-    public boolean isRejectedBy(Subscription subscription)
+    public boolean isRejectedBy(long subscriptionId)
     {
 
         if (_rejectedBy != null) // We have subscriptions that rejected this message
         {
-            return _rejectedBy.contains(subscription);
+            return _rejectedBy.contains(subscriptionId);
         }
         else // This messasge hasn't been rejected yet.
         {
@@ -358,15 +355,6 @@ public class QueueEntryImpl implements Q
         }
     }
 
-    public void requeue(Subscription subscription)
-    {
-        getQueue().requeue(this, subscription);
-        if(_stateChangeListeners != null)
-        {
-            notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
-        }
-    }
-
     public void dequeue()
     {
         EntryState state = _state;
@@ -508,7 +496,7 @@ public class QueueEntryImpl implements Q
     {
 
         QueueEntryImpl next = nextNode();
-        while(next != null && next.isDeleted())
+        while(next != null && next.isDispensed() )
         {
 
             final QueueEntryImpl newNext = next.nextNode();
@@ -556,4 +544,14 @@ public class QueueEntryImpl implements Q
         return _queueEntryList;
     }
 
+    public boolean isDequeued()
+    {
+        return _state == DEQUEUED_STATE;
+    }
+
+    public boolean isDispensed()
+    {
+        return _state.isDispensed();
+    }
+
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Oct 20 18:42:46 2011
@@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -83,7 +83,7 @@ public class SimpleAMQQueue implements A
     /** null means shared */
     private final AMQShortString _owner;
 
-    private PrincipalHolder _prinicpalHolder;
+    private AuthorizationHolder _authorizationHolder;
 
     private boolean _exclusive = false;
     private AMQSessionModel _exclusiveOwner;
@@ -102,9 +102,7 @@ public class SimpleAMQQueue implements A
 
     protected final QueueEntryList _entries;
 
-    protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
-
-    private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+    protected final SubscriptionList _subscriptionList = new SubscriptionList();
 
     private volatile Subscription _exclusiveSubscriber;
 
@@ -373,14 +371,14 @@ public class SimpleAMQQueue implements A
         return _owner;
     }
 
-    public PrincipalHolder getPrincipalHolder()
+    public AuthorizationHolder getAuthorizationHolder()
     {
-        return _prinicpalHolder;
+        return _authorizationHolder;
     }
 
-    public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
+    public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
     {
-        _prinicpalHolder = prinicpalHolder;
+        _authorizationHolder = authorizationHolder;
     }
 
 
@@ -602,25 +600,25 @@ public class SimpleAMQQueue implements A
             iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
 
              */
-            SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
-            SubscriptionList.SubscriptionNode nextNode = node.getNext();
+            SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
+            SubscriptionList.SubscriptionNode nextNode = node.findNext();
             if (nextNode == null)
             {
-                nextNode = _subscriptionList.getHead().getNext();
+                nextNode = _subscriptionList.getHead().findNext();
             }
             while (nextNode != null)
             {
-                if (_lastSubscriptionNode.compareAndSet(node, nextNode))
+                if (_subscriptionList.updateMarkedNode(node, nextNode))
                 {
                     break;
                 }
                 else
                 {
-                    node = _lastSubscriptionNode.get();
-                    nextNode = node.getNext();
+                    node = _subscriptionList.getMarkedNode();
+                    nextNode = node.findNext();
                     if (nextNode == null)
                     {
-                        nextNode = _subscriptionList.getHead().getNext();
+                        nextNode = _subscriptionList.getHead().findNext();
                     }
                 }
             }
@@ -629,7 +627,7 @@ public class SimpleAMQQueue implements A
             // this catches the case where we *just* miss an update
             int loops = 2;
 
-            while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+            while (entry.isAvailable() && loops != 0)
             {
                 if (nextNode == null)
                 {
@@ -642,13 +640,13 @@ public class SimpleAMQQueue implements A
                     Subscription sub = nextNode.getSubscription();
                     deliverToSubscription(sub, entry);
                 }
-                nextNode = nextNode.getNext();
+                nextNode = nextNode.findNext();
 
             }
         }
 
 
-        if (!(entry.isAcquired() || entry.isDeleted()))
+        if (entry.isAvailable())
         {
             checkSubscriptionsNotAheadOfDelivery(entry);
 
@@ -805,24 +803,6 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void requeue(QueueEntryImpl entry, Subscription subscription)
-    {
-        SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
-        // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
-        while (subscriberIter.advance())
-        {
-            Subscription sub = subscriberIter.getNode().getSubscription();
-
-            // we don't make browsers send the same stuff twice
-            if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
-            {
-                updateSubRequeueEntry(sub, entry);
-            }
-        }
-
-        deliverAsync();
-    }
-
     public void dequeue(QueueEntry entry, Subscription sub)
     {
         decrementQueueCount();
@@ -960,7 +940,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (node != null && !node.isDeleted())
+            if (node != null && !node.isDispensed())
             {
                 entryList.add(node);
             }
@@ -1064,7 +1044,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance() && !filter.filterComplete())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDeleted() && filter.accept(node))
+            if (!node.isDispensed() && filter.accept(node))
             {
                 entryList.add(node);
             }
@@ -1258,7 +1238,6 @@ public class SimpleAMQQueue implements A
 
                 if ((messageId >= fromMessageId)
                     && (messageId <= toMessageId)
-                    && !node.isDeleted()
                     && node.acquire())
                 {
                     dequeueEntry(node);
@@ -1288,7 +1267,7 @@ public class SimpleAMQQueue implements A
         while (noDeletes && queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDeleted() && node.acquire())
+            if (node.acquire())
             {
                 dequeueEntry(node);
                 noDeletes = false;
@@ -1318,7 +1297,7 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            if (!node.isDeleted() && node.acquire())
+            if (node.acquire())
             {
                 dequeueEntry(node, txn);
                 if(++count == request)
@@ -1585,7 +1564,7 @@ public class SimpleAMQQueue implements A
 
     public void deliverAsync()
     {
-        Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+        QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
 
         if (_asynchronousRunner.compareAndSet(null, runner))
         {
@@ -1604,52 +1583,6 @@ public class SimpleAMQQueue implements A
         _asyncDelivery.execute(flusher);
     }
 
-
-    private class Runner implements ReadWriteRunnable
-    {
-        String _name;
-        public Runner(long count)
-        {
-            _name = "QueueRunner-" + count + "-" + _logActor;
-        }
-
-        public void run()
-        {
-            String originalName = Thread.currentThread().getName();
-            try
-            {
-                Thread.currentThread().setName(_name);
-                CurrentActor.set(_logActor);
-
-                processQueue(this);
-            }
-            catch (AMQException e)
-            {
-                _logger.error(e);
-            }
-            finally
-            {
-                CurrentActor.remove();
-                Thread.currentThread().setName(originalName);
-            }
-        }
-
-        public boolean isRead()
-        {
-            return false;
-        }
-
-        public boolean isWrite()
-        {
-            return true;
-        }
-
-        public String toString()
-        {
-            return _name;
-        }
-    }
-
     public void flushSubscription(Subscription sub) throws AMQException
     {
         // Access control
@@ -1718,7 +1651,7 @@ public class SimpleAMQQueue implements A
 
             QueueEntry node  = getNextAvailableEntry(sub);
 
-            if (node != null && !(node.isAcquired() || node.isDeleted()))
+            if (node != null && node.isAvailable())
             {
                 if (sub.hasInterest(node))
                 {
@@ -1779,7 +1712,7 @@ public class SimpleAMQQueue implements A
             QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
 
             boolean expired = false;
-            while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
+            while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
             {
                 if (expired)
                 {
@@ -1808,14 +1741,40 @@ public class SimpleAMQQueue implements A
     }
 
 
-    private void processQueue(Runnable runner) throws AMQException
+    /**
+     * Used by queue Runners to asynchronously deliver messages to consumers.
+     *
+     * A queue Runner is started whenever a state change occurs, e.g when a new
+     * message arrives on the queue and cannot be immediately delivered to a
+     * subscription (i.e. asynchronous delivery is required). Unless there are
+     * SubFlushRunners operating (due to subscriptions unsuspending) which are
+     * capable of accepting/delivering all messages then these messages would
+     * otherwise remain on the queue.
+     *
+     * processQueue should be running while there are messages on the queue AND
+     * there are subscriptions that can deliver them. If there are no
+     * subscriptions capable of delivering the remaining messages on the queue
+     * then processQueue should stop to prevent spinning.
+     *
+     * Since processQueue is runs in a fixed size Executor, it should not run
+     * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
+     * incoming messages may not be able to be scheduled in the thread pool
+     * because all threads are working on clearing down large queues). To solve
+     * this problem, after an arbitrary number of message deliveries the
+     * processQueue job stops iterating, resubmits itself to the executor, and
+     * ends the current instance
+     *
+     * @param runner the Runner to schedule
+     * @throws AMQException
+     */
+    public void processQueue(QueueRunner runner) throws AMQException
     {
         long stateChangeCount;
         long previousStateChangeCount = Long.MIN_VALUE;
         boolean deliveryIncomplete = true;
 
-        int extraLoops = 1;
-        long iterations = MAX_ASYNC_DELIVERIES;
+        boolean lastLoop = false;
+        int iterations = MAX_ASYNC_DELIVERIES;
 
         _asynchronousRunner.compareAndSet(runner, null);
 
@@ -1832,12 +1791,14 @@ public class SimpleAMQQueue implements A
 
             if (previousStateChangeCount != stateChangeCount)
             {
-                extraLoops = 1;
+                //further asynchronous delivery is required since the
+                //previous loop. keep going if iteration slicing allows.
+                lastLoop = false;
             }
 
             previousStateChangeCount = stateChangeCount;
-            deliveryIncomplete = _subscriptionList.size() != 0;
-            boolean done;
+            boolean allSubscriptionsDone = true;
+            boolean subscriptionDone;
 
             SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
             //iterate over the subscribers and try to advance their pointer
@@ -1847,30 +1808,25 @@ public class SimpleAMQQueue implements A
                 sub.getSendLock();
                 try
                 {
-
-                    done = attemptDelivery(sub);
-
-                    if (done)
+                    //attempt delivery. returns true if no further delivery currently possible to this sub
+                    subscriptionDone = attemptDelivery(sub);
+                    if (subscriptionDone)
                     {
-                        if (extraLoops == 0)
+                        //close autoClose subscriptions if we are not currently intent on continuing
+                        if (lastLoop && sub.isAutoClose())
                         {
-                            deliveryIncomplete = false;
-                            if (sub.isAutoClose())
-                            {
-                                unregisterSubscription(sub);
+                            unregisterSubscription(sub);
 
-                                sub.confirmAutoClose();
-                            }
-                        }
-                        else
-                        {
-                            extraLoops--;
+                            sub.confirmAutoClose();
                         }
                     }
                     else
                     {
+                        //this subscription can accept additional deliveries, so we must 
+                        //keep going after this (if iteration slicing allows it)
+                        allSubscriptionsDone = false;
+                        lastLoop = false;
                         iterations--;
-                        extraLoops = 1;
                     }
                 }
                 finally
@@ -1878,10 +1834,34 @@ public class SimpleAMQQueue implements A
                     sub.releaseSendLock();
                 }
             }
+
+            if(allSubscriptionsDone && lastLoop)
+            {
+                //We have done an extra loop already and there are again
+                //again no further delivery attempts possible, only
+                //keep going if state change demands it.
+                deliveryIncomplete = false;
+            }
+            else if(allSubscriptionsDone)
+            {
+                //All subscriptions reported being done, but we have to do
+                //an extra loop if the iterations are not exhausted and
+                //there is still any work to be done
+                deliveryIncomplete = _subscriptionList.size() != 0;
+                lastLoop = true;
+            }
+            else
+            {
+                //some subscriptions can still accept more messages,
+                //keep going if iteration count allows.
+                lastLoop = false;
+                deliveryIncomplete = true;
+            }
+
             _asynchronousRunner.set(null);
         }
 
-        // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
+        // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
         // therefore we should schedule this runner again (unless someone beats us to it :-) ).
         if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
         {
@@ -1901,8 +1881,8 @@ public class SimpleAMQQueue implements A
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
-            // Only process nodes that are not currently deleted
-            if (!node.isDeleted())
+            // Only process nodes that are not currently deleted and not dequeued
+            if (!node.isDispensed())
             {
                 // If the node has exired then aquire it
                 if (node.expired() && node.acquire())
@@ -2242,4 +2222,9 @@ public class SimpleAMQQueue implements A
             }
         }
     }
+
+    public LogActor getLogActor()
+    {
+        return _logActor;
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Thu Oct 20 18:42:46 2011
@@ -1,6 +1,5 @@
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.ServerMessage;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -156,7 +155,7 @@ public class SimpleQueueEntryList implem
             if(!atTail())
             {
                 QueueEntryImpl nextNode = _lastNode.nextNode();
-                while(nextNode.isDeleted() && nextNode.nextNode() != null)
+                while(nextNode.isDispensed() && nextNode.nextNode() != null)
                 {
                     nextNode = nextNode.nextNode();
                 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Thu Oct 20 18:42:46 2011
@@ -21,9 +21,14 @@
 package org.apache.qpid.server.registry;
 
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
@@ -41,23 +46,27 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
 import org.apache.qpid.server.logging.Log4jMessageLogger;
 import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.AbstractRootMessageLogger;
 import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
 import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.Plugin;
 import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.SecurityManager.SecurityConfiguration;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.transport.QpidAcceptor;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.osgi.framework.BundleContext;
+
 
 /**
  * An abstract application registry that provides access to configuration information and handles the
@@ -69,12 +78,10 @@ public abstract class ApplicationRegistr
 {
     protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
 
-    private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>();
+    private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null);
 
     protected final ServerConfiguration _configuration;
 
-    public static final int DEFAULT_INSTANCE = 1;
-
     protected final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>();
 
     protected ManagedObjectRegistry _managedObjectRegistry;
@@ -85,8 +92,6 @@ public abstract class ApplicationRegistr
 
     protected SecurityManager _securityManager;
 
-    protected PrincipalDatabaseManager _databaseManager;
-
     protected PluginManager _pluginManager;
 
     protected ConfigurationManager _configurationManager;
@@ -102,8 +107,12 @@ public abstract class ApplicationRegistr
     private BrokerConfig _broker;
 
     private ConfigStore _configStore;
+    
+    private Timer _reportingTimer;
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
-    protected String _registryName;
+    private BundleContext _bundleContext;
 
     static
     {
@@ -114,53 +123,54 @@ public abstract class ApplicationRegistr
     {
         public void run()
         {
-            removeAll();
+            remove();
         }
     }
 
     public static void initialise(IApplicationRegistry instance) throws Exception
     {
-        initialise(instance, DEFAULT_INSTANCE);
-    }
+        if(instance == null)
+        {
+            throw new IllegalArgumentException("ApplicationRegistry instance must not be null");
+        }
 
-    @SuppressWarnings("finally")
-    public static void initialise(IApplicationRegistry instance, int instanceID) throws Exception
-    {
-        if (instance != null)
+        if(!_instance.compareAndSet(null, instance))
         {
-            _logger.info("Initialising Application Registry(" + instance + "):" + instanceID);
-            _instanceMap.put(instanceID, instance);
+            throw new IllegalStateException("An ApplicationRegistry is already initialised");
+        }
+
+        _logger.info("Initialising Application Registry(" + instance + ")");
+
+
+        final ConfigStore store = ConfigStore.newInstance();
+        store.setRoot(new SystemConfigImpl(store));
+        instance.setConfigStore(store);
 
-            final ConfigStore store = ConfigStore.newInstance();
-            store.setRoot(new SystemConfigImpl(store));
-            instance.setConfigStore(store);
+        BrokerConfig broker = new BrokerConfigAdapter(instance);
 
-            BrokerConfig broker = new BrokerConfigAdapter(instance);
+        SystemConfig system = (SystemConfig) store.getRoot();
+        system.addBroker(broker);
+        instance.setBroker(broker);
 
-            SystemConfig system = (SystemConfig) store.getRoot();
-            system.addBroker(broker);
-            instance.setBroker(broker);
+        try
+        {
+            instance.initialise();
+        }
+        catch (Exception e)
+        {
+            _instance.set(null);
 
+            //remove the Broker instance, then re-throw
             try
             {
-                instance.initialise(instanceID);
+                system.removeBroker(broker);
             }
-            catch (Exception e)
+            catch(Throwable t)
             {
-                _instanceMap.remove(instanceID);
-                try
-                {
-                    system.removeBroker(broker);
-                }
-                finally
-                {
-                    throw e;
-                }
+                //ignore
             }
-        }
-        else
-        {
-            remove(instanceID);
+
+            throw e;
         }
     }
 
@@ -176,35 +186,19 @@ public abstract class ApplicationRegistr
 
     public static boolean isConfigured()
     {
-        return isConfigured(DEFAULT_INSTANCE);
-    }
-
-    public static boolean isConfigured(int instanceID)
-    {
-        return _instanceMap.containsKey(instanceID);
+        return _instance.get() != null;
     }
 
-    /** Method to cleanly shutdown the default registry running in this JVM */
     public static void remove()
     {
-        remove(DEFAULT_INSTANCE);
-    }
-
-    /**
-     * Method to cleanly shutdown specified registry running in this JVM
-     *
-     * @param instanceID the instance to shutdown
-     */
-    public static void remove(int instanceID)
-    {
+        IApplicationRegistry instance = _instance.getAndSet(null);
         try
         {
-            IApplicationRegistry instance = _instanceMap.get(instanceID);
             if (instance != null)
             {
                 if (_logger.isInfoEnabled())
                 {
-                    _logger.info("Shutting down ApplicationRegistry(" + instanceID + "):" + instance);
+                    _logger.info("Shutting down ApplicationRegistry(" + instance + ")");
                 }
                 instance.close();
                 instance.getBroker().getSystem().removeBroker(instance.getBroker());
@@ -212,27 +206,19 @@ public abstract class ApplicationRegistr
         }
         catch (Exception e)
         {
-            _logger.error("Error shutting down Application Registry(" + instanceID + "): " + e, e);
-        }
-        finally
-        {
-            _instanceMap.remove(instanceID);
+            _logger.error("Error shutting down Application Registry(" + instance + "): " + e, e);
         }
     }
 
-    /** Method to cleanly shutdown all registries currently running in this JVM */
-    public static void removeAll()
+    protected ApplicationRegistry(ServerConfiguration configuration)
     {
-        Object[] keys = _instanceMap.keySet().toArray();
-        for (Object k : keys)
-        {
-            remove((Integer) k);
-        }
+        this(configuration, null);
     }
 
-    protected ApplicationRegistry(ServerConfiguration configuration)
+    protected ApplicationRegistry(ServerConfiguration configuration, BundleContext bundleContext)
     {
         _configuration = configuration;
+        _bundleContext = bundleContext;
     }
 
     public void configure() throws ConfigurationException
@@ -241,7 +227,7 @@ public abstract class ApplicationRegistr
 
         try
         {
-            _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory());
+            _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory(), _bundleContext);
         }
         catch (Exception e)
         {
@@ -251,11 +237,10 @@ public abstract class ApplicationRegistr
         _configuration.initialise();
     }
 
-    public void initialise(int instanceID) throws Exception
+    public void initialise() throws Exception
     {
         //Create the RootLogger to be used during broker operation
         _rootMessageLogger = new Log4jMessageLogger(_configuration);
-        _registryName = String.valueOf(instanceID);
 
         //Create the composite (log4j+SystemOut MessageLogger to be used during startup
         RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger};
@@ -277,11 +262,7 @@ public abstract class ApplicationRegistr
 
             _securityManager = new SecurityManager(_configuration, _pluginManager);
 
-            createDatabaseManager(_configuration);
-
-            _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
-
-            _databaseManager.initialiseManagement(_configuration);
+            _authenticationManager = createAuthenticationManager();
 
             _managedObjectRegistry.start();
         }
@@ -294,6 +275,8 @@ public abstract class ApplicationRegistr
         try
         {
             initialiseVirtualHosts();
+            initialiseStatistics();
+            initialiseStatisticsReporting();
         }
         finally
         {
@@ -302,9 +285,51 @@ public abstract class ApplicationRegistr
         }
     }
 
-    protected void createDatabaseManager(ServerConfiguration configuration) throws Exception
+    /**
+     * Iterates across all discovered authentication manager factories, offering the security configuration to each.
+     * Expects <b>exactly</b> one authentication manager to configure and initialise itself.
+     * 
+     * It is an error to configure more than one authentication manager, or to configure none.
+     *
+     * @return authentication manager
+     * @throws ConfigurationException
+     */
+    protected AuthenticationManager createAuthenticationManager() throws ConfigurationException
     {
-        _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration);
+        final SecurityConfiguration securityConfiguration = _configuration.getConfiguration(SecurityConfiguration.class.getName());
+        final Collection<AuthenticationManagerPluginFactory<? extends Plugin>> factories = _pluginManager.getAuthenticationManagerPlugins().values();
+        
+        if (factories.size() == 0)
+        {
+            throw new ConfigurationException("No authentication manager factory plugins found.  Check the desired authentication" +
+                    "manager plugin has been placed in the plugins directory.");
+        }
+        
+        AuthenticationManager authMgr = null;
+        
+        for (final Iterator<AuthenticationManagerPluginFactory<? extends Plugin>> iterator = factories.iterator(); iterator.hasNext();)
+        {
+            final AuthenticationManagerPluginFactory<? extends Plugin> factory = (AuthenticationManagerPluginFactory<? extends Plugin>) iterator.next();
+            final AuthenticationManager tmp = factory.newInstance(securityConfiguration);
+            if (tmp != null)
+            {
+                if (authMgr != null)
+                {
+                    throw new ConfigurationException("Cannot configure more than one authentication manager."
+                            + " Both " + tmp.getClass() + " and " + authMgr.getClass() + " are configured."
+                            + " Remove configuration for one of the authentication manager, or remove the plugin JAR"
+                            + " from the classpath.");
+                }
+                authMgr = tmp;
+            }
+        }
+
+        if (authMgr == null)
+        {
+            throw new ConfigurationException("No authentication managers configured within the configure file.");
+        }
+        
+        return authMgr;
     }
 
     protected void initialiseVirtualHosts() throws Exception
@@ -320,26 +345,88 @@ public abstract class ApplicationRegistr
     {
         _managedObjectRegistry = new NoopManagedObjectRegistry();
     }
-
-    public static IApplicationRegistry getInstance()
+    
+    public void initialiseStatisticsReporting()
     {
-        return getInstance(DEFAULT_INSTANCE);
+        long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
+        final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
+        final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
+        final boolean reset = _configuration.isStatisticsReportResetEnabled();
+        
+        /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
+        if (report > 0L && (broker || virtualhost))
+        {
+            _reportingTimer = new Timer("Statistics-Reporting", true);
+            
+            class StatisticsReportingTask extends TimerTask
+            {
+                private final int DELIVERED = 0;
+                private final int RECEIVED = 1;
+                
+                public void run()
+                {
+                    CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+                        public String getLogMessage()
+                        {
+                            return "[" + Thread.currentThread().getName() + "] ";
+                        }
+                    });
+                    
+                    if (broker)
+                    {
+                        CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
+                        CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
+                        CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
+                        CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
+                    }
+                    
+                    if (virtualhost)
+                    {
+                        for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
+                        {
+                            String name = vhost.getName();
+                            StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
+                            StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
+                            StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
+                            StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
+                            
+                            CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
+                            CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
+                            CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
+                            CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
+                        }
+                    }
+                    
+                    if (reset)
+                    {
+                        resetStatistics();
+                    }
+
+                    CurrentActor.remove();
+                }
+            }
+
+            _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
+                                                report / 2,
+                                                report);
+        }
     }
 
-    public static IApplicationRegistry getInstance(int instanceID)
+    /**
+     * Get the ApplicationRegistry
+     * @return the IApplicationRegistry instance
+     * @throws IllegalStateException if no registry instance has been initialised.
+     */
+    public static IApplicationRegistry getInstance() throws IllegalStateException
     {
-        synchronized (IApplicationRegistry.class)
+        IApplicationRegistry iApplicationRegistry = _instance.get();
+        if (iApplicationRegistry == null)
         {
-            IApplicationRegistry instance = _instanceMap.get(instanceID);
-
-            if (instance == null)
-            {
-                throw new IllegalStateException("Application Registry (" + instanceID + ") not created");
-            }
-            else
-            {
-                return instance;
-            }
+            throw new IllegalStateException("No ApplicationRegistry has been initialised");
+        }
+        else
+        {
+            return iApplicationRegistry;
         }
     }
 
@@ -369,6 +456,12 @@ public abstract class ApplicationRegistr
         {
             _logger.info("Shutting down ApplicationRegistry:" + this);
         }
+        
+        //Stop Statistics Reporting
+        if (_reportingTimer != null)
+        {
+            _reportingTimer.cancel();
+        }
 
         //Stop incoming connections
         unbind();
@@ -376,10 +469,6 @@ public abstract class ApplicationRegistr
         //Shutdown virtualhosts
         close(_virtualHostRegistry);
 
-//      close(_accessManager);
-//
-//      close(_databaseManager);
-
         close(_authenticationManager);
 
         close(_managedObjectRegistry);
@@ -401,7 +490,7 @@ public abstract class ApplicationRegistr
 
                 try
                 {
-                    acceptor.getNetworkDriver().close();
+                    acceptor.getNetworkTransport().close();
                 }
                 catch (Throwable e)
                 {
@@ -441,11 +530,6 @@ public abstract class ApplicationRegistr
         return _managedObjectRegistry;
     }
 
-    public PrincipalDatabaseManager getDatabaseManager()
-    {
-        return _databaseManager;
-    }
-
     public AuthenticationManager getAuthenticationManager()
     {
         return _authenticationManager;
@@ -493,9 +577,81 @@ public abstract class ApplicationRegistr
 
     public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
     {
-        VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
+        VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig, null);
         _virtualHostRegistry.registerVirtualHost(virtualHost);
         getBroker().addVirtualHost(virtualHost);
         return virtualHost;
     }
+    
+    public void registerMessageDelivered(long messageSize)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesDelivered.registerEvent(1L);
+            _dataDelivered.registerEvent(messageSize);
+        }
+    }
+    
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesReceived.registerEvent(1L, timestamp);
+            _dataReceived.registerEvent(messageSize, timestamp);
+        }
+    }
+    
+    public StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+    
+    public StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+    
+    public StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+    
+    public StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+    
+    public void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+        
+        for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
+        {
+            vhost.resetStatistics();
+        }
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+                getConfiguration().isStatisticsGenerationBrokerEnabled());
+        
+        _messagesDelivered = new StatisticsCounter("messages-delivered");
+        _dataDelivered = new StatisticsCounter("bytes-delivered");
+        _messagesReceived = new StatisticsCounter("messages-received");
+        _dataReceived = new StatisticsCounter("bytes-received");
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Thu Oct 20 18:42:46 2011
@@ -71,7 +71,7 @@ public class BrokerConfigAdapter impleme
 
     public Integer getWorkerThreads()
     {
-        return _instance.getConfiguration().getProcessors();
+        return _instance.getConfiguration().getConnectorProcessors();
     }
 
     public Integer getMaxConnections()

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Thu Oct 20 18:42:46 2011
@@ -29,12 +29,18 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.management.JMXManagedObjectRegistry;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.osgi.framework.BundleContext;
 
 public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
 {
     public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
     {
-        super(new ServerConfiguration(configurationURL));
+        this(configurationURL, null);
+    }
+
+    public ConfigurationFileApplicationRegistry(File configurationURL, BundleContext bundleContext) throws ConfigurationException
+    {
+        super(new ServerConfiguration(configurationURL), bundleContext);
     }
 
     @Override

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Thu Oct 20 18:42:46 2011
@@ -33,21 +33,20 @@ import org.apache.qpid.server.logging.Ro
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.transport.QpidAcceptor;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
-public interface IApplicationRegistry
+public interface IApplicationRegistry extends StatisticsGatherer
 {
     /**
      * Initialise the application registry. All initialisation must be done in this method so that any components
      * that need access to the application registry itself for initialisation are able to use it. Attempting to
      * initialise in the constructor will lead to failures since the registry reference will not have been set.
-     * @param instanceID the instanceID that we can use to identify this AR.
      */
-    void initialise(int instanceID) throws Exception;
+    void initialise() throws Exception;
 
     /**
      * Shutdown this Registry
@@ -63,8 +62,6 @@ public interface IApplicationRegistry
 
     ManagedObjectRegistry getManagedObjectRegistry();
 
-    PrincipalDatabaseManager getDatabaseManager();
-
     AuthenticationManager getAuthenticationManager();
 
     VirtualHostRegistry getVirtualHostRegistry();
@@ -97,4 +94,6 @@ public interface IApplicationRegistry
     ConfigStore getConfigStore();
 
     void setConfigStore(ConfigStore store);
+    
+    void initialiseStatisticsReporting();
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java Thu Oct 20 18:42:46 2011
@@ -18,8 +18,19 @@
  */
 package org.apache.qpid.server.security;
 
-import static org.apache.qpid.server.security.access.ObjectType.*;
-import static org.apache.qpid.server.security.access.Operation.*;
+import static org.apache.qpid.server.security.access.ObjectType.EXCHANGE;
+import static org.apache.qpid.server.security.access.ObjectType.METHOD;
+import static org.apache.qpid.server.security.access.ObjectType.OBJECT;
+import static org.apache.qpid.server.security.access.ObjectType.QUEUE;
+import static org.apache.qpid.server.security.access.ObjectType.VIRTUALHOST;
+import static org.apache.qpid.server.security.access.Operation.ACCESS;
+import static org.apache.qpid.server.security.access.Operation.BIND;
+import static org.apache.qpid.server.security.access.Operation.CONSUME;
+import static org.apache.qpid.server.security.access.Operation.CREATE;
+import static org.apache.qpid.server.security.access.Operation.DELETE;
+import static org.apache.qpid.server.security.access.Operation.PUBLISH;
+import static org.apache.qpid.server.security.access.Operation.PURGE;
+import static org.apache.qpid.server.security.access.Operation.UNBIND;
 
 import java.net.SocketAddress;
 import java.security.Principal;
@@ -29,6 +40,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import javax.security.auth.Subject;
+
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
@@ -37,11 +50,9 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.ObjectProperties;
 import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 
 /**
  * The security manager contains references to all loaded {@link SecurityPlugin}s and delegates security decisions to them based
@@ -55,7 +66,7 @@ public class SecurityManager
     private static final Logger _logger = Logger.getLogger(SecurityManager.class);
     
     /** Container for the {@link Principal} that is using to this thread. */
-    private static final ThreadLocal<Principal> _principal = new ThreadLocal<Principal>();
+    private static final ThreadLocal<Subject> _subject = new ThreadLocal<Subject>();
     
     private PluginManager _pluginManager;
     private Map<String, SecurityPluginFactory> _pluginFactories = new HashMap<String, SecurityPluginFactory>();
@@ -126,19 +137,14 @@ public class SecurityManager
         configureHostPlugins(configuration);
     }
 
-    public static Principal getThreadPrincipal()
-    {
-        return _principal.get();
-    }
-
-    public static void setThreadPrincipal(Principal principal)
+    public static Subject getThreadSubject()
     {
-        _principal.set(principal);
+        return _subject.get();
     }
 
-    public static void setThreadPrincipal(String authId)
+    public static void setThreadSubject(final Subject subject)
     {
-        setThreadPrincipal(new UsernamePrincipal(authId));
+        _subject.set(subject);
     }
 
     public void configureHostPlugins(ConfigurationPlugin hostConfig) throws ConfigurationException

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Thu Oct 20 18:42:46 2011
@@ -149,9 +149,9 @@ public class ObjectProperties extends Ha
         {
             put(Property.OWNER, queue.getOwner());
         }
-        else if (queue.getPrincipalHolder() != null)
+        else if (queue.getAuthorizationHolder() != null)
         {
-            put(Property.OWNER, queue.getPrincipalHolder().getPrincipal().getName());
+            put(Property.OWNER, queue.getAuthorizationHolder().getAuthorizedPrincipal().getName());
         }
     }
     

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java Thu Oct 20 18:42:46 2011
@@ -20,42 +20,93 @@
  */
 package org.apache.qpid.server.security.auth;
 
+import javax.security.auth.Subject;
+
+/**
+ * Encapsulates the result of an attempt to authenticate.
+ * <p>
+ * The authentication status describes the overall outcome.
+ * <p>
+ * <ol>
+ *  <li>If authentication status is SUCCESS, the subject will be populated.
+ *  </li>
+ *  <li>If authentication status is CONTINUE, the authentication has failed because the user
+ *      supplied incorrect credentials (etc).  If the authentication requires it, the next challenge
+ *      is made available.
+ *  </li>
+ *  <li>If authentication status is ERROR , the authentication decision could not be made due
+ *      to a failure (such as an external system), the {@link AuthenticationResult#getCause()}
+ *      will provide the underlying exception.
+ *  </li>
+ * </ol>
+ *
+ */
 public class AuthenticationResult
 {
     public enum AuthenticationStatus
     {
-        SUCCESS, CONTINUE, ERROR
+        /** Authentication successful */
+        SUCCESS,
+        /** Authentication not successful due to credentials problem etc */
+        CONTINUE,
+        /** Problem prevented the authentication from being made e.g. failure of an external system */
+        ERROR
     }
 
-    public AuthenticationStatus status;
-    public byte[] challenge;
-    
-    private Exception cause;
+    public final AuthenticationStatus _status;
+    public final byte[] _challenge;
+    private final Exception _cause;
+    private final Subject _subject;
 
-    public AuthenticationResult(AuthenticationStatus status)
+    public AuthenticationResult(final AuthenticationStatus status)
     {
         this(null, status, null);
     }
 
-    public AuthenticationResult(byte[] challenge, AuthenticationStatus status)
+    public AuthenticationResult(final byte[] challenge, final AuthenticationStatus status)
     {
         this(challenge, status, null);
     }
 
-    public AuthenticationResult(AuthenticationStatus error, Exception cause)
+    public AuthenticationResult(final AuthenticationStatus error, final Exception cause)
     {
         this(null, error, cause);
     }
 
-    public AuthenticationResult(byte[] challenge, AuthenticationStatus status, Exception cause)
+    public AuthenticationResult(final byte[] challenge, final AuthenticationStatus status, final Exception cause)
+    {
+        this._status = status;
+        this._challenge = challenge;
+        this._cause = cause;
+        this._subject = null;
+    }
+
+    public AuthenticationResult(final Subject subject)
     {
-        this.status = status;
-        this.challenge = challenge;
-        this.cause = cause;
+        this._status = AuthenticationStatus.SUCCESS;
+        this._challenge = null;
+        this._cause = null;
+        this._subject = subject;
     }
 
     public Exception getCause()
     {
-        return cause;
+        return _cause;
+    }
+
+    public AuthenticationStatus getStatus()
+    {
+        return _status;
     }
+
+    public byte[] getChallenge()
+    {
+        return _challenge;
+    }
+
+    public Subject getSubject()
+    {
+        return _subject;
+    }
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org