You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [22/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Fri Oct 21 14:42:12 2011
@@ -20,9 +20,6 @@ package org.apache.qpid.server.output.am
  *
  */
 
-
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.output.HeaderPropertiesConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -38,11 +35,13 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
     private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-    private static final ProtocolVersionMethodConverter
-            PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
 
 
     public static Factory getInstanceFactory()
@@ -121,15 +120,12 @@ public class ProtocolOutputConverterImpl
             int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
 
 
-            final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-            java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
 
-            int writtenSize = 0;
+            int writtenSize = capacity;
 
+            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
 
-            writtenSize += message.getContent(buf, writtenSize);
-            buf.flip();
-            AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
 
             CompositeAMQBodyBlock
                     compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +133,55 @@ public class ProtocolOutputConverterImpl
 
             while(writtenSize < bodySize)
             {
-                buf = java.nio.ByteBuffer.allocate(capacity);
+                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+                writtenSize += capacity;
 
-                writtenSize += message.getContent(buf, writtenSize);
-                buf.flip();
-                writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+                writeFrame(new AMQFrame(channelId, body));
             }
         }
     }
 
+    private class MessageContentSourceBody implements AMQBody
+    {
+        public static final byte TYPE = 3;
+        private int _length;
+        private MessageContentSource _message;
+        private int _offset;
+
+        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+        {
+            _message = message;
+            _offset = offset;
+            _length = length;
+        }
+
+        public byte getFrameType()
+        {
+            return TYPE;
+        }
+
+        public int getSize()
+        {
+            return _length;
+        }
+
+        public void writePayload(DataOutputStream buffer) throws IOException
+        {
+            byte[] data = new byte[_length];
+
+            _message.getContent(ByteBuffer.wrap(data), _offset);
+
+            buffer.write(data);
+        }
+
+        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+
     private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
     {
 
@@ -221,7 +257,7 @@ public class ProtocolOutputConverterImpl
                 return _underlyingBody.getSize();
             }
 
-            public void writePayload(ByteBuffer buffer)
+            public void writePayload(DataOutputStream buffer) throws IOException
             {
                 if(_underlyingBody == null)
                 {
@@ -346,7 +382,7 @@ public class ProtocolOutputConverterImpl
             return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
         }
 
-        public void writePayload(ByteBuffer buffer)
+        public void writePayload(DataOutputStream buffer) throws IOException
         {
             AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
         }
@@ -374,7 +410,7 @@ public class ProtocolOutputConverterImpl
             return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
         }
 
-        public void writePayload(ByteBuffer buffer)
+        public void writePayload(DataOutputStream buffer) throws IOException
         {
             AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
         }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java Fri Oct 21 14:42:12 2011
@@ -20,9 +20,6 @@ package org.apache.qpid.server.output.am
  *
  */
 
-
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.output.HeaderPropertiesConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -33,17 +30,16 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
     private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
-    private static final ProtocolVersionMethodConverter
-            PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
 
     public static Factory getInstanceFactory()
     {
@@ -121,15 +117,11 @@ public class ProtocolOutputConverterImpl
             int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
 
 
-            final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-            java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
 
-            int writtenSize = 0;
+            int writtenSize = capacity;
 
-
-            writtenSize += message.getContent(buf, writtenSize);
-            buf.flip();
-            AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
 
             CompositeAMQBodyBlock
                     compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +129,54 @@ public class ProtocolOutputConverterImpl
 
             while(writtenSize < bodySize)
             {
-                buf = java.nio.ByteBuffer.allocate(capacity);
+                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+                writtenSize += capacity;
 
-                writtenSize += message.getContent(buf, writtenSize);
-                buf.flip();
-                writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+                writeFrame(new AMQFrame(channelId, body));
             }
         }
     }
 
