You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [22/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java Fri Oct 21 01:19:00 2011
@@ -18,56 +18,39 @@
  */
 package org.apache.qpid.server.plugins;
 
-import static org.apache.felix.framework.util.FelixConstants.SYSTEMBUNDLE_ACTIVATORS_PROP;
-import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_ACTION_PROPERY;
-import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_DIR_PROPERY;
-import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_INSTALL_VALUE;
-import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_START_VALUE;
-import static org.apache.felix.main.AutoProcessor.process;
-import static org.osgi.framework.Constants.FRAMEWORK_STORAGE;
-import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN;
-import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT;
-import static org.osgi.framework.Constants.FRAMEWORK_SYSTEMPACKAGES;
+import static org.apache.felix.framework.util.FelixConstants.*;
+import static org.apache.felix.main.AutoProcessor.*;
 
 import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.felix.framework.Felix;
 import org.apache.felix.framework.util.StringMap;
 import org.apache.log4j.Logger;
 import org.apache.qpid.common.Closeable;
-import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.server.configuration.TopicConfiguration;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
 import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory;
 import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory;
 import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
 import org.apache.qpid.server.exchange.ExchangeType;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.SecurityPluginFactory;
 import org.apache.qpid.server.security.access.plugins.AllowAll;
 import org.apache.qpid.server.security.access.plugins.DenyAll;
 import org.apache.qpid.server.security.access.plugins.LegacyAccess;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection;
 import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
+import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection;
 import org.apache.qpid.server.virtualhost.plugins.policies.TopicDeletePolicy;
 import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
-import org.apache.qpid.util.FileUtils;
 import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleException;
-import org.osgi.framework.Version;
 import org.osgi.framework.launch.Framework;
 import org.osgi.util.tracker.ServiceTracker;
 
@@ -80,6 +63,7 @@ public class PluginManager implements Cl
     private static final Logger _logger = Logger.getLogger(PluginManager.class);
 
     private static final int FELIX_STOP_TIMEOUT = 30000;
+    private static final String QPID_VER_SUFFIX = "version=0.9,";
 
     private Framework _felix;
 
@@ -88,61 +72,15 @@ public class PluginManager implements Cl
     private ServiceTracker _configTracker = null;
     private ServiceTracker _virtualHostTracker = null;
     private ServiceTracker _policyTracker = null;
-    private ServiceTracker _authenticationManagerTracker = null;
 
     private Activator _activator;
 
-    private final List<ServiceTracker> _trackers = new ArrayList<ServiceTracker>();
     private Map<String, SecurityPluginFactory> _securityPlugins = new HashMap<String, SecurityPluginFactory>();
     private Map<List<String>, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap<List<String>, ConfigurationPluginFactory>();
     private Map<String, VirtualHostPluginFactory> _vhostPlugins = new HashMap<String, VirtualHostPluginFactory>();
     private Map<String, SlowConsumerPolicyPluginFactory> _policyPlugins = new HashMap<String, SlowConsumerPolicyPluginFactory>();
-    private Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> _authenticationManagerPlugins = new HashMap<String, AuthenticationManagerPluginFactory<? extends Plugin>>();
 
-    /** The default name of the OSGI system package list. */
-    private static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/server/plugins/OsgiSystemPackages.properties";
-    
-    /** The name of the override system property that holds the name of the OSGI system package list. */
-    private static final String FILE_PROPERTY = "qpid.osgisystempackages.properties";
-    
-    private static final String OSGI_SYSTEM_PACKAGES;
-    
-    static 
-    {
-        final String filename = System.getProperty(FILE_PROPERTY);
-        final InputStream is = FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
-                    PluginManager.class.getClassLoader());
-        
-        try
-        {
-            Version qpidReleaseVersion;
-            try
-            {
-                qpidReleaseVersion = Version.parseVersion(QpidProperties.getReleaseVersion());
-            }
-            catch (IllegalArgumentException iae)
-            {
-                qpidReleaseVersion = null;
-            }
-            
-            final Properties p  = new Properties();
-            p.load(is);
-            
-            final OsgiSystemPackageUtil osgiSystemPackageUtil = new OsgiSystemPackageUtil(qpidReleaseVersion, (Map)p);
-            
-            OSGI_SYSTEM_PACKAGES = osgiSystemPackageUtil.getFormattedSystemPackageString();
-            
-            _logger.debug("List of OSGi system packages to be added: " + OSGI_SYSTEM_PACKAGES);
-        }
-        catch (IOException e)
-        {
-            _logger.error("Error reading OSGI system package list", e);
-            throw new ExceptionInInitializerError(e);
-        }
-    }
-    
-    
-    public PluginManager(String pluginPath, String cachePath, BundleContext bundleContext) throws Exception
+    public PluginManager(String pluginPath, String cachePath) throws Exception
     {
         // Store all non-OSGi plugins
         // A little gross that we have to add them here, but not all the plugins are OSGIfied
@@ -159,8 +97,7 @@ public class PluginManager implements Cl
                 LegacyAccess.LegacyAccessConfiguration.FACTORY,
                 new SlowConsumerDetectionConfigurationFactory(),
                 new SlowConsumerDetectionPolicyConfigurationFactory(),
-                new SlowConsumerDetectionQueueConfigurationFactory(),
-                PrincipalDatabaseAuthenticationManager.PrincipalDatabaseAuthenticationManagerConfiguration.FACTORY))
+                new SlowConsumerDetectionQueueConfigurationFactory()))
         {
             _configPlugins.put(configFactory.getParentPaths(), configFactory);
         }
@@ -175,109 +112,125 @@ public class PluginManager implements Cl
             _vhostPlugins.put(pluginFactory.getClass().getName(), pluginFactory);
         }
 
-        for (AuthenticationManagerPluginFactory<? extends Plugin> pluginFactory : Arrays.asList(
-                PrincipalDatabaseAuthenticationManager.FACTORY))
+        // Check the plugin directory path is set and exist
+        if (pluginPath == null)
         {
-            _authenticationManagerPlugins.put(pluginFactory.getPluginName(), pluginFactory);
+            return;
         }
