You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC

svn commit: r1186990 [25/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java Thu Oct 20 18:42:46 2011
@@ -20,121 +20,108 @@
 */
 package org.apache.qpid.server.subscription;
 
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.subscription.Subscription;
 
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.nio.ByteBuffer;
 
 public class SubscriptionList
 {
-
     private final SubscriptionNode _head = new SubscriptionNode();
 
-    private AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
-    private AtomicInteger _size = new AtomicInteger();
-
+    private final AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
+    private final AtomicReference<SubscriptionNode> _subNodeMarker = new AtomicReference<SubscriptionNode>(_head);
+    private final AtomicInteger _size = new AtomicInteger();
 
-    public final class SubscriptionNode
+    public static final class SubscriptionNode
     {
         private final AtomicBoolean _deleted = new AtomicBoolean();
         private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>();
         private final Subscription _sub;
 
-
         public SubscriptionNode()
         {
-
+            //used for sentinel head and dummy node construction
             _sub = null;
             _deleted.set(true);
         }
 
         public SubscriptionNode(final Subscription sub)
         {
+            //used for regular node construction
             _sub = sub;
         }
 
-
-        public SubscriptionNode getNext()
+        /**
+         * Retrieves the first non-deleted node following the current node.
+         * Any deleted non-tail nodes encountered during the search are unlinked.
+         *
+         * @return the next non-deleted node, or null if none was found.
+         */
+        public SubscriptionNode findNext()
         {
-
             SubscriptionNode next = nextNode();
             while(next != null && next.isDeleted())
             {
-
                 final SubscriptionNode newNext = next.nextNode();
                 if(newNext != null)
                 {
+                    //try to move our _next reference forward to the 'newNext'
+                    //node to unlink the deleted node
                     _next.compareAndSet(next, newNext);
                     next = nextNode();
                 }
                 else
                 {
+                    //'newNext' is null, meaning 'next' is the current tail. Can't unlink
+                    //the tail node for thread safety reasons, just use the null.
                     next = null;
                 }
-
             }
+
             return next;
         }
 
-        private SubscriptionNode nextNode()
+        /**
+         * Gets the immediately next referenced node in the structure.
+         *
+         * @return the immediately next node in the structure, or null if at the tail.
+         */
+        protected SubscriptionNode nextNode()
         {
             return _next.get();
         }
 
+        /**
+         * Used to initialise the 'next' reference. Will only succeed if the reference was not previously set.
+         *
+         * @param node the SubscriptionNode to set as 'next'
+         * @return whether the operation succeeded
+         */
+        private boolean setNext(final SubscriptionNode node)
+        {
+            return _next.compareAndSet(null, node);
+        }
+
         public boolean isDeleted()
         {
             return _deleted.get();
         }
 
-
         public boolean delete()
         {
-            if(_deleted.compareAndSet(false,true))
-            {
-                _size.decrementAndGet();
-                advanceHead();
-                return true;
-            }
-            else
-            {
-                return false;
-            }
+            return _deleted.compareAndSet(false,true);
         }
 
-
         public Subscription getSubscription()
         {
             return _sub;
         }
     }
 
-
-    public SubscriptionList(AMQQueue queue)
+    private void insert(final SubscriptionNode node, final boolean count)
     {
-    }
-
-    private void advanceHead()
-    {
-        SubscriptionNode head = _head.nextNode();
-        while(head._next.get() != null && head.isDeleted())
-        {
-
-            final SubscriptionNode newhead = head.nextNode();
-            if(newhead != null)
-            {
-                _head._next.compareAndSet(head, newhead);
-            }
-            head = _head.nextNode();
-        }
-    }
-
-
-    public SubscriptionNode add(Subscription sub)
-    {
-        SubscriptionNode node = new SubscriptionNode(sub);
         for (;;)
         {
             SubscriptionNode tail = _tail.get();
@@ -143,11 +130,14 @@ public class SubscriptionList
             {
                 if (next == null)
                 {
-                    if (tail._next.compareAndSet(null, node))
+                    if (tail.setNext(node))
                     {
                         _tail.compareAndSet(tail, node);
-                        _size.incrementAndGet();
-                        return node;
+                        if(count)
+                        {
+                            _size.incrementAndGet();
+                        }
+                        return;
                     }
                 }
                 else
@@ -156,27 +146,101 @@ public class SubscriptionList
                 }
             }
         }
+    }
 
+    public void add(final Subscription sub)
+    {
+        SubscriptionNode node = new SubscriptionNode(sub);
+        insert(node, true);
     }
 
-    public boolean remove(Subscription sub)
+    public boolean remove(final Subscription sub)
     {
-        SubscriptionNode node = _head.getNext();
+        SubscriptionNode prevNode = _head;
+        SubscriptionNode node = _head.nextNode();
+
         while(node != null)
         {
-            if(sub.equals(node._sub) && node.delete())
+            if(sub.equals(node.getSubscription()) && node.delete())
             {
+                _size.decrementAndGet();
+
+                SubscriptionNode tail = _tail.get();
+                if(node == tail)
+                {
+                    //we cant remove the last node from the structure for
+                    //correctness reasons, however we have just 'deleted'
+                    //the tail. Inserting an empty dummy node after it will
+                    //let us scavenge the node containing the Subscription.
+                    insert(new SubscriptionNode(), false);
+                }
+
+                //advance the next node reference in the 'prevNode' to scavange
+                //the newly 'deleted' node for the Subscription.
+                prevNode.findNext();
+
+                nodeMarkerCleanup(node);
+
                 return true;
             }
-            node = node.getNext();
+
+            prevNode = node;
+            node = node.findNext();
         }
+
         return false;
     }
 
+    private void nodeMarkerCleanup(final SubscriptionNode node)
+    {
+        SubscriptionNode markedNode = _subNodeMarker.get();
+        if(node == markedNode)
+        {
+            //if the marked node is the one we are removing, then
+            //replace it with a dummy pointing at the next node.
+            //this is OK as the marked node is only used to index
+            //into the list and find the next node to use.
+            //Because we inserted a dummy if node was the
+            //tail, markedNode.nextNode() can never be null.
+            SubscriptionNode dummy = new SubscriptionNode();
+            dummy.setNext(markedNode.nextNode());
+
+            //if the CAS fails the marked node has changed, thus
+            //we don't care about the dummy and just forget it
+            _subNodeMarker.compareAndSet(markedNode, dummy);
+        }
+        else if(markedNode != null)
+        {
+            //if the marked node was already deleted then it could
+            //hold subsequently removed nodes after it in the list 
+            //in memory. Scavenge it to ensure their actual removal.
+            if(markedNode != _head && markedNode.isDeleted())
+            {
+                markedNode.findNext();
+            }
+        }
+    }
 
-    public static class SubscriptionNodeIterator
+    public boolean updateMarkedNode(final SubscriptionNode expected, final SubscriptionNode nextNode)
+    {
+        return _subNodeMarker.compareAndSet(expected, nextNode);
+    }
+
+    /**
+     * Get the current marked SubscriptionNode. This should only be used only to index into the list and find the next node
+     * after the mark, since if the previously marked node was subsequently deleted the item returned may be a dummy node
+     * with reference to the next node.
+     *
+     * @return the previously marked node (or a dummy if it was subsequently deleted)
+     */
+    public SubscriptionNode getMarkedNode()
     {
+        return _subNodeMarker.get();
+    }
+
 
+    public static class SubscriptionNodeIterator
+    {
         private SubscriptionNode _lastNode;
 
         SubscriptionNodeIterator(SubscriptionNode startNode)
@@ -184,49 +248,25 @@ public class SubscriptionList
             _lastNode = startNode;
         }
 
-
-        public boolean atTail()
-        {
-            return _lastNode.nextNode() == null;
-        }
-
         public SubscriptionNode getNode()
         {
-
             return _lastNode;
-
         }
 
         public boolean advance()
         {
+            SubscriptionNode nextNode = _lastNode.findNext();
+            _lastNode = nextNode;
 
-            if(!atTail())
-            {
-                SubscriptionNode nextNode = _lastNode.nextNode();
-                while(nextNode.isDeleted() && nextNode.nextNode() != null)
-                {
-                    nextNode = nextNode.nextNode();
-                }
-                _lastNode = nextNode;
-                return true;
-
-            }
-            else
-            {
-                return false;
-            }
-
+            return _lastNode != null;
         }
-
     }
 
