You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/19 17:13:38 UTC

svn commit: r1172657 [9/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/examples/cshar...

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java Mon Sep 19 15:13:18 2011
@@ -18,39 +18,56 @@
  */
 package org.apache.qpid.server.plugins;
 
-import static org.apache.felix.framework.util.FelixConstants.*;
-import static org.apache.felix.main.AutoProcessor.*;
+import static org.apache.felix.framework.util.FelixConstants.SYSTEMBUNDLE_ACTIVATORS_PROP;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_ACTION_PROPERY;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_DIR_PROPERY;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_INSTALL_VALUE;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_START_VALUE;
+import static org.apache.felix.main.AutoProcessor.process;
+import static org.osgi.framework.Constants.FRAMEWORK_STORAGE;
+import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN;
+import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT;
+import static org.osgi.framework.Constants.FRAMEWORK_SYSTEMPACKAGES;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.felix.framework.Felix;
 import org.apache.felix.framework.util.StringMap;
 import org.apache.log4j.Logger;
 import org.apache.qpid.common.Closeable;
+import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.server.configuration.TopicConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
 import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory;
 import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory;
 import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
 import org.apache.qpid.server.exchange.ExchangeType;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.SecurityPluginFactory;
 import org.apache.qpid.server.security.access.plugins.AllowAll;
 import org.apache.qpid.server.security.access.plugins.DenyAll;
 import org.apache.qpid.server.security.access.plugins.LegacyAccess;
-import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
 import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection;
+import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
 import org.apache.qpid.server.virtualhost.plugins.policies.TopicDeletePolicy;
 import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
+import org.apache.qpid.util.FileUtils;
 import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleException;
+import org.osgi.framework.Version;
 import org.osgi.framework.launch.Framework;
 import org.osgi.util.tracker.ServiceTracker;
 
@@ -63,7 +80,6 @@ public class PluginManager implements Cl
     private static final Logger _logger = Logger.getLogger(PluginManager.class);
 
     private static final int FELIX_STOP_TIMEOUT = 30000;
-    private static final String QPID_VER_SUFFIX = "version=0.13,";
 
     private Framework _felix;
 
@@ -72,15 +88,61 @@ public class PluginManager implements Cl
     private ServiceTracker _configTracker = null;
     private ServiceTracker _virtualHostTracker = null;
     private ServiceTracker _policyTracker = null;
+    private ServiceTracker _authenticationManagerTracker = null;
 
     private Activator _activator;
 
+    private final List<ServiceTracker> _trackers = new ArrayList<ServiceTracker>();
     private Map<String, SecurityPluginFactory> _securityPlugins = new HashMap<String, SecurityPluginFactory>();
     private Map<List<String>, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap<List<String>, ConfigurationPluginFactory>();
     private Map<String, VirtualHostPluginFactory> _vhostPlugins = new HashMap<String, VirtualHostPluginFactory>();
     private Map<String, SlowConsumerPolicyPluginFactory> _policyPlugins = new HashMap<String, SlowConsumerPolicyPluginFactory>();
+    private Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> _authenticationManagerPlugins = new HashMap<String, AuthenticationManagerPluginFactory<? extends Plugin>>();
 
-    public PluginManager(String pluginPath, String cachePath) throws Exception
+    /** The default name of the OSGI system package list. */
+    private static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/server/plugins/OsgiSystemPackages.properties";
+    
+    /** The name of the override system property that holds the name of the OSGI system package list. */
+    private static final String FILE_PROPERTY = "qpid.osgisystempackages.properties";
+    
+    private static final String OSGI_SYSTEM_PACKAGES;
+    
+    static 
+    {
+        final String filename = System.getProperty(FILE_PROPERTY);
+        final InputStream is = FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
+                    PluginManager.class.getClassLoader());
+        
+        try
+        {
+            Version qpidReleaseVersion;
+            try
+            {
+                qpidReleaseVersion = Version.parseVersion(QpidProperties.getReleaseVersion());
+            }
+            catch (IllegalArgumentException iae)
+            {
+                qpidReleaseVersion = null;
+            }
+            
+            final Properties p  = new Properties();
+            p.load(is);
+            
+            final OsgiSystemPackageUtil osgiSystemPackageUtil = new OsgiSystemPackageUtil(qpidReleaseVersion, (Map)p);
+            
+            OSGI_SYSTEM_PACKAGES = osgiSystemPackageUtil.getFormattedSystemPackageString();
+            
+            _logger.debug("List of OSGi system packages to be added: " + OSGI_SYSTEM_PACKAGES);
+        }
+        catch (IOException e)
+        {
+            _logger.error("Error reading OSGI system package list", e);
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+    
+    
+    public PluginManager(String pluginPath, String cachePath, BundleContext bundleContext) throws Exception
     {
         // Store all non-OSGi plugins
         // A little gross that we have to add them here, but not all the plugins are OSGIfied
@@ -97,7 +159,8 @@ public class PluginManager implements Cl
                 LegacyAccess.LegacyAccessConfiguration.FACTORY,
                 new SlowConsumerDetectionConfigurationFactory(),
                 new SlowConsumerDetectionPolicyConfigurationFactory(),
-                new SlowConsumerDetectionQueueConfigurationFactory()))
+                new SlowConsumerDetectionQueueConfigurationFactory(),
+                PrincipalDatabaseAuthenticationManager.PrincipalDatabaseAuthenticationManagerConfiguration.FACTORY))
         {
             _configPlugins.put(configFactory.getParentPaths(), configFactory);
         }
@@ -112,125 +175,109 @@ public class PluginManager implements Cl
             _vhostPlugins.put(pluginFactory.getClass().getName(), pluginFactory);
         }
 
-        // Check the plugin directory path is set and exist
-        if (pluginPath == null)
+        for (AuthenticationManagerPluginFactory<? extends Plugin> pluginFactory : Arrays.asList(
+                PrincipalDatabaseAuthenticationManager.FACTORY))
         {
-            return;
+            _authenticationManagerPlugins.put(pluginFactory.getPluginName(), pluginFactory);
         }