+    private class MessageContentSourceBody implements AMQBody
+    {
+        public static final byte TYPE = 3;
+        private int _length;
+        private MessageContentSource _message;
+        private int _offset;
+
+        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+        {
+            _message = message;
+            _offset = offset;
+            _length = length;
+        }
+
+        public byte getFrameType()
+        {
+            return TYPE;
+        }
+
+        public int getSize()
+        {
+            return _length;
+        }
+
+        public void writePayload(DataOutputStream buffer) throws IOException
+        {
+            byte[] data = new byte[_length];
+
+            _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+            buffer.write(data);
+        }
+
+        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
     {
 
@@ -221,7 +252,7 @@ public class ProtocolOutputConverterImpl
                 return _underlyingBody.getSize();
             }
 
-            public void writePayload(ByteBuffer buffer)
+            public void writePayload(DataOutputStream buffer) throws IOException
             {
                 if(_underlyingBody == null)
                 {
@@ -346,7 +377,7 @@ public class ProtocolOutputConverterImpl
             return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
         }
 
-        public void writePayload(ByteBuffer buffer)
+        public void writePayload(DataOutputStream buffer) throws IOException
         {
             AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
         }
@@ -374,7 +405,7 @@ public class ProtocolOutputConverterImpl
             return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
         }
 
-        public void writePayload(ByteBuffer buffer)
+        public void writePayload(DataOutputStream buffer) throws IOException
         {
             AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
         }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java Fri Oct 21 14:42:12 2011
@@ -27,5 +27,5 @@ public interface Plugin
     /**
      * Provide Configuration to this plugin
      */
-    public void configure(ConfigurationPlugin config);
+    public void configure(ConfigurationPlugin config) throws ConfigurationException;
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java Fri Oct 21 14:42:12 2011
@@ -18,39 +18,56 @@
  */
 package org.apache.qpid.server.plugins;
 
-import static org.apache.felix.framework.util.FelixConstants.*;
-import static org.apache.felix.main.AutoProcessor.*;
+import static org.apache.felix.framework.util.FelixConstants.SYSTEMBUNDLE_ACTIVATORS_PROP;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_ACTION_PROPERY;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_DIR_PROPERY;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_INSTALL_VALUE;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_START_VALUE;
+import static org.apache.felix.main.AutoProcessor.process;
+import static org.osgi.framework.Constants.FRAMEWORK_STORAGE;
+import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN;
+import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT;
+import static org.osgi.framework.Constants.FRAMEWORK_SYSTEMPACKAGES;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.felix.framework.Felix;
 import org.apache.felix.framework.util.StringMap;
 import org.apache.log4j.Logger;
 import org.apache.qpid.common.Closeable;
+import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.server.configuration.TopicConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
 import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory;
 import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory;
 import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
 import org.apache.qpid.server.exchange.ExchangeType;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.SecurityPluginFactory;
 import org.apache.qpid.server.security.access.plugins.AllowAll;
 import org.apache.qpid.server.security.access.plugins.DenyAll;
 import org.apache.qpid.server.security.access.plugins.LegacyAccess;
-import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
 import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection;
+import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
 import org.apache.qpid.server.virtualhost.plugins.policies.TopicDeletePolicy;
 import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
+import org.apache.qpid.util.FileUtils;
 import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleException;
+import org.osgi.framework.Version;
 import org.osgi.framework.launch.Framework;
 import org.osgi.util.tracker.ServiceTracker;
 
@@ -63,7 +80,6 @@ public class PluginManager implements Cl
     private static final Logger _logger = Logger.getLogger(PluginManager.class);
 
     private static final int FELIX_STOP_TIMEOUT = 30000;
-    private static final String QPID_VER_SUFFIX = "version=0.9,";
 
     private Framework _felix;
 
@@ -72,15 +88,61 @@ public class PluginManager implements Cl
     private ServiceTracker _configTracker = null;
     private ServiceTracker _virtualHostTracker = null;
     private ServiceTracker _policyTracker = null;
+    private ServiceTracker _authenticationManagerTracker = null;
 
     private Activator _activator;
 
+    private final List<ServiceTracker> _trackers = new ArrayList<ServiceTracker>();
     private Map<String, SecurityPluginFactory> _securityPlugins = new HashMap<String, SecurityPluginFactory>();
     private Map<List<String>, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap<List<String>, ConfigurationPluginFactory>();
     private Map<String, VirtualHostPluginFactory> _vhostPlugins = new HashMap<String, VirtualHostPluginFactory>();
     private Map<String, SlowConsumerPolicyPluginFactory> _policyPlugins = new HashMap<String, SlowConsumerPolicyPluginFactory>();
+    private Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> _authenticationManagerPlugins = new HashMap<String, AuthenticationManagerPluginFactory<? extends Plugin>>();
 
-    public PluginManager(String pluginPath, String cachePath) throws Exception
+    /** The default name of the OSGI system package list. */
+    private static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/server/plugins/OsgiSystemPackages.properties";
+    
+    /** The name of the override system property that holds the name of the OSGI system package list. */
+    private static final String FILE_PROPERTY = "qpid.osgisystempackages.properties";
+    
+    private static final String OSGI_SYSTEM_PACKAGES;
+    
+    static 
+    {
+        final String filename = System.getProperty(FILE_PROPERTY);
+        final InputStream is = FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
+                    PluginManager.class.getClassLoader());
+        
+        try
+        {
+            Version qpidReleaseVersion;
+            try
+            {
+                qpidReleaseVersion = Version.parseVersion(QpidProperties.getReleaseVersion());
+            }
+            catch (IllegalArgumentException iae)
+            {
+                qpidReleaseVersion = null;
+            }
+            
+            final Properties p  = new Properties();
+            p.load(is);
+            
+            final OsgiSystemPackageUtil osgiSystemPackageUtil = new OsgiSystemPackageUtil(qpidReleaseVersion, (Map)p);
+            
+            OSGI_SYSTEM_PACKAGES = osgiSystemPackageUtil.getFormattedSystemPackageString();
+            
+            _logger.debug("List of OSGi system packages to be added: " + OSGI_SYSTEM_PACKAGES);
+        }
+        catch (IOException e)
+        {
+            _logger.error("Error reading OSGI system package list", e);
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+    
+    
+    public PluginManager(String pluginPath, String cachePath, BundleContext bundleContext) throws Exception
     {
         // Store all non-OSGi plugins
         // A little gross that we have to add them here, but not all the plugins are OSGIfied
@@ -97,7 +159,8 @@ public class PluginManager implements Cl
                 LegacyAccess.LegacyAccessConfiguration.FACTORY,
                 new SlowConsumerDetectionConfigurationFactory(),
                 new SlowConsumerDetectionPolicyConfigurationFactory(),
-                new SlowConsumerDetectionQueueConfigurationFactory()))
+                new SlowConsumerDetectionQueueConfigurationFactory(),
+                PrincipalDatabaseAuthenticationManager.PrincipalDatabaseAuthenticationManagerConfiguration.FACTORY))
         {
             _configPlugins.put(configFactory.getParentPaths(), configFactory);
         }
@@ -112,125 +175,109 @@ public class PluginManager implements Cl
             _vhostPlugins.put(pluginFactory.getClass().getName(), pluginFactory);
         }
 
-        // Check the plugin directory path is set and exist
-        if (pluginPath == null)
+        for (AuthenticationManagerPluginFactory<? extends Plugin> pluginFactory : Arrays.asList(
+                PrincipalDatabaseAuthenticationManager.FACTORY))
         {
-            return;
+            _authenticationManagerPlugins.put(pluginFactory.getPluginName(), pluginFactory);
         }
-        File pluginDir = new File(pluginPath);
-        if (!pluginDir.exists())
-        {
-            return;
-        } 
-
-        // Setup OSGi configuration propery map
-        StringMap configMap = new StringMap(false);
-
-        // Add the bundle provided service interface package and the core OSGi
-        // packages to be exported from the class path via the system bundle.
-        configMap.put(FRAMEWORK_SYSTEMPACKAGES,
-                "org.osgi.framework; version=1.3.0," +
-                "org.osgi.service.packageadmin; version=1.2.0," +
-                "org.osgi.service.startlevel; version=1.0.0," +
-                "org.osgi.service.url; version=1.0.0," +
-                "org.osgi.util.tracker; version=1.0.0," +
-                "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX +
-                "org.apache.qpid; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.common; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.exchange; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.framing; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.protocol; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.management; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.security; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX +
-                "org.apache.qpid.util; " + QPID_VER_SUFFIX +
-                "org.apache.commons.configuration; version=1.0.0," +
-                "org.apache.commons.lang; version=1.0.0," +
-                "org.apache.commons.lang.builder; version=1.0.0," +
-                "org.apache.commons.logging; version=1.0.0," +
-                "org.apache.log4j; version=1.2.12," +
-                "javax.management.openmbean; version=1.0.0," +
-                "javax.management; version=1.0.0"
-            );
-        
-        // No automatic shutdown hook
-        configMap.put("felix.shutdown.hook", "false");
-        
-        // Add system activator
-        List<BundleActivator> activators = new ArrayList<BundleActivator>();
-        _activator = new Activator();
-        activators.add(_activator);
-        configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
 