-
     public SubscriptionNodeIterator iterator()
     {
         return new SubscriptionNodeIterator(_head);
     }
 
-
     public SubscriptionNode getHead()
     {
         return _head;
@@ -236,9 +276,6 @@ public class SubscriptionList
     {
         return _size.get();
     }
-
-
-
 }
 
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Thu Oct 20 18:42:46 2011
@@ -40,7 +40,6 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.SubscriptionActor;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.AMQMessage;
@@ -80,10 +79,7 @@ import java.nio.ByteBuffer;
 
 public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
 {
-
-    private static final AtomicLong idGenerator = new AtomicLong(0);
-    // Create a simple ID that increments for ever new Subscription
-    private final long _subscriptionID = idGenerator.getAndIncrement();
+    private final long _subscriptionID;
 
     private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
     private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
@@ -97,7 +93,6 @@ public class Subscription_0_10 implement
 
     private FlowCreditManager_0_10 _creditManager;
 
-
     private StateListener _stateListener = new StateListener()
                                             {
 
@@ -114,16 +109,15 @@ public class Subscription_0_10 implement
     private final MessageAcquireMode _acquireMode;
     private MessageFlowMode _flowMode;
     private final ServerSession _session;
-    private AtomicBoolean _stopped = new AtomicBoolean(true);
-    private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
+    private final AtomicBoolean _stopped = new AtomicBoolean(true);
     private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
 
     private LogActor _logActor;
-    private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+    private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
     private UUID _id;
     private String _traceExclude;
     private String _trace;
-    private long _createTime = System.currentTimeMillis();
+    private final long _createTime = System.currentTimeMillis();
     private final AtomicLong _deliveredCount = new AtomicLong(0);
     private final Map<String, Object> _arguments;
 
@@ -132,8 +126,9 @@ public class Subscription_0_10 implement
                              MessageAcquireMode acquireMode,
                              MessageFlowMode flowMode,
                              FlowCreditManager_0_10 creditManager,
-                             FilterManager filters,Map<String, Object> arguments)
+                             FilterManager filters,Map<String, Object> arguments, long subscriptionId)
     {
+        _subscriptionID = subscriptionId;
         _session = session;
         _destination = destination;
         _acceptMode = acceptMode;
@@ -199,7 +194,7 @@ public class Subscription_0_10 implement
 
     public boolean isSuspended()
     {
-        return !isActive() || _deleted.get(); // TODO check for Session suspension
+        return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
     }
 
     public boolean hasInterest(QueueEntry entry)
@@ -208,7 +203,7 @@ public class Subscription_0_10 implement
 
 
         //check that the message hasn't been rejected
-        if (entry.isRejectedBy(this))
+        if (entry.isRejectedBy(getSubscriptionID()))
         {
 
             return false;
@@ -442,7 +437,7 @@ public class Subscription_0_10 implement
             Struct[] headers = new Struct[] { deliveryProps, messageProps };
 
             BasicContentHeaderProperties properties =
-                    (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().properties;
+                    (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
             final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
             if(exchange != null)
             {
@@ -732,13 +727,22 @@ public class Subscription_0_10 implement
 
     public void stop()
     {
-        if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+        try
         {
-            _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+            getSendLock();
+
+            if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+            {
+                _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+            }
+            _stopped.set(true);
+            FlowCreditManager_0_10 creditManager = getCreditManager();
+            creditManager.clearCredit();
+        }
+        finally
+        {
+            releaseSendLock();
         }
-        _stopped.set(true);
-        FlowCreditManager_0_10 creditManager = getCreditManager();
-        creditManager.clearCredit();
     }
 
     public void addCredit(MessageCreditUnit unit, long value)

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java Thu Oct 20 18:42:46 2011
@@ -20,21 +20,21 @@
  */
 package org.apache.qpid.server.transport;
 
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.NetworkTransport;
 
 public class QpidAcceptor
 {
-    NetworkDriver _driver;
+    NetworkTransport _transport;
     String _protocol;
-    public QpidAcceptor(NetworkDriver driver, String protocol)
+    public QpidAcceptor(NetworkTransport transport, String protocol)
     {
-        _driver = driver;
+        _transport = transport;
         _protocol = protocol;
     }
 
-    public NetworkDriver getNetworkDriver()
+    public NetworkTransport getNetworkTransport()
     {
-        return _driver;
+        return _transport;
     }
 
     public String toString()

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Thu Oct 20 18:42:46 2011
@@ -20,11 +20,19 @@
  */
 package org.apache.qpid.server.transport;
 
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
 
+import java.security.Principal;
 import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.security.auth.Subject;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -35,23 +43,39 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionCloseCode;
 import org.apache.qpid.transport.ExecutionErrorCode;
 import org.apache.qpid.transport.ExecutionException;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.Session;
 
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
+public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
 {
     private ConnectionConfig _config;
     private Runnable _onOpenTask;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
     private LogActor _actor = GenericActor.getInstance(this);
 
-    public ServerConnection()
+    private Subject _authorizedSubject = null;
+    private Principal _authorizedPrincipal = null;
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    private final long _connectionId;
+    
+    public ServerConnection(final long connectionId)
     {
+        _connectionId = connectionId;
+    }
 
+    public UUID getId()
+    {
+        return _config.getId();
     }
 
     @Override
@@ -72,8 +96,18 @@ public class ServerConnection extends Co
                 _onOpenTask.run();    
             }
             _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true));
+
+            getVirtualHost().getConnectionRegistry().registerConnection(this);
         }
-        
+
+        if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING)
+        {
+            if(_virtualHost != null)
+            {
+                _virtualHost.getConnectionRegistry().deregisterConnection(this);
+            }
+        }
+
         if (state == State.CLOSED)
         {
             logClosed();
@@ -110,6 +144,8 @@ public class ServerConnection extends Co
     public void setVirtualHost(VirtualHost virtualHost)
     {
         _virtualHost = virtualHost;
+        
+        initialiseStatistics();
     }
 
     public void setConnectionConfig(final ConnectionConfig config)
@@ -145,6 +181,11 @@ public class ServerConnection extends Co
 
         ((ServerSession)session).close();
     }
+    
+    public LogSubject getLogSubject()
+    {
+        return (LogSubject) this;
+    }
 
     @Override
     public void received(ProtocolEvent event)
@@ -179,9 +220,9 @@ public class ServerConnection extends Co
     public String toLogString()
     {
         boolean hasVirtualHost = (null != this.getVirtualHost());
-        boolean hasPrincipal = (null != getAuthorizationID());
+        boolean hasClientId = (null != getClientId());
 
-        if (hasPrincipal && hasVirtualHost)
+        if (hasClientId && hasVirtualHost)
         {
             return "[" +
                     MessageFormat.format(CONNECTION_FORMAT,
@@ -191,7 +232,7 @@ public class ServerConnection extends Co
                                          getVirtualHost().getName())
                  + "] ";
         }
-        else if (hasPrincipal)
+        else if (hasClientId)
         {
             return "[" +
                     MessageFormat.format(USER_FORMAT,
@@ -215,4 +256,147 @@ public class ServerConnection extends Co
     {
         return _actor;
     }
+
+    public void close(AMQConstant cause, String message) throws AMQException
+    {
+        ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
+        try
+        {
+	        replyCode = ConnectionCloseCode.get(cause.getCode());
+        }
+        catch (IllegalArgumentException iae)
+        {
+            // Ignore
+        }
+        close(replyCode, message);
+    }
+
+    public List<AMQSessionModel> getSessionModels()
+    {
+        List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+        for (Session ssn : getChannels())
+        {
+            sessions.add((AMQSessionModel) ssn);
+        }
+        return sessions;
+    }
+
+    public void registerMessageDelivered(long messageSize)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesDelivered.registerEvent(1L);
+            _dataDelivered.registerEvent(messageSize);
+        }
+        _virtualHost.registerMessageDelivered(messageSize);
+    }
+
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesReceived.registerEvent(1L, timestamp);
+            _dataReceived.registerEvent(messageSize, timestamp);
+        }
+        _virtualHost.registerMessageReceived(messageSize, timestamp);
+    }
+    
+    public StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+    
+    public StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+    
+    public StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+    
+    public StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+    
+    public void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+                _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
+        
+        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
+        _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
+        _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
+        _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
+
+    /**
+     * @return authorizedSubject
+     */
+    public Subject getAuthorizedSubject()
+    {
+        return _authorizedSubject;
+    }
+
+    /**
+     * Sets the authorized subject.  It also extracts the UsernamePrincipal from the subject
+     * and caches it for optimisation purposes.
+     *
+     * @param authorizedSubject
+     */
+    public void setAuthorizedSubject(final Subject authorizedSubject)
+    {
+        if (authorizedSubject == null)
+        {
+            _authorizedSubject = null;
+            _authorizedPrincipal = null;
+        }
+        else
+        {
+            _authorizedSubject = authorizedSubject;
+            _authorizedPrincipal = UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+        }
+    }
+
+    public Principal getAuthorizedPrincipal()
+    {
+        return _authorizedPrincipal;
+    }
+
+    public long getConnectionId()
+    {
+        return _connectionId;
+    }
+
+    @Override
+    public boolean isSessionNameUnique(String name)
+    {
+        return !super.hasSessionWithName(name);
+    }
+
+    @Override
+    public String getUserName()
+    {
+        return _authorizedPrincipal.getName();
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Thu Oct 20 18:42:46 2011
@@ -20,26 +20,47 @@
  */
 package org.apache.qpid.server.transport;
 
-import org.apache.qpid.transport.*;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.common.ClientProperties;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
 import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
+import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-import java.util.*;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ConnectionOpen;
+import org.apache.qpid.transport.ConnectionOpenOk;
+import org.apache.qpid.transport.ConnectionTuneOk;
+import org.apache.qpid.transport.ServerDelegate;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionAttach;
+import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.SessionDetach;
+import org.apache.qpid.transport.SessionDetachCode;
+import org.apache.qpid.transport.SessionDetached;
 
 public class ServerConnectionDelegate extends ServerDelegate
 {
     private String _localFQDN;
     private final IApplicationRegistry _appRegistry;
 
-
     public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
     {
         this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
@@ -68,24 +89,42 @@ public class ServerConnectionDelegate ex
         return list;
     }
 
-    @Override
     public ServerSession getSession(Connection conn, SessionAttach atc)
     {
-        SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
+        SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
 
         ServerSession ssn = new ServerSession(conn, serverSessionDelegate,  new Binary(atc.getName()), 0);
 
         return ssn;
     }
 
-    @Override
     protected SaslServer createSaslServer(String mechanism) throws SaslException
     {
         return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
 
     }
 
-    @Override
+    protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
+    {
+        final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response);
+        final ServerConnection sconn = (ServerConnection) conn;
+        
+        
+        if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
+        {
+            tuneAuthorizedConnection(sconn);
+            sconn.setAuthorizedSubject(authResult.getSubject());
+        }
+        else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus()))
+        {
+            connectionAuthContinue(sconn, authResult.getChallenge());
+        }
+        else
+        {
+            connectionAuthFailed(sconn, authResult.getCause());
+        }
+    }
+
     public void connectionClose(Connection conn, ConnectionClose close)
     {
         try
@@ -99,10 +138,9 @@ public class ServerConnectionDelegate ex
         
     }
 