-        File pluginDir = new File(pluginPath);
-        if (!pluginDir.exists())
-        {
-            return;
-        } 
-
-        // Setup OSGi configuration propery map
-        StringMap configMap = new StringMap(false);
-
-        // Add the bundle provided service interface package and the core OSGi
-        // packages to be exported from the class path via the system bundle.
-        configMap.put(FRAMEWORK_SYSTEMPACKAGES,
-                "org.osgi.framework; version=1.3.0," +
-                "org.osgi.service.packageadmin; version=1.2.0," +
-                "org.osgi.service.startlevel; version=1.0.0," +
-                "org.osgi.service.url; version=1.0.0," +
-                "org.osgi.util.tracker; version=1.0.0," +
-                "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX +
-                "org.apache.qpid; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.common; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.exchange; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.framing; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.protocol; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.management; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.security; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.util; " + QPID_VER_SUFFIX +
-                "org.apache.commons.configuration; version=1.0.0," +
-                "org.apache.commons.lang; version=1.0.0," +
-                "org.apache.commons.lang.builder; version=1.0.0," +
-                "org.apache.commons.logging; version=1.0.0," +
-                "org.apache.log4j; version=1.2.12," +
-                "javax.management.openmbean; version=1.0.0," +
-                "javax.management; version=1.0.0"
-            );
-        
-        // No automatic shutdown hook
-        configMap.put("felix.shutdown.hook", "false");
-        
-        // Add system activator
-        List<BundleActivator> activators = new ArrayList<BundleActivator>();
-        _activator = new Activator();
-        activators.add(_activator);
-        configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
 
-        if (cachePath != null)
+        if(bundleContext == null)
         {
-            File cacheDir = new File(cachePath);
-            if (!cacheDir.exists() && cacheDir.canWrite())
+            // Check the plugin directory path is set and exist
+            if (pluginPath == null)
             {
-                _logger.info("Creating plugin cache directory: " + cachePath);
-                cacheDir.mkdir();
+                _logger.info("No plugin path specified, no plugins will be loaded.");
+                return;
             }
-            
-            // Set plugin cache directory and empty it
-            _logger.info("Cache bundles in directory " + cachePath);
-            configMap.put(FRAMEWORK_STORAGE, cachePath);
-        }
-        configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
-        
-        // Set directory with plugins to auto-deploy
-        _logger.info("Auto deploying bundles from directory " + pluginPath);
-        configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
-        configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);        
-        
-        // Start plugin manager and trackers
-        _felix = new Felix(configMap);
-        try
-        {
-            _logger.info("Starting plugin manager...");
-            _felix.init();
-	        process(configMap, _felix.getBundleContext());
-            _felix.start();
-            _logger.info("Started plugin manager");
+            File pluginDir = new File(pluginPath);
+            if (!pluginDir.exists())
+            {
+                _logger.warn("Plugin dir : "  + pluginDir + " does not exist.");
+                return;
+            }
+
+            // Add the bundle provided service interface package and the core OSGi
+            // packages to be exported from the class path via the system bundle.
+
+            // Setup OSGi configuration property map
+            final StringMap configMap = new StringMap(false);
+            configMap.put(FRAMEWORK_SYSTEMPACKAGES, OSGI_SYSTEM_PACKAGES);
+
+            // No automatic shutdown hook
+            configMap.put("felix.shutdown.hook", "false");
+
+            // Add system activator
+            List<BundleActivator> activators = new ArrayList<BundleActivator>();
+            _activator = new Activator();
+            activators.add(_activator);
+            configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
+
+            if (cachePath != null)
+            {
+                File cacheDir = new File(cachePath);
+                if (!cacheDir.exists() && cacheDir.canWrite())
+                {
+                    _logger.info("Creating plugin cache directory: " + cachePath);
+                    cacheDir.mkdir();
+                }
+
+                // Set plugin cache directory and empty it
+                _logger.info("Cache bundles in directory " + cachePath);
+                configMap.put(FRAMEWORK_STORAGE, cachePath);
+            }
+            configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
+
+            // Set directory with plugins to auto-deploy
+            _logger.info("Auto deploying bundles from directory " + pluginPath);
+            configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
+            configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);
+
+            // Start plugin manager
+            _felix = new Felix(configMap);
+            try
+            {
+                _logger.info("Starting plugin manager framework");
+                _felix.init();
+                process(configMap, _felix.getBundleContext());
+                _felix.start();
+                _logger.info("Started plugin manager framework");
+            }
+            catch (BundleException e)
+            {
+                throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
+            }
+
+            bundleContext = _activator.getContext();
         }
-        catch (BundleException e)
+        else
         {
-            throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
+            _logger.info("Using the specified external BundleContext");
         }
-        
-        // TODO save trackers in a map, keyed by class name
-        
-        _exchangeTracker = new ServiceTracker(_activator.getContext(), ExchangeType.class.getName(), null);
+
+        _exchangeTracker = new ServiceTracker(bundleContext, ExchangeType.class.getName(), null);
         _exchangeTracker.open();
+        _trackers.add(_exchangeTracker);
 
-        _securityTracker = new ServiceTracker(_activator.getContext(), SecurityPluginFactory.class.getName(), null);
+        _securityTracker = new ServiceTracker(bundleContext, SecurityPluginFactory.class.getName(), null);
         _securityTracker.open();
+        _trackers.add(_securityTracker);
 
-        _configTracker = new ServiceTracker(_activator.getContext(), ConfigurationPluginFactory.class.getName(), null);
+        _configTracker = new ServiceTracker(bundleContext, ConfigurationPluginFactory.class.getName(), null);
         _configTracker.open();
+        _trackers.add(_configTracker);
 
-        _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null);
+        _virtualHostTracker = new ServiceTracker(bundleContext, VirtualHostPluginFactory.class.getName(), null);
         _virtualHostTracker.open();
+        _trackers.add(_virtualHostTracker);
  
-        _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null);
+        _policyTracker = new ServiceTracker(bundleContext, SlowConsumerPolicyPluginFactory.class.getName(), null);
         _policyTracker.open();
-        
+        _trackers.add(_policyTracker);
+
+        _authenticationManagerTracker = new ServiceTracker(bundleContext, AuthenticationManagerPluginFactory.class.getName(), null);
+        _authenticationManagerTracker.open();
+        _trackers.add(_authenticationManagerTracker);
+
         _logger.info("Opened service trackers");
     }
 
@@ -301,22 +348,26 @@ public class PluginManager implements Cl
         return getServices(_securityTracker, _securityPlugins);
     }
 