-        if (cachePath != null)
+        if(bundleContext == null)
         {
-            File cacheDir = new File(cachePath);
-            if (!cacheDir.exists() && cacheDir.canWrite())
+            // Check the plugin directory path is set and exist
+            if (pluginPath == null)
             {
-                _logger.info("Creating plugin cache directory: " + cachePath);
-                cacheDir.mkdir();
+                _logger.info("No plugin path specified, no plugins will be loaded.");
+                return;
             }
-            
-            // Set plugin cache directory and empty it
-            _logger.info("Cache bundles in directory " + cachePath);
-            configMap.put(FRAMEWORK_STORAGE, cachePath);
-        }
-        configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
-        
-        // Set directory with plugins to auto-deploy
-        _logger.info("Auto deploying bundles from directory " + pluginPath);
-        configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
-        configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);        
-        
-        // Start plugin manager and trackers
-        _felix = new Felix(configMap);
-        try
-        {
-            _logger.info("Starting plugin manager...");
-            _felix.init();
-	        process(configMap, _felix.getBundleContext());
-            _felix.start();
-            _logger.info("Started plugin manager");
+            File pluginDir = new File(pluginPath);
+            if (!pluginDir.exists())
+            {
+                _logger.warn("Plugin dir : "  + pluginDir + " does not exist.");
+                return;
+            }
+
+            // Add the bundle provided service interface package and the core OSGi
+            // packages to be exported from the class path via the system bundle.
+
+            // Setup OSGi configuration property map
+            final StringMap configMap = new StringMap(false);
+            configMap.put(FRAMEWORK_SYSTEMPACKAGES, OSGI_SYSTEM_PACKAGES);
+
+            // No automatic shutdown hook
+            configMap.put("felix.shutdown.hook", "false");
+
+            // Add system activator
+            List<BundleActivator> activators = new ArrayList<BundleActivator>();
+            _activator = new Activator();
+            activators.add(_activator);
+            configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
+
+            if (cachePath != null)
+            {
+                File cacheDir = new File(cachePath);
+                if (!cacheDir.exists() && cacheDir.canWrite())
+                {
+                    _logger.info("Creating plugin cache directory: " + cachePath);
+                    cacheDir.mkdir();
+                }
+
+                // Set plugin cache directory and empty it
+                _logger.info("Cache bundles in directory " + cachePath);
+                configMap.put(FRAMEWORK_STORAGE, cachePath);
+            }
+            configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
+
+            // Set directory with plugins to auto-deploy
+            _logger.info("Auto deploying bundles from directory " + pluginPath);
+            configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
+            configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);
+
+            // Start plugin manager
+            _felix = new Felix(configMap);
+            try
+            {
+                _logger.info("Starting plugin manager framework");
+                _felix.init();
+                process(configMap, _felix.getBundleContext());
+                _felix.start();
+                _logger.info("Started plugin manager framework");
+            }
+            catch (BundleException e)
+            {
+                throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
+            }
+
+            bundleContext = _activator.getContext();
         }
-        catch (BundleException e)
+        else
         {
-            throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
+            _logger.info("Using the specified external BundleContext");
         }
-        
-        // TODO save trackers in a map, keyed by class name
-        
-        _exchangeTracker = new ServiceTracker(_activator.getContext(), ExchangeType.class.getName(), null);
+
+        _exchangeTracker = new ServiceTracker(bundleContext, ExchangeType.class.getName(), null);
         _exchangeTracker.open();
+        _trackers.add(_exchangeTracker);
 
-        _securityTracker = new ServiceTracker(_activator.getContext(), SecurityPluginFactory.class.getName(), null);
+        _securityTracker = new ServiceTracker(bundleContext, SecurityPluginFactory.class.getName(), null);
         _securityTracker.open();
+        _trackers.add(_securityTracker);
 
-        _configTracker = new ServiceTracker(_activator.getContext(), ConfigurationPluginFactory.class.getName(), null);
+        _configTracker = new ServiceTracker(bundleContext, ConfigurationPluginFactory.class.getName(), null);
         _configTracker.open();
+        _trackers.add(_configTracker);
 
-        _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null);
+        _virtualHostTracker = new ServiceTracker(bundleContext, VirtualHostPluginFactory.class.getName(), null);
         _virtualHostTracker.open();
+        _trackers.add(_virtualHostTracker);
  
-        _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null);
+        _policyTracker = new ServiceTracker(bundleContext, SlowConsumerPolicyPluginFactory.class.getName(), null);
         _policyTracker.open();
-        
+        _trackers.add(_policyTracker);
+
+        _authenticationManagerTracker = new ServiceTracker(bundleContext, AuthenticationManagerPluginFactory.class.getName(), null);
+        _authenticationManagerTracker.open();
+        _trackers.add(_authenticationManagerTracker);
+
         _logger.info("Opened service trackers");
     }
 
@@ -301,22 +348,26 @@ public class PluginManager implements Cl
         return getServices(_securityTracker, _securityPlugins);
     }
 