-
-        if(bundleContext == null)
+        File pluginDir = new File(pluginPath);
+        if (!pluginDir.exists())
         {
-            // Check the plugin directory path is set and exist
-            if (pluginPath == null)
-            {
-                _logger.info("No plugin path specified, no plugins will be loaded.");
-                return;
-            }
-            File pluginDir = new File(pluginPath);
-            if (!pluginDir.exists())
-            {
-                _logger.warn("Plugin dir : "  + pluginDir + " does not exist.");
-                return;
-            }
-
-            // Add the bundle provided service interface package and the core OSGi
-            // packages to be exported from the class path via the system bundle.
-
-            // Setup OSGi configuration property map
-            final StringMap configMap = new StringMap(false);
-            configMap.put(FRAMEWORK_SYSTEMPACKAGES, OSGI_SYSTEM_PACKAGES);
-
-            // No automatic shutdown hook
-            configMap.put("felix.shutdown.hook", "false");
-
-            // Add system activator
-            List<BundleActivator> activators = new ArrayList<BundleActivator>();
-            _activator = new Activator();
-            activators.add(_activator);
-            configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
-
-            if (cachePath != null)
-            {
-                File cacheDir = new File(cachePath);
-                if (!cacheDir.exists() && cacheDir.canWrite())
-                {
-                    _logger.info("Creating plugin cache directory: " + cachePath);
-                    cacheDir.mkdir();
-                }
-
-                // Set plugin cache directory and empty it
-                _logger.info("Cache bundles in directory " + cachePath);
-                configMap.put(FRAMEWORK_STORAGE, cachePath);
-            }
-            configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
-
-            // Set directory with plugins to auto-deploy
-            _logger.info("Auto deploying bundles from directory " + pluginPath);
-            configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
-            configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);
+            return;
+        } 
+
+        // Setup OSGi configuration propery map
+        StringMap configMap = new StringMap(false);
+
+        // Add the bundle provided service interface package and the core OSGi
+        // packages to be exported from the class path via the system bundle.
+        configMap.put(FRAMEWORK_SYSTEMPACKAGES,
+                "org.osgi.framework; version=1.3.0," +
+                "org.osgi.service.packageadmin; version=1.2.0," +
+                "org.osgi.service.startlevel; version=1.0.0," +
+                "org.osgi.service.url; version=1.0.0," +
+                "org.osgi.util.tracker; version=1.0.0," +
+                "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX +
+                "org.apache.qpid; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.common; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.exchange; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.framing; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.protocol; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.management; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.security; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX +
+                "org.apache.qpid.util; " + QPID_VER_SUFFIX +
+                "org.apache.commons.configuration; version=1.0.0," +
+                "org.apache.commons.lang; version=1.0.0," +
+                "org.apache.commons.lang.builder; version=1.0.0," +
+                "org.apache.commons.logging; version=1.0.0," +
+                "org.apache.log4j; version=1.2.12," +
+                "javax.management.openmbean; version=1.0.0," +
+                "javax.management; version=1.0.0"
+            );
+        
+        // No automatic shutdown hook
+        configMap.put("felix.shutdown.hook", "false");
+        
+        // Add system activator
+        List<BundleActivator> activators = new ArrayList<BundleActivator>();
+        _activator = new Activator();
+        activators.add(_activator);
+        configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
 
-            // Start plugin manager
-            _felix = new Felix(configMap);
-            try
-            {
-                _logger.info("Starting plugin manager framework");
-                _felix.init();
-                process(configMap, _felix.getBundleContext());
-                _felix.start();
-                _logger.info("Started plugin manager framework");
-            }
-            catch (BundleException e)
+        if (cachePath != null)
+        {
+            File cacheDir = new File(cachePath);
+            if (!cacheDir.exists() && cacheDir.canWrite())
             {
-                throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
+                _logger.info("Creating plugin cache directory: " + cachePath);
+                cacheDir.mkdir();
             }
-
-            bundleContext = _activator.getContext();
+            
+            // Set plugin cache directory and empty it
+            _logger.info("Cache bundles in directory " + cachePath);
+            configMap.put(FRAMEWORK_STORAGE, cachePath);
+        }
+        configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
+        
+        // Set directory with plugins to auto-deploy
+        _logger.info("Auto deploying bundles from directory " + pluginPath);
+        configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
+        configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);        
+        
+        // Start plugin manager and trackers
+        _felix = new Felix(configMap);
+        try
+        {
+            _logger.info("Starting plugin manager...");
+            _felix.init();
+	        process(configMap, _felix.getBundleContext());
+            _felix.start();
+            _logger.info("Started plugin manager");
         }
-        else
+        catch (BundleException e)
         {
-            _logger.info("Using the specified external BundleContext");
+            throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
         }
-
-        _exchangeTracker = new ServiceTracker(bundleContext, ExchangeType.class.getName(), null);
+        
+        // TODO save trackers in a map, keyed by class name
+        
+        _exchangeTracker = new ServiceTracker(_activator.getContext(), ExchangeType.class.getName(), null);
         _exchangeTracker.open();
-        _trackers.add(_exchangeTracker);
 
-        _securityTracker = new ServiceTracker(bundleContext, SecurityPluginFactory.class.getName(), null);
+        _securityTracker = new ServiceTracker(_activator.getContext(), SecurityPluginFactory.class.getName(), null);
         _securityTracker.open();
-        _trackers.add(_securityTracker);
 
-        _configTracker = new ServiceTracker(bundleContext, ConfigurationPluginFactory.class.getName(), null);
+        _configTracker = new ServiceTracker(_activator.getContext(), ConfigurationPluginFactory.class.getName(), null);
         _configTracker.open();
-        _trackers.add(_configTracker);
 
-        _virtualHostTracker = new ServiceTracker(bundleContext, VirtualHostPluginFactory.class.getName(), null);
+        _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null);
         _virtualHostTracker.open();
-        _trackers.add(_virtualHostTracker);
  
-        _policyTracker = new ServiceTracker(bundleContext, SlowConsumerPolicyPluginFactory.class.getName(), null);
+        _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null);
         _policyTracker.open();
-        _trackers.add(_policyTracker);
-
-        _authenticationManagerTracker = new ServiceTracker(bundleContext, AuthenticationManagerPluginFactory.class.getName(), null);
-        _authenticationManagerTracker.open();
-        _trackers.add(_authenticationManagerTracker);
-
+        
         _logger.info("Opened service trackers");
     }
 
@@ -348,26 +301,22 @@ public class PluginManager implements Cl
         return getServices(_securityTracker, _securityPlugins);
     }
 