+    public Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> getAuthenticationManagerPlugins()
+    {
+        return getServices(_authenticationManagerTracker, _authenticationManagerPlugins);
+    }
+
     public void close()
     {
-        if (_felix != null)
+        try
         {
-            try
+            // Close all bundle trackers
+            for(ServiceTracker tracker : _trackers)
             {
-                // Close all bundle trackers
-                _exchangeTracker.close();
-                _securityTracker.close();
-                _configTracker.close();
-                _virtualHostTracker.close();
-                _policyTracker.close();
+                tracker.close();
             }
-            finally
+        }
+        finally
+        {
+            if (_felix != null)
             {
-                _logger.info("Stopping plugin manager");
+                _logger.info("Stopping plugin manager framework");
                 try
                 {
                     // FIXME should be stopAndWait() but hangs VM, need upgrade in felix
@@ -335,7 +386,12 @@ public class PluginManager implements Cl
                 {
                     // Ignore
                 }
-                _logger.info("Stopped plugin manager");
+                _logger.info("Stopped plugin manager framework");
+            }
+            else
+            {
+                _logger.info("Plugin manager was started with an external BundleContext, " +
+                             "skipping remaining shutdown tasks");
             }
         }
     }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Mon Sep 19 15:13:18 2011
@@ -69,4 +69,8 @@ public interface AMQConnectionModel exte
      * Return a {@link LogSubject} for the connection.
      */
     public LogSubject getLogSubject();
+
+    public String getUserName();
+
+    public boolean isSessionNameUnique(String name);
 }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -34,9 +36,9 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.JMException;
+import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
 
 import org.apache.log4j.Logger;
@@ -64,12 +66,10 @@ import org.apache.qpid.framing.MethodDis
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -88,22 +88,22 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.NetworkConnection;
 
-public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
     private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
 
-    private static final AtomicLong idGenerator = new AtomicLong(0);
-
     // to save boxing the channelId and looking up in a map... cache in an array the low numbered
     // channels.  This value must be of the form 2^x - 1.
     private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -133,7 +133,7 @@ public class AMQProtocolEngine implement
     private Object _lastSent;
 
     protected volatile boolean _closed;
-    
+
     // maximum number of channels this session should have
     private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
 
@@ -145,12 +145,11 @@ public class AMQProtocolEngine implement
 
     private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
     private ProtocolOutputConverter _protocolOutputConverter;
-    private Principal _authorizedID;
+    private Subject _authorizedSubject;
     private MethodDispatcher _dispatcher;
     private ProtocolSessionIdentifier _sessionIdentifier;
 
-    // Create a simple ID that increments for ever new Session
-    private final long _sessionID = idGenerator.getAndIncrement();
+    private final long _sessionID;
 
     private AMQPConnectionActor _actor;
     private LogSubject _logSubject;
@@ -160,37 +159,32 @@ public class AMQProtocolEngine implement
     private long _writtenBytes;
     private long _readBytes;
 
-    private Job _readJob;
-    private Job _writeJob;
 
-    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
     private long _maxFrameSize;
     private final AtomicBoolean _closing = new AtomicBoolean(false);
     private final UUID _id;
     private final ConfigStore _configStore;
     private long _createTime = System.currentTimeMillis();
-    
+
     private ApplicationRegistry _registry;
     private boolean _statisticsEnabled = false;
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
-    private final NetworkConnection _network;
-    private final Sender<ByteBuffer> _sender;
+    private NetworkConnection _network;
+    private Sender<ByteBuffer> _sender;
 
     public ManagedObject getManagedObject()
     {
         return _managedObject;
     }
 
-    public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network)
+    public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _codecFactory = new AMQCodecFactory(true, this);
-        _poolReference.acquireExecutorService();
-        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
-        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
-        _network = network;
-        _sender = _network.getSender();
+
+        setNetworkConnection(network);
+        _sessionID = connectionId;
 
         _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
 
@@ -205,6 +199,17 @@ public class AMQProtocolEngine implement
         initialiseStatistics();
     }
 
+    public void setNetworkConnection(NetworkConnection network)
+    {
+        setNetworkConnection(network, network.getSender());
+    }
+
+    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    {
+        _network = network;
+        _sender = sender;
+    }
+
     private AMQProtocolSessionMBean createMBean() throws JMException
     {
         return new AMQProtocolSessionMBean(this);
@@ -241,26 +246,18 @@ public class AMQProtocolEngine implement
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
-            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+            for (AMQDataBlock dataBlock : dataBlocks)
             {
-                public void run()
+                try
                 {
-                    // Decode buffer
-
-                    for (AMQDataBlock dataBlock : dataBlocks)
-                    {
-                        try
-                        {
-                            dataBlockReceived(dataBlock);
-                        }
-                        catch (Exception e)
-                        {
-                            _logger.error("Unexpected exception when processing datablock", e);
-                            closeProtocolSession();
-                        }
-                    }
+                    dataBlockReceived(dataBlock);
                 }
-            });
+                catch (Exception e)
+                {
+                    _logger.error("Unexpected exception when processing datablock", e);
+                    closeProtocolSession();
+                }
+            }
         }
         catch (Exception e)
         {
@@ -338,6 +335,11 @@ public class AMQProtocolEngine implement
                 closeChannel(channelId);
                 throw e;
             }
+            catch (TransportException e)
+            {
+                closeChannel(channelId);
+                throw e;
+            }
         }
         finally
         {
@@ -348,7 +350,7 @@ public class AMQProtocolEngine implement
     private void protocolInitiationReceived(ProtocolInitiation pi)
     {
         // this ensures the codec never checks for a PI message again
-        ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+        (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
         try
         {
             // Log incomming protocol negotiation request
@@ -368,15 +370,49 @@ public class AMQProtocolEngine implement
                                                                                        null,
                                                                                        mechanisms.getBytes(),
                                                                                        locales.getBytes());
-            _sender.send(responseBody.generateFrame(0).toNioByteBuffer());
+            _sender.send(asByteBuffer(responseBody.generateFrame(0)));
+            _sender.flush();
 
         }
         catch (AMQException e)
         {
             _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
 
-            _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
+            _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
+            _sender.flush();
+        }
+    }
+
+    private ByteBuffer asByteBuffer(AMQDataBlock block)
+    {
+        final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+
+        try
+        {
+            block.writePayload(new DataOutputStream(new OutputStream()
+            {
+
+
+                @Override
+                public void write(int b) throws IOException
+                {
+                    buf.put((byte) b);
+                }
+
+                @Override
+                public void write(byte[] b, int off, int len) throws IOException
+                {
+                    buf.put(b, off, len);
+                }
+            }));
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
         }
+
+        buf.flip();
+        return buf;
     }
 
     public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
@@ -431,19 +467,19 @@ public class AMQProtocolEngine implement
                                                                    AMQConstant.CHANNEL_ERROR.getName().toString());
 
                     _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                    closeConnection(channelId, ce, false);