+    public Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> getAuthenticationManagerPlugins()
+    {
+        return getServices(_authenticationManagerTracker, _authenticationManagerPlugins);
+    }
+
     public void close()
     {
-        if (_felix != null)
+        try
         {
-            try
+            // Close all bundle trackers
+            for(ServiceTracker tracker : _trackers)
             {
-                // Close all bundle trackers
-                _exchangeTracker.close();
-                _securityTracker.close();
-                _configTracker.close();
-                _virtualHostTracker.close();
-                _policyTracker.close();
+                tracker.close();
             }
-            finally
+        }
+        finally
+        {
+            if (_felix != null)
             {
-                _logger.info("Stopping plugin manager");
+                _logger.info("Stopping plugin manager framework");
                 try
                 {
                     // FIXME should be stopAndWait() but hangs VM, need upgrade in felix
@@ -335,7 +386,12 @@ public class PluginManager implements Cl
                 {
                     // Ignore
                 }
-                _logger.info("Stopped plugin manager");
+                _logger.info("Stopped plugin manager framework");
+            }
+            else
+            {
+                _logger.info("Plugin manager was started with an external BundleContext, " +
+                             "skipping remaining shutdown tasks");
             }
         }
     }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Fri Oct 21 14:42:12 2011
@@ -20,14 +20,35 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.protocol.AMQConstant;
+import java.util.List;
+import java.util.UUID;
+
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.stats.StatisticsGatherer;
 
-public interface AMQConnectionModel
+public interface AMQConnectionModel extends StatisticsGatherer
 {
+    /**
+     * get a unique id for this connection.
+     * 
+     * @return a {@link UUID} representing the connection
+     */
+    public UUID getId();
+    
+    /**
+     * Close the underlying Connection
+     * 
+     * @param cause
+     * @param message
+     * @throws org.apache.qpid.AMQException
+     */
+    public void close(AMQConstant cause, String message) throws AMQException;
 
     /**
      * Close the given requested Session
+     * 
      * @param session
      * @param cause
      * @param message
@@ -36,4 +57,20 @@ public interface AMQConnectionModel
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
 
     public long getConnectionId();
+    
+    /**
+     * Get a list of all sessions using this connection.
+     * 
+     * @return a list of {@link AMQSessionModel}s
+     */
+    public List<AMQSessionModel> getSessionModels();
+
+    /**
+     * Return a {@link LogSubject} for the connection.
+     */
+    public LogSubject getLogSubject();
+
+    public String getUserName();
+
+    public boolean isSessionNameUnique(String name);
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -30,18 +32,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.JMException;
+import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
 
 import org.apache.log4j.Logger;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
@@ -66,12 +66,10 @@ import org.apache.qpid.framing.MethodDis
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -90,21 +88,22 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.NetworkConnection;
 
-public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
     private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
 
-    private static final AtomicLong idGenerator = new AtomicLong(0);
-
     // to save boxing the channelId and looking up in a map... cache in an array the low numbered
     // channels.  This value must be of the form 2^x - 1.
     private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -134,7 +133,7 @@ public class AMQProtocolEngine implement
     private Object _lastSent;
 
     protected volatile boolean _closed;
-    
+
     // maximum number of channels this session should have
     private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
 
@@ -146,47 +145,46 @@ public class AMQProtocolEngine implement
 
     private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
     private ProtocolOutputConverter _protocolOutputConverter;
-    private Principal _authorizedID;
+    private Subject _authorizedSubject;
     private MethodDispatcher _dispatcher;
     private ProtocolSessionIdentifier _sessionIdentifier;
 
-    // Create a simple ID that increments for ever new Session
-    private final long _sessionID = idGenerator.getAndIncrement();
+    private final long _sessionID;
 
     private AMQPConnectionActor _actor;
     private LogSubject _logSubject;
 
-    private NetworkDriver _networkDriver;
-
     private long _lastIoTime;
 
     private long _writtenBytes;
     private long _readBytes;
 
-    private Job _readJob;
-    private Job _writeJob;
 
-    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
     private long _maxFrameSize;
     private final AtomicBoolean _closing = new AtomicBoolean(false);
     private final UUID _id;
     private final ConfigStore _configStore;
     private long _createTime = System.currentTimeMillis();
 
+    private ApplicationRegistry _registry;
+    private boolean _statisticsEnabled = false;
+    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+    private NetworkConnection _network;
+    private Sender<ByteBuffer> _sender;
+
     public ManagedObject getManagedObject()
     {
         return _managedObject;
     }
 
-    public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
+    public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
-        _networkDriver = driver;
-
         _codecFactory = new AMQCodecFactory(true, this);
-        _poolReference.acquireExecutorService();
-        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
-        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
+
+        setNetworkConnection(network);
+        _sessionID = connectionId;
 
         _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
 
@@ -195,9 +193,21 @@ public class AMQProtocolEngine implement
         _configStore = virtualHostRegistry.getConfigStore();
         _id = _configStore.createId();
 
-
         _actor.message(ConnectionMessages.OPEN(null, null, false, false));
 
+        _registry = virtualHostRegistry.getApplicationRegistry();
+        initialiseStatistics();
+    }
+
+    public void setNetworkConnection(NetworkConnection network)
+    {
+        setNetworkConnection(network, network.getSender());
+    }
+
+    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    {
+        _network = network;
+        _sender = sender;
     }
 
     private AMQProtocolSessionMBean createMBean() throws JMException
@@ -236,26 +246,18 @@ public class AMQProtocolEngine implement
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
-            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+            for (AMQDataBlock dataBlock : dataBlocks)
             {
-                public void run()
+                try
                 {
-                    // Decode buffer
-
-                    for (AMQDataBlock dataBlock : dataBlocks)
-                    {
-                        try
-                        {
-                            dataBlockReceived(dataBlock);
-                        }
-                        catch (Exception e)
-                        {
-                            _logger.error("Unexpected exception when processing datablock", e);
-                            closeProtocolSession();
-                        }
-                    }
+                    dataBlockReceived(dataBlock);
                 }
-            });
+                catch (Exception e)
+                {
+                    _logger.error("Unexpected exception when processing datablock", e);
+                    closeProtocolSession();
+                }
+            }
         }
         catch (Exception e)
         {
@@ -333,6 +335,11 @@ public class AMQProtocolEngine implement
                 closeChannel(channelId);
                 throw e;
             }
+            catch (TransportException e)
+            {
+                closeChannel(channelId);
+                throw e;
+            }
         }
         finally
         {
@@ -343,7 +350,7 @@ public class AMQProtocolEngine implement
     private void protocolInitiationReceived(ProtocolInitiation pi)
     {
         // this ensures the codec never checks for a PI message again
-        ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+        (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
         try
         {
             // Log incomming protocol negotiation request
@@ -363,15 +370,49 @@ public class AMQProtocolEngine implement
                                                                                        null,
                                                                                        mechanisms.getBytes(),
                                                                                        locales.getBytes());
-            _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
+            _sender.send(asByteBuffer(responseBody.generateFrame(0)));
+            _sender.flush();
 
         }
         catch (AMQException e)
         {
             _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
 
-            _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
+            _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
+            _sender.flush();
+        }
+    }
+
+    private ByteBuffer asByteBuffer(AMQDataBlock block)
+    {
+        final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+
+        try
+        {
+            block.writePayload(new DataOutputStream(new OutputStream()
+            {
+
+
+                @Override
+                public void write(int b) throws IOException
+                {
+                    buf.put((byte) b);
+                }
+
+                @Override
+                public void write(byte[] b, int off, int len) throws IOException
+                {
+                    buf.put(b, off, len);
+                }
+            }));
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
         }
+
+        buf.flip();
+        return buf;
     }
 
     public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
@@ -426,19 +467,19 @@ public class AMQProtocolEngine implement
                                                                    AMQConstant.CHANNEL_ERROR.getName().toString());
 
                     _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                    closeConnection(channelId, ce, false);