-    public Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> getAuthenticationManagerPlugins()
-    {
-        return getServices(_authenticationManagerTracker, _authenticationManagerPlugins);
-    }
-
     public void close()
     {
-        try
+        if (_felix != null)
         {
-            // Close all bundle trackers
-            for(ServiceTracker tracker : _trackers)
+            try
             {
-                tracker.close();
+                // Close all bundle trackers
+                _exchangeTracker.close();
+                _securityTracker.close();
+                _configTracker.close();
+                _virtualHostTracker.close();
+                _policyTracker.close();
             }
-        }
-        finally
-        {
-            if (_felix != null)
+            finally
             {
-                _logger.info("Stopping plugin manager framework");
+                _logger.info("Stopping plugin manager");
                 try
                 {
                     // FIXME should be stopAndWait() but hangs VM, need upgrade in felix
@@ -386,12 +335,7 @@ public class PluginManager implements Cl
                 {
                     // Ignore
                 }
-                _logger.info("Stopped plugin manager framework");
-            }
-            else
-            {
-                _logger.info("Plugin manager was started with an external BundleContext, " +
-                             "skipping remaining shutdown tasks");
+                _logger.info("Stopped plugin manager");
             }
         }
     }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Fri Oct 21 01:19:00 2011
@@ -20,35 +20,14 @@
  */
 package org.apache.qpid.server.protocol;
 
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.AMQException;
 
-public interface AMQConnectionModel extends StatisticsGatherer
+public interface AMQConnectionModel
 {
-    /**
-     * get a unique id for this connection.
-     * 
-     * @return a {@link UUID} representing the connection
-     */
-    public UUID getId();
-    
-    /**
-     * Close the underlying Connection
-     * 
-     * @param cause
-     * @param message
-     * @throws org.apache.qpid.AMQException
-     */
-    public void close(AMQConstant cause, String message) throws AMQException;
 
     /**
      * Close the given requested Session
-     * 
      * @param session
      * @param cause
      * @param message
@@ -57,20 +36,4 @@ public interface AMQConnectionModel exte
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
 
     public long getConnectionId();
-    
-    /**
-     * Get a list of all sessions using this connection.
-     * 
-     * @return a list of {@link AMQSessionModel}s
-     */
-    public List<AMQSessionModel> getSessionModels();
-
-    /**
-     * Return a {@link LogSubject} for the connection.
-     */
-    public LogSubject getLogSubject();
-
-    public String getUserName();
-
-    public boolean isSessionNameUnique(String name);
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,7 @@
  */
 package org.apache.qpid.server.protocol;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -32,16 +30,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.JMException;
-import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
 
 import org.apache.log4j.Logger;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
@@ -66,10 +66,12 @@ import org.apache.qpid.framing.MethodDis
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -88,22 +90,21 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.NetworkConnection;
 
-public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
     private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
 
+    private static final AtomicLong idGenerator = new AtomicLong(0);
+
     // to save boxing the channelId and looking up in a map... cache in an array the low numbered
     // channels.  This value must be of the form 2^x - 1.
     private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -133,7 +134,7 @@ public class AMQProtocolEngine implement
     private Object _lastSent;
 
     protected volatile boolean _closed;
-
+    
     // maximum number of channels this session should have
     private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
 
@@ -145,46 +146,47 @@ public class AMQProtocolEngine implement
 
     private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
     private ProtocolOutputConverter _protocolOutputConverter;
-    private Subject _authorizedSubject;
+    private Principal _authorizedID;
     private MethodDispatcher _dispatcher;
     private ProtocolSessionIdentifier _sessionIdentifier;
 
-    private final long _sessionID;
+    // Create a simple ID that increments for ever new Session
+    private final long _sessionID = idGenerator.getAndIncrement();
 
     private AMQPConnectionActor _actor;
     private LogSubject _logSubject;
 
+    private NetworkDriver _networkDriver;
+
     private long _lastIoTime;
 
     private long _writtenBytes;
     private long _readBytes;
 
+    private Job _readJob;
+    private Job _writeJob;
 
+    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
     private long _maxFrameSize;
     private final AtomicBoolean _closing = new AtomicBoolean(false);
     private final UUID _id;
     private final ConfigStore _configStore;
     private long _createTime = System.currentTimeMillis();
 
-    private ApplicationRegistry _registry;
-    private boolean _statisticsEnabled = false;
-    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
-
-    private NetworkConnection _network;
-    private Sender<ByteBuffer> _sender;
-
     public ManagedObject getManagedObject()
     {
         return _managedObject;
     }
 
-    public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
+    public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
-        _codecFactory = new AMQCodecFactory(true, this);
+        _networkDriver = driver;
 
-        setNetworkConnection(network);
-        _sessionID = connectionId;
+        _codecFactory = new AMQCodecFactory(true, this);
+        _poolReference.acquireExecutorService();
+        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
 
         _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
 
@@ -193,21 +195,9 @@ public class AMQProtocolEngine implement
         _configStore = virtualHostRegistry.getConfigStore();
         _id = _configStore.createId();
 
-        _actor.message(ConnectionMessages.OPEN(null, null, false, false));
 
-        _registry = virtualHostRegistry.getApplicationRegistry();
-        initialiseStatistics();
-    }
-
-    public void setNetworkConnection(NetworkConnection network)
-    {
-        setNetworkConnection(network, network.getSender());
-    }
+        _actor.message(ConnectionMessages.OPEN(null, null, false, false));
 
-    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
-    {
-        _network = network;
-        _sender = sender;
     }
 
     private AMQProtocolSessionMBean createMBean() throws JMException
@@ -246,18 +236,26 @@ public class AMQProtocolEngine implement
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
-            for (AMQDataBlock dataBlock : dataBlocks)
+            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
             {
-                try
-                {
-                    dataBlockReceived(dataBlock);
-                }
-                catch (Exception e)
+                public void run()
                 {
-                    _logger.error("Unexpected exception when processing datablock", e);
-                    closeProtocolSession();
+                    // Decode buffer
+
+                    for (AMQDataBlock dataBlock : dataBlocks)
+                    {
+                        try
+                        {
+                            dataBlockReceived(dataBlock);
+                        }
+                        catch (Exception e)
+                        {
+                            _logger.error("Unexpected exception when processing datablock", e);
+                            closeProtocolSession();
+                        }
+                    }
                 }
-            }
+            });
         }
         catch (Exception e)
         {
@@ -335,11 +333,6 @@ public class AMQProtocolEngine implement
                 closeChannel(channelId);
                 throw e;
             }
-            catch (TransportException e)
-            {
-                closeChannel(channelId);
-                throw e;
-            }
         }
         finally
         {
@@ -350,7 +343,7 @@ public class AMQProtocolEngine implement
     private void protocolInitiationReceived(ProtocolInitiation pi)
     {
         // this ensures the codec never checks for a PI message again
-        (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+        ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
         try
         {
             // Log incomming protocol negotiation request
@@ -370,49 +363,15 @@ public class AMQProtocolEngine implement
                                                                                        null,
                                                                                        mechanisms.getBytes(),
                                                                                        locales.getBytes());
-            _sender.send(asByteBuffer(responseBody.generateFrame(0)));
-            _sender.flush();
+            _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
 
         }
         catch (AMQException e)
         {
             _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
 
-            _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
-            _sender.flush();
-        }
-    }
-
-    private ByteBuffer asByteBuffer(AMQDataBlock block)
-    {
-        final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
-
-        try
-        {
-            block.writePayload(new DataOutputStream(new OutputStream()
-            {
-
-
-                @Override
-                public void write(int b) throws IOException
-                {
-                    buf.put((byte) b);
-                }
-
-                @Override
-                public void write(byte[] b, int off, int len) throws IOException
-                {
-                    buf.put(b, off, len);
-                }
-            }));
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
+            _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
         }
-
-        buf.flip();
-        return buf;
     }
 
     public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
@@ -467,19 +426,19 @@ public class AMQProtocolEngine implement
                                                                    AMQConstant.CHANNEL_ERROR.getName().toString());
 
                     _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                    closeConnection(channelId, ce);