+                    closeConnection(channelId, ce);
                 }
             }
             catch (AMQConnectionException e)
             {
                 _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                closeConnection(channelId, e, false);
+                closeConnection(channelId, e);
             }
             catch (AMQSecurityException e)
             {
                 AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
                 _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                closeConnection(channelId, ce, false);
+                closeConnection(channelId, ce);
             }
         }
         catch (Exception e)
@@ -486,19 +522,14 @@ public class AMQProtocolEngine implement
      *
      * @param frame the frame to write
      */
-    public void writeFrame(AMQDataBlock frame)
+    public synchronized void writeFrame(AMQDataBlock frame)
     {
         _lastSent = frame;
-        final ByteBuffer buf = frame.toNioByteBuffer();
+        final ByteBuffer buf = asByteBuffer(frame);
         _lastIoTime = System.currentTimeMillis();
         _writtenBytes += buf.remaining();
-        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
-        {
-            public void run()
-            {
-                _sender.send(buf);
-            }
-        });
+        _sender.send(buf);
+        _sender.flush();
     }
 
     public AMQShortString getContextKey()
@@ -730,7 +761,7 @@ public class AMQProtocolEngine implement
                 }
 
                 closeAllChannels();
-                
+
                 getConfigStore().removeConfiguredObject(this);
 
                 if (_managedObject != null)
@@ -750,7 +781,6 @@ public class AMQProtocolEngine implement
                     _closed = true;
                     notifyAll();
                 }
-                _poolReference.releaseExecutorService();
                 CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
             }
         }
@@ -773,27 +803,32 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
+    public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
     {
-        if (_logger.isInfoEnabled())
+        try
         {
-            _logger.info("Closing connection due to: " + e);
-        }
-
-        markChannelAwaitingCloseOk(channelId);
-        closeSession();
-        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
-        writeFrame(e.getCloseFrame(channelId));
+            if (_logger.isInfoEnabled())
+            {
+                _logger.info("Closing connection due to: " + e);
+            }
 
-        if (closeProtocolSession)
+            markChannelAwaitingCloseOk(channelId);
+            closeSession();
+            _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+            writeFrame(e.getCloseFrame(channelId));
+        }
+        finally
         {
             closeProtocolSession();
         }
+
+
     }
 
     public void closeProtocolSession()
     {
-        _sender.close();
+        _network.close();
+
         try
         {
             _stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -802,11 +837,15 @@ public class AMQProtocolEngine implement
         {
             _logger.info(e.getMessage());
         }
+        catch (TransportException e)
+        {
+            _logger.info(e.getMessage());
+        }
     }
 
     public String toString()
     {
-        return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
+        return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
     }
 
     public String dump()
@@ -924,7 +963,7 @@ public class AMQProtocolEngine implement
         _virtualHost = virtualHost;
 
         _virtualHost.getConnectionRegistry().registerConnection(this);
-        
+
         _configStore.addConfiguredObject(this);
 
         try
@@ -953,19 +992,23 @@ public class AMQProtocolEngine implement
         return _protocolOutputConverter;
     }
 
-    public void setAuthorizedID(Principal authorizedID)
+    public void setAuthorizedSubject(final Subject authorizedSubject)
     {
-        _authorizedID = authorizedID;
+        if (authorizedSubject == null)
+        {
+            throw new IllegalArgumentException("authorizedSubject cannot be null");
+        }
+        _authorizedSubject = authorizedSubject;
     }
 
-    public Principal getAuthorizedID()
+    public Subject getAuthorizedSubject()
     {
-        return _authorizedID;
+        return _authorizedSubject;
     }
 
-    public Principal getPrincipal()
+    public Principal getAuthorizedPrincipal()
     {
-        return _authorizedID;
+        return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
     }
 
     public SocketAddress getRemoteAddress()
@@ -998,6 +1041,10 @@ public class AMQProtocolEngine implement
         {
            _logger.error("Could not close protocol engine", e);
         }
+        catch (TransportException e)
+        {
+           _logger.error("Could not close protocol engine", e);
+        }
     }
 
     public void readerIdle()
@@ -1007,7 +1054,7 @@ public class AMQProtocolEngine implement
 
     public void writerIdle()
     {
-        _sender.send(HeartbeatBody.FRAME.toNioByteBuffer());
+        _sender.send(asByteBuffer(HeartbeatBody.FRAME));
     }
 
     public void exception(Throwable throwable)
@@ -1089,7 +1136,7 @@ public class AMQProtocolEngine implement
 
     public String getAuthId()
     {
-        return getAuthorizedID().getName();
+        return getAuthorizedPrincipal().getName();
     }
 
     public Integer getRemotePID()
@@ -1151,7 +1198,7 @@ public class AMQProtocolEngine implement
     {
         return false;
     }
-    
+
     public void mgmtClose()
     {
         MethodRegistry methodRegistry = getMethodRegistry();
@@ -1253,7 +1300,7 @@ public class AMQProtocolEngine implement
                         new AMQShortString(message),
                         0,0);
 
-        writeFrame(responseBody.generateFrame((Integer)session.getID()));       
+        writeFrame(responseBody.generateFrame((Integer)session.getID()));
     }
 
     public void close(AMQConstant cause, String message) throws AMQException
@@ -1261,12 +1308,12 @@ public class AMQProtocolEngine implement
         closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
 		                getProtocolOutputConverter().getProtocolMajorVersion(),
 		                getProtocolOutputConverter().getProtocolMinorVersion(),
-		                (Throwable) null), true);
+		                (Throwable) null));
     }
 
     public List<AMQSessionModel> getSessionModels()
     {
-		List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); 
+		List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
 		for (AMQChannel channel : getChannels())
 		{
 		    sessions.add((AMQSessionModel) channel);
@@ -1298,27 +1345,27 @@ public class AMQProtocolEngine implement
         }
         _virtualHost.registerMessageReceived(messageSize, timestamp);
     }
-    
+
     public StatisticsCounter getMessageReceiptStatistics()
     {
         return _messagesReceived;
     }