+                    closeConnection(channelId, ce);
                 }
             }
             catch (AMQConnectionException e)
             {
                 _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                closeConnection(channelId, e, false);
+                closeConnection(channelId, e);
             }
             catch (AMQSecurityException e)
             {
                 AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
                 _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                closeConnection(channelId, ce, false);
+                closeConnection(channelId, ce);
             }
         }
         catch (Exception e)
@@ -481,19 +522,14 @@ public class AMQProtocolEngine implement
      *
      * @param frame the frame to write
      */
-    public void writeFrame(AMQDataBlock frame)
+    public synchronized void writeFrame(AMQDataBlock frame)
     {
         _lastSent = frame;
-        final ByteBuffer buf = frame.toNioByteBuffer();
+        final ByteBuffer buf = asByteBuffer(frame);
         _lastIoTime = System.currentTimeMillis();
         _writtenBytes += buf.remaining();
-        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
-        {
-            public void run()
-            {
-                _networkDriver.send(buf);
-            }
-        });
+        _sender.send(buf);
+        _sender.flush();
     }
 
     public AMQShortString getContextKey()
@@ -683,8 +719,8 @@ public class AMQProtocolEngine implement
     {
         if (delay > 0)
         {
-            _networkDriver.setMaxWriteIdle(delay);
-            _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+            _network.setMaxWriteIdle(delay);
+            _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
         }
     }
 
@@ -725,7 +761,7 @@ public class AMQProtocolEngine implement
                 }
 
                 closeAllChannels();
-                
+
                 getConfigStore().removeConfiguredObject(this);
 
                 if (_managedObject != null)
@@ -745,7 +781,6 @@ public class AMQProtocolEngine implement
                     _closed = true;
                     notifyAll();
                 }
-                _poolReference.releaseExecutorService();
                 CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
             }
         }
@@ -768,27 +803,32 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
+    public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
     {
-        if (_logger.isInfoEnabled())
+        try
         {
-            _logger.info("Closing connection due to: " + e);
-        }
-
-        markChannelAwaitingCloseOk(channelId);
-        closeSession();
-        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
-        writeFrame(e.getCloseFrame(channelId));
+            if (_logger.isInfoEnabled())
+            {
+                _logger.info("Closing connection due to: " + e);
+            }
 
-        if (closeProtocolSession)
+            markChannelAwaitingCloseOk(channelId);
+            closeSession();
+            _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+            writeFrame(e.getCloseFrame(channelId));
+        }
+        finally
         {
             closeProtocolSession();
         }
+
+
     }
 
     public void closeProtocolSession()
     {
-        _networkDriver.close();
+        _network.close();
+
         try
         {
             _stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -797,11 +837,15 @@ public class AMQProtocolEngine implement
         {
             _logger.info(e.getMessage());
         }
+        catch (TransportException e)
+        {
+            _logger.info(e.getMessage());
+        }
     }
 
     public String toString()
     {
-        return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
+        return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
     }
 
     public String dump()
@@ -823,17 +867,11 @@ public class AMQProtocolEngine implement
      */
     public String getLocalFQDN()
     {
-        SocketAddress address = _networkDriver.getLocalAddress();
-        // we use the vmpipe address in some tests hence the need for this rather ugly test. The host
-        // information is used by SASL primary.
+        SocketAddress address = _network.getLocalAddress();
         if (address instanceof InetSocketAddress)
         {
             return ((InetSocketAddress) address).getHostName();
         }
-        else if (address instanceof VmPipeAddress)
-        {
-            return "vmpipe:" + ((VmPipeAddress) address).getPort();
-        }
         else
         {
             throw new IllegalArgumentException("Unsupported socket address class: " + address);
@@ -912,7 +950,7 @@ public class AMQProtocolEngine implement
 
     public Object getClientIdentifier()
     {
-        return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
+        return _network.getRemoteAddress();
     }
 
     public VirtualHost getVirtualHost()
@@ -925,7 +963,7 @@ public class AMQProtocolEngine implement
         _virtualHost = virtualHost;
 
         _virtualHost.getConnectionRegistry().registerConnection(this);
-        
+
         _configStore.addConfiguredObject(this);
 
         try
@@ -954,29 +992,33 @@ public class AMQProtocolEngine implement
         return _protocolOutputConverter;
     }
 
-    public void setAuthorizedID(Principal authorizedID)
+    public void setAuthorizedSubject(final Subject authorizedSubject)
     {
-        _authorizedID = authorizedID;
+        if (authorizedSubject == null)
+        {
+            throw new IllegalArgumentException("authorizedSubject cannot be null");
+        }
+        _authorizedSubject = authorizedSubject;
     }
 
-    public Principal getAuthorizedID()
+    public Subject getAuthorizedSubject()
     {
-        return _authorizedID;
+        return _authorizedSubject;
     }
 
-    public Principal getPrincipal()
+    public Principal getAuthorizedPrincipal()
     {
-        return _authorizedID;
+        return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
     }
 
     public SocketAddress getRemoteAddress()
     {
-        return _networkDriver.getRemoteAddress();
+        return _network.getRemoteAddress();
     }
 
     public SocketAddress getLocalAddress()
     {
-        return _networkDriver.getLocalAddress();
+        return _network.getLocalAddress();
     }
 
     public MethodRegistry getMethodRegistry()
@@ -999,6 +1041,10 @@ public class AMQProtocolEngine implement
         {
            _logger.error("Could not close protocol engine", e);
         }
+        catch (TransportException e)
+        {
+           _logger.error("Could not close protocol engine", e);
+        }
     }
 
     public void readerIdle()
@@ -1006,14 +1052,9 @@ public class AMQProtocolEngine implement
         // Nothing
     }
 
-    public void setNetworkDriver(NetworkDriver driver)
-    {
-        _networkDriver = driver;
-    }
-
     public void writerIdle()
     {
-        _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
+        _sender.send(asByteBuffer(HeartbeatBody.FRAME));
     }
 
     public void exception(Throwable throwable)
@@ -1021,7 +1062,7 @@ public class AMQProtocolEngine implement
         if (throwable instanceof AMQProtocolHeaderException)
         {
             writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
-            _networkDriver.close();
+            _sender.close();
 
             _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
         }