-    @Override
     public void connectionOpen(Connection conn, ConnectionOpen open)
     {
-        ServerConnection sconn = (ServerConnection) conn;
+        final ServerConnection sconn = (ServerConnection) conn;
         
         VirtualHost vhost;
         String vhostName;
@@ -116,7 +154,7 @@ public class ServerConnectionDelegate ex
         }
         vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
 
-        SecurityManager.setThreadPrincipal(conn.getAuthorizationID());
+        SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
         
         if(vhost != null)
         {
@@ -138,6 +176,27 @@ public class ServerConnectionDelegate ex
             sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
             sconn.setState(Connection.State.CLOSING);
         }
+        
+    }
+
+    @Override
+    public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
+    {
+        ServerConnection sconn = (ServerConnection) conn;
+        int okChannelMax = ok.getChannelMax();
+
+        if (okChannelMax > getChannelMax())
+        {
+            _logger.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+                    "client connectionTuneOk returned a channelMax (" + okChannelMax +
+                    ") above the servers offered limit (" + getChannelMax() +")");
+
+            //Due to the error we must forcefully close the connection without negotiation
+            sconn.getSender().close();
+            return;
+        }
+
+        setConnectionTuneOkChannelMax(sconn, okChannelMax);
     }
     
     @Override
@@ -152,4 +211,59 @@ public class ServerConnectionDelegate ex
     {
         return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
     }