-    
+
     public StatisticsCounter getDataReceiptStatistics()
     {
         return _dataReceived;
     }
-    
+
     public StatisticsCounter getMessageDeliveryStatistics()
     {
         return _messagesDelivered;
     }
-    
+
     public StatisticsCounter getDataDeliveryStatistics()
     {
         return _dataDelivered;
     }
-    
+
     public void resetStatistics()
     {
         _messagesDelivered.reset();
@@ -1331,7 +1378,7 @@ public class AMQProtocolEngine implement
     {
         setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
                 _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
-        
+
         _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
         _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
         _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
@@ -1347,4 +1394,16 @@ public class AMQProtocolEngine implement
     {
         _statisticsEnabled = enabled;
     }
+
+    @Override
+    public boolean isSessionNameUnique(String name)
+    {
+        return true;
+    }
+
+    @Override
+    public String getUserName()
+    {
+        return getAuthorizedPrincipal().getName();
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Mon Sep 19 15:13:18 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol;
 
+import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
 
 import org.apache.qpid.AMQException;
@@ -28,16 +29,15 @@ import org.apache.qpid.framing.*;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.security.Principal;
 import java.util.List;
 
 
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel
 {
     long getSessionID();
 
@@ -163,8 +163,10 @@ public interface AMQProtocolSession exte
     /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     void closeSession() throws AMQException;
 
+    void closeProtocolSession();
+
     /** This must be called to close the session in order to free up any resources managed by the session. */
-    void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
+    void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
 
 
     /** @return a key that uniquely identifies this session */
@@ -205,7 +207,7 @@ public interface AMQProtocolSession exte
 
     public ProtocolOutputConverter getProtocolOutputConverter();
 
-    void setAuthorizedID(Principal authorizedID);
+    void setAuthorizedSubject(Subject authorizedSubject);
 
     public java.net.SocketAddress getRemoteAddress();
 

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Mon Sep 19 15:13:18 2011
@@ -131,7 +131,7 @@ public class AMQProtocolSessionMBean ext
 
     public String getAuthorizedId()
     {
-        return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null;
+        return (_protocolSession.getAuthorizedPrincipal() != null ) ? _protocolSession.getAuthorizedPrincipal().getName() : null;
     }
 
     public String getVersion()

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Mon Sep 19 15:13:18 2011
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol;
 
 
 import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.transport.ServerConnection;
 import org.apache.qpid.transport.ConnectionDelegate;
@@ -33,30 +33,43 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
-public class MultiVersionProtocolEngine implements ProtocolEngine
+public class MultiVersionProtocolEngine implements ServerProtocolEngine
 {
     private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
 
+    private final long _id;
+
     private Set<AmqpProtocolVersion> _supported;
     private String _fqdn;
     private IApplicationRegistry _appRegistry;
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
-    
-    private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+
+    private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+
+    public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
+                                      String fqdn,
+                                      Set<AmqpProtocolVersion> supported,
+                                      NetworkConnection network,
+                                      long id)
+    {
+        this(appRegistry,fqdn,supported,id);
+        setNetworkConnection(network);
+    }
 
     public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
                                       String fqdn,
                                       Set<AmqpProtocolVersion> supported,
-                                      NetworkConnection network)
+                                      long id)
     {
+        _id = id;
         _appRegistry = appRegistry;
         _fqdn = fqdn;
         _supported = supported;
-        _network = network;
-        _sender = _network.getSender();
+
     }
 
+
     public SocketAddress getRemoteAddress()
     {
         return _delegate.getRemoteAddress();
@@ -92,6 +105,7 @@ public class MultiVersionProtocolEngine 
         _delegate.readerIdle();
     }
 
+
     public void received(ByteBuffer msg)
     {
         _delegate.received(msg);
@@ -102,6 +116,11 @@ public class MultiVersionProtocolEngine 
         _delegate.exception(t);
     }
 
+    public long getConnectionId()
+    {
+        return _delegate.getConnectionId();
+    }
+
     private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
 
     private static final byte[] AMQP_0_8_HEADER =
@@ -126,7 +145,7 @@ public class MultiVersionProtocolEngine 
                          (byte) 9
             };
 
-private static final byte[] AMQP_0_9_1_HEADER =
+    private static final byte[] AMQP_0_9_1_HEADER =
             new byte[] { (byte) 'A',
                          (byte) 'M',
                          (byte) 'Q',
@@ -149,11 +168,23 @@ private static final byte[] AMQP_0_9_1_H
                          (byte) 10
             };
 
+    public void setNetworkConnection(NetworkConnection networkConnection)
+    {
+        setNetworkConnection(networkConnection, networkConnection.getSender());
+    }
+
+    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    {
+        _network = network;
+        _sender = sender;
+    }
+
+
     private static interface DelegateCreator
     {
         AmqpProtocolVersion getVersion();
         byte[] getHeaderIdentifier();
-        ProtocolEngine getProtocolEngine();
+        ServerProtocolEngine getProtocolEngine();
     }
 
     private DelegateCreator creator_0_8 = new DelegateCreator()
@@ -169,9 +200,9 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_8_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
         }
     };
 
@@ -189,9 +220,9 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_9_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
         }
     };
 
@@ -209,9 +240,9 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_9_1_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
         }
     };
 
@@ -230,12 +261,12 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_10_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
             final ConnectionDelegate connDelegate =
                     new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
 
-            ServerConnection conn = new ServerConnection();
+            ServerConnection conn = new ServerConnection(_id);
             conn.setConnectionDelegate(connDelegate);
 
             return new ProtocolEngine_0_10( conn, _network, _appRegistry);
@@ -246,7 +277,7 @@ private static final byte[] AMQP_0_9_1_H
             new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
 
 