@@ -1039,7 +1080,7 @@ public class AMQProtocolEngine implement
 
             writeFrame(closeBody.generateFrame(0));
 
-            _networkDriver.close();
+            _sender.close();
         }
     }
 
@@ -1078,19 +1119,6 @@ public class AMQProtocolEngine implement
         return (_clientVersion == null) ? null : _clientVersion.toString();
     }
 
-    public void closeIfLingeringClosedChannels()
-    {
-        for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
-        {
-            if (id.getValue() + 30000 > System.currentTimeMillis())
-            {
-                // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
-                _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
-                closeProtocolSession();
-            }
-        }
-    }
-
     public Boolean isIncoming()
     {
         return true;
@@ -1108,7 +1136,7 @@ public class AMQProtocolEngine implement
 
     public String getAuthId()
     {
-        return getAuthorizedID().getName();
+        return getAuthorizedPrincipal().getName();
     }
 
     public Integer getRemotePID()
@@ -1170,7 +1198,7 @@ public class AMQProtocolEngine implement
     {
         return false;
     }
-    
+
     public void mgmtClose()
     {
         MethodRegistry methodRegistry = getMethodRegistry();
@@ -1263,7 +1291,6 @@ public class AMQProtocolEngine implement
 
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
     {
-
         closeChannel((Integer)session.getID());
 
         MethodRegistry methodRegistry = getMethodRegistry();
@@ -1273,6 +1300,110 @@ public class AMQProtocolEngine implement
                         new AMQShortString(message),
                         0,0);
 
-        writeFrame(responseBody.generateFrame((Integer)session.getID()));       
-    }       
+        writeFrame(responseBody.generateFrame((Integer)session.getID()));
+    }
+
+    public void close(AMQConstant cause, String message) throws AMQException
+    {
+        closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
+		                getProtocolOutputConverter().getProtocolMajorVersion(),
+		                getProtocolOutputConverter().getProtocolMinorVersion(),
+		                (Throwable) null));
+    }
+
+    public List<AMQSessionModel> getSessionModels()
+    {
+		List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+		for (AMQChannel channel : getChannels())
+		{
+		    sessions.add((AMQSessionModel) channel);
+		}
+		return sessions;
+    }
+
+    public LogSubject getLogSubject()
+    {
+        return _logSubject;
+    }
+
+    public void registerMessageDelivered(long messageSize)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesDelivered.registerEvent(1L);
+            _dataDelivered.registerEvent(messageSize);
+        }
+        _virtualHost.registerMessageDelivered(messageSize);
+    }
+
+    public void registerMessageReceived(long messageSize, long timestamp)
+    {
+        if (isStatisticsEnabled())
+        {
+            _messagesReceived.registerEvent(1L, timestamp);
+            _dataReceived.registerEvent(messageSize, timestamp);
+        }
+        _virtualHost.registerMessageReceived(messageSize, timestamp);
+    }
+
+    public StatisticsCounter getMessageReceiptStatistics()
+    {
+        return _messagesReceived;
+    }
+
+    public StatisticsCounter getDataReceiptStatistics()
+    {
+        return _dataReceived;
+    }
+
+    public StatisticsCounter getMessageDeliveryStatistics()
+    {
+        return _messagesDelivered;
+    }
+
+    public StatisticsCounter getDataDeliveryStatistics()
+    {
+        return _dataDelivered;
+    }
+
+    public void resetStatistics()
+    {
+        _messagesDelivered.reset();
+        _dataDelivered.reset();
+        _messagesReceived.reset();
+        _dataReceived.reset();
+    }
+
+    public void initialiseStatistics()
+    {
+        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+                _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
+
+        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
+        _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
+        _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
+        _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _statisticsEnabled = enabled;
+    }
+
+    @Override
+    public boolean isSessionNameUnique(String name)
+    {
+        return true;
+    }
+
+    @Override
+    public String getUserName()
+    {
+        return getAuthorizedPrincipal().getName();
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Oct 21 14:42:12 2011
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol;
 
+import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
 
 import org.apache.qpid.AMQException;
@@ -28,16 +29,15 @@ import org.apache.qpid.framing.*;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.security.Principal;
 import java.util.List;
 
 
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel
 {
     long getSessionID();
 
@@ -163,8 +163,10 @@ public interface AMQProtocolSession exte
     /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     void closeSession() throws AMQException;
 
+    void closeProtocolSession();
+
     /** This must be called to close the session in order to free up any resources managed by the session. */
-    void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
+    void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
 
 
     /** @return a key that uniquely identifies this session */
@@ -205,7 +207,7 @@ public interface AMQProtocolSession exte
 
     public ProtocolOutputConverter getProtocolOutputConverter();
 
-    void setAuthorizedID(Principal authorizedID);
+    void setAuthorizedSubject(Subject authorizedSubject);
 
     public java.net.SocketAddress getRemoteAddress();
 
@@ -231,7 +233,5 @@ public interface AMQProtocolSession exte
 
     List<AMQChannel> getChannels();
 
-    void closeIfLingeringClosedChannels();
-
     void mgmtCloseChannel(int channelId);
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Fri Oct 21 14:42:12 2011
@@ -37,25 +37,15 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
+import java.util.Date;
+import java.util.List;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
 import javax.management.MBeanNotificationInfo;
 import javax.management.NotCompliantMBeanException;
 import javax.management.Notification;
+import javax.management.ObjectName;
 import javax.management.monitor.MonitorNotification;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
@@ -66,8 +56,20 @@ import javax.management.openmbean.Simple
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
-import java.util.Date;
-import java.util.List;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
 
 /**
  * This MBean class implements the management interface. In order to make more attributes, operations and notifications
@@ -94,8 +96,7 @@ public class AMQProtocolSessionMBean ext
         super(ManagedConnection.class, ManagedConnection.TYPE);
         _protocolSession = amqProtocolSession;
         String remote = getRemoteAddress();
-        remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
-        _name = jmxEncode(new StringBuffer(remote), 0).toString();
+        _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
         init();
     }
 
@@ -130,7 +131,7 @@ public class AMQProtocolSessionMBean ext
 
     public String getAuthorizedId()
     {
-        return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null;
+        return (_protocolSession.getAuthorizedPrincipal() != null ) ? _protocolSession.getAuthorizedPrincipal().getName() : null;
     }
 
     public String getVersion()
@@ -175,7 +176,7 @@ public class AMQProtocolSessionMBean ext
 
     public String getObjectInstanceName()
     {
-        return _name;
+        return ObjectName.quote(_name);
     }
 
     /**
@@ -339,4 +340,78 @@ public class AMQProtocolSessionMBean ext
         _broadcaster.sendNotification(n);
     }
 
-} // End of MBean class
+    public void resetStatistics() throws Exception
+    {
+        _protocolSession.resetStatistics();
+    }
+
+    public double getPeakMessageDeliveryRate()
+    {
+        return _protocolSession.getMessageDeliveryStatistics().getPeak();
+    }
+
+    public double getPeakDataDeliveryRate()
+    {
+        return _protocolSession.getDataDeliveryStatistics().getPeak();
+    }
+
+    public double getMessageDeliveryRate()
+    {
+        return _protocolSession.getMessageDeliveryStatistics().getRate();
+    }
+
+    public double getDataDeliveryRate()
+    {
+        return _protocolSession.getDataDeliveryStatistics().getRate();
+    }
+
+    public long getTotalMessagesDelivered()
+    {
+        return _protocolSession.getMessageDeliveryStatistics().getTotal();
+    }
+
+    public long getTotalDataDelivered()
+    {
+        return _protocolSession.getDataDeliveryStatistics().getTotal();
+    }
+
+    public double getPeakMessageReceiptRate()
+    {
+        return _protocolSession.getMessageReceiptStatistics().getPeak();
+    }
+
+    public double getPeakDataReceiptRate()
+    {
+        return _protocolSession.getDataReceiptStatistics().getPeak();
+    }
+
+    public double getMessageReceiptRate()
+    {
+        return _protocolSession.getMessageReceiptStatistics().getRate();
+    }
+
+    public double getDataReceiptRate()
+    {
+        return _protocolSession.getDataReceiptStatistics().getRate();
+    }
+
+    public long getTotalMessagesReceived()
+    {
+        return _protocolSession.getMessageReceiptStatistics().getTotal();
+    }
+
+    public long getTotalDataReceived()
+    {
+        return _protocolSession.getDataReceiptStatistics().getTotal();
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return _protocolSession.isStatisticsEnabled();
+    }
+
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _protocolSession.setStatisticsEnabled(enabled);
+    }
+}

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Oct 21 14:42:12 2011
@@ -20,15 +20,35 @@
  */
 package org.apache.qpid.server.protocol;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.LogSubject;
 
 public interface AMQSessionModel
 {
-    Object getID();
+    public Object getID();
 
-    AMQConnectionModel getConnectionModel();
+    public AMQConnectionModel getConnectionModel();
 
-    String getClientID();
+    public String getClientID();
+    
+    public void close() throws AMQException;
 
-    LogSubject getLogSubject();
+    public LogSubject getLogSubject();
+    
+    /**
+     * This method is called from the housekeeping thread to check the status of
+     * transactions on this session and react appropriately.
+     * 
+     * If a transaction is open for too long or idle for too long then a warning
+     * is logged or the connection is closed, depending on the configuration. An open
+     * transaction is one that has recent activity. The transaction age is counted
+     * from the time the transaction was started. An idle transaction is one that 
+     * has had no activity, such as publishing or acknowledgeing messages.
+     * 
+     * @param openWarn time in milliseconds before alerting on open transaction
+     * @param openClose time in milliseconds before closing connection with open transaction
+     * @param idleWarn time in milliseconds before alerting on idle transaction
+     * @param idleClose time in milliseconds before closing connection with idle transaction
+     */
+    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException;
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri Oct 21 14:42:12 2011
@@ -22,45 +22,54 @@ package org.apache.qpid.server.protocol;
 
 
 import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.transport.ServerConnection;
 import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
-public class MultiVersionProtocolEngine implements ProtocolEngine
+public class MultiVersionProtocolEngine implements ServerProtocolEngine
 {
     private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
 
+    private final long _id;
 
-
-    private NetworkDriver _networkDriver;
-    private Set<VERSION> _supported;
+    private Set<AmqpProtocolVersion> _supported;
     private String _fqdn;
     private IApplicationRegistry _appRegistry;
+    private NetworkConnection _network;
+    private Sender<ByteBuffer> _sender;
+
+    private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
 
-    private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+    public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
+                                      String fqdn,
+                                      Set<AmqpProtocolVersion> supported,
+                                      NetworkConnection network,
+                                      long id)
+    {
+        this(appRegistry,fqdn,supported,id);
+        setNetworkConnection(network);
+    }
 
     public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
                                       String fqdn,
-                                      Set<VERSION> supported, NetworkDriver networkDriver)
+                                      Set<AmqpProtocolVersion> supported,
+                                      long id)
     {
+        _id = id;
         _appRegistry = appRegistry;
         _fqdn = fqdn;
         _supported = supported;
-        _networkDriver = networkDriver;
-    }
 
-    public void setNetworkDriver(NetworkDriver driver)
-    {
-        _delegate.setNetworkDriver(driver);
     }
 
+
     public SocketAddress getRemoteAddress()
     {
         return _delegate.getRemoteAddress();
@@ -96,6 +105,7 @@ public class MultiVersionProtocolEngine 
         _delegate.readerIdle();
     }
 
+
     public void received(ByteBuffer msg)
     {
         _delegate.received(msg);
@@ -106,6 +116,11 @@ public class MultiVersionProtocolEngine 
         _delegate.exception(t);
     }
 
+    public long getConnectionId()
+    {
+        return _delegate.getConnectionId();
+    }
+
     private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
 
     private static final byte[] AMQP_0_8_HEADER =
@@ -130,7 +145,7 @@ public class MultiVersionProtocolEngine 
                          (byte) 9
             };
 