+
+    @Override public void sessionDetach(Connection conn, SessionDetach dtc)
+    {
+        // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures
+        // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister
+        // completes.
+        unregisterAllSubscriptions(conn, dtc);
+        super.sessionDetach(conn, dtc);
+    }
+
+    private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc)
+    {
+        final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
+        final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
+        for (Subscription_0_10 subscription_0_10 : subs)
+        {
+            ssn.unregister(subscription_0_10);
+        }
+    }
+
+    @Override
+    public void sessionAttach(final Connection conn, final SessionAttach atc)
+    {
+        final String clientId = new String(atc.getName());
+        final Session ssn = getSession(conn, atc);
+
+        if(isSessionNameUnique(clientId,conn))
+        {
+            conn.registerSession(ssn);
+            super.sessionAttach(conn, atc);
+        }
+        else
+        {
+            ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
+            ssn.closed();
+        }
+    }
+
+    private boolean isSessionNameUnique(final String name, final Connection conn)
+    {
+        final ServerConnection sconn = (ServerConnection) conn;
+        final String userId = sconn.getUserName();
+
+        final Iterator<AMQConnectionModel> connections =
+                        ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
+        while(connections.hasNext())
+        {
+            final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
+            if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
+            {
+                return false;
+            }
+        }
+        return true;
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Oct 20 18:42:46 2011
@@ -23,9 +23,25 @@ package org.apache.qpid.server.transport
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
 import static org.apache.qpid.util.Serial.gt;
 
-import com.sun.security.auth.UserPrincipal;
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.security.auth.Subject;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -38,18 +54,18 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.MessageTransfer;
@@ -58,24 +74,13 @@ import org.apache.qpid.transport.Range;
 import org.apache.qpid.transport.RangeSet;
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionDelegate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.lang.ref.WeakReference;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
+public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
 {
+    private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
+    
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
 
     private final UUID _id;
@@ -111,8 +116,7 @@ public class ServerSession extends Sessi
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
-
-    private Principal _principal;
+    private final AtomicLong _txnUpdateTime = new AtomicLong(0);
 
     private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
 
@@ -125,27 +129,27 @@ public class ServerSession extends Sessi
         this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
     }
 
-    protected void setState(State state)
-    {
-        super.setState(state);
-
-        if (state == State.OPEN)
-        {
-	        _actor.message(ChannelMessages.CREATE());
-        }
-    }
-
     public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
     {
         super(connection, delegate, name, expiry);
         _connectionConfig = connConfig;        
         _transaction = new AutoCommitTransaction(this.getMessageStore());
-        _principal = new UserPrincipal(connection.getAuthorizationID());
-        _reference = new WeakReference(this);
+
+        _reference = new WeakReference<Session>(this);
         _id = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
     }
 
+    protected void setState(State state)
+    {
+        super.setState(state);
+
+        if (state == State.OPEN)
+        {
+            _actor.message(ChannelMessages.CREATE());
+        }
+    }
+
     private ConfigStore getConfigStore()
     {
         return getConnectionConfig().getConfigStore();
@@ -160,8 +164,8 @@ public class ServerSession extends Sessi
 
     public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
     {
-
-            _transaction.enqueue(queues,message, new ServerTransaction.Action()
+        getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
+        _transaction.enqueue(queues,message, new ServerTransaction.Action()
             {
 
                 BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
@@ -189,12 +193,14 @@ public class ServerSession extends Sessi
             });
 
             incrementOutstandingTxnsIfNecessary();
+            updateTransactionalActivity();
     }
 
 
     public void sendMessage(MessageTransfer xfr,
                             Runnable postIdSettingAction)
     {
+        getConnectionModel().registerMessageDelivered(xfr.getBodySize());
         invoke(xfr, postIdSettingAction);
     }
 
@@ -377,6 +383,7 @@ public class ServerSession extends Sessi
                                      entry.release();
                                  }
                              });
+	    updateTransactionalActivity();
     }
 
     public Collection<Subscription_0_10> getSubscriptions()
@@ -410,7 +417,7 @@ public class ServerSession extends Sessi
         catch (AMQException e)
         {
             // TODO
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            _logger.error("Failed to unregister subscription", e);
         }
         finally
         {
@@ -425,6 +432,11 @@ public class ServerSession extends Sessi
         // theory
         return !(_transaction instanceof AutoCommitTransaction);
     }
+    
+    public boolean inTransaction()
+    {
+        return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
+    }
 
     public void selectTx()
     {
@@ -471,6 +483,17 @@ public class ServerSession extends Sessi
         }
     }
 
+    /**
+     * Update last transaction activity timestamp
+     */
+    public void updateTransactionalActivity()
+    {
+        if (isTransactional())
+        {
+            _txnUpdateTime.set(System.currentTimeMillis());
+        }
+    }
+
     public Long getTxnStarts()
     {
         return _txnStarts.get();
@@ -491,9 +514,14 @@ public class ServerSession extends Sessi
         return _txnCount.get();
     }
     
-    public Principal getPrincipal()
+    public Principal getAuthorizedPrincipal()
     {
-        return _principal;
+        return ((ServerConnection) getConnection()).getAuthorizedPrincipal();
+    }
+    
+    public Subject getAuthorizedSubject()
+    {
+        return ((ServerConnection) getConnection()).getAuthorizedSubject();
     }
 
     public void addSessionCloseTask(Task task)
@@ -606,18 +634,61 @@ public class ServerSession extends Sessi
         return (LogSubject) this;
     }
 
-    @Override
+    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+    {
+        if (inTransaction())
+        {
+            long currentTime = System.currentTimeMillis();
+            long openTime = currentTime - _transaction.getTransactionStartTime();
+            long idleTime = currentTime - _txnUpdateTime.get();
+
+            // Log a warning on idle or open transactions
+            if (idleWarn > 0L && idleTime > idleWarn)
+            {
+                CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(idleTime));
+                _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
+            }
+            else if (openWarn > 0L && openTime > openWarn)
+            {
+                CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
+                _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
+            }
+
+            // Close connection for idle or open transactions that have timed out
+            if (idleClose > 0L && idleTime > idleClose)
+            {
+                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+            }
+            else if (openClose > 0L && openTime > openClose)
+            {
+                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+            }
+        }
+    }
+
     public String toLogString()
     {
        return "[" +
                MessageFormat.format(CHANNEL_FORMAT,
-                                   getConnection().getConnectionId(),
+                                   ((ServerConnection) getConnection()).getConnectionId(),
                                    getClientID(),
                                    ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
                                    getVirtualHost().getName(),
                                    getChannel())
             + "] ";
-
     }
 
+    @Override
+    public void close()
+    {
+        // unregister subscriptions in order to prevent sending of new messages
+        // to subscriptions with closing session
+        final Collection<Subscription_0_10> subscriptions = getSubscriptions();
+        for (Subscription_0_10 subscription_0_10 : subscriptions)
+        {
+            unregister(subscription_0_10);
+        }
+
+        super.close();
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Thu Oct 20 18:42:46 2011
@@ -25,31 +25,34 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.*;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeInUseException;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeType;
+import org.apache.qpid.server.exchange.HeadersExchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.flow.FlowCreditManager_0_10;
 import org.apache.qpid.server.flow.WindowCreditManager;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.MessageMetaData_0_10;
 import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Acquired;
@@ -95,26 +98,34 @@ import org.apache.qpid.transport.TxSelec
 
 public class ServerSessionDelegate extends SessionDelegate
 {
-    private final IApplicationRegistry _appRegistry;
+    private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
 
-    public ServerSessionDelegate(IApplicationRegistry appRegistry)
+    public ServerSessionDelegate()
     {
-        _appRegistry = appRegistry;
+
     }
 
     @Override
     public void command(Session session, Method method)
     {
-        SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID());
-
-        if(!session.isClosing())
+        try
         {
-            super.command(session, method);
-            if (method.isSync())
+            setThreadSubject(session);
+
+            if(!session.isClosing())
             {
-                session.flushProcessed();
+                super.command(session, method);
+                if (method.isSync())
+                {
+                    session.flushProcessed();
+                }
             }
         }
+        catch(RuntimeException e)
+        {
+            LOGGER.error("Exception processing command", e);
+            exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e);
+        }
     }
 
     @Override
@@ -123,8 +134,6 @@ public class ServerSessionDelegate exten
         ((ServerSession)session).accept(method.getTransfers());
     }
 
-
-
     @Override
     public void messageReject(Session session, MessageReject method)
     {
@@ -159,7 +168,6 @@ public class ServerSessionDelegate exten
     @Override
     public void messageSubscribe(Session session, MessageSubscribe method)
     {
-
         //TODO - work around broken Python tests
         if(!method.hasAcceptMode())
         {
@@ -203,32 +211,33 @@ public class ServerSessionDelegate exten
                 {
                     exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
                 }
-                else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
+                else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
                 else
                 {
-
                     if(queue.isExclusive())
                     {
-                        if(queue.getPrincipalHolder() == null)
+                        ServerSession s = (ServerSession) session;
+                        queue.setExclusiveOwningSession(s);
+                        if(queue.getAuthorizationHolder() == null)
                         {
-                            queue.setPrincipalHolder((ServerSession)session);
+                            queue.setAuthorizationHolder(s);
+                            queue.setExclusiveOwningSession(s);
                             ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
                             {
-
                                 public void doTask(ServerSession session)
                                 {
-                                    if(queue.getPrincipalHolder() == session)
+                                    if(queue.getAuthorizationHolder() == session)
                                     {
-                                        queue.setPrincipalHolder(null);
+                                        queue.setAuthorizationHolder(null);
+                                        queue.setExclusiveOwningSession(null);
                                     }
                                 }
                             });
                         }
 
-
                     }
 
                     FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -244,7 +253,7 @@ public class ServerSessionDelegate exten
                         return;
                     }
 
-                    Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+                    Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
                                                                   destination,
                                                                   method.getAcceptMode(),
                                                                   method.getAcquireMode(),
@@ -275,25 +284,10 @@ public class ServerSessionDelegate exten
         }
     }
 
-
     @Override
     public void messageTransfer(Session ssn, MessageTransfer xfr)
     {
-        ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
-        Exchange exchange;
-        if(xfr.hasDestination())
-        {
-            exchange = exchangeRegistry.getExchange(xfr.getDestination());
-            if(exchange == null)
-            {
-                exchange = exchangeRegistry.getDefaultExchange();
-            }
-        }
-        else
-        {
-            exchange = exchangeRegistry.getDefaultExchange();
-        }
-        
+        final Exchange exchange = getExchangeForMessage(ssn, xfr);
 
         DeliveryProperties delvProps = null;
         if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -301,7 +295,7 @@ public class ServerSessionDelegate exten
             delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
         }
 
-        MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+        final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
         
         if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
         {
@@ -311,65 +305,63 @@ public class ServerSessionDelegate exten
             
             return;
         }
-        
-        final MessageStore store = getVirtualHost(ssn).getMessageStore();
-        StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
-        ByteBuffer body = xfr.getBody();
-        if(body != null)
+
+        final Exchange exchangeInUse;
+        ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData);
+        if(queues.isEmpty() && exchange.getAlternateExchange() != null)
+        {
+            final Exchange alternateExchange = exchange.getAlternateExchange();
+            queues = alternateExchange.route(messageMetaData);
+            if (!queues.isEmpty())
+            {
+                exchangeInUse = alternateExchange;
+            }
+            else
+            {
+                exchangeInUse = exchange;
+            }
+        }
+        else
         {
-            storeMessage.addContent(0, body);
+            exchangeInUse = exchange;
         }
-        storeMessage.flushToStore();
-        MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
-
-        ArrayList<? extends BaseQueue> queues = exchange.route(message);
 
-
-
-        if(queues != null && queues.size() != 0)
+        if(!queues.isEmpty())
         {
+            final MessageStore store = getVirtualHost(ssn).getMessageStore();
+            final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store);
+            MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
             ((ServerSession) ssn).enqueue(message, queues);
         }
         else
         {
-            if(delvProps == null || !delvProps.hasDiscardUnroutable() || !delvProps.getDiscardUnroutable())
+            if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
             {
-                if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
-                {
-                    RangeSet rejects = new RangeSet();
-                    rejects.add(xfr.getId());
-                    MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
-                    ssn.invoke(reject);
-                }
-                else
-                {
-                    Exchange alternate = exchange.getAlternateExchange();
-                    if(alternate != null)
-                    {
-                        queues = alternate.route(message);
-                        if(queues != null && queues.size() != 0)
-                        {
-                            ((ServerSession) ssn).enqueue(message, queues);
-                        }
-                        else
-                        {
-                            //TODO - log the message discard
-                        }
-                    }
-                    else
-                    {
-                        //TODO - log the message discard
-                    }
-
-
-                }
+                RangeSet rejects = new RangeSet();
+                rejects.add(xfr.getId());
+                MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+                ssn.invoke(reject);
+            }
+            else
+            {
+                ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
             }
-
-
         }
 
         ssn.processed(xfr);
+    }
 
+    private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr,
+            final MessageMetaData_0_10 messageMetaData, final MessageStore store)
+    {
+        final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+        ByteBuffer body = xfr.getBody();
+        if(body != null)
+        {
+            storeMessage.addContent(0, body);
+        }
+        storeMessage.flushToStore();
+        return storeMessage;
     }
 
     @Override
@@ -389,7 +381,7 @@ public class ServerSessionDelegate exten
             ((ServerSession)session).unregister(sub);
             if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
             {
-                queue.setPrincipalHolder(null);
+                queue.setAuthorizationHolder(null);
             }
         }
     }
@@ -448,6 +440,19 @@ public class ServerSessionDelegate exten
         VirtualHost virtualHost = getVirtualHost(session);
         Exchange exchange = getExchange(session, exchangeName);
 
+        //we must check for any unsupported arguments present and throw not-implemented
+        if(method.hasArguments())
+        {
+            Map<String,Object> args = method.getArguments();
+
+            //QPID-3392: currently we don't support any!
+            if(!args.isEmpty())
+            {
+                exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString());
+                return;
+            }
+        }
+
         if(method.getPassive())
         {
             if(exchange == null)
@@ -457,7 +462,6 @@ public class ServerSessionDelegate exten
             }
             else
             {
-                // TODO - check exchange has same properties
                 if(!exchange.getTypeShortString().toString().equals(method.getType()))
                 {
                     exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
@@ -562,6 +566,25 @@ public class ServerSessionDelegate exten
 
     }
 
+    private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
+    {
+        final ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+        Exchange exchange;
+        if(xfr.hasDestination())
+        {
+            exchange = exchangeRegistry.getExchange(xfr.getDestination());
+            if(exchange == null)
+            {
+                exchange = exchangeRegistry.getDefaultExchange();
+            }
+        }
+        else
+        {
+            exchange = exchangeRegistry.getDefaultExchange();
+        }
+        return exchange;
+    }
+
     private VirtualHost getVirtualHost(Session session)
     {
         ServerConnection conn = getServerConnection(session);
@@ -583,6 +606,12 @@ public class ServerSessionDelegate exten
 
         try
         {
+            if (nameNullOrEmpty(method.getExchange()))
+            {
+                exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Delete not allowed for default exchange");
+                return;
+            }
+
             Exchange exchange = getExchange(session, method.getExchange());
 
             if(exchange == null)
@@ -618,6 +647,16 @@ public class ServerSessionDelegate exten
         }
     }
 
+    private boolean nameNullOrEmpty(String name)
+    {
+        if(name == null || name.length() == 0)
+        {
+            return true;
+        }
+
+        return false;
+    }
+
     private boolean isStandardExchange(Exchange exchange, Collection<ExchangeType<? extends Exchange>> registeredTypes)
     {
         for(ExchangeType type : registeredTypes)
@@ -664,9 +703,9 @@ public class ServerSessionDelegate exten
         {
             exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
         }
-        else if (!method.hasExchange())
+        else if (nameNullOrEmpty(method.getExchange()))
         {
-            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
+            exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange");
         }
 /*
         else if (!method.hasBindingKey())
@@ -735,9 +774,9 @@ public class ServerSessionDelegate exten
         {
             exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
         }
-        else if (!method.hasExchange())
+        else if (nameNullOrEmpty(method.getExchange()))
         {
-            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
+            exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Unbind not allowed for default exchange");
         }
         else if (!method.hasBindingKey())
         {
@@ -767,9 +806,6 @@ public class ServerSessionDelegate exten
                 }
             }
         }
-
-
-        super.exchangeUnbind(session, method);
     }
 
     @Override
@@ -969,10 +1005,10 @@ public class ServerSessionDelegate exten
 
                         }
 
-                        if(method.hasAutoDelete()
-                           && method.getAutoDelete()
-                           && method.hasExclusive()
-                           && method.getExclusive())
+                        if (method.hasAutoDelete()
+                            && method.getAutoDelete()
+                            && method.hasExclusive()
+                            && method.getExclusive())
                         {
                             final AMQQueue q = queue;
                             final ServerSession.Task deleteQueueTask = new ServerSession.Task()
@@ -999,23 +1035,23 @@ public class ServerSessionDelegate exten
                                     }
                                 });
                         }
-                        else if(method.getExclusive())
+                        if (method.hasExclusive()
+                            && method.getExclusive())
                         {
                             final AMQQueue q = queue;
                             final ServerSession.Task removeExclusive = new ServerSession.Task()
                             {
-
                                 public void doTask(ServerSession session)
                                 {
-                                    q.setPrincipalHolder(null);
+                                    q.setAuthorizationHolder(null);
                                     q.setExclusiveOwningSession(null);
                                 }
                             };
                             final ServerSession s = (ServerSession) session;
+                            q.setExclusiveOwningSession(s);
                             s.addSessionCloseTask(removeExclusive);
                             queue.addQueueDeleteTask(new AMQQueue.Task()
                             {
-
                                 public void doTask(AMQQueue queue) throws AMQException
                                 {
                                     s.removeSessionCloseTask(removeExclusive);
@@ -1029,7 +1065,7 @@ public class ServerSessionDelegate exten
                     }
                 }
             }
-            else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
+            else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
             {
                     String description = "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
@@ -1077,7 +1113,7 @@ public class ServerSessionDelegate exten
             }
             else
             {
-                if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
+                if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
@@ -1223,6 +1259,8 @@ public class ServerSessionDelegate exten
     @Override
     public void closed(Session session)
     {
+        setThreadSubject(session);
+
         for(Subscription_0_10 sub : getSubscriptions(session))
         {
             ((ServerSession)session).unregister(sub);
@@ -1241,4 +1279,9 @@ public class ServerSessionDelegate exten
         return ((ServerSession)session).getSubscriptions();
     }
 
+    private void setThreadSubject(Session session)
+    {
+        final ServerConnection scon = (ServerConnection) session.getConnection();
+        SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Thu Oct 20 18:42:46 2011
@@ -50,6 +50,11 @@ public class AutoCommitTransaction imple
         _transactionLog = transactionLog;
     }
 
+    public long getTransactionStartTime()
+    {
+        return 0L;
+    }
+
     /**
      * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
      * by the caller are executed immediately.

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Thu Oct 20 18:42:46 2011
@@ -20,18 +20,23 @@ package org.apache.qpid.server.txn;
  * 
  */
 
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -41,17 +46,28 @@ import org.apache.qpid.server.store.Tran
  */
 public class LocalTransaction implements ServerTransaction
 {
-    protected static final Logger _logger = Logger.getLogger(LocalTransaction.class);
+    protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class);
 
     private final List<Action> _postTransactionActions = new ArrayList<Action>();
 
     private volatile TransactionLog.Transaction _transaction;
     private TransactionLog _transactionLog;
+    private long _txnStartTime = 0L;
 
     public LocalTransaction(TransactionLog transactionLog)
     {
         _transactionLog = transactionLog;
     }
+    
+    public boolean inTransaction()
+    {
+        return _transaction != null;
+    }
+    
+    public long getTransactionStartTime()
+    {
+        return _txnStartTime;
+    }
 
     public void addPostTransactionAction(Action postTransactionAction)
     {
@@ -89,7 +105,6 @@ public class LocalTransaction implements
 
         try
         {
-
             for(QueueEntry entry : queueEntries)
             {
                 ServerMessage message = entry.getMessage();
@@ -113,7 +128,6 @@ public class LocalTransaction implements
             _logger.error("Error during message dequeues", e);
             tidyUpOnError(e);
         }
-
     }
 
     private void tidyUpOnError(Exception e)
@@ -140,8 +154,7 @@ public class LocalTransaction implements
             }
             finally
             {
-                _transaction = null;
-                _postTransactionActions.clear();
+		resetDetails();
             }
         }
 
@@ -193,6 +206,11 @@ public class LocalTransaction implements
     {
         _postTransactionActions.add(postTransactionAction);
 
+        if (_txnStartTime == 0L)
+        {
+            _txnStartTime = System.currentTimeMillis();
+        }
+
         if(message.isPersistent())
         {
             try
@@ -248,17 +266,14 @@ public class LocalTransaction implements
         }
         finally
         {
-            _transaction = null;
-            _postTransactionActions.clear();
+            resetDetails();
         }
-
     }
 
     public void rollback()
     {
         try
         {
-
             if(_transaction != null)
             {
                 _transaction.abortTran();
@@ -280,9 +295,15 @@ public class LocalTransaction implements
             }
             finally
             {
-                _transaction = null;
-                _postTransactionActions.clear();
+                resetDetails();
             }
         }
     }
+    
+    private void resetDetails()
+    {
+        _transaction = null;
+	_postTransactionActions.clear();
+        _txnStartTime = 0L;
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Thu Oct 20 18:42:46 2011
@@ -52,6 +52,13 @@ public interface ServerTransaction
         public void onRollback();
     }
 
+    /**
+     * Return the time the current transaction started.
+     * 
+     * @return the time this transaction started or 0 if not in a transaction
+     */
+    long getTransactionStartTime();
+
     /** 
      * Register an Action for execution after transaction commit or rollback.  Actions
      * will be executed in the order in which they are registered.

Propchange: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -1 +1,3 @@
 /qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:930288
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1061302-1072333
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1072051-1185907

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java Thu Oct 20 18:42:46 2011
@@ -63,6 +63,10 @@ public abstract class HouseKeepingTask i
         {
             _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e);
         }
+        finally
+        {
+            CurrentActor.remove();
+        }
     }
 
     public VirtualHost getVirtualHost()

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Oct 20 18:42:46 2011
@@ -20,30 +20,28 @@
 */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.UUID;
+
 import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
-import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.binding.BindingFactory;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.TimerTask;
-import java.util.concurrent.FutureTask;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
 
-public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable
+public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
 {
     IConnectionRegistry getConnectionRegistry();
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Thu Oct 20 18:42:46 2011
@@ -43,7 +43,10 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.AMQException;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.util.ByteBufferInputStream;
 
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import java.util.List;
@@ -236,7 +239,14 @@ public class VirtualHostConfigRecoveryHa
                 FieldTable argumentsFT = null;
                 if(buf != null)
                 {
-                    argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
+                    try
+                    {
+                        argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit());
+                    }
+                    catch (IOException e)
+                    {
+                        throw new RuntimeException("IOException should not be thrown here", e);
+                    }
                 }
 
                 BindingFactory bf = _virtualHost.getBindingFactory();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org