-    private class ClosedDelegateProtocolEngine implements ProtocolEngine
+    private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
     {
         public SocketAddress getRemoteAddress()
         {
@@ -292,9 +323,19 @@ private static final byte[] AMQP_0_9_1_H
         {
 
         }
+
+        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+        {
+
+        }
+
+        public long getConnectionId()
+        {
+            return _id;
+        }
     }
 
-    private class SelfDelegateProtocolEngine implements ProtocolEngine
+    private class SelfDelegateProtocolEngine implements ServerProtocolEngine
     {
         private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
 
@@ -340,7 +381,7 @@ private static final byte[] AMQP_0_9_1_H
                 _header.get(headerBytes);
 
 
-                ProtocolEngine newDelegate = null;
+                ServerProtocolEngine newDelegate = null;
                 byte[] newestSupported = null;
 
                 for(int i = 0; newDelegate == null && i < _creators.length; i++)
@@ -366,14 +407,19 @@ private static final byte[] AMQP_0_9_1_H
                 if(newDelegate == null)
                 {
                     _sender.send(ByteBuffer.wrap(newestSupported));
+                    _sender.flush();
 
                     _delegate = new ClosedDelegateProtocolEngine();
+
+                    _network.close();
+
                 }
                 else
                 {
                     _delegate = newDelegate;
 
                     _header.flip();
+                    _delegate.setNetworkConnection(_network, _sender);
                     _delegate.received(_header);
                     if(msg.hasRemaining())
                     {
@@ -385,6 +431,11 @@ private static final byte[] AMQP_0_9_1_H
 
         }
 
+        public long getConnectionId()
+        {
+            return _id;
+        }
+
         public void exception(Throwable t)
         {
             _logger.error("Error establishing session", t);
@@ -404,5 +455,10 @@ private static final byte[] AMQP_0_9_1_H
         {
 
         }
+
+        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+        {
+
+        }
     }
 }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Mon Sep 19 15:13:18 2011
@@ -20,50 +20,38 @@
 */
 package org.apache.qpid.server.protocol;
 
-import java.util.EnumSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
 {
-    private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class);
+    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
 
     private final IApplicationRegistry _appRegistry;
     private final String _fqdn;
     private final Set<AmqpProtocolVersion> _supported;
 
-
-    public MultiVersionProtocolEngineFactory()
-    {
-        this(1, "localhost", ALL_VERSIONS);
-    }
-
-    public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> versions)
-    {
-        this(1, fqdn, versions);
-    }
-
-
-    public MultiVersionProtocolEngineFactory(String fqdn)
-    {
-        this(1, fqdn, ALL_VERSIONS);
-    }
-
-    public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<AmqpProtocolVersion> supportedVersions)
+    public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
     {
         _appRegistry = ApplicationRegistry.getInstance();
         _fqdn = fqdn;
         _supported = supportedVersions;
     }
 
+    public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
+    {
+        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
+    }
 
-    public ProtocolEngine newProtocolEngine(NetworkConnection network)
+    public ServerProtocolEngine newProtocolEngine()
     {
-        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network);
+        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement());
     }
+
 }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,8 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
@@ -31,9 +32,10 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.registry.IApplicationRegistry;
 
 import java.net.SocketAddress;
+import java.nio.ByteBuffer;
 import java.util.UUID;
 
-public class ProtocolEngine_0_10  extends InputHandler implements ProtocolEngine, ConnectionConfig
+public class ProtocolEngine_0_10  extends InputHandler implements ServerProtocolEngine, ConnectionConfig
 {
     public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
 
@@ -52,11 +54,16 @@ public class ProtocolEngine_0_10  extend
         super(new Assembler(conn));
         _connection = conn;
         _connection.setConnectionConfig(this);
-        _network = network;
+
         _id = appRegistry.getConfigStore().createId();
         _appRegistry = appRegistry;
 
-        _connection.setSender(new Disassembler(_network.getSender(), MAX_FRAME_SIZE));
+        if(network != null)
+        {
+            setNetworkConnection(network);
+        }
+
+
         _connection.onOpen(new Runnable()
         {
             public void run()
@@ -65,6 +72,19 @@ public class ProtocolEngine_0_10  extend
             }
         });
 
+    }
+
+    public void setNetworkConnection(NetworkConnection network)
+    {
+        setNetworkConnection(network, network.getSender());
+    }
+
+    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    {
+        _network = network;
+
+        _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+
         // FIXME Two log messages to maintain compatibility with earlier protocol versions
         _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
         _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
@@ -127,7 +147,7 @@ public class ProtocolEngine_0_10  extend
 
     public String getAuthId()
     {
-        return _connection.getAuthorizationID();
+        return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
     }
 
     public String getRemoteProcessName()
@@ -186,9 +206,14 @@ public class ProtocolEngine_0_10  extend
     {
         return false;
     }
-    
+
     public void mgmtClose()
     {
         _connection.mgmtClose();
     }
+
+    public long getConnectionId()
+    {
+        return _connection.getConnectionId();
+    }
 }

Propchange: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:13:18 2011
@@ -4,4 +4,4 @@
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1156188
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1172654

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Sep 19 15:13:18 2011
@@ -32,7 +32,7 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -69,8 +69,8 @@ public interface AMQQueue extends Managa
     boolean isAutoDelete();
 
     AMQShortString getOwner();