-private static final byte[] AMQP_0_9_1_HEADER =
+    private static final byte[] AMQP_0_9_1_HEADER =
             new byte[] { (byte) 'A',
                          (byte) 'M',
                          (byte) 'Q',
@@ -153,19 +168,31 @@ private static final byte[] AMQP_0_9_1_H
                          (byte) 10
             };
 
+    public void setNetworkConnection(NetworkConnection networkConnection)
+    {
+        setNetworkConnection(networkConnection, networkConnection.getSender());
+    }
+
+    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    {
+        _network = network;
+        _sender = sender;
+    }
+
+
     private static interface DelegateCreator
     {
-        VERSION getVersion();
+        AmqpProtocolVersion getVersion();
         byte[] getHeaderIdentifier();
-        ProtocolEngine getProtocolEngine();
+        ServerProtocolEngine getProtocolEngine();
     }
 
     private DelegateCreator creator_0_8 = new DelegateCreator()
     {
 
-        public VERSION getVersion()
+        public AmqpProtocolVersion getVersion()
         {
-            return VERSION.v0_8;
+            return AmqpProtocolVersion.v0_8;
         }
 
         public byte[] getHeaderIdentifier()
@@ -173,18 +200,18 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_8_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
         }
     };
 
     private DelegateCreator creator_0_9 = new DelegateCreator()
     {
 
-        public VERSION getVersion()
+        public AmqpProtocolVersion getVersion()
         {
-            return VERSION.v0_9;
+            return AmqpProtocolVersion.v0_9;
         }
 
 
@@ -193,18 +220,18 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_9_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
         }
     };
 
     private DelegateCreator creator_0_9_1 = new DelegateCreator()
     {
 
-        public VERSION getVersion()
+        public AmqpProtocolVersion getVersion()
         {
-            return VERSION.v0_9_1;
+            return AmqpProtocolVersion.v0_9_1;
         }
 
 
@@ -213,9 +240,9 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_9_1_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
         }
     };
 