+                    closeConnection(channelId, ce, false);
                 }
             }
             catch (AMQConnectionException e)
             {
                 _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                closeConnection(channelId, e);
+                closeConnection(channelId, e, false);
             }
             catch (AMQSecurityException e)
             {
                 AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
                 _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                closeConnection(channelId, ce);
+                closeConnection(channelId, ce, false);
             }
         }
         catch (Exception e)
@@ -522,14 +481,19 @@ public class AMQProtocolEngine implement
      *
      * @param frame the frame to write
      */
-    public synchronized void writeFrame(AMQDataBlock frame)
+    public void writeFrame(AMQDataBlock frame)
     {
         _lastSent = frame;
-        final ByteBuffer buf = asByteBuffer(frame);
+        final ByteBuffer buf = frame.toNioByteBuffer();
         _lastIoTime = System.currentTimeMillis();
         _writtenBytes += buf.remaining();
-        _sender.send(buf);
-        _sender.flush();
+        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
+        {
+            public void run()
+            {
+                _networkDriver.send(buf);
+            }
+        });
     }
 
     public AMQShortString getContextKey()
@@ -719,8 +683,8 @@ public class AMQProtocolEngine implement
     {
         if (delay > 0)
         {
-            _network.setMaxWriteIdle(delay);
-            _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+            _networkDriver.setMaxWriteIdle(delay);
+            _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
         }
     }
 
@@ -761,7 +725,7 @@ public class AMQProtocolEngine implement
                 }
 
                 closeAllChannels();
-
+                
                 getConfigStore().removeConfiguredObject(this);
 
                 if (_managedObject != null)
@@ -781,6 +745,7 @@ public class AMQProtocolEngine implement
                     _closed = true;
                     notifyAll();
                 }
+                _poolReference.releaseExecutorService();
                 CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
             }
         }
@@ -803,32 +768,27 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+    public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
     {
-        try
+        if (_logger.isInfoEnabled())
         {
-            if (_logger.isInfoEnabled())
-            {
-                _logger.info("Closing connection due to: " + e);
-            }
-
-            markChannelAwaitingCloseOk(channelId);
-            closeSession();
-            _stateManager.changeState(AMQState.CONNECTION_CLOSING);
-            writeFrame(e.getCloseFrame(channelId));
+            _logger.info("Closing connection due to: " + e);
         }
-        finally
+
+        markChannelAwaitingCloseOk(channelId);
+        closeSession();
+        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+        writeFrame(e.getCloseFrame(channelId));
+
+        if (closeProtocolSession)
         {
             closeProtocolSession();
         }
-
-
     }
 
     public void closeProtocolSession()
     {
-        _network.close();
-
+        _networkDriver.close();
         try
         {
             _stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -837,15 +797,11 @@ public class AMQProtocolEngine implement
         {
             _logger.info(e.getMessage());
         }
-        catch (TransportException e)
-        {
-            _logger.info(e.getMessage());
-        }
     }
 
     public String toString()
     {
-        return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
+        return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
     }
 
     public String dump()
@@ -867,11 +823,17 @@ public class AMQProtocolEngine implement
      */
     public String getLocalFQDN()
     {
-        SocketAddress address = _network.getLocalAddress();
+        SocketAddress address = _networkDriver.getLocalAddress();
+        // we use the vmpipe address in some tests hence the need for this rather ugly test. The host
+        // information is used by SASL primary.
         if (address instanceof InetSocketAddress)
         {
             return ((InetSocketAddress) address).getHostName();
         }
+        else if (address instanceof VmPipeAddress)
+        {
+            return "vmpipe:" + ((VmPipeAddress) address).getPort();
+        }
         else
         {
             throw new IllegalArgumentException("Unsupported socket address class: " + address);
@@ -950,7 +912,7 @@ public class AMQProtocolEngine implement
 
     public Object getClientIdentifier()
     {
-        return _network.getRemoteAddress();
+        return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
     }
 
     public VirtualHost getVirtualHost()
@@ -963,7 +925,7 @@ public class AMQProtocolEngine implement
         _virtualHost = virtualHost;
 
         _virtualHost.getConnectionRegistry().registerConnection(this);
-
+        
         _configStore.addConfiguredObject(this);
 
         try
@@ -992,33 +954,29 @@ public class AMQProtocolEngine implement
         return _protocolOutputConverter;
     }
 
-    public void setAuthorizedSubject(final Subject authorizedSubject)
+    public void setAuthorizedID(Principal authorizedID)
     {
-        if (authorizedSubject == null)
-        {
-            throw new IllegalArgumentException("authorizedSubject cannot be null");
-        }
-        _authorizedSubject = authorizedSubject;
+        _authorizedID = authorizedID;
     }
 
-    public Subject getAuthorizedSubject()
+    public Principal getAuthorizedID()
     {
-        return _authorizedSubject;
+        return _authorizedID;
     }
 
-    public Principal getAuthorizedPrincipal()
+    public Principal getPrincipal()
     {
-        return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+        return _authorizedID;
     }
 
     public SocketAddress getRemoteAddress()
     {
-        return _network.getRemoteAddress();
+        return _networkDriver.getRemoteAddress();
     }
 
     public SocketAddress getLocalAddress()
     {
-        return _network.getLocalAddress();
+        return _networkDriver.getLocalAddress();
     }
 
     public MethodRegistry getMethodRegistry()
@@ -1041,10 +999,6 @@ public class AMQProtocolEngine implement
         {
            _logger.error("Could not close protocol engine", e);
         }
-        catch (TransportException e)
-        {
-           _logger.error("Could not close protocol engine", e);
-        }
     }
 
     public void readerIdle()
@@ -1052,9 +1006,14 @@ public class AMQProtocolEngine implement
         // Nothing
     }
 
+    public void setNetworkDriver(NetworkDriver driver)
+    {
+        _networkDriver = driver;
+    }
+
     public void writerIdle()
     {
-        _sender.send(asByteBuffer(HeartbeatBody.FRAME));
+        _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
     }
 
     public void exception(Throwable throwable)
@@ -1062,7 +1021,7 @@ public class AMQProtocolEngine implement
         if (throwable instanceof AMQProtocolHeaderException)
         {
             writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
-            _sender.close();
+            _networkDriver.close();
 
             _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
         }
@@ -1080,7 +1039,7 @@ public class AMQProtocolEngine implement
 
             writeFrame(closeBody.generateFrame(0));
 
-            _sender.close();
+            _networkDriver.close();
         }
     }
 
@@ -1119,6 +1078,19 @@ public class AMQProtocolEngine implement
         return (_clientVersion == null) ? null : _clientVersion.toString();
     }
 
+    public void closeIfLingeringClosedChannels()
+    {
+        for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
+        {
+            if (id.getValue() + 30000 > System.currentTimeMillis())
+            {
+                // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
+                _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
+                closeProtocolSession();
+            }
+        }
+    }
+
     public Boolean isIncoming()
     {
         return true;
@@ -1136,7 +1108,7 @@ public class AMQProtocolEngine implement
 
     public String getAuthId()
     {
-        return getAuthorizedPrincipal().getName();
+        return getAuthorizedID().getName();
     }
 
     public Integer getRemotePID()
@@ -1198,7 +1170,7 @@ public class AMQProtocolEngine implement
     {
         return false;
     }
-
+    
     public void mgmtClose()
     {
         MethodRegistry methodRegistry = getMethodRegistry();
@@ -1291,6 +1263,7 @@ public class AMQProtocolEngine implement
 
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
     {
+
         closeChannel((Integer)session.getID());
 
         MethodRegistry methodRegistry = getMethodRegistry();
@@ -1300,110 +1273,6 @@ public class AMQProtocolEngine implement
                         new AMQShortString(message),
                         0,0);
 
-        writeFrame(responseBody.generateFrame((Integer)session.getID()));
-    }
-
-    public void close(AMQConstant cause, String message) throws AMQException
-    {
-        closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
-		                getProtocolOutputConverter().getProtocolMajorVersion(),
-		                getProtocolOutputConverter().getProtocolMinorVersion(),
-		                (Throwable) null));
-    }
-
-    public List<AMQSessionModel> getSessionModels()
-    {
-		List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
-		for (AMQChannel channel : getChannels())
-		{
-		    sessions.add((AMQSessionModel) channel);
-		}
-		return sessions;
-    }
-
-    public LogSubject getLogSubject()
-    {
-        return _logSubject;
-    }
-
-    public void registerMessageDelivered(long messageSize)
-    {
-        if (isStatisticsEnabled())
-        {
-            _messagesDelivered.registerEvent(1L);
-            _dataDelivered.registerEvent(messageSize);
-        }
-        _virtualHost.registerMessageDelivered(messageSize);
-    }
-
-    public void registerMessageReceived(long messageSize, long timestamp)
-    {
-        if (isStatisticsEnabled())
-        {
-            _messagesReceived.registerEvent(1L, timestamp);
-            _dataReceived.registerEvent(messageSize, timestamp);
-        }
-        _virtualHost.registerMessageReceived(messageSize, timestamp);
-    }
-
-    public StatisticsCounter getMessageReceiptStatistics()
-    {
-        return _messagesReceived;
-    }
-
-    public StatisticsCounter getDataReceiptStatistics()
-    {
-        return _dataReceived;
-    }
-
-    public StatisticsCounter getMessageDeliveryStatistics()
-    {
-        return _messagesDelivered;
-    }
-
-    public StatisticsCounter getDataDeliveryStatistics()
-    {
-        return _dataDelivered;
-    }
-
-    public void resetStatistics()
-    {
-        _messagesDelivered.reset();
-        _dataDelivered.reset();
-        _messagesReceived.reset();
-        _dataReceived.reset();
-    }
-
-    public void initialiseStatistics()
-    {
-        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
-                _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
-        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
-        _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
-        _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
-        _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
-    }
-
-    public boolean isStatisticsEnabled()
-    {
-        return _statisticsEnabled;
-    }
-
-    public void setStatisticsEnabled(boolean enabled)
-    {
-        _statisticsEnabled = enabled;
-    }
-
-    @Override
-    public boolean isSessionNameUnique(String name)
-    {
-        return true;
-    }
-
-    @Override
-    public String getUserName()
-    {
-        return getAuthorizedPrincipal().getName();
-    }
+        writeFrame(responseBody.generateFrame((Integer)session.getID()));       
+    }       
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Oct 21 01:19:00 2011
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.protocol;
 
-import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
 
 import org.apache.qpid.AMQException;
@@ -29,15 +28,16 @@ import org.apache.qpid.framing.*;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.security.Principal;
 import java.util.List;
 
 
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel
 {
     long getSessionID();
 
@@ -163,10 +163,8 @@ public interface AMQProtocolSession exte
     /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     void closeSession() throws AMQException;
 
-    void closeProtocolSession();
-
     /** This must be called to close the session in order to free up any resources managed by the session. */
-    void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
+    void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
 
 
     /** @return a key that uniquely identifies this session */
@@ -207,7 +205,7 @@ public interface AMQProtocolSession exte
 
     public ProtocolOutputConverter getProtocolOutputConverter();
 
-    void setAuthorizedSubject(Subject authorizedSubject);
+    void setAuthorizedID(Principal authorizedID);
 
     public java.net.SocketAddress getRemoteAddress();
 
@@ -233,5 +231,7 @@ public interface AMQProtocolSession exte
 
     List<AMQChannel> getChannels();
 
+    void closeIfLingeringClosedChannels();
+
     void mgmtCloseChannel(int channelId);
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Fri Oct 21 01:19:00 2011
@@ -37,15 +37,25 @@
  */
 package org.apache.qpid.server.protocol;
 
-import java.util.Date;
-import java.util.List;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
 import javax.management.MBeanNotificationInfo;
 import javax.management.NotCompliantMBeanException;
 import javax.management.Notification;
-import javax.management.ObjectName;
 import javax.management.monitor.MonitorNotification;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
@@ -56,20 +66,8 @@ import javax.management.openmbean.Simple
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
+import java.util.Date;
+import java.util.List;
 
 /**
  * This MBean class implements the management interface. In order to make more attributes, operations and notifications
@@ -96,7 +94,8 @@ public class AMQProtocolSessionMBean ext
         super(ManagedConnection.class, ManagedConnection.TYPE);
         _protocolSession = amqProtocolSession;
         String remote = getRemoteAddress();
-        _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
+        remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
+        _name = jmxEncode(new StringBuffer(remote), 0).toString();
         init();
     }
 
@@ -131,7 +130,7 @@ public class AMQProtocolSessionMBean ext
 
     public String getAuthorizedId()
     {
-        return (_protocolSession.getAuthorizedPrincipal() != null ) ? _protocolSession.getAuthorizedPrincipal().getName() : null;
+        return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null;
     }
 
     public String getVersion()
@@ -176,7 +175,7 @@ public class AMQProtocolSessionMBean ext
 
     public String getObjectInstanceName()
     {
-        return ObjectName.quote(_name);
+        return _name;
     }
 
     /**
@@ -340,78 +339,4 @@ public class AMQProtocolSessionMBean ext
         _broadcaster.sendNotification(n);
     }
 
-    public void resetStatistics() throws Exception
-    {
-        _protocolSession.resetStatistics();
-    }
-
-    public double getPeakMessageDeliveryRate()
-    {
-        return _protocolSession.getMessageDeliveryStatistics().getPeak();
-    }
-
-    public double getPeakDataDeliveryRate()
-    {
-        return _protocolSession.getDataDeliveryStatistics().getPeak();
-    }
-
-    public double getMessageDeliveryRate()
-    {
-        return _protocolSession.getMessageDeliveryStatistics().getRate();
-    }
-
-    public double getDataDeliveryRate()
-    {
-        return _protocolSession.getDataDeliveryStatistics().getRate();
-    }
-
-    public long getTotalMessagesDelivered()
-    {
-        return _protocolSession.getMessageDeliveryStatistics().getTotal();
-    }
-
-    public long getTotalDataDelivered()
-    {
-        return _protocolSession.getDataDeliveryStatistics().getTotal();
-    }
-
-    public double getPeakMessageReceiptRate()
-    {
-        return _protocolSession.getMessageReceiptStatistics().getPeak();
-    }
-
-    public double getPeakDataReceiptRate()
-    {
-        return _protocolSession.getDataReceiptStatistics().getPeak();
-    }
-
-    public double getMessageReceiptRate()
-    {
-        return _protocolSession.getMessageReceiptStatistics().getRate();
-    }
-
-    public double getDataReceiptRate()
-    {
-        return _protocolSession.getDataReceiptStatistics().getRate();
-    }
-
-    public long getTotalMessagesReceived()
-    {
-        return _protocolSession.getMessageReceiptStatistics().getTotal();
-    }
-
-    public long getTotalDataReceived()
-    {
-        return _protocolSession.getDataReceiptStatistics().getTotal();
-    }
-
-    public boolean isStatisticsEnabled()
-    {
-        return _protocolSession.isStatisticsEnabled();
-    }
-
-    public void setStatisticsEnabled(boolean enabled)
-    {
-        _protocolSession.setStatisticsEnabled(enabled);
-    }
-}
+} // End of MBean class

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Oct 21 01:19:00 2011
@@ -20,35 +20,15 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.LogSubject;
 
 public interface AMQSessionModel
 {
-    public Object getID();
+    Object getID();
 
-    public AMQConnectionModel getConnectionModel();
+    AMQConnectionModel getConnectionModel();
 
-    public String getClientID();
-    
-    public void close() throws AMQException;
+    String getClientID();
 
-    public LogSubject getLogSubject();
-    
-    /**
-     * This method is called from the housekeeping thread to check the status of
-     * transactions on this session and react appropriately.
-     * 
-     * If a transaction is open for too long or idle for too long then a warning
-     * is logged or the connection is closed, depending on the configuration. An open
-     * transaction is one that has recent activity. The transaction age is counted
-     * from the time the transaction was started. An idle transaction is one that 
-     * has had no activity, such as publishing or acknowledgeing messages.
-     * 
-     * @param openWarn time in milliseconds before alerting on open transaction
-     * @param openClose time in milliseconds before closing connection with open transaction
-     * @param idleWarn time in milliseconds before alerting on idle transaction
-     * @param idleClose time in milliseconds before closing connection with idle transaction
-     */
-    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException;
+    LogSubject getLogSubject();
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri Oct 21 01:19:00 2011
@@ -22,53 +22,44 @@ package org.apache.qpid.server.protocol;
 
 
 import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.transport.ServerConnection;
 import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.NetworkDriver;
 
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
-public class MultiVersionProtocolEngine implements ServerProtocolEngine
+public class MultiVersionProtocolEngine implements ProtocolEngine
 {
     private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
 
-    private final long _id;
 
-    private Set<AmqpProtocolVersion> _supported;
+
+    private NetworkDriver _networkDriver;
+    private Set<VERSION> _supported;
     private String _fqdn;
     private IApplicationRegistry _appRegistry;
-    private NetworkConnection _network;
-    private Sender<ByteBuffer> _sender;
-
-    private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
 
-    public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
-                                      String fqdn,
-                                      Set<AmqpProtocolVersion> supported,
-                                      NetworkConnection network,
-                                      long id)
-    {
-        this(appRegistry,fqdn,supported,id);
-        setNetworkConnection(network);
-    }
+    private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
 
     public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
                                       String fqdn,
-                                      Set<AmqpProtocolVersion> supported,
-                                      long id)
+                                      Set<VERSION> supported, NetworkDriver networkDriver)
     {
-        _id = id;
         _appRegistry = appRegistry;
         _fqdn = fqdn;
         _supported = supported;
-
+        _networkDriver = networkDriver;
     }
 
+    public void setNetworkDriver(NetworkDriver driver)
+    {
+        _delegate.setNetworkDriver(driver);
+    }
 
     public SocketAddress getRemoteAddress()
     {
@@ -105,7 +96,6 @@ public class MultiVersionProtocolEngine 
         _delegate.readerIdle();
     }
 
-
     public void received(ByteBuffer msg)
     {
         _delegate.received(msg);
@@ -116,11 +106,6 @@ public class MultiVersionProtocolEngine 
         _delegate.exception(t);
     }
 
-    public long getConnectionId()
-    {
-        return _delegate.getConnectionId();
-    }
-
     private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
 
     private static final byte[] AMQP_0_8_HEADER =
@@ -145,7 +130,7 @@ public class MultiVersionProtocolEngine 
                          (byte) 9
             };
 
-    private static final byte[] AMQP_0_9_1_HEADER =
+private static final byte[] AMQP_0_9_1_HEADER =
             new byte[] { (byte) 'A',
                          (byte) 'M',
                          (byte) 'Q',
@@ -168,31 +153,19 @@ public class MultiVersionProtocolEngine 
                          (byte) 10
             };
 
-    public void setNetworkConnection(NetworkConnection networkConnection)
-    {
-        setNetworkConnection(networkConnection, networkConnection.getSender());
-    }
-
-    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
-    {
-        _network = network;
-        _sender = sender;
-    }
-
-
     private static interface DelegateCreator
     {
-        AmqpProtocolVersion getVersion();
+        VERSION getVersion();
         byte[] getHeaderIdentifier();
-        ServerProtocolEngine getProtocolEngine();
+        ProtocolEngine getProtocolEngine();
     }
 
     private DelegateCreator creator_0_8 = new DelegateCreator()
     {
 
-        public AmqpProtocolVersion getVersion()
+        public VERSION getVersion()
         {
-            return AmqpProtocolVersion.v0_8;
+            return VERSION.v0_8;
         }
 
         public byte[] getHeaderIdentifier()
@@ -200,18 +173,18 @@ public class MultiVersionProtocolEngine 
             return AMQP_0_8_HEADER;
         }
 
-        public ServerProtocolEngine getProtocolEngine()
+        public ProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
         }
     };
 
     private DelegateCreator creator_0_9 = new DelegateCreator()
     {
 
-        public AmqpProtocolVersion getVersion()
+        public VERSION getVersion()
         {
-            return AmqpProtocolVersion.v0_9;
+            return VERSION.v0_9;
         }
 
 
@@ -220,18 +193,18 @@ public class MultiVersionProtocolEngine 
             return AMQP_0_9_HEADER;
         }
 
-        public ServerProtocolEngine getProtocolEngine()
+        public ProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
         }
     };
 
     private DelegateCreator creator_0_9_1 = new DelegateCreator()
     {
 
-        public AmqpProtocolVersion getVersion()
+        public VERSION getVersion()
         {
-            return AmqpProtocolVersion.v0_9_1;
+            return VERSION.v0_9_1;
         }
 
 
@@ -240,9 +213,9 @@ public class MultiVersionProtocolEngine 
             return AMQP_0_9_1_HEADER;
         }
 
-        public ServerProtocolEngine getProtocolEngine()
+        public ProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
         }
     };
 
@@ -250,9 +223,9 @@ public class MultiVersionProtocolEngine 
     private DelegateCreator creator_0_10 = new DelegateCreator()
     {
 
-        public AmqpProtocolVersion getVersion()
+        public VERSION getVersion()
         {
-            return AmqpProtocolVersion.v0_10;
+            return VERSION.v0_10;
         }
 
 
@@ -261,15 +234,15 @@ public class MultiVersionProtocolEngine 
             return AMQP_0_10_HEADER;
         }
 
-        public ServerProtocolEngine getProtocolEngine()
+        public ProtocolEngine getProtocolEngine()
         {
             final ConnectionDelegate connDelegate =
                     new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
 
-            ServerConnection conn = new ServerConnection(_id);
+            ServerConnection conn = new ServerConnection();
             conn.setConnectionDelegate(connDelegate);
 
-            return new ProtocolEngine_0_10( conn, _network, _appRegistry);
+            return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry);
         }
     };
 
@@ -277,16 +250,21 @@ public class MultiVersionProtocolEngine 
             new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
 
 
-    private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
+    private class ClosedDelegateProtocolEngine implements ProtocolEngine
     {
+        public void setNetworkDriver(NetworkDriver driver)
+        {
+            _networkDriver = driver;
+        }
+
         public SocketAddress getRemoteAddress()
         {
-            return _network.getRemoteAddress();
+            return _networkDriver.getRemoteAddress();
         }
 
         public SocketAddress getLocalAddress()
         {
-            return _network.getLocalAddress();
+            return _networkDriver.getLocalAddress();
         }
 
         public long getWrittenBytes()
@@ -323,30 +301,26 @@ public class MultiVersionProtocolEngine 
         {
 
         }
+    }
 
-        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
-        {
+    private class SelfDelegateProtocolEngine implements ProtocolEngine
+    {
 
-        }
+        private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
 
-        public long getConnectionId()
+        public void setNetworkDriver(NetworkDriver driver)
         {
-            return _id;
+            _networkDriver = driver;
         }
-    }
-
-    private class SelfDelegateProtocolEngine implements ServerProtocolEngine
-    {
-        private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
 
         public SocketAddress getRemoteAddress()
         {
-            return _network.getRemoteAddress();
+            return _networkDriver.getRemoteAddress();
         }
 
         public SocketAddress getLocalAddress()
         {
-            return _network.getLocalAddress();
+            return _networkDriver.getLocalAddress();
         }
 
         public long getWrittenBytes()
@@ -381,7 +355,7 @@ public class MultiVersionProtocolEngine 
                 _header.get(headerBytes);
 
 
-                ServerProtocolEngine newDelegate = null;
+                ProtocolEngine newDelegate = null;
                 byte[] newestSupported = null;
 
                 for(int i = 0; newDelegate == null && i < _creators.length; i++)
@@ -406,20 +380,17 @@ public class MultiVersionProtocolEngine 
                 // If no delegate is found then send back the most recent support protocol version id
                 if(newDelegate == null)
                 {
-                    _sender.send(ByteBuffer.wrap(newestSupported));
-                    _sender.flush();
+                    _networkDriver.send(ByteBuffer.wrap(newestSupported));
 
                     _delegate = new ClosedDelegateProtocolEngine();
-
-                    _network.close();
-
                 }
                 else
                 {
+                    newDelegate.setNetworkDriver(_networkDriver);
+
                     _delegate = newDelegate;
 
                     _header.flip();
-                    _delegate.setNetworkConnection(_network, _sender);
                     _delegate.received(_header);
                     if(msg.hasRemaining())
                     {
@@ -431,11 +402,6 @@ public class MultiVersionProtocolEngine 
 
         }
 
-        public long getConnectionId()
-        {
-            return _id;
-        }
-
         public void exception(Throwable t)
         {
             _logger.error("Error establishing session", t);
@@ -455,10 +421,5 @@ public class MultiVersionProtocolEngine 
         {
 
         }
-
-        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
-        {
-
-        }
     }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Fri Oct 21 01:19:00 2011
@@ -20,38 +20,56 @@
 */
 package org.apache.qpid.server.protocol;
 
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.transport.network.NetworkConnection;
+
+import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
 
 public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
 {
-    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+    ;
+
+
+    public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 };
+
+    private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
 
     private final IApplicationRegistry _appRegistry;
     private final String _fqdn;
-    private final Set<AmqpProtocolVersion> _supported;
+    private final Set<VERSION> _supported;
+
 
-    public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
+    public MultiVersionProtocolEngineFactory()
     {
-        _appRegistry = ApplicationRegistry.getInstance();
-        _fqdn = fqdn;
-        _supported = supportedVersions;
+        this(1, "localhost", ALL_VERSIONS);
     }
 
-    public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
+    public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions)
     {
-        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
+        this(1, fqdn, versions);
     }
 
-    public ServerProtocolEngine newProtocolEngine()
+
+    public MultiVersionProtocolEngineFactory(String fqdn)
     {
-        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement());
+        this(1, fqdn, ALL_VERSIONS);
     }
 
+    public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions)
+    {
+        _appRegistry = ApplicationRegistry.getInstance(instance);
+        _fqdn = fqdn;
+        _supported = supportedVersions;
+    }
+
+
+    public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+    {
+        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
+    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Fri Oct 21 01:19:00 2011
@@ -20,26 +20,25 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.server.configuration.*;
 import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.util.UUID;
 
-public class ProtocolEngine_0_10  extends InputHandler implements ServerProtocolEngine, ConnectionConfig
+public class ProtocolEngine_0_10  extends InputHandler implements ProtocolEngine, ConnectionConfig
 {
     public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
 
-    private NetworkConnection _network;
+    private NetworkDriver _networkDriver;
     private long _readBytes;
     private long _writtenBytes;
     private ServerConnection _connection;
@@ -48,22 +47,26 @@ public class ProtocolEngine_0_10  extend
     private long _createTime = System.currentTimeMillis();
 
     public ProtocolEngine_0_10(ServerConnection conn,
-                               NetworkConnection network,
+                               NetworkDriver networkDriver,
                                final IApplicationRegistry appRegistry)
     {
         super(new Assembler(conn));
         _connection = conn;
         _connection.setConnectionConfig(this);
-
+        _networkDriver = networkDriver;
         _id = appRegistry.getConfigStore().createId();
         _appRegistry = appRegistry;
 
-        if(network != null)
-        {
-            setNetworkConnection(network);
-        }
-
+        // FIXME Two log messages to maintain compatinbility with earlier protocol versions
+        _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
+        _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
+    }
 
+    public void setNetworkDriver(NetworkDriver driver)
+    {
+        _networkDriver = driver;
+        Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE);
+        _connection.setSender(dis);
         _connection.onOpen(new Runnable()
         {
             public void run()
@@ -74,30 +77,14 @@ public class ProtocolEngine_0_10  extend
 
     }
 
-    public void setNetworkConnection(NetworkConnection network)
-    {
-        setNetworkConnection(network, network.getSender());
-    }
-
-    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
-    {
-        _network = network;
-
-        _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
-
-        // FIXME Two log messages to maintain compatibility with earlier protocol versions
-        _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
-        _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
-    }
-
     public SocketAddress getRemoteAddress()
     {
-        return _network.getRemoteAddress();
+        return _networkDriver.getRemoteAddress();
     }
 
     public SocketAddress getLocalAddress()
     {
-        return _network.getLocalAddress();
+        return _networkDriver.getLocalAddress();
     }
 
     public long getReadBytes()
@@ -147,7 +134,7 @@ public class ProtocolEngine_0_10  extend
 
     public String getAuthId()
     {
-        return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
+        return _connection.getAuthorizationID();
     }
 
     public String getRemoteProcessName()
@@ -206,14 +193,9 @@ public class ProtocolEngine_0_10  extend
     {
         return false;
     }
-
+    
     public void mgmtClose()
     {
         _connection.mgmtClose();
     }
-
-    public long getConnectionId()
-    {
-        return _connection.getConnectionId();
-    }
 }

Propchange: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -3,5 +3,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
-/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1072051-1185907
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Fri Oct 21 01:19:00 2011
@@ -60,7 +60,7 @@ public class AMQPriorityQueue extends Si
     {
         // check that all subscriptions are not in advance of the entry
         SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
-        while(subIter.advance() && entry.isAvailable())
+        while(subIter.advance() && !entry.isAcquired())
         {
             final Subscription subscription = subIter.getNode().getSubscription();
             if(!subscription.isClosed())
@@ -70,7 +70,7 @@ public class AMQPriorityQueue extends Si
                 {
                     QueueEntry subnode = context._lastSeenEntry;
                     QueueEntry released = context._releasedEntry;
-                    while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0))
+                    while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
                     {
                         if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
                         {

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Oct 21 01:19:00 2011
@@ -21,18 +21,21 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.QueueConfig;
+import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -69,8 +72,8 @@ public interface AMQQueue extends Managa
     boolean isAutoDelete();
 
     AMQShortString getOwner();
-    AuthorizationHolder getAuthorizationHolder();
-    void setAuthorizationHolder(AuthorizationHolder principalHolder);
+    PrincipalHolder getPrincipalHolder();
+    void setPrincipalHolder(PrincipalHolder principalHolder);
 
     void setExclusiveOwningSession(AMQSessionModel owner);
     AMQSessionModel getExclusiveOwningSession();
@@ -105,16 +108,23 @@ public interface AMQQueue extends Managa
 
     boolean isDeleted();
 
+
     int delete() throws AMQException;
 
+
     void requeue(QueueEntry entry);
 
+    void requeue(QueueEntryImpl storeContext, Subscription subscription);
+
     void dequeue(QueueEntry entry, Subscription sub);
 
     void decrementUnackedMsgCount();
 
+
     boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
 
+
+
     void addQueueDeleteTask(final Task task);
     void removeQueueDeleteTask(final Task task);
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Fri Oct 21 01:19:00 2011
@@ -43,7 +43,6 @@ import javax.management.JMException;
 import javax.management.MBeanException;
 import javax.management.MBeanNotificationInfo;
 import javax.management.Notification;
-import javax.management.ObjectName;
 import javax.management.OperationsException;
 import javax.management.monitor.MonitorNotification;
 import javax.management.openmbean.ArrayType;
@@ -98,7 +97,7 @@ public class AMQQueueMBean extends AMQMa
     {
         super(ManagedQueue.class, ManagedQueue.TYPE);
         _queue = queue;
-        _queueName = queue.getName();
+        _queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString();
     }
 
     public ManagedObject getParentObject()
@@ -148,7 +147,7 @@ public class AMQQueueMBean extends AMQMa
 
     public String getObjectInstanceName()
     {
-        return ObjectName.quote(_queueName);
+        return _queueName;
     }
 
     public String getName()
@@ -507,7 +506,7 @@ public class AMQQueueMBean extends AMQMa
     private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
     {
         List<String> list = new ArrayList<String>();
-        BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties();
+        BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
         list.add("reply-to = " + headerProperties.getReplyToAsString());
         list.add("propertyFlags = " + headerProperties.getPropertyFlags());
         list.add("ApplicationID = " + headerProperties.getAppIdAsString());

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Fri Oct 21 01:19:00 2011
@@ -96,9 +96,9 @@ public class IncomingMessage implements 
     public void setExpiration()
     {
             long expiration =
-                    ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
+                    ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
             long timestamp =
-                    ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
+                    ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
 
             if (SYNCHED_CLOCKS)
             {
@@ -139,7 +139,7 @@ public class IncomingMessage implements 
     public int addContentBodyFrame(final ContentChunk contentChunk)
             throws AMQException
     {
-        _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
+        _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
 
@@ -193,8 +193,8 @@ public class IncomingMessage implements 
 
     public boolean isPersistent()
     {
-        return getContentHeader().getProperties() instanceof BasicContentHeaderProperties &&
-             ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() ==
+        return getContentHeader().properties instanceof BasicContentHeaderProperties &&
+             ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
                                                              BasicContentHeaderProperties.PERSISTENT;
     }
 
@@ -263,7 +263,7 @@ public class IncomingMessage implements 
         int written = 0;
         for(ContentChunk cb : _contentChunks)
         {
-            ByteBuffer data = ByteBuffer.wrap(cb.getData());
+            ByteBuffer data = cb.getData().buf();
             if(offset+written >= pos && offset < pos + data.limit())
             {
                 ByteBuffer src = data.duplicate();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org