-    PrincipalHolder getPrincipalHolder();
-    void setPrincipalHolder(PrincipalHolder principalHolder);
+    AuthorizationHolder getAuthorizationHolder();
+    void setAuthorizationHolder(AuthorizationHolder principalHolder);
 
     void setExclusiveOwningSession(AMQSessionModel owner);
     AMQSessionModel getExclusiveOwningSession();

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Mon Sep 19 15:13:18 2011
@@ -139,7 +139,7 @@ public class IncomingMessage implements 
     public int addContentBodyFrame(final ContentChunk contentChunk)
             throws AMQException
     {
-        _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
+        _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
 
@@ -263,7 +263,7 @@ public class IncomingMessage implements 
         int written = 0;
         for(ContentChunk cb : _contentChunks)
         {
-            ByteBuffer data = cb.getData().buf();
+            ByteBuffer data = ByteBuffer.wrap(cb.getData());
             if(offset+written >= pos && offset < pos + data.limit())
             {
                 ByteBuffer src = data.duplicate();

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Sep 19 15:13:18 2011
@@ -202,9 +202,7 @@ public interface QueueEntry extends Comp
 
     void reject();
 
-    void reject(Subscription subscription);
-
-    boolean isRejectedBy(Subscription subscription);
+    boolean isRejectedBy(long subscriptionId);
 
     void dequeue();
 

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Sep 19 15:13:18 2011
@@ -51,7 +51,7 @@ public class QueueEntryImpl implements Q
 
     private MessageReference _message;
 
-    private Set<Subscription> _rejectedBy = null;
+    private Set<Long> _rejectedBy = null;
 
     private volatile EntryState _state = AVAILABLE_STATE;
 
@@ -325,19 +325,16 @@ public class QueueEntryImpl implements Q
 
     public void reject()
     {
-        reject(getDeliveredSubscription());
-    }
+        Subscription subscription = getDeliveredSubscription();
 
-    public void reject(Subscription subscription)
-    {
         if (subscription != null)
         {
             if (_rejectedBy == null)
             {
-                _rejectedBy = new HashSet<Subscription>();
+                _rejectedBy = new HashSet<Long>();
             }
 
-            _rejectedBy.add(subscription);
+            _rejectedBy.add(subscription.getSubscriptionID());
         }
         else
         {
@@ -345,12 +342,12 @@ public class QueueEntryImpl implements Q
         }
     }
 
-    public boolean isRejectedBy(Subscription subscription)
+    public boolean isRejectedBy(long subscriptionId)
     {
 
         if (_rejectedBy != null) // We have subscriptions that rejected this message
         {
-            return _rejectedBy.contains(subscription);
+            return _rejectedBy.contains(subscriptionId);
         }
         else // This messasge hasn't been rejected yet.
         {

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Sep 19 15:13:18 2011
@@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -83,7 +83,7 @@ public class SimpleAMQQueue implements A
     /** null means shared */
     private final AMQShortString _owner;
 
-    private PrincipalHolder _prinicpalHolder;
+    private AuthorizationHolder _authorizationHolder;
 
     private boolean _exclusive = false;
     private AMQSessionModel _exclusiveOwner;
@@ -102,9 +102,7 @@ public class SimpleAMQQueue implements A
 
     protected final QueueEntryList _entries;
 
-    protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
-
-    private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+    protected final SubscriptionList _subscriptionList = new SubscriptionList();
 
     private volatile Subscription _exclusiveSubscriber;
 
@@ -373,14 +371,14 @@ public class SimpleAMQQueue implements A
         return _owner;
     }
 
-    public PrincipalHolder getPrincipalHolder()
+    public AuthorizationHolder getAuthorizationHolder()
     {
-        return _prinicpalHolder;
+        return _authorizationHolder;
     }
 
-    public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
+    public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
     {
-        _prinicpalHolder = prinicpalHolder;
+        _authorizationHolder = authorizationHolder;
     }
 
 
@@ -602,25 +600,25 @@ public class SimpleAMQQueue implements A
             iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
 
              */
-            SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
-            SubscriptionList.SubscriptionNode nextNode = node.getNext();
+            SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
+            SubscriptionList.SubscriptionNode nextNode = node.findNext();
             if (nextNode == null)
             {
-                nextNode = _subscriptionList.getHead().getNext();
+                nextNode = _subscriptionList.getHead().findNext();
             }
             while (nextNode != null)
             {
-                if (_lastSubscriptionNode.compareAndSet(node, nextNode))
+                if (_subscriptionList.updateMarkedNode(node, nextNode))
                 {
                     break;
                 }
                 else
                 {
-                    node = _lastSubscriptionNode.get();
-                    nextNode = node.getNext();
+                    node = _subscriptionList.getMarkedNode();
+                    nextNode = node.findNext();
                     if (nextNode == null)
                     {
-                        nextNode = _subscriptionList.getHead().getNext();
+                        nextNode = _subscriptionList.getHead().findNext();
                     }
                 }
             }
@@ -642,7 +640,7 @@ public class SimpleAMQQueue implements A
                     Subscription sub = nextNode.getSubscription();
                     deliverToSubscription(sub, entry);
                 }
-                nextNode = nextNode.getNext();
+                nextNode = nextNode.findNext();
 
             }
         }

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Mon Sep 19 15:13:18 2011
@@ -21,7 +21,9 @@
 package org.apache.qpid.server.registry;
 
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -52,17 +54,18 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.Plugin;
 import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.SecurityManager.SecurityConfiguration;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.transport.QpidAcceptor;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.osgi.framework.BundleContext;
 
 
 /**
@@ -89,8 +92,6 @@ public abstract class ApplicationRegistr
 
     protected SecurityManager _securityManager;
 
-    protected PrincipalDatabaseManager _databaseManager;
-
     protected PluginManager _pluginManager;
 
     protected ConfigurationManager _configurationManager;
@@ -111,6 +112,8 @@ public abstract class ApplicationRegistr
     private boolean _statisticsEnabled = false;
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
+    private BundleContext _bundleContext;
+
     static
     {
         Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -209,7 +212,13 @@ public abstract class ApplicationRegistr
 
     protected ApplicationRegistry(ServerConfiguration configuration)
     {
+        this(configuration, null);
+    }
+
+    protected ApplicationRegistry(ServerConfiguration configuration, BundleContext bundleContext)
+    {
         _configuration = configuration;
+        _bundleContext = bundleContext;
     }
 
     public void configure() throws ConfigurationException
@@ -218,7 +227,7 @@ public abstract class ApplicationRegistr
 
         try
         {
-            _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory());
+            _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory(), _bundleContext);
         }
         catch (Exception e)
         {
@@ -253,11 +262,7 @@ public abstract class ApplicationRegistr
 
             _securityManager = new SecurityManager(_configuration, _pluginManager);
 
-            createDatabaseManager(_configuration);
-
-            _authenticationManager = new PrincipalDatabaseAuthenticationManager();
-
-            _databaseManager.initialiseManagement(_configuration);
+            _authenticationManager = createAuthenticationManager();
 
             _managedObjectRegistry.start();
         }
@@ -280,9 +285,51 @@ public abstract class ApplicationRegistr
         }
     }
 
-    protected void createDatabaseManager(ServerConfiguration configuration) throws Exception
+    /**
+     * Iterates across all discovered authentication manager factories, offering the security configuration to each.
+     * Expects <b>exactly</b> one authentication manager to configure and initialise itself.
+     * 
+     * It is an error to configure more than one authentication manager, or to configure none.
+     *
+     * @return authentication manager
+     * @throws ConfigurationException
+     */
+    protected AuthenticationManager createAuthenticationManager() throws ConfigurationException
     {
-        _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration);
+        final SecurityConfiguration securityConfiguration = _configuration.getConfiguration(SecurityConfiguration.class.getName());
+        final Collection<AuthenticationManagerPluginFactory<? extends Plugin>> factories = _pluginManager.getAuthenticationManagerPlugins().values();
+        
+        if (factories.size() == 0)
+        {
+            throw new ConfigurationException("No authentication manager factory plugins found.  Check the desired authentication" +
+                    "manager plugin has been placed in the plugins directory.");
+        }
+        
+        AuthenticationManager authMgr = null;
+        
+        for (final Iterator<AuthenticationManagerPluginFactory<? extends Plugin>> iterator = factories.iterator(); iterator.hasNext();)
+        {
+            final AuthenticationManagerPluginFactory<? extends Plugin> factory = (AuthenticationManagerPluginFactory<? extends Plugin>) iterator.next();
+            final AuthenticationManager tmp = factory.newInstance(securityConfiguration);
+            if (tmp != null)
+            {
+                if (authMgr != null)
+                {
+                    throw new ConfigurationException("Cannot configure more than one authentication manager."
+                            + " Both " + tmp.getClass() + " and " + authMgr.getClass() + " are configured."
+                            + " Remove configuration for one of the authentication manager, or remove the plugin JAR"
+                            + " from the classpath.");
+                }
+                authMgr = tmp;
+            }
+        }
+
+        if (authMgr == null)
+        {
+            throw new ConfigurationException("No authentication managers configured within the configure file.");
+        }
+        
+        return authMgr;
     }
 
     protected void initialiseVirtualHosts() throws Exception
@@ -422,10 +469,6 @@ public abstract class ApplicationRegistr
         //Shutdown virtualhosts
         close(_virtualHostRegistry);
 
-//      close(_accessManager);
-//
-//      close(_databaseManager);
-
         close(_authenticationManager);
 
         close(_managedObjectRegistry);
@@ -487,11 +530,6 @@ public abstract class ApplicationRegistr
         return _managedObjectRegistry;
     }
 
-    public PrincipalDatabaseManager getDatabaseManager()
-    {
-        return _databaseManager;
-    }
-
     public AuthenticationManager getAuthenticationManager()
     {
         return _authenticationManager;
@@ -539,7 +577,7 @@ public abstract class ApplicationRegistr
 
     public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
     {
-        VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
+        VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig, null);
         _virtualHostRegistry.registerVirtualHost(virtualHost);
         getBroker().addVirtualHost(virtualHost);
         return virtualHost;

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Mon Sep 19 15:13:18 2011
@@ -29,12 +29,18 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.management.JMXManagedObjectRegistry;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.osgi.framework.BundleContext;
 
 public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
 {
     public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
     {
-        super(new ServerConfiguration(configurationURL));
+        this(configurationURL, null);
+    }
+
+    public ConfigurationFileApplicationRegistry(File configurationURL, BundleContext bundleContext) throws ConfigurationException
+    {
+        super(new ServerConfiguration(configurationURL), bundleContext);
     }
 
     @Override

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Mon Sep 19 15:13:18 2011
@@ -33,7 +33,6 @@ import org.apache.qpid.server.logging.Ro
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.transport.QpidAcceptor;
@@ -63,8 +62,6 @@ public interface IApplicationRegistry ex
 
     ManagedObjectRegistry getManagedObjectRegistry();
 
-    PrincipalDatabaseManager getDatabaseManager();
-
     AuthenticationManager getAuthenticationManager();
 
     VirtualHostRegistry getVirtualHostRegistry();

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java Mon Sep 19 15:13:18 2011
@@ -18,8 +18,19 @@
  */
 package org.apache.qpid.server.security;
 
-import static org.apache.qpid.server.security.access.ObjectType.*;
-import static org.apache.qpid.server.security.access.Operation.*;
+import static org.apache.qpid.server.security.access.ObjectType.EXCHANGE;
+import static org.apache.qpid.server.security.access.ObjectType.METHOD;
+import static org.apache.qpid.server.security.access.ObjectType.OBJECT;
+import static org.apache.qpid.server.security.access.ObjectType.QUEUE;
+import static org.apache.qpid.server.security.access.ObjectType.VIRTUALHOST;
+import static org.apache.qpid.server.security.access.Operation.ACCESS;
+import static org.apache.qpid.server.security.access.Operation.BIND;
+import static org.apache.qpid.server.security.access.Operation.CONSUME;
+import static org.apache.qpid.server.security.access.Operation.CREATE;
+import static org.apache.qpid.server.security.access.Operation.DELETE;
+import static org.apache.qpid.server.security.access.Operation.PUBLISH;
+import static org.apache.qpid.server.security.access.Operation.PURGE;
+import static org.apache.qpid.server.security.access.Operation.UNBIND;
 
 import java.net.SocketAddress;
 import java.security.Principal;
@@ -29,6 +40,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import javax.security.auth.Subject;
+
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
@@ -37,11 +50,9 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.ObjectProperties;
 import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 
 /**
  * The security manager contains references to all loaded {@link SecurityPlugin}s and delegates security decisions to them based
@@ -55,7 +66,7 @@ public class SecurityManager
     private static final Logger _logger = Logger.getLogger(SecurityManager.class);
     
     /** Container for the {@link Principal} that is using to this thread. */
-    private static final ThreadLocal<Principal> _principal = new ThreadLocal<Principal>();
+    private static final ThreadLocal<Subject> _subject = new ThreadLocal<Subject>();
     
     private PluginManager _pluginManager;
     private Map<String, SecurityPluginFactory> _pluginFactories = new HashMap<String, SecurityPluginFactory>();
@@ -126,19 +137,14 @@ public class SecurityManager
         configureHostPlugins(configuration);
     }
 
-    public static Principal getThreadPrincipal()
-    {
-        return _principal.get();
-    }
-
-    public static void setThreadPrincipal(Principal principal)
+    public static Subject getThreadSubject()
     {
-        _principal.set(principal);
+        return _subject.get();
     }
 
-    public static void setThreadPrincipal(String authId)
+    public static void setThreadSubject(final Subject subject)
     {
-        setThreadPrincipal(new UsernamePrincipal(authId));
+        _subject.set(subject);
     }
 
     public void configureHostPlugins(ConfigurationPlugin hostConfig) throws ConfigurationException

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Mon Sep 19 15:13:18 2011
@@ -149,9 +149,9 @@ public class ObjectProperties extends Ha
         {
             put(Property.OWNER, queue.getOwner());
         }
-        else if (queue.getPrincipalHolder() != null)
+        else if (queue.getAuthorizationHolder() != null)
         {
-            put(Property.OWNER, queue.getPrincipalHolder().getPrincipal().getName());
+            put(Property.OWNER, queue.getAuthorizationHolder().getAuthorizedPrincipal().getName());
         }
     }
     



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org