@@ -223,9 +250,9 @@ private static final byte[] AMQP_0_9_1_H
     private DelegateCreator creator_0_10 = new DelegateCreator()
     {
 
-        public VERSION getVersion()
+        public AmqpProtocolVersion getVersion()
         {
-            return VERSION.v0_10;
+            return AmqpProtocolVersion.v0_10;
         }
 
 
@@ -234,15 +261,15 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_10_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
             final ConnectionDelegate connDelegate =
                     new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
 
-            ServerConnection conn = new ServerConnection();
+            ServerConnection conn = new ServerConnection(_id);
             conn.setConnectionDelegate(connDelegate);
 
-            return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry);
+            return new ProtocolEngine_0_10( conn, _network, _appRegistry);
         }
     };
 
@@ -250,21 +277,16 @@ private static final byte[] AMQP_0_9_1_H
             new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
 
 
-    private class ClosedDelegateProtocolEngine implements ProtocolEngine
+    private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
     {
-        public void setNetworkDriver(NetworkDriver driver)
-        {
-            _networkDriver = driver;
-        }
-
         public SocketAddress getRemoteAddress()
         {
-            return _networkDriver.getRemoteAddress();
+            return _network.getRemoteAddress();
         }
 
         public SocketAddress getLocalAddress()
         {
-            return _networkDriver.getLocalAddress();
+            return _network.getLocalAddress();
         }
 
         public long getWrittenBytes()
@@ -301,26 +323,30 @@ private static final byte[] AMQP_0_9_1_H
         {
 
         }
-    }
 
-    private class SelfDelegateProtocolEngine implements ProtocolEngine
-    {
+        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+        {
 
-        private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
+        }
 
-        public void setNetworkDriver(NetworkDriver driver)
+        public long getConnectionId()
         {
-            _networkDriver = driver;
+            return _id;
         }
+    }
+
+    private class SelfDelegateProtocolEngine implements ServerProtocolEngine
+    {
+        private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
 
         public SocketAddress getRemoteAddress()
         {
-            return _networkDriver.getRemoteAddress();
+            return _network.getRemoteAddress();
         }
 
         public SocketAddress getLocalAddress()
         {
-            return _networkDriver.getLocalAddress();
+            return _network.getLocalAddress();
         }
 
         public long getWrittenBytes()
@@ -355,7 +381,7 @@ private static final byte[] AMQP_0_9_1_H
                 _header.get(headerBytes);
 
 
-                ProtocolEngine newDelegate = null;
+                ServerProtocolEngine newDelegate = null;
                 byte[] newestSupported = null;
 
                 for(int i = 0; newDelegate == null && i < _creators.length; i++)
@@ -380,17 +406,20 @@ private static final byte[] AMQP_0_9_1_H
                 // If no delegate is found then send back the most recent support protocol version id
                 if(newDelegate == null)
                 {
-                    _networkDriver.send(ByteBuffer.wrap(newestSupported));
+                    _sender.send(ByteBuffer.wrap(newestSupported));
+                    _sender.flush();
 
                     _delegate = new ClosedDelegateProtocolEngine();
+
+                    _network.close();
+
                 }
                 else
                 {
-                    newDelegate.setNetworkDriver(_networkDriver);
-
                     _delegate = newDelegate;
 
                     _header.flip();
+                    _delegate.setNetworkConnection(_network, _sender);
                     _delegate.received(_header);
                     if(msg.hasRemaining())
                     {
@@ -402,6 +431,11 @@ private static final byte[] AMQP_0_9_1_H
 
         }
 
+        public long getConnectionId()
+        {
+            return _id;
+        }
+
         public void exception(Throwable t)
         {
             _logger.error("Error establishing session", t);
@@ -421,5 +455,10 @@ private static final byte[] AMQP_0_9_1_H
         {
 
         }
+
+        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+        {
+
+        }
     }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Fri Oct 21 14:42:12 2011
@@ -20,56 +20,38 @@
 */
 package org.apache.qpid.server.protocol;
 
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
-
-import java.util.Set;
-import java.util.Arrays;
-import java.util.HashSet;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
 {
-    ;
-
-
-    public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 };
-
-    private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
+    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
 
     private final IApplicationRegistry _appRegistry;
     private final String _fqdn;
-    private final Set<VERSION> _supported;
-
+    private final Set<AmqpProtocolVersion> _supported;
 
-    public MultiVersionProtocolEngineFactory()
+    public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
     {
-        this(1, "localhost", ALL_VERSIONS);
-    }
-
-    public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions)
-    {
-        this(1, fqdn, versions);
+        _appRegistry = ApplicationRegistry.getInstance();
+        _fqdn = fqdn;
+        _supported = supportedVersions;
     }
 
-
-    public MultiVersionProtocolEngineFactory(String fqdn)
+    public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
     {
-        this(1, fqdn, ALL_VERSIONS);
+        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
     }
 
-    public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions)
+    public ServerProtocolEngine newProtocolEngine()
     {
-        _appRegistry = ApplicationRegistry.getInstance(instance);
-        _fqdn = fqdn;
-        _supported = supportedVersions;
+        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement());
     }
 
-
-    public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
-    {
-        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
-    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org