You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/19 17:13:38 UTC

svn commit: r1172657 [10/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/examples/csha...

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/management/AMQUserManagementMBean.java Mon Sep 19 15:13:18 2011
@@ -91,15 +91,10 @@ public class AMQUserManagementMBean exte
 
     public boolean setPassword(String username, String password)
     {
-        return setPassword(username, password.toCharArray());
-    }
-    
-    public boolean setPassword(String username, char[] password)
-    {
         try
         {
             //delegate password changes to the Principal Database
-            return _principalDatabase.updatePassword(new UsernamePrincipal(username), password);
+            return _principalDatabase.updatePassword(new UsernamePrincipal(username), password.toCharArray());
         }
         catch (AccountNotFoundException e)
         {
@@ -108,11 +103,6 @@ public class AMQUserManagementMBean exte
         }
     }
 
-    public boolean setRights(String username, boolean read, boolean write, boolean admin)
-    {
-        throw new UnsupportedOperationException("Support for setting access rights no longer supported.");
-    }
-    
     public boolean createUser(String username, String password)
     {
         if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password.toCharArray()))
@@ -122,20 +112,6 @@ public class AMQUserManagementMBean exte
 
         return false;
     }
-    
-    public boolean createUser(String username, String password, boolean read, boolean write, boolean admin)
-    {
-        if (read || write || admin)
-        {
-            throw new UnsupportedOperationException("Support for setting access rights to true no longer supported.");
-        }
-        return createUser(username, password);
-    }
-
-    public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin)
-    {
-        return createUser(username, new String(password), read, write, admin);
-    }
 
     public boolean deleteUser(String username)
     {
@@ -181,7 +157,6 @@ public class AMQUserManagementMBean exte
             for (Principal user : users)
             {
                 // Create header attributes list
-                
                 // Read,Write,Admin items are depcreated and we return always false.
                 Object[] itemData = {user.getName(), false, false, false};
                 CompositeData messageData = new CompositeDataSupport(_userDataType, COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]), itemData);

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Mon Sep 19 15:13:18 2011
@@ -20,18 +20,36 @@
  */
 package org.apache.qpid.server.security.auth.manager;
 
+import javax.security.auth.Subject;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
 import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.plugins.Plugin;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 
 /**
- * The AuthenticationManager class is the entity responsible for
- * determining the authenticity of user credentials.
+ * Implementations of the AuthenticationManager are responsible for determining
+ * the authenticity of a user's credentials.
+ * 
+ * If the authentication is successful, the manager is responsible for producing a populated
+ * {@link Subject} containing the user's identity and zero or more principals representing
+ * groups to which the user belongs.
+ * <p>
+ * The {@link #initialise()} method is responsible for registering SASL mechanisms required by
+ * the manager.  The {@link #close()} method must reverse this registration.
+ * 
  */
-public interface AuthenticationManager extends Closeable
+public interface AuthenticationManager extends Closeable, Plugin
 {
+    /** The name for the required SASL Server mechanisms */
+    public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
+
+    /**
+     * Initialise the authentication plugin.
+     *
+     */
+    void initialise();
 
    /**
     * Gets the SASL mechanisms known to this manager.

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Mon Sep 19 15:13:18 2011
@@ -20,32 +20,64 @@
  */
 package org.apache.qpid.server.security.auth.manager;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.Security;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.login.AccountNotFoundException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.configuration.PropertyException;
+import org.apache.qpid.configuration.PropertyUtils;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.sasl.JCAProvider;
+import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean;
 import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
+import org.apache.qpid.server.security.auth.sasl.JCAProvider;
 import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
 
-import javax.security.auth.Subject;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.AccountNotFoundException;
-import javax.security.sasl.SaslServerFactory;
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.Sasl;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.TreeMap;
-import java.security.Security;
 
 /**
  * Concrete implementation of the AuthenticationManager that determines if supplied
- * user credentials match those appearing in a PrincipalDatabase.
+ * user credentials match those appearing in a PrincipalDatabase.   The implementation
+ * of the PrincipalDatabase is determined from the configuration.
+ * 
+ * This implementation also registers the JMX UserManagemement MBean.
+ * 
+ * This plugin expects configuration such as:
  *
+ * <pre>
+ * &lt;pd-auth-manager&gt;
+ *   &lt;principal-database&gt;
+ *      &lt;class&gt;org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase&lt;/class&gt;
+ *      &lt;attributes&gt;
+ *         &lt;attribute&gt;
+ *              &lt;name>passwordFile&lt;/name&gt;
+ *              &lt;value>${conf}/passwd&lt;/value&gt;
+ *          &lt;/attribute&gt;
+ *      &lt;/attributes&gt;
+ *   &lt;/principal-database&gt;
+ * &lt;/pd-auth-manager&gt;
+ * </pre>
  */
 public class PrincipalDatabaseAuthenticationManager implements AuthenticationManager
 {
@@ -55,25 +87,109 @@ public class PrincipalDatabaseAuthentica
     private String _mechanisms;
 
     /** Maps from the mechanism to the callback handler to use for handling those requests */
-    private Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
+    private final Map<String, CallbackHandler> _callbackHandlerMap = new HashMap<String, CallbackHandler>();
 
     /**
      * Maps from the mechanism to the properties used to initialise the server. See the method Sasl.createSaslServer for
      * details of the use of these properties. This map is populated during initialisation of each provider.
      */
-    private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
+    private final Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
+
+    protected PrincipalDatabase _principalDatabase = null;
 
-    /** The name for the required SASL Server mechanisms */
-    public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
+    protected AMQUserManagementMBean _mbean = null;
 
-    public PrincipalDatabaseAuthenticationManager()  
+    public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>()
     {
-        _logger.info("Initialising  PrincipalDatabase authentication manager.");
+        public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException
+        {
+            final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
+
+            // If there is no configuration for this plugin then don't load it.
+            if (configuration == null)
+            {
+                _logger.info("No authentication-manager configuration found for PrincipalDatabaseAuthenticationManager");
+                return null;
+            }
+
+            final PrincipalDatabaseAuthenticationManager pdam = new PrincipalDatabaseAuthenticationManager();
+            pdam.configure(configuration);
+            pdam.initialise();
+            return pdam;
+        }
+
+        public Class<PrincipalDatabaseAuthenticationManager> getPluginClass()
+        {
+            return PrincipalDatabaseAuthenticationManager.class;
+        }
+
+        public String getPluginName()
+        {
+            return PrincipalDatabaseAuthenticationManager.class.getName();
+        }
+    };
+
+    public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin {
+ 
+        public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory()
+        {
+            public List<String> getParentPaths()
+            {
+                return Arrays.asList("security.pd-auth-manager");
+            }
+
+            public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException
+            {
+                final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration();
+                
+                instance.setConfiguration(path, config);
+                return instance;
+            }
+        };
+
+        public String[] getElementsProcessed()
+        {
+            return new String[] {"principal-database.class",
+                                 "principal-database.attributes.attribute.name",
+                                 "principal-database.attributes.attribute.value"};
+        }
+
+        public void validateConfiguration() throws ConfigurationException
+        {
+        }
+  
+        public String getPrincipalDatabaseClass()
+        {
+            return _configuration.getString("principal-database.class");
+        }
+  
+        public Map<String,String> getPdClassAttributeMap() throws ConfigurationException
+        {
+            final List<String> argumentNames = _configuration.getList("principal-database.attributes.attribute.name");
+            final List<String> argumentValues = _configuration.getList("principal-database.attributes.attribute.value");
+            final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
+
+            for (int i = 0; i < argumentNames.size(); i++)
+            {
+                final String argName = argumentNames.get(i);
+                final String argValue = argumentValues.get(i);
+
+                attributes.put(argName, argValue);
+            }
+
+            return Collections.unmodifiableMap(attributes);
+        }
+    }
 
-        Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
+    protected PrincipalDatabaseAuthenticationManager()  
+    {
+    }
 
+    public void initialise()
+    {
+        final Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
 
-        initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases());
+        initialiseAuthenticationMechanisms(providerMap, _principalDatabase);
 
         if (providerMap.size() > 0)
         {
@@ -86,28 +202,13 @@ public class PrincipalDatabaseAuthentica
             {
                 _logger.info("Additional SASL providers successfully registered.");
             }
-
         }
         else
         {
             _logger.warn("No additional SASL providers registered.");
         }
-    }
 
-    private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases) 
-    {
-        if (databases.size() > 1)
-        {
-            _logger.warn("More than one principle database provided currently authentication mechanism will override each other.");
-        }
-
-        for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet())
-        {
-            // fixme As the database now provide the mechanisms they support, they will ...
-            // overwrite each other in the map. There should only be one database per vhost.
-            // But currently we must have authentication before vhost definition.
-            initialiseAuthenticationMechanisms(providerMap, entry.getValue());
-        }
+        registerManagement();
     }
 
     private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) 
@@ -126,7 +227,6 @@ public class PrincipalDatabaseAuthentica
 
     private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser,
                                                    Map<String, Class<? extends SaslServerFactory>> providerMap)
-            
     {
         if (_mechanisms == null)
         {
@@ -147,6 +247,21 @@ public class PrincipalDatabaseAuthentica
         _logger.info("Initialised " + mechanism + " SASL provider successfully");
     }
 
+    /**
+     * @see org.apache.qpid.server.plugins.Plugin#configure(org.apache.qpid.server.configuration.plugins.ConfigurationPlugin)
+     */
+    public void configure(final ConfigurationPlugin config) throws ConfigurationException
+    {
+        final PrincipalDatabaseAuthenticationManagerConfiguration pdamConfig = (PrincipalDatabaseAuthenticationManagerConfiguration) config;
+        final String pdClazz = pdamConfig.getPrincipalDatabaseClass();
+
+        _logger.info("PrincipalDatabase concrete implementation : " + pdClazz);
+
+        _principalDatabase = createPrincipalDatabaseImpl(pdClazz);
+
+        configPrincipalDatabase(_principalDatabase, pdamConfig);        
+    }
+
     public String getMechanisms()
     {
         return _mechanisms;
@@ -158,6 +273,9 @@ public class PrincipalDatabaseAuthentica
                                      _callbackHandlerMap.get(mechanism));
     }
 
+    /**
+     * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(SaslServer, byte[])
+     */
     public AuthenticationResult authenticate(SaslServer server, byte[] response)
     {
         try
@@ -182,23 +300,14 @@ public class PrincipalDatabaseAuthentica
         }
     }
 
-    public void close()
-    {
-        _mechanisms = null;
-        Security.removeProvider(PROVIDER_NAME);
-    }
-
     /**
      * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
      */
-    @Override
     public AuthenticationResult authenticate(final String username, final String password)
     {
-        final PrincipalDatabase db = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().values().iterator().next();
-
         try
         {
-            if (db.verifyPassword(username, password.toCharArray()))
+            if (_principalDatabase.verifyPassword(username, password.toCharArray()))
             {
                 final Subject subject = new Subject();
                 subject.getPrincipals().add(new UsernamePrincipal(username));
@@ -214,4 +323,141 @@ public class PrincipalDatabaseAuthentica
             return new AuthenticationResult(AuthenticationStatus.CONTINUE);
         }
     }
+
+    public void close()
+    {
+        _mechanisms = null;
+        Security.removeProvider(PROVIDER_NAME);
+
+        unregisterManagement();
+    }
+
+    private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException
+    {
+        try
+        {
+            return (PrincipalDatabase) Class.forName(pdClazz).newInstance();
+        }
+        catch (InstantiationException ie)
+        {
+            throw new ConfigurationException("Cannot instantiate " + pdClazz, ie);
+        }
+        catch (IllegalAccessException iae)
+        {
+            throw new ConfigurationException("Cannot access " + pdClazz, iae);
+        }
+        catch (ClassNotFoundException cnfe)
+        {
+            throw new ConfigurationException("Cannot load " + pdClazz + " implementation", cnfe);
+        }
+        catch (ClassCastException cce)
+        {
+            throw new ConfigurationException("Expecting a " + PrincipalDatabase.class + " implementation", cce);
+        }
+    }
+
+    private void configPrincipalDatabase(final PrincipalDatabase principalDatabase, final PrincipalDatabaseAuthenticationManagerConfiguration config)
+            throws ConfigurationException
+    {
+
+        final Map<String,String> attributes = config.getPdClassAttributeMap();
+
+        for (Iterator<Entry<String, String>> iterator = attributes.entrySet().iterator(); iterator.hasNext();)
+        {
+            final Entry<String, String> nameValuePair = iterator.next();
+            final String methodName = generateSetterName(nameValuePair.getKey());
+            final Method method;
+            try
+            {
+                method = principalDatabase.getClass().getMethod(methodName, String.class);
+            }
+            catch (Exception e)
+            {
+                throw new ConfigurationException("No method " + methodName + " found in class "
+                        + principalDatabase.getClass()
+                        + " hence unable to configure principal database. The method must be public and "
+                        + "have a single String argument with a void return type", e);
+            }
+            try
+            {
+                method.invoke(principalDatabase, PropertyUtils.replaceProperties(nameValuePair.getValue()));
+            }
+            catch (IllegalArgumentException e)
+            {
+                throw new ConfigurationException(e.getMessage(), e);
+            }
+            catch (PropertyException e)
+            {
+                throw new ConfigurationException(e.getMessage(), e);
+            }
+            catch (IllegalAccessException e)
+            {
+                throw new ConfigurationException(e.getMessage(), e);
+            }
+            catch (InvocationTargetException e)
+            {
+                // QPID-1347..  InvocationTargetException wraps the checked exception thrown from the reflective
+                // method call.  Pull out the underlying message and cause to make these more apparent to the user.
+                throw new ConfigurationException(e.getCause().getMessage(), e.getCause());
+            }
+        }
+    }
+
+    private String generateSetterName(String argName) throws ConfigurationException
+    {
+        if ((argName == null) || (argName.length() == 0))
+        {
+            throw new ConfigurationException("Argument names must have length >= 1 character");
+        }
+
+        if (Character.isLowerCase(argName.charAt(0)))
+        {
+            argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
+        }
+
+        final String methodName = "set" + argName;
+        return methodName;
+    }
+
+    protected void setPrincipalDatabase(final PrincipalDatabase principalDatabase)
+    {
+        _principalDatabase = principalDatabase;
+    }
+
+    protected void registerManagement()
+    {
+        try
+        {
+            _logger.info("Registering UserManagementMBean");
+
+            _mbean = new AMQUserManagementMBean();
+            _mbean.setPrincipalDatabase(_principalDatabase);
+            _mbean.register();
+        }
+        catch (Exception e)
+        {
+            _logger.warn("User management disabled as unable to create MBean:", e);
+            _mbean = null;
+        }
+    }
+
+    protected void unregisterManagement()
+    {
+        try
+        {
+            if (_mbean != null)
+            {
+                _logger.info("Unregistering UserManagementMBean");
+                _mbean.unregister();
+            }
+        }
+        catch (Exception e)
+        {
+            _logger.warn("Failed to unregister User management MBean:", e);
+        }
+        finally
+        {
+            _mbean = null;
+        }
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/AuthenticationProviderInitialiser.java Mon Sep 19 15:13:18 2011
@@ -25,9 +25,6 @@ import java.util.Map;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.sasl.SaslServerFactory;
 
-import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-
 public interface AuthenticationProviderInitialiser
 {
     /**
@@ -37,24 +34,6 @@ public interface AuthenticationProviderI
     String getMechanismName();
 
     /**
-     * Initialise the authentication provider.
-     * @param baseConfigPath the path in the config file that points to any config options for this provider. Each
-     * provider can have its own set of configuration options
-     * @param configuration the Apache Commons Configuration instance used to configure this provider
-     * @param principalDatabases the set of principal databases that are available
-     * @throws Exception needs refined Exception is too broad.
-     */
-    void initialise(String baseConfigPath, Configuration configuration,
-                    Map<String, PrincipalDatabase> principalDatabases) throws Exception;
-
-    /**
-     * Initialise the authentication provider.     
-     * @param db The principal database to initialise with
-     */
-    void initialise(PrincipalDatabase db);
-
-
-    /**
      * @return the callback handler that should be used to process authentication requests for this mechanism. This will
      * be called after initialise and will be stored by the authentication manager. The callback handler <b>must</b> be
      * fully threadsafe.

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java Mon Sep 19 15:13:18 2011
@@ -21,12 +21,11 @@
 package org.apache.qpid.server.security.auth.sasl;
 
 import java.security.Provider;
-import java.security.Security;
 import java.util.Map;
 
 import javax.security.sasl.SaslServerFactory;
 
-public final class JCAProvider extends Provider
+public class JCAProvider extends Provider
 {
     public JCAProvider(String name, Map<String, Class<? extends SaslServerFactory>> providerMap)
     {

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java Mon Sep 19 15:13:18 2011
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.security.auth.sasl.amqplain;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 
 import javax.security.auth.callback.Callback;
@@ -31,7 +33,6 @@ import javax.security.sasl.AuthorizeCall
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.framing.AMQFrameDecodingException;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
@@ -60,7 +61,7 @@ public class AmqPlainSaslServer implemen
     {
         try
         {
-            final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length);
+            final FieldTable ft = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(response)), response.length);
             String username = (String) ft.getString("LOGIN");
             // we do not care about the prompt but it throws if null
             NameCallback nameCb = new NameCallback("prompt", username);

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java Mon Sep 19 15:13:18 2011
@@ -20,21 +20,9 @@
  */
 package org.apache.qpid.server.security.auth.sasl.anonymous;
 
-import java.io.IOException;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
 
 public class AnonymousSaslServer implements SaslServer
 {

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Mon Sep 19 15:13:18 2011
@@ -259,7 +259,7 @@ public class AMQStateManager implements 
 
     public AMQProtocolSession getProtocolSession()
     {
-        SecurityManager.setThreadPrincipal(_protocolSession.getPrincipal());
+        SecurityManager.setThreadSubject(_protocolSession.getAuthorizedSubject());
         return _protocolSession;
     }
 }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Mon Sep 19 15:13:18 2011
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.store;
 
 import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.lang.ref.SoftReference;
@@ -479,9 +480,15 @@ public class DerbyMessageStore implement
                         FieldTable arguments;
                         if(dataAsBytes.length > 0)
                         {
-                            org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.wrap(dataAsBytes);
 
-                            arguments = new FieldTable(buffer,buffer.limit());
+                            try
+                            {
+                                arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length);
+                            }
+                            catch (IOException e)
+                            {
+                                throw new RuntimeException("IO Exception should not be thrown",e);
+                            }
                         }
                         else
                         {

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java Mon Sep 19 15:13:18 2011
@@ -20,13 +20,21 @@
  */
 package org.apache.qpid.server.subscription;
 
+import java.util.Map;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.transport.ServerSession;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageFlowMode;
 
 /**
  * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
@@ -56,4 +64,23 @@ public interface SubscriptionFactory
                                             RecordDeliveryMethod recordMethod
     )
             throws AMQException;
+
+
+    SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(AMQChannel channel,
+                                                                          AMQProtocolSession session,
+                                                                          AMQShortString consumerTag,
+                                                                          FieldTable filters,
+                                                                          boolean noLocal,
+                                                                          FlowCreditManager creditManager,
+                                                                          ClientDeliveryMethod deliveryMethod,
+                                                                          RecordDeliveryMethod recordMethod) throws AMQException;
+
+    Subscription_0_10 createSubscription(final ServerSession session,
+                                         final String destination,
+                                         final MessageAcceptMode acceptMode,
+                                         final MessageAcquireMode acquireMode,
+                                         final MessageFlowMode flowMode,
+                                         final FlowCreditManager_0_10 creditManager,
+                                         final FilterManager filterManager,
+                                         final Map<String,Object> arguments);
 }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java Mon Sep 19 15:13:18 2011
@@ -20,17 +20,28 @@
  */
 package org.apache.qpid.server.subscription;
 
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageFlowMode;
 
 public class SubscriptionFactoryImpl implements SubscriptionFactory
 {
+    private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+
     public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
                                            AMQShortString consumerTag, boolean acks, FieldTable filters,
                                            boolean noLocal, FlowCreditManager creditManager) throws AMQException
@@ -78,18 +89,47 @@ public class SubscriptionFactoryImpl imp
 
         if(isBrowser)
         {
-            return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
+            return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
         }
         else if(acks)
         {
-            return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
+            return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
         }
         else
         {
-            return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
+            return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
         }
     }
 
+    public SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(final AMQChannel channel,
+                                                                                 final AMQProtocolSession session,
+                                                                                 final AMQShortString consumerTag,
+                                                                                 final FieldTable filters,
+                                                                                 final boolean noLocal,
+                                                                                 final FlowCreditManager creditManager,
+                                                                                 final ClientDeliveryMethod deliveryMethod,
+                                                                                 final RecordDeliveryMethod recordMethod) throws AMQException
+    {
+        return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod, getNextSubscriptionId());
+    }
+
+    public Subscription_0_10 createSubscription(final ServerSession session,
+                                                final String destination,
+                                                final MessageAcceptMode acceptMode,
+                                                final MessageAcquireMode acquireMode,
+                                                final MessageFlowMode flowMode,
+                                                final FlowCreditManager_0_10 creditManager,
+                                                final FilterManager filterManager,
+                                                final Map<String,Object> arguments)
+    {
+        return new Subscription_0_10(session, destination, acceptMode, acquireMode,
+                                flowMode, creditManager, filterManager, arguments, getNextSubscriptionId());
+    }
 
     public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
+
+    private static long getNextSubscriptionId()
+    {
+        return SUB_ID_GENERATOR.getAndIncrement();
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Mon Sep 19 15:13:18 2011
@@ -88,9 +88,7 @@ public abstract class SubscriptionImpl i
 
     private final Lock _stateChangeLock;
 
-    private static final AtomicLong idGenerator = new AtomicLong(0);
-    // Create a simple ID that increments for ever new Subscription
-    private final long _subscriptionID = idGenerator.getAndIncrement();
+    private final long _subscriptionID;
     private LogSubject _logSubject;
     private LogActor _logActor;
     private UUID _id;
@@ -104,10 +102,11 @@ public abstract class SubscriptionImpl i
                                    AMQShortString consumerTag, FieldTable filters,
                                    boolean noLocal, FlowCreditManager creditManager,
                                    ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod)
+                                   RecordDeliveryMethod recordMethod,
+                                   long subscriptionID)
             throws AMQException
         {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
         }
 
 
@@ -151,10 +150,11 @@ public abstract class SubscriptionImpl i
                                  AMQShortString consumerTag, FieldTable filters,
                                  boolean noLocal, FlowCreditManager creditManager,
                                    ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod)
+                                   RecordDeliveryMethod recordMethod,
+                                   long subscriptionID)
             throws AMQException
         {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
         }
 
 
@@ -211,16 +211,45 @@ public abstract class SubscriptionImpl i
 
     }
 
+    /**
+     * NoAck Subscription for use with BasicGet method.
+     */
+    public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
+    {
+        public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+                               AMQShortString consumerTag, FieldTable filters,
+                               boolean noLocal, FlowCreditManager creditManager,
+                                   ClientDeliveryMethod deliveryMethod,
+                                   RecordDeliveryMethod recordMethod,
+                                   long subscriptionID)
+            throws AMQException
+        {
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+        }
+
+        public boolean isTransient()
+        {
+            return true;
+        }
+
+        public boolean wouldSuspend(QueueEntry msg)
+        {
+            return !getCreditManager().useCreditForMessage(msg.getMessage());
+        }
+
+    }
+
     static final class AckSubscription extends SubscriptionImpl
     {
         public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
                                AMQShortString consumerTag, FieldTable filters,
                                boolean noLocal, FlowCreditManager creditManager,
                                    ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod)
+                                   RecordDeliveryMethod recordMethod,
+                                   long subscriptionID)
             throws AMQException
         {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
         }
 
 
@@ -296,10 +325,11 @@ public abstract class SubscriptionImpl i
                             AMQShortString consumerTag, FieldTable arguments,
                             boolean noLocal, FlowCreditManager creditManager,
                             ClientDeliveryMethod deliveryMethod,
-                            RecordDeliveryMethod recordMethod)
+                            RecordDeliveryMethod recordMethod,
+                            long subscriptionID)
             throws AMQException
     {
-
+        _subscriptionID = subscriptionID;
         _channel = channel;
         _consumerTag = consumerTag;
 
@@ -445,7 +475,7 @@ public abstract class SubscriptionImpl i
 
 
         //check that the message hasn't been rejected
-        if (entry.isRejectedBy(this))
+        if (entry.isRejectedBy(getSubscriptionID()))
         {
             if (_logger.isDebugEnabled())
             {

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java Mon Sep 19 15:13:18 2011
@@ -20,121 +20,108 @@
 */
 package org.apache.qpid.server.subscription;
 
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.subscription.Subscription;
 
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.nio.ByteBuffer;
 
 public class SubscriptionList
 {
-
     private final SubscriptionNode _head = new SubscriptionNode();
 
-    private AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
-    private AtomicInteger _size = new AtomicInteger();
-
+    private final AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
+    private final AtomicReference<SubscriptionNode> _subNodeMarker = new AtomicReference<SubscriptionNode>(_head);
+    private final AtomicInteger _size = new AtomicInteger();
 
-    public final class SubscriptionNode
+    public static final class SubscriptionNode
     {
         private final AtomicBoolean _deleted = new AtomicBoolean();
         private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>();
         private final Subscription _sub;
 
-
         public SubscriptionNode()
         {
-
+            //used for sentinel head and dummy node construction
             _sub = null;
             _deleted.set(true);
         }
 
         public SubscriptionNode(final Subscription sub)
         {
+            //used for regular node construction
             _sub = sub;
         }
 
-
-        public SubscriptionNode getNext()
+        /**
+         * Retrieves the first non-deleted node following the current node.
+         * Any deleted non-tail nodes encountered during the search are unlinked.
+         *
+         * @return the next non-deleted node, or null if none was found.
+         */
+        public SubscriptionNode findNext()
         {
-
             SubscriptionNode next = nextNode();
             while(next != null && next.isDeleted())
             {
-
                 final SubscriptionNode newNext = next.nextNode();
                 if(newNext != null)
                 {
+                    //try to move our _next reference forward to the 'newNext'
+                    //node to unlink the deleted node
                     _next.compareAndSet(next, newNext);
                     next = nextNode();
                 }
                 else
                 {
+                    //'newNext' is null, meaning 'next' is the current tail. Can't unlink
+                    //the tail node for thread safety reasons, just use the null.
                     next = null;
                 }
-
             }
+
             return next;
         }
 
-        private SubscriptionNode nextNode()
+        /**
+         * Gets the immediately next referenced node in the structure.
+         *
+         * @return the immediately next node in the structure, or null if at the tail.
+         */
+        protected SubscriptionNode nextNode()
         {
             return _next.get();
         }
 
+        /**
+         * Used to initialise the 'next' reference. Will only succeed if the reference was not previously set.
+         *
+         * @param node the SubscriptionNode to set as 'next'
+         * @return whether the operation succeeded
+         */
+        private boolean setNext(final SubscriptionNode node)
+        {
+            return _next.compareAndSet(null, node);
+        }
+
         public boolean isDeleted()
         {
             return _deleted.get();
         }
 
-
         public boolean delete()
         {
-            if(_deleted.compareAndSet(false,true))
-            {
-                _size.decrementAndGet();
-                advanceHead();
-                return true;
-            }
-            else
-            {
-                return false;
-            }
+            return _deleted.compareAndSet(false,true);
         }
 
-
         public Subscription getSubscription()
         {
             return _sub;
         }
     }
 
-
-    public SubscriptionList(AMQQueue queue)
+    private void insert(final SubscriptionNode node, final boolean count)
     {
-    }
-
-    private void advanceHead()
-    {
-        SubscriptionNode head = _head.nextNode();
-        while(head._next.get() != null && head.isDeleted())
-        {
-
-            final SubscriptionNode newhead = head.nextNode();
-            if(newhead != null)
-            {
-                _head._next.compareAndSet(head, newhead);
-            }
-            head = _head.nextNode();
-        }
-    }
-
-
-    public SubscriptionNode add(Subscription sub)
-    {
-        SubscriptionNode node = new SubscriptionNode(sub);
         for (;;)
         {
             SubscriptionNode tail = _tail.get();
@@ -143,11 +130,14 @@ public class SubscriptionList
             {
                 if (next == null)
                 {
-                    if (tail._next.compareAndSet(null, node))
+                    if (tail.setNext(node))
                     {
                         _tail.compareAndSet(tail, node);
-                        _size.incrementAndGet();
-                        return node;
+                        if(count)
+                        {
+                            _size.incrementAndGet();
+                        }
+                        return;
                     }
                 }
                 else
@@ -156,27 +146,101 @@ public class SubscriptionList
                 }
             }
         }
+    }
 
+    public void add(final Subscription sub)
+    {
+        SubscriptionNode node = new SubscriptionNode(sub);
+        insert(node, true);
     }
 
-    public boolean remove(Subscription sub)
+    public boolean remove(final Subscription sub)
     {
-        SubscriptionNode node = _head.getNext();
+        SubscriptionNode prevNode = _head;
+        SubscriptionNode node = _head.nextNode();
+
         while(node != null)
         {
-            if(sub.equals(node._sub) && node.delete())
+            if(sub.equals(node.getSubscription()) && node.delete())
             {
+                _size.decrementAndGet();
+
+                SubscriptionNode tail = _tail.get();
+                if(node == tail)
+                {
+                    //we cant remove the last node from the structure for
+                    //correctness reasons, however we have just 'deleted'
+                    //the tail. Inserting an empty dummy node after it will
+                    //let us scavenge the node containing the Subscription.
+                    insert(new SubscriptionNode(), false);
+                }
+
+                //advance the next node reference in the 'prevNode' to scavange
+                //the newly 'deleted' node for the Subscription.
+                prevNode.findNext();
+
+                nodeMarkerCleanup(node);
+
                 return true;
             }
-            node = node.getNext();
+
+            prevNode = node;
+            node = node.findNext();
         }
+
         return false;
     }
 
+    private void nodeMarkerCleanup(final SubscriptionNode node)
+    {
+        SubscriptionNode markedNode = _subNodeMarker.get();
+        if(node == markedNode)
+        {
+            //if the marked node is the one we are removing, then
+            //replace it with a dummy pointing at the next node.
+            //this is OK as the marked node is only used to index
+            //into the list and find the next node to use.
+            //Because we inserted a dummy if node was the
+            //tail, markedNode.nextNode() can never be null.
+            SubscriptionNode dummy = new SubscriptionNode();
+            dummy.setNext(markedNode.nextNode());
+
+            //if the CAS fails the marked node has changed, thus
+            //we don't care about the dummy and just forget it
+            _subNodeMarker.compareAndSet(markedNode, dummy);
+        }
+        else if(markedNode != null)
+        {
+            //if the marked node was already deleted then it could
+            //hold subsequently removed nodes after it in the list 
+            //in memory. Scavenge it to ensure their actual removal.
+            if(markedNode != _head && markedNode.isDeleted())
+            {
+                markedNode.findNext();
+            }
+        }
+    }
 
-    public static class SubscriptionNodeIterator
+    public boolean updateMarkedNode(final SubscriptionNode expected, final SubscriptionNode nextNode)
+    {
+        return _subNodeMarker.compareAndSet(expected, nextNode);
+    }
+
+    /**
+     * Get the current marked SubscriptionNode. This should only be used only to index into the list and find the next node
+     * after the mark, since if the previously marked node was subsequently deleted the item returned may be a dummy node
+     * with reference to the next node.
+     *
+     * @return the previously marked node (or a dummy if it was subsequently deleted)
+     */
+    public SubscriptionNode getMarkedNode()
     {
+        return _subNodeMarker.get();
+    }
+
 
+    public static class SubscriptionNodeIterator
+    {
         private SubscriptionNode _lastNode;
 
         SubscriptionNodeIterator(SubscriptionNode startNode)
@@ -184,49 +248,25 @@ public class SubscriptionList
             _lastNode = startNode;
         }
 
-
-        public boolean atTail()
-        {
-            return _lastNode.nextNode() == null;
-        }
-
         public SubscriptionNode getNode()
         {
-
             return _lastNode;
-
         }
 
         public boolean advance()
         {
+            SubscriptionNode nextNode = _lastNode.findNext();
+            _lastNode = nextNode;
 
-            if(!atTail())
-            {
-                SubscriptionNode nextNode = _lastNode.nextNode();
-                while(nextNode.isDeleted() && nextNode.nextNode() != null)
-                {
-                    nextNode = nextNode.nextNode();
-                }
-                _lastNode = nextNode;
-                return true;
-
-            }
-            else
-            {
-                return false;
-            }
-
+            return _lastNode != null;
         }
-
     }
 
-
     public SubscriptionNodeIterator iterator()
     {
         return new SubscriptionNodeIterator(_head);
     }
 
-
     public SubscriptionNode getHead()
     {
         return _head;
@@ -236,9 +276,6 @@ public class SubscriptionList
     {
         return _size.get();
     }
-
-
-
 }
 
 

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Mon Sep 19 15:13:18 2011
@@ -40,7 +40,6 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.SubscriptionActor;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.AMQMessage;
@@ -80,10 +79,7 @@ import java.nio.ByteBuffer;
 
 public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
 {
-
-    private static final AtomicLong idGenerator = new AtomicLong(0);
-    // Create a simple ID that increments for ever new Subscription
-    private final long _subscriptionID = idGenerator.getAndIncrement();
+    private final long _subscriptionID;
 
     private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
     private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
@@ -113,16 +109,15 @@ public class Subscription_0_10 implement
     private final MessageAcquireMode _acquireMode;
     private MessageFlowMode _flowMode;
     private final ServerSession _session;
-    private AtomicBoolean _stopped = new AtomicBoolean(true);
-    private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
+    private final AtomicBoolean _stopped = new AtomicBoolean(true);
     private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
 
     private LogActor _logActor;
-    private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+    private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
     private UUID _id;
     private String _traceExclude;
     private String _trace;
-    private long _createTime = System.currentTimeMillis();
+    private final long _createTime = System.currentTimeMillis();
     private final AtomicLong _deliveredCount = new AtomicLong(0);
     private final Map<String, Object> _arguments;
 
@@ -131,8 +126,9 @@ public class Subscription_0_10 implement
                              MessageAcquireMode acquireMode,
                              MessageFlowMode flowMode,
                              FlowCreditManager_0_10 creditManager,
-                             FilterManager filters,Map<String, Object> arguments)
+                             FilterManager filters,Map<String, Object> arguments, long subscriptionId)
     {
+        _subscriptionID = subscriptionId;
         _session = session;
         _destination = destination;
         _acceptMode = acceptMode;
@@ -198,7 +194,7 @@ public class Subscription_0_10 implement
 
     public boolean isSuspended()
     {
-        return !isActive() || _deleted.get(); // TODO check for Session suspension
+        return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
     }
 
     public boolean hasInterest(QueueEntry entry)
@@ -207,7 +203,7 @@ public class Subscription_0_10 implement
 
 
         //check that the message hasn't been rejected
-        if (entry.isRejectedBy(this))
+        if (entry.isRejectedBy(getSubscriptionID()))
         {
 
             return false;
@@ -731,13 +727,22 @@ public class Subscription_0_10 implement
 
     public void stop()
     {
-        if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+        try
         {
-            _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+            getSendLock();
+
+            if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+            {
+                _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+            }
+            _stopped.set(true);
+            FlowCreditManager_0_10 creditManager = getCreditManager();
+            creditManager.clearCredit();
+        }
+        finally
+        {
+            releaseSendLock();
         }
-        _stopped.set(true);
-        FlowCreditManager_0_10 creditManager = getCreditManager();
-        creditManager.clearCredit();
     }
 
     public void addCredit(MessageCreditUnit unit, long value)

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Mon Sep 19 15:13:18 2011
@@ -20,13 +20,18 @@
  */
 package org.apache.qpid.server.transport;
 
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
 
+import java.security.Principal;
 import java.text.MessageFormat;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.auth.Subject;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
@@ -38,7 +43,8 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Connection;
@@ -49,20 +55,22 @@ import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolEvent;
 import org.apache.qpid.transport.Session;
 
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
+public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
 {
     private ConnectionConfig _config;
     private Runnable _onOpenTask;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
     private LogActor _actor = GenericActor.getInstance(this);
 
-    private ApplicationRegistry _registry;
+    private Subject _authorizedSubject = null;
+    private Principal _authorizedPrincipal = null;
     private boolean _statisticsEnabled = false;
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    private final long _connectionId;
     
-    public ServerConnection()
+    public ServerConnection(final long connectionId)
     {
-
+        _connectionId = connectionId;
     }
 
     public UUID getId()
@@ -212,9 +220,9 @@ public class ServerConnection extends Co
     public String toLogString()
     {
         boolean hasVirtualHost = (null != this.getVirtualHost());
-        boolean hasPrincipal = (null != getAuthorizationID());
+        boolean hasClientId = (null != getClientId());
 
-        if (hasPrincipal && hasVirtualHost)
+        if (hasClientId && hasVirtualHost)
         {
             return "[" +
                     MessageFormat.format(CONNECTION_FORMAT,
@@ -224,7 +232,7 @@ public class ServerConnection extends Co
                                          getVirtualHost().getName())
                  + "] ";
         }
-        else if (hasPrincipal)
+        else if (hasClientId)
         {
             return "[" +
                     MessageFormat.format(USER_FORMAT,
@@ -341,4 +349,54 @@ public class ServerConnection extends Co
     {
         _statisticsEnabled = enabled;
     }
+
+    /**
+     * @return authorizedSubject
+     */
+    public Subject getAuthorizedSubject()
+    {
+        return _authorizedSubject;
+    }
+
+    /**
+     * Sets the authorized subject.  It also extracts the UsernamePrincipal from the subject
+     * and caches it for optimisation purposes.
+     *
+     * @param authorizedSubject
+     */
+    public void setAuthorizedSubject(final Subject authorizedSubject)
+    {
+        if (authorizedSubject == null)
+        {
+            _authorizedSubject = null;
+            _authorizedPrincipal = null;
+        }
+        else
+        {
+            _authorizedSubject = authorizedSubject;
+            _authorizedPrincipal = UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+        }
+    }
+
+    public Principal getAuthorizedPrincipal()
+    {
+        return _authorizedPrincipal;
+    }
+
+    public long getConnectionId()
+    {
+        return _connectionId;
+    }
+
+    @Override
+    public boolean isSessionNameUnique(String name)
+    {
+        return !super.hasSessionWithName(name);
+    }
+
+    @Override
+    public String getUserName()
+    {
+        return _authorizedPrincipal.getName();
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Mon Sep 19 15:13:18 2011
@@ -21,8 +21,10 @@
 package org.apache.qpid.server.transport;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
@@ -31,11 +33,28 @@ import javax.security.sasl.SaslException
 import javax.security.sasl.SaslServer;
 
 import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
+import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ConnectionOpen;
+import org.apache.qpid.transport.ConnectionOpenOk;
+import org.apache.qpid.transport.ConnectionTuneOk;
+import org.apache.qpid.transport.ServerDelegate;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionAttach;
+import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.SessionDetach;
+import org.apache.qpid.transport.SessionDetachCode;
+import org.apache.qpid.transport.SessionDetached;
 
 public class ServerConnectionDelegate extends ServerDelegate
 {
@@ -70,24 +89,42 @@ public class ServerConnectionDelegate ex
         return list;
     }
 
-    @Override
     public ServerSession getSession(Connection conn, SessionAttach atc)
     {
-        SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
+        SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
 
         ServerSession ssn = new ServerSession(conn, serverSessionDelegate,  new Binary(atc.getName()), 0);
 
         return ssn;
     }
 
-    @Override
     protected SaslServer createSaslServer(String mechanism) throws SaslException
     {
         return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
 
     }
 
-    @Override
+    protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
+    {
+        final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response);
+        final ServerConnection sconn = (ServerConnection) conn;
+        
+        
+        if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
+        {
+            tuneAuthorizedConnection(sconn);
+            sconn.setAuthorizedSubject(authResult.getSubject());
+        }
+        else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus()))
+        {
+            connectionAuthContinue(sconn, authResult.getChallenge());
+        }
+        else
+        {
+            connectionAuthFailed(sconn, authResult.getCause());
+        }
+    }
+
     public void connectionClose(Connection conn, ConnectionClose close)
     {
         try
@@ -101,10 +138,9 @@ public class ServerConnectionDelegate ex
         
     }
 
-    @Override
     public void connectionOpen(Connection conn, ConnectionOpen open)
     {
-        ServerConnection sconn = (ServerConnection) conn;
+        final ServerConnection sconn = (ServerConnection) conn;
         
         VirtualHost vhost;
         String vhostName;
@@ -118,7 +154,7 @@ public class ServerConnectionDelegate ex
         }
         vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
 
-        SecurityManager.setThreadPrincipal(conn.getAuthorizationID());
+        SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
         
         if(vhost != null)
         {
@@ -142,6 +178,26 @@ public class ServerConnectionDelegate ex
         }
         
     }
+
+    @Override
+    public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
+    {
+        ServerConnection sconn = (ServerConnection) conn;
+        int okChannelMax = ok.getChannelMax();
+
+        if (okChannelMax > getChannelMax())
+        {
+            _logger.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+                    "client connectionTuneOk returned a channelMax (" + okChannelMax +
+                    ") above the servers offered limit (" + getChannelMax() +")");
+
+            //Due to the error we must forcefully close the connection without negotiation
+            sconn.getSender().close();
+            return;
+        }
+
+        setConnectionTuneOkChannelMax(sconn, okChannelMax);
+    }
     
     @Override
     protected int getHeartbeatMax()
@@ -155,4 +211,59 @@ public class ServerConnectionDelegate ex
     {
         return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
     }
+
+    @Override public void sessionDetach(Connection conn, SessionDetach dtc)
+    {
+        // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures
+        // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister
+        // completes.
+        unregisterAllSubscriptions(conn, dtc);
+        super.sessionDetach(conn, dtc);
+    }
+
+    private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc)
+    {
+        final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
+        final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
+        for (Subscription_0_10 subscription_0_10 : subs)
+        {
+            ssn.unregister(subscription_0_10);
+        }
+    }
+
+    @Override
+    public void sessionAttach(final Connection conn, final SessionAttach atc)
+    {
+        final String clientId = new String(atc.getName());
+        final Session ssn = getSession(conn, atc);
+
+        if(isSessionNameUnique(clientId,conn))
+        {
+            conn.registerSession(ssn);
+            super.sessionAttach(conn, atc);
+        }
+        else
+        {
+            ssn.invoke(new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY));
+            ssn.closed();
+        }
+    }
+
+    private boolean isSessionNameUnique(final String name, final Connection conn)
+    {
+        final ServerConnection sconn = (ServerConnection) conn;
+        final String userId = sconn.getUserName();
+
+        final Iterator<AMQConnectionModel> connections =
+                        ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
+        while(connections.hasNext())
+        {
+            final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
+            if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
+            {
+                return false;
+            }
+        }
+        return true;
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Sep 19 15:13:18 2011
@@ -20,8 +20,8 @@
  */
 package org.apache.qpid.server.transport;
 
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
-import static org.apache.qpid.util.Serial.*;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.util.Serial.gt;
 
 import java.lang.ref.WeakReference;
 import java.security.Principal;
@@ -38,6 +38,8 @@ import java.util.concurrent.ConcurrentSk
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.security.auth.Subject;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.ProtocolEngine;
@@ -57,7 +59,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -75,9 +77,7 @@ import org.apache.qpid.transport.Session
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.sun.security.auth.UserPrincipal;
-
-public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
+public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
 {
     private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
     
@@ -118,8 +118,6 @@ public class ServerSession extends Sessi
     private final AtomicLong _txnCount = new AtomicLong(0);
     private final AtomicLong _txnUpdateTime = new AtomicLong(0);
 
-    private Principal _principal;
-
     private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
 
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
@@ -131,27 +129,27 @@ public class ServerSession extends Sessi
         this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
     }
 
-    protected void setState(State state)
-    {
-        super.setState(state);
-
-        if (state == State.OPEN)
-        {
-	        _actor.message(ChannelMessages.CREATE());
-        }
-    }
-
     public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
     {
         super(connection, delegate, name, expiry);
         _connectionConfig = connConfig;        
         _transaction = new AutoCommitTransaction(this.getMessageStore());
-        _principal = new UserPrincipal(connection.getAuthorizationID());
+
         _reference = new WeakReference<Session>(this);
         _id = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
     }
 
+    protected void setState(State state)
+    {
+        super.setState(state);
+
+        if (state == State.OPEN)
+        {
+            _actor.message(ChannelMessages.CREATE());
+        }
+    }
+
     private ConfigStore getConfigStore()
     {
         return getConnectionConfig().getConfigStore();
@@ -202,8 +200,8 @@ public class ServerSession extends Sessi
     public void sendMessage(MessageTransfer xfr,
                             Runnable postIdSettingAction)
     {
-        invoke(xfr, postIdSettingAction);
         getConnectionModel().registerMessageDelivered(xfr.getBodySize());
+        invoke(xfr, postIdSettingAction);
     }
 
     public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
@@ -419,7 +417,7 @@ public class ServerSession extends Sessi
         catch (AMQException e)
         {
             // TODO
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            _logger.error("Failed to unregister subscription", e);
         }
         finally
         {
@@ -516,9 +514,14 @@ public class ServerSession extends Sessi
         return _txnCount.get();
     }
     
-    public Principal getPrincipal()
+    public Principal getAuthorizedPrincipal()
     {
-        return _principal;
+        return ((ServerConnection) getConnection()).getAuthorizedPrincipal();
+    }
+    
+    public Subject getAuthorizedSubject()
+    {
+        return ((ServerConnection) getConnection()).getAuthorizedSubject();
     }
 
     public void addSessionCloseTask(Task task)
@@ -667,11 +670,25 @@ public class ServerSession extends Sessi
     {
        return "[" +
                MessageFormat.format(CHANNEL_FORMAT,
-                                   getConnection().getConnectionId(),
+                                   ((ServerConnection) getConnection()).getConnectionId(),
                                    getClientID(),
                                    ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
                                    getVirtualHost().getName(),
                                    getChannel())
             + "] ";
     }
+
+    @Override
+    public void close()
+    {
+        // unregister subscriptions in order to prevent sending of new messages
+        // to subscriptions with closing session
+        final Collection<Subscription_0_10> subscriptions = getSubscriptions();
+        for (Subscription_0_10 subscription_0_10 : subscriptions)
+        {
+            unregister(subscription_0_10);
+        }
+
+        super.close();
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org