You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [25/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Fri Oct 21 01:19:00 2011
@@ -20,47 +20,26 @@
  */
 package org.apache.qpid.server.transport;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
+import org.apache.qpid.transport.*;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
-import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
-import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionClose;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionOpen;
-import org.apache.qpid.transport.ConnectionOpenOk;
-import org.apache.qpid.transport.ConnectionTuneOk;
-import org.apache.qpid.transport.ServerDelegate;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionAttach;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.SessionDetach;
-import org.apache.qpid.transport.SessionDetachCode;
-import org.apache.qpid.transport.SessionDetached;
+
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import java.util.*;
 
 public class ServerConnectionDelegate extends ServerDelegate
 {
     private String _localFQDN;
     private final IApplicationRegistry _appRegistry;
 
+
     public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
     {
         this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
@@ -89,42 +68,24 @@ public class ServerConnectionDelegate ex
         return list;
     }
 
+    @Override
     public ServerSession getSession(Connection conn, SessionAttach atc)
     {
-        SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
+        SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
 
         ServerSession ssn = new ServerSession(conn, serverSessionDelegate,  new Binary(atc.getName()), 0);
 
         return ssn;
     }
 
+    @Override
     protected SaslServer createSaslServer(String mechanism) throws SaslException
     {
         return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
 
     }
 
-    protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
-    {
-        final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response);
-        final ServerConnection sconn = (ServerConnection) conn;
-        
-        
-        if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
-        {
-            tuneAuthorizedConnection(sconn);
-            sconn.setAuthorizedSubject(authResult.getSubject());
-        }
-        else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus()))
-        {
-            connectionAuthContinue(sconn, authResult.getChallenge());
-        }
-        else
-        {
-            connectionAuthFailed(sconn, authResult.getCause());
-        }
-    }
-
+    @Override
     public void connectionClose(Connection conn, ConnectionClose close)
     {
         try
@@ -138,9 +99,10 @@ public class ServerConnectionDelegate ex
         
     }
 
+    @Override
     public void connectionOpen(Connection conn, ConnectionOpen open)
     {
-        final ServerConnection sconn = (ServerConnection) conn;
+        ServerConnection sconn = (ServerConnection) conn;
         
         VirtualHost vhost;
         String vhostName;
@@ -154,7 +116,7 @@ public class ServerConnectionDelegate ex
         }
         vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
 
-        SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
+        SecurityManager.setThreadPrincipal(conn.getAuthorizationID());
         
         if(vhost != null)
         {
@@ -176,27 +138,6 @@ public class ServerConnectionDelegate ex
             sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
             sconn.setState(Connection.State.CLOSING);
         }
-        
-    }
-
-    @Override
-    public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
-    {
-        ServerConnection sconn = (ServerConnection) conn;
-        int okChannelMax = ok.getChannelMax();
-
-        if (okChannelMax > getChannelMax())
-        {
-            _logger.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
-                    "client connectionTuneOk returned a channelMax (" + okChannelMax +
-                    ") above the servers offered limit (" + getChannelMax() +")");
-
-            //Due to the error we must forcefully close the connection without negotiation
-            sconn.getSender().close();
-            return;
-        }
-
-        setConnectionTuneOkChannelMax(sconn, okChannelMax);
     }
     
     @Override
@@ -211,59 +152,4 @@ public class ServerConnectionDelegate ex
     {
         return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
     }
-
-    @Override public void sessionDetach(Connection conn, SessionDetach dtc)
-    {
-        // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures
-        // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister
-        // completes.
-        unregisterAllSubscriptions(conn, dtc);
-        super.sessionDetach(conn, dtc);
-    }
-
-    private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc)
-    {
-        final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
-        final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
-        for (Subscription_0_10 subscription_0_10 : subs)
-        {
-            ssn.unregister(subscription_0_10);
-        }
-    }
-
-    @Override
-    public void sessionAttach(final Connection conn, final SessionAttach atc)
-    {
-        final String clientId = new String(atc.getName());
-        final Session ssn = getSession(conn, atc);
-
-        if(isSessionNameUnique(clientId,conn))
-        {
-            conn.registerSession(ssn);
-            super.sessionAttach(conn, atc);
-        }
-        else
-        {
-            ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
-            ssn.closed();
-        }
-    }
-
-    private boolean isSessionNameUnique(final String name, final Connection conn)
-    {
-        final ServerConnection sconn = (ServerConnection) conn;
-        final String userId = sconn.getUserName();
-
-        final Iterator<AMQConnectionModel> connections =
-                        ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
-        while(connections.hasNext())
-        {
-            final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
-            if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
-            {
-                return false;
-            }
-        }
-        return true;
-    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Fri Oct 21 01:19:00 2011
@@ -23,25 +23,9 @@ package org.apache.qpid.server.transport
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
 import static org.apache.qpid.util.Serial.gt;
 
-import java.lang.ref.WeakReference;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.security.auth.Subject;
+import com.sun.security.auth.UserPrincipal;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -54,18 +38,18 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.MessageTransfer;
@@ -74,13 +58,24 @@ import org.apache.qpid.transport.Range;
 import org.apache.qpid.transport.RangeSet;
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionDelegate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
 {
-    private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
-    
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
 
     private final UUID _id;
@@ -116,7 +111,8 @@ public class ServerSession extends Sessi
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
-    private final AtomicLong _txnUpdateTime = new AtomicLong(0);
+
+    private Principal _principal;
 
     private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
 
@@ -129,27 +125,27 @@ public class ServerSession extends Sessi
         this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
     }
 
-    public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
-    {
-        super(connection, delegate, name, expiry);
-        _connectionConfig = connConfig;        
-        _transaction = new AutoCommitTransaction(this.getMessageStore());
-
-        _reference = new WeakReference<Session>(this);
-        _id = getConfigStore().createId();
-        getConfigStore().addConfiguredObject(this);
-    }
-
     protected void setState(State state)
     {
         super.setState(state);
 
         if (state == State.OPEN)
         {
-            _actor.message(ChannelMessages.CREATE());
+	        _actor.message(ChannelMessages.CREATE());
         }
     }
 
+    public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
+    {
+        super(connection, delegate, name, expiry);
+        _connectionConfig = connConfig;        
+        _transaction = new AutoCommitTransaction(this.getMessageStore());
+        _principal = new UserPrincipal(connection.getAuthorizationID());
+        _reference = new WeakReference(this);
+        _id = getConfigStore().createId();
+        getConfigStore().addConfiguredObject(this);
+    }
+
     private ConfigStore getConfigStore()
     {
         return getConnectionConfig().getConfigStore();
@@ -164,8 +160,8 @@ public class ServerSession extends Sessi
 
     public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
     {
-        getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
-        _transaction.enqueue(queues,message, new ServerTransaction.Action()
+
+            _transaction.enqueue(queues,message, new ServerTransaction.Action()
             {
 
                 BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
@@ -193,14 +189,12 @@ public class ServerSession extends Sessi
             });
 
             incrementOutstandingTxnsIfNecessary();
-            updateTransactionalActivity();
     }
 
 
     public void sendMessage(MessageTransfer xfr,
                             Runnable postIdSettingAction)
     {
-        getConnectionModel().registerMessageDelivered(xfr.getBodySize());
         invoke(xfr, postIdSettingAction);
     }
 
@@ -383,7 +377,6 @@ public class ServerSession extends Sessi
                                      entry.release();
                                  }
                              });
-	    updateTransactionalActivity();
     }
 
     public Collection<Subscription_0_10> getSubscriptions()
@@ -417,7 +410,7 @@ public class ServerSession extends Sessi
         catch (AMQException e)
         {
             // TODO
-            _logger.error("Failed to unregister subscription", e);
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
         }
         finally
         {
@@ -432,11 +425,6 @@ public class ServerSession extends Sessi
         // theory
         return !(_transaction instanceof AutoCommitTransaction);
     }
-    
-    public boolean inTransaction()
-    {
-        return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
-    }
 
     public void selectTx()
     {
@@ -483,17 +471,6 @@ public class ServerSession extends Sessi
         }
     }
 
-    /**
-     * Update last transaction activity timestamp
-     */
-    public void updateTransactionalActivity()
-    {
-        if (isTransactional())
-        {
-            _txnUpdateTime.set(System.currentTimeMillis());
-        }
-    }
-
     public Long getTxnStarts()
     {
         return _txnStarts.get();
@@ -514,14 +491,9 @@ public class ServerSession extends Sessi
         return _txnCount.get();
     }
     
-    public Principal getAuthorizedPrincipal()
+    public Principal getPrincipal()
     {
-        return ((ServerConnection) getConnection()).getAuthorizedPrincipal();
-    }
-    
-    public Subject getAuthorizedSubject()
-    {
-        return ((ServerConnection) getConnection()).getAuthorizedSubject();
+        return _principal;
     }
 
     public void addSessionCloseTask(Task task)
@@ -634,61 +606,18 @@ public class ServerSession extends Sessi
         return (LogSubject) this;
     }
 
-    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
-    {
-        if (inTransaction())
-        {
-            long currentTime = System.currentTimeMillis();
-            long openTime = currentTime - _transaction.getTransactionStartTime();
-            long idleTime = currentTime - _txnUpdateTime.get();
-
-            // Log a warning on idle or open transactions
-            if (idleWarn > 0L && idleTime > idleWarn)
-            {
-                CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(idleTime));
-                _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
-            }
-            else if (openWarn > 0L && openTime > openWarn)
-            {
-                CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
-                _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
-            }
-
-            // Close connection for idle or open transactions that have timed out
-            if (idleClose > 0L && idleTime > idleClose)
-            {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
-            }
-            else if (openClose > 0L && openTime > openClose)
-            {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
-            }
-        }
-    }
-
+    @Override
     public String toLogString()
     {
        return "[" +
                MessageFormat.format(CHANNEL_FORMAT,
-                                   ((ServerConnection) getConnection()).getConnectionId(),
+                                   getConnection().getConnectionId(),
                                    getClientID(),
                                    ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
                                    getVirtualHost().getName(),
                                    getChannel())
             + "] ";
-    }
 
-    @Override
-    public void close()
-    {
-        // unregister subscriptions in order to prevent sending of new messages
-        // to subscriptions with closing session
-        final Collection<Subscription_0_10> subscriptions = getSubscriptions();
-        for (Subscription_0_10 subscription_0_10 : subscriptions)
-        {
-            unregister(subscription_0_10);
-        }
-
-        super.close();
     }
+
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Fri Oct 21 01:19:00 2011
@@ -25,34 +25,31 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeType;
-import org.apache.qpid.server.exchange.HeadersExchange;
+import org.apache.qpid.server.exchange.*;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.flow.FlowCreditManager_0_10;
 import org.apache.qpid.server.flow.WindowCreditManager;
-import org.apache.qpid.server.logging.messages.ExchangeMessages;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.message.MessageMetaData_0_10;
 import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Acquired;
@@ -98,34 +95,26 @@ import org.apache.qpid.transport.TxSelec
 
 public class ServerSessionDelegate extends SessionDelegate
 {
-    private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
+    private final IApplicationRegistry _appRegistry;
 
-    public ServerSessionDelegate()
+    public ServerSessionDelegate(IApplicationRegistry appRegistry)
     {
-
+        _appRegistry = appRegistry;
     }
 
     @Override
     public void command(Session session, Method method)
     {
-        try
-        {
-            setThreadSubject(session);
+        SecurityManager.setThreadPrincipal(session.getConnection().getAuthorizationID());
 
-            if(!session.isClosing())
+        if(!session.isClosing())
+        {
+            super.command(session, method);
+            if (method.isSync())
             {
-                super.command(session, method);
-                if (method.isSync())
-                {
-                    session.flushProcessed();
-                }
+                session.flushProcessed();
             }
         }
-        catch(RuntimeException e)
-        {
-            LOGGER.error("Exception processing command", e);
-            exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e);
-        }
     }
 
     @Override
@@ -134,6 +123,8 @@ public class ServerSessionDelegate exten
         ((ServerSession)session).accept(method.getTransfers());
     }
 
+
+
     @Override
     public void messageReject(Session session, MessageReject method)
     {
@@ -168,6 +159,7 @@ public class ServerSessionDelegate exten
     @Override
     public void messageSubscribe(Session session, MessageSubscribe method)
     {
+
         //TODO - work around broken Python tests
         if(!method.hasAcceptMode())
         {
@@ -211,33 +203,32 @@ public class ServerSessionDelegate exten
                 {
                     exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
                 }
-                else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
+                else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
                 else
                 {
+
                     if(queue.isExclusive())
                     {
-                        ServerSession s = (ServerSession) session;
-                        queue.setExclusiveOwningSession(s);
-                        if(queue.getAuthorizationHolder() == null)
+                        if(queue.getPrincipalHolder() == null)
                         {
-                            queue.setAuthorizationHolder(s);
-                            queue.setExclusiveOwningSession(s);
+                            queue.setPrincipalHolder((ServerSession)session);
                             ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
                             {
+
                                 public void doTask(ServerSession session)
                                 {
-                                    if(queue.getAuthorizationHolder() == session)
+                                    if(queue.getPrincipalHolder() == session)
                                     {
-                                        queue.setAuthorizationHolder(null);
-                                        queue.setExclusiveOwningSession(null);
+                                        queue.setPrincipalHolder(null);
                                     }
                                 }
                             });
                         }
 
+
                     }
 
                     FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -253,7 +244,7 @@ public class ServerSessionDelegate exten
                         return;
                     }
 
-                    Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
+                    Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
                                                                   destination,
                                                                   method.getAcceptMode(),
                                                                   method.getAcquireMode(),
@@ -284,10 +275,25 @@ public class ServerSessionDelegate exten
         }
     }
 
+
     @Override
     public void messageTransfer(Session ssn, MessageTransfer xfr)
     {
-        final Exchange exchange = getExchangeForMessage(ssn, xfr);
+        ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+        Exchange exchange;
+        if(xfr.hasDestination())
+        {
+            exchange = exchangeRegistry.getExchange(xfr.getDestination());
+            if(exchange == null)
+            {
+                exchange = exchangeRegistry.getDefaultExchange();
+            }
+        }
+        else
+        {
+            exchange = exchangeRegistry.getDefaultExchange();
+        }
+        
 
         DeliveryProperties delvProps = null;
         if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -295,7 +301,7 @@ public class ServerSessionDelegate exten
             delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
         }
 
-        final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+        MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
         
         if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
         {
@@ -305,63 +311,65 @@ public class ServerSessionDelegate exten
             
             return;
         }
-
-        final Exchange exchangeInUse;
-        ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData);
-        if(queues.isEmpty() && exchange.getAlternateExchange() != null)
-        {
-            final Exchange alternateExchange = exchange.getAlternateExchange();
-            queues = alternateExchange.route(messageMetaData);
-            if (!queues.isEmpty())
-            {
-                exchangeInUse = alternateExchange;
-            }
-            else
-            {
-                exchangeInUse = exchange;
-            }
-        }
-        else
+        
+        final MessageStore store = getVirtualHost(ssn).getMessageStore();
+        StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+        ByteBuffer body = xfr.getBody();
+        if(body != null)
         {
-            exchangeInUse = exchange;
+            storeMessage.addContent(0, body);
         }
+        storeMessage.flushToStore();
+        MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
+
+        ArrayList<? extends BaseQueue> queues = exchange.route(message);
 
-        if(!queues.isEmpty())
+
+
+        if(queues != null && queues.size() != 0)
         {
-            final MessageStore store = getVirtualHost(ssn).getMessageStore();
-            final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store);
-            MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
             ((ServerSession) ssn).enqueue(message, queues);
         }
         else
         {
-            if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+            if(delvProps == null || !delvProps.hasDiscardUnroutable() || !delvProps.getDiscardUnroutable())
             {
-                RangeSet rejects = new RangeSet();
-                rejects.add(xfr.getId());
-                MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
-                ssn.invoke(reject);
-            }
-            else
-            {
-                ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
+                if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+                {
+                    RangeSet rejects = new RangeSet();
+                    rejects.add(xfr.getId());
+                    MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+                    ssn.invoke(reject);
+                }
+                else
+                {
+                    Exchange alternate = exchange.getAlternateExchange();
+                    if(alternate != null)
+                    {
+                        queues = alternate.route(message);
+                        if(queues != null && queues.size() != 0)
+                        {
+                            ((ServerSession) ssn).enqueue(message, queues);
+                        }
+                        else
+                        {
+                            //TODO - log the message discard
+                        }
+                    }
+                    else
+                    {
+                        //TODO - log the message discard
+                    }
+
+
+                }
             }
+
+
         }
 
         ssn.processed(xfr);
-    }
 
-    private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr,
-            final MessageMetaData_0_10 messageMetaData, final MessageStore store)
-    {
-        final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
-        ByteBuffer body = xfr.getBody();
-        if(body != null)
-        {
-            storeMessage.addContent(0, body);
-        }
-        storeMessage.flushToStore();
-        return storeMessage;
     }
 
     @Override
@@ -381,7 +389,7 @@ public class ServerSessionDelegate exten
             ((ServerSession)session).unregister(sub);
             if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
             {
-                queue.setAuthorizationHolder(null);
+                queue.setPrincipalHolder(null);
             }
         }
     }
@@ -440,19 +448,6 @@ public class ServerSessionDelegate exten
         VirtualHost virtualHost = getVirtualHost(session);
         Exchange exchange = getExchange(session, exchangeName);
 
-        //we must check for any unsupported arguments present and throw not-implemented
-        if(method.hasArguments())
-        {
-            Map<String,Object> args = method.getArguments();
-
-            //QPID-3392: currently we don't support any!
-            if(!args.isEmpty())
-            {
-                exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString());
-                return;
-            }
-        }
-
         if(method.getPassive())
         {
             if(exchange == null)
@@ -462,6 +457,7 @@ public class ServerSessionDelegate exten
             }
             else
             {
+                // TODO - check exchange has same properties
                 if(!exchange.getTypeShortString().toString().equals(method.getType()))
                 {
                     exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
@@ -566,25 +562,6 @@ public class ServerSessionDelegate exten
 
     }
 
-    private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
-    {
-        final ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
-        Exchange exchange;
-        if(xfr.hasDestination())
-        {
-            exchange = exchangeRegistry.getExchange(xfr.getDestination());
-            if(exchange == null)
-            {
-                exchange = exchangeRegistry.getDefaultExchange();
-            }
-        }
-        else
-        {
-            exchange = exchangeRegistry.getDefaultExchange();
-        }
-        return exchange;
-    }
-
     private VirtualHost getVirtualHost(Session session)
     {
         ServerConnection conn = getServerConnection(session);
@@ -606,12 +583,6 @@ public class ServerSessionDelegate exten
 
         try
         {
-            if (nameNullOrEmpty(method.getExchange()))
-            {
-                exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Delete not allowed for default exchange");
-                return;
-            }
-
             Exchange exchange = getExchange(session, method.getExchange());
 
             if(exchange == null)
@@ -647,16 +618,6 @@ public class ServerSessionDelegate exten
         }
     }
 
-    private boolean nameNullOrEmpty(String name)
-    {
-        if(name == null || name.length() == 0)
-        {
-            return true;
-        }
-
-        return false;
-    }
-
     private boolean isStandardExchange(Exchange exchange, Collection<ExchangeType<? extends Exchange>> registeredTypes)
     {
         for(ExchangeType type : registeredTypes)
@@ -703,9 +664,9 @@ public class ServerSessionDelegate exten
         {
             exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
         }
-        else if (nameNullOrEmpty(method.getExchange()))
+        else if (!method.hasExchange())
         {
-            exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Bind not allowed for default exchange");
+            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
         }
 /*
         else if (!method.hasBindingKey())
@@ -774,9 +735,9 @@ public class ServerSessionDelegate exten
         {
             exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
         }
-        else if (nameNullOrEmpty(method.getExchange()))
+        else if (!method.hasExchange())
         {
-            exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Unbind not allowed for default exchange");
+            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
         }
         else if (!method.hasBindingKey())
         {
@@ -806,6 +767,9 @@ public class ServerSessionDelegate exten
                 }
             }
         }
+
+
+        super.exchangeUnbind(session, method);
     }
 
     @Override
@@ -1005,10 +969,10 @@ public class ServerSessionDelegate exten
 
                         }
 
-                        if (method.hasAutoDelete()
-                            && method.getAutoDelete()
-                            && method.hasExclusive()
-                            && method.getExclusive())
+                        if(method.hasAutoDelete()
+                           && method.getAutoDelete()
+                           && method.hasExclusive()
+                           && method.getExclusive())
                         {
                             final AMQQueue q = queue;
                             final ServerSession.Task deleteQueueTask = new ServerSession.Task()
@@ -1035,23 +999,23 @@ public class ServerSessionDelegate exten
                                     }
                                 });
                         }
-                        if (method.hasExclusive()
-                            && method.getExclusive())
+                        else if(method.getExclusive())
                         {
                             final AMQQueue q = queue;
                             final ServerSession.Task removeExclusive = new ServerSession.Task()
                             {
+
                                 public void doTask(ServerSession session)
                                 {
-                                    q.setAuthorizationHolder(null);
+                                    q.setPrincipalHolder(null);
                                     q.setExclusiveOwningSession(null);
                                 }
                             };
                             final ServerSession s = (ServerSession) session;
-                            q.setExclusiveOwningSession(s);
                             s.addSessionCloseTask(removeExclusive);
                             queue.addQueueDeleteTask(new AMQQueue.Task()
                             {
+
                                 public void doTask(AMQQueue queue) throws AMQException
                                 {
                                     s.removeSessionCloseTask(removeExclusive);
@@ -1065,7 +1029,7 @@ public class ServerSessionDelegate exten
                     }
                 }
             }
-            else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+            else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
             {
                     String description = "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
@@ -1113,7 +1077,7 @@ public class ServerSessionDelegate exten
             }
             else
             {
-                if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
+                if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
@@ -1259,8 +1223,6 @@ public class ServerSessionDelegate exten
     @Override
     public void closed(Session session)
     {
-        setThreadSubject(session);
-
         for(Subscription_0_10 sub : getSubscriptions(session))
         {
             ((ServerSession)session).unregister(sub);
@@ -1279,9 +1241,4 @@ public class ServerSessionDelegate exten
         return ((ServerSession)session).getSubscriptions();
     }
 
-    private void setThreadSubject(Session session)
-    {
-        final ServerConnection scon = (ServerConnection) session.getConnection();
-        SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
-    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Fri Oct 21 01:19:00 2011
@@ -50,11 +50,6 @@ public class AutoCommitTransaction imple
         _transactionLog = transactionLog;
     }
 
-    public long getTransactionStartTime()
-    {
-        return 0L;
-    }
-
     /**
      * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
      * by the caller are executed immediately.

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Fri Oct 21 01:19:00 2011
@@ -20,23 +20,18 @@ package org.apache.qpid.server.txn;
  * 
  */
 
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A concrete implementation of ServerTransaction where enqueue/dequeue
@@ -46,28 +41,17 @@ import org.slf4j.LoggerFactory;
  */
 public class LocalTransaction implements ServerTransaction
 {
-    protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class);
+    protected static final Logger _logger = Logger.getLogger(LocalTransaction.class);
 
     private final List<Action> _postTransactionActions = new ArrayList<Action>();
 
     private volatile TransactionLog.Transaction _transaction;
     private TransactionLog _transactionLog;
-    private long _txnStartTime = 0L;
 
     public LocalTransaction(TransactionLog transactionLog)
     {
         _transactionLog = transactionLog;
     }
-    
-    public boolean inTransaction()
-    {
-        return _transaction != null;
-    }
-    
-    public long getTransactionStartTime()
-    {
-        return _txnStartTime;
-    }
 
     public void addPostTransactionAction(Action postTransactionAction)
     {
@@ -105,6 +89,7 @@ public class LocalTransaction implements
 
         try
         {
+
             for(QueueEntry entry : queueEntries)
             {
                 ServerMessage message = entry.getMessage();
@@ -128,6 +113,7 @@ public class LocalTransaction implements
             _logger.error("Error during message dequeues", e);
             tidyUpOnError(e);
         }
+
     }
 
     private void tidyUpOnError(Exception e)
@@ -154,7 +140,8 @@ public class LocalTransaction implements
             }
             finally
             {
-		resetDetails();
+                _transaction = null;
+                _postTransactionActions.clear();
             }
         }
 
@@ -206,11 +193,6 @@ public class LocalTransaction implements
     {
         _postTransactionActions.add(postTransactionAction);
 
-        if (_txnStartTime == 0L)
-        {
-            _txnStartTime = System.currentTimeMillis();
-        }
-
         if(message.isPersistent())
         {
             try
@@ -266,14 +248,17 @@ public class LocalTransaction implements
         }
         finally
         {
-            resetDetails();
+            _transaction = null;
+            _postTransactionActions.clear();
         }
+
     }
 
     public void rollback()
     {
         try
         {
+
             if(_transaction != null)
             {
                 _transaction.abortTran();
@@ -295,15 +280,9 @@ public class LocalTransaction implements
             }
             finally
             {
-                resetDetails();
+                _transaction = null;
+                _postTransactionActions.clear();
             }
         }
     }
-    
-    private void resetDetails()
-    {
-        _transaction = null;
-	_postTransactionActions.clear();
-        _txnStartTime = 0L;
-    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Fri Oct 21 01:19:00 2011
@@ -52,13 +52,6 @@ public interface ServerTransaction
         public void onRollback();
     }
 
-    /**
-     * Return the time the current transaction started.
-     * 
-     * @return the time this transaction started or 0 if not in a transaction
-     */
-    long getTransactionStartTime();
-
     /** 
      * Register an Action for execution after transaction commit or rollback.  Actions
      * will be executed in the order in which they are registered.

Propchange: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,3 +1 @@
 /qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:930288
-/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1072051-1185907

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java Fri Oct 21 01:19:00 2011
@@ -63,10 +63,6 @@ public abstract class HouseKeepingTask i
         {
             _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e);
         }
-        finally
-        {
-            CurrentActor.remove();
-        }
     }
 
     public VirtualHost getVirtualHost()

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Oct 21 01:19:00 2011
@@ -20,28 +20,30 @@
 */
 package org.apache.qpid.server.virtualhost;
 
-import java.util.UUID;
-
 import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.binding.BindingFactory;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.binding.BindingFactory;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.TimerTask;
+import java.util.concurrent.FutureTask;
 
-public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
+public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable
 {
     IConnectionRegistry getConnectionRegistry();
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Fri Oct 21 01:19:00 2011
@@ -43,10 +43,7 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.AMQException;
 
 import org.apache.log4j.Logger;
-import org.apache.qpid.server.util.ByteBufferInputStream;
 
-import java.io.DataInputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import java.util.List;
@@ -239,14 +236,7 @@ public class VirtualHostConfigRecoveryHa
                 FieldTable argumentsFT = null;
                 if(buf != null)
                 {
-                    try
-                    {
-                        argumentsFT = new FieldTable(new DataInputStream(new ByteBufferInputStream(buf)),buf.limit());
-                    }
-                    catch (IOException e)
-                    {
-                        throw new RuntimeException("IOException should not be thrown here", e);
-                    }
+                    argumentsFT = new FieldTable(org.apache.mina.common.ByteBuffer.wrap(buf),buf.limit());
                 }
 
                 BindingFactory bf = _virtualHost.getBindingFactory();

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Fri Oct 21 01:19:00 2011
@@ -20,21 +20,23 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -61,8 +63,6 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -71,7 +71,7 @@ import org.apache.qpid.server.registry.A
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
@@ -99,7 +99,7 @@ public class VirtualHostImpl implements 
 
     private AMQBrokerManagerMBean _brokerMBean;
 
-    private final AuthenticationManager _authenticationManager;
+    private AuthenticationManager _authenticationManager;
 
     private SecurityManager _securityManager;
 
@@ -111,8 +111,6 @@ public class VirtualHostImpl implements 
     private BrokerConfig _broker;
     private UUID _id;
 
-    private boolean _statisticsEnabled = false;
-    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     private final long _createTime = System.currentTimeMillis();
     private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
@@ -163,12 +161,12 @@ public class VirtualHostImpl implements 
 
         public String getObjectInstanceName()
         {
-            return ObjectName.quote(_name);
+            return _name.toString();
         }
 
         public String getName()
         {
-            return _name;
+            return _name.toString();
         }
 
         public VirtualHostImpl getVirtualHost()
@@ -177,11 +175,22 @@ public class VirtualHostImpl implements 
         }
     }
 
-    public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+    public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
+    {
+        this(appRegistry, hostConfig, null);
+    }
+
+
+    public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+    {
+        this(ApplicationRegistry.getInstance(),hostConfig,store);
+    }
+
+    private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
     {
 		if (hostConfig == null)
 		{
-			throw new IllegalArgumentException("HostConfig cannot be null");
+			throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
 		}
 		
         _appRegistry = appRegistry;
@@ -235,28 +244,21 @@ public class VirtualHostImpl implements 
 			initialiseMessageStore(hostConfig);
         }
 		
-        _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager();
+        _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, _configuration);
 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();
-        initialiseHouseKeeping(hostConfig.getHousekeepingCheckPeriod());
-        
-        initialiseStatistics();
+        initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
     }
 
-    /**
-     * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers
-     * and checking for idle or open transactions that have exceeded the permitted thresholds.
-     *
-     * @param period
-     */
 	private void initialiseHouseKeeping(long period)
     {
+        /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
         if (period != 0L)
         {
-            class VirtualHostHouseKeepingTask extends HouseKeepingTask
+            class ExpiredMessagesTask extends HouseKeepingTask
             {
-                public VirtualHostHouseKeepingTask(VirtualHost vhost)
+                public ExpiredMessagesTask(VirtualHost vhost)
                 {
                     super(vhost);
                 }
@@ -279,29 +281,18 @@ public class VirtualHostImpl implements 
                             // house keeping task from running.
                         }
                     }
-                    for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
-                    {
-                        _logger.debug("Checking for long running open transactions on connection " + connection);
-                        for (AMQSessionModel session : connection.getSessionModels())
-                        {
-	                        _logger.debug("Checking for long running open transactions on session " + session);
-                            try
-                            {
-                                session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
-	                                                           _configuration.getTransactionTimeoutOpenClose(),
-	                                                           _configuration.getTransactionTimeoutIdleWarn(),
-	                                                           _configuration.getTransactionTimeoutIdleClose());
-                            }
-                            catch (Exception e)
-                            {
-                                _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
-                            }
-                        }
-                    }
                 }
             }
 
-            scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask(this));
+            scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
+
+            class ForceChannelClosuresTask extends TimerTask
+            {
+                public void run()
+                {
+                    _connectionRegistry.expireClosedChannels();
+                }
+            }
 
             Map<String, VirtualHostPluginFactory> plugins =
                 ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
@@ -455,57 +446,46 @@ public class VirtualHostImpl implements 
     private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
     {
     	AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
-        String queueName = queue.getName();
 
     	if (queue.isDurable())
     	{
     		getDurableConfigurationStore().createQueue(queue);
     	}
 
-        //get the exchange name (returns default exchange name if none was specified)
     	String exchangeName = queueConfiguration.getExchange();
 
-        Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
+    	Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
+
+        if (exchange == null)
+        {
+            exchange = _exchangeRegistry.getDefaultExchange();
+        }
+
     	if (exchange == null)
     	{
-            throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName);
+    		throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
     	}
 
-        Exchange defaultExchange = _exchangeRegistry.getDefaultExchange();
-
-        //get routing keys in configuration (returns empty list if none are defined)
-        List<?> routingKeys = queueConfiguration.getRoutingKeys();
+        List routingKeys = queueConfiguration.getRoutingKeys();
+        if (routingKeys == null || routingKeys.isEmpty())
+        {
+            routingKeys = Collections.singletonList(queue.getNameShortString());
+        }
 
         for (Object routingKeyNameObj : routingKeys)
         {
-            String routingKey = String.valueOf(routingKeyNameObj);
-
-            if (exchange.equals(defaultExchange) && !queueName.equals(routingKey))
+            AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
+            if (_logger.isInfoEnabled())
             {
-                throw new ConfigurationException("Illegal attempt to bind queue '" + queueName +
-                        "' to the default exchange with a key other than the queue name: " + routingKey);
+                _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this);
             }
-
-            configureBinding(queue, exchange, routingKey);
-        }
-
-        if (!exchange.equals(defaultExchange))
-        {
-            //bind the queue to the named exchange using its name
-            configureBinding(queue, exchange, queueName);
+            _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null);
         }
 
-        //ensure the queue is bound to the default exchange using its name
-        configureBinding(queue, defaultExchange, queueName);
-    }
-
-    private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey) throws AMQException
-    {
-        if (_logger.isInfoEnabled())
+        if (exchange != _exchangeRegistry.getDefaultExchange())
         {
-            _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName());
+            _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null);
         }
-        _bindingFactory.addBinding(routingKey, queue, exchange, null);
     }
 
     public String getName()
@@ -647,80 +627,6 @@ public class VirtualHostImpl implements 
     {
         return _bindingFactory;
     }
-    
-    public void registerMessageDelivered(long messageSize)
-    {
-        if (isStatisticsEnabled())
-        {
-            _messagesDelivered.registerEvent(1L);
-            _dataDelivered.registerEvent(messageSize);
-        }
-        _appRegistry.registerMessageDelivered(messageSize);
-    }
-    
-    public void registerMessageReceived(long messageSize, long timestamp)
-    {
-        if (isStatisticsEnabled())
-        {
-            _messagesReceived.registerEvent(1L, timestamp);
-            _dataReceived.registerEvent(messageSize, timestamp);
-        }
-        _appRegistry.registerMessageReceived(messageSize, timestamp);
-    }
-    
-    public StatisticsCounter getMessageReceiptStatistics()
-    {
-        return _messagesReceived;
-    }
-    
-    public StatisticsCounter getDataReceiptStatistics()
-    {
-        return _dataReceived;
-    }
-    
-    public StatisticsCounter getMessageDeliveryStatistics()
-    {
-        return _messagesDelivered;
-    }
-    
-    public StatisticsCounter getDataDeliveryStatistics()
-    {
-        return _dataDelivered;
-    }
-    
-    public void resetStatistics()
-    {
-        _messagesDelivered.reset();
-        _dataDelivered.reset();
-        _messagesReceived.reset();
-        _dataReceived.reset();
-        
-        for (AMQConnectionModel connection : _connectionRegistry.getConnections())
-        {
-            connection.resetStatistics();
-        }
-    }
-
-    public void initialiseStatistics()
-    {
-        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
-                _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
-        
-        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
-        _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
-        _messagesReceived = new StatisticsCounter("messages-received-" + getName());
-        _dataReceived = new StatisticsCounter("bytes-received-" + getName());
-    }
-
-    public boolean isStatisticsEnabled()
-    {
-        return _statisticsEnabled;
-    }
-
-    public void setStatisticsEnabled(boolean enabled)
-    {
-        _statisticsEnabled = enabled;
-    }
 
     public void createBrokerConnection(final String transport,
                                        final String host,

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java Fri Oct 21 01:19:00 2011
@@ -192,7 +192,7 @@ public class MessageStoreTool
 
         if (_initialised)
         {
-            ApplicationRegistry.remove();
+            ApplicationRegistry.remove(1);
         }
 
         _console.println("...exiting");
@@ -274,7 +274,7 @@ public class MessageStoreTool
         {
             ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile);
 
-            ApplicationRegistry.remove();
+            ApplicationRegistry.remove(1);
 
             ApplicationRegistry.initialise(registry);
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Fri Oct 21 01:19:00 2011
@@ -364,7 +364,7 @@ public class Show extends AbstractComman
             {
                 if(msg instanceof AMQMessage)
                 {
-                    headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().getProperties());
+                    headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties);
                 }
             }
             catch (AMQException e)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org