You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [22/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Fri Oct 21 14:42:12 2011
@@ -20,9 +20,6 @@ package org.apache.qpid.server.output.am
*
*/
-
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -38,11 +35,13 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- private static final ProtocolVersionMethodConverter
- PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -121,15 +120,12 @@ public class ProtocolOutputConverterImpl
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- int writtenSize = 0;
+ int writtenSize = capacity;
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
CompositeAMQBodyBlock
compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +133,55 @@ public class ProtocolOutputConverterImpl
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ writeFrame(new AMQFrame(channelId, body));
}
}
}
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
@@ -221,7 +257,7 @@ public class ProtocolOutputConverterImpl
return _underlyingBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if(_underlyingBody == null)
{
@@ -346,7 +382,7 @@ public class ProtocolOutputConverterImpl
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
@@ -374,7 +410,7 @@ public class ProtocolOutputConverterImpl
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java Fri Oct 21 14:42:12 2011
@@ -20,9 +20,6 @@ package org.apache.qpid.server.output.am
*
*/
-
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -33,17 +30,16 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
- private static final ProtocolVersionMethodConverter
- PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
public static Factory getInstanceFactory()
{
@@ -121,15 +117,11 @@ public class ProtocolOutputConverterImpl
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- int writtenSize = 0;
+ int writtenSize = capacity;
-
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
CompositeAMQBodyBlock
compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
@@ -137,15 +129,54 @@ public class ProtocolOutputConverterImpl
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ writeFrame(new AMQFrame(channelId, body));
}
}
}
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
@@ -221,7 +252,7 @@ public class ProtocolOutputConverterImpl
return _underlyingBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if(_underlyingBody == null)
{
@@ -346,7 +377,7 @@ public class ProtocolOutputConverterImpl
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
}
@@ -374,7 +405,7 @@ public class ProtocolOutputConverterImpl
return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/Plugin.java Fri Oct 21 14:42:12 2011
@@ -27,5 +27,5 @@ public interface Plugin
/**
* Provide Configuration to this plugin
*/
- public void configure(ConfigurationPlugin config);
+ public void configure(ConfigurationPlugin config) throws ConfigurationException;
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java Fri Oct 21 14:42:12 2011
@@ -18,39 +18,56 @@
*/
package org.apache.qpid.server.plugins;
-import static org.apache.felix.framework.util.FelixConstants.*;
-import static org.apache.felix.main.AutoProcessor.*;
+import static org.apache.felix.framework.util.FelixConstants.SYSTEMBUNDLE_ACTIVATORS_PROP;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_ACTION_PROPERY;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_DIR_PROPERY;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_INSTALL_VALUE;
+import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_START_VALUE;
+import static org.apache.felix.main.AutoProcessor.process;
+import static org.osgi.framework.Constants.FRAMEWORK_STORAGE;
+import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN;
+import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT;
+import static org.osgi.framework.Constants.FRAMEWORK_SYSTEMPACKAGES;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.felix.framework.Felix;
import org.apache.felix.framework.util.StringMap;
import org.apache.log4j.Logger;
import org.apache.qpid.common.Closeable;
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.configuration.TopicConfiguration;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SecurityPluginFactory;
import org.apache.qpid.server.security.access.plugins.AllowAll;
import org.apache.qpid.server.security.access.plugins.DenyAll;
import org.apache.qpid.server.security.access.plugins.LegacyAccess;
-import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection;
+import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
import org.apache.qpid.server.virtualhost.plugins.policies.TopicDeletePolicy;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
+import org.apache.qpid.util.FileUtils;
import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleException;
+import org.osgi.framework.Version;
import org.osgi.framework.launch.Framework;
import org.osgi.util.tracker.ServiceTracker;
@@ -63,7 +80,6 @@ public class PluginManager implements Cl
private static final Logger _logger = Logger.getLogger(PluginManager.class);
private static final int FELIX_STOP_TIMEOUT = 30000;
- private static final String QPID_VER_SUFFIX = "version=0.9,";
private Framework _felix;
@@ -72,15 +88,61 @@ public class PluginManager implements Cl
private ServiceTracker _configTracker = null;
private ServiceTracker _virtualHostTracker = null;
private ServiceTracker _policyTracker = null;
+ private ServiceTracker _authenticationManagerTracker = null;
private Activator _activator;
+ private final List<ServiceTracker> _trackers = new ArrayList<ServiceTracker>();
private Map<String, SecurityPluginFactory> _securityPlugins = new HashMap<String, SecurityPluginFactory>();
private Map<List<String>, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap<List<String>, ConfigurationPluginFactory>();
private Map<String, VirtualHostPluginFactory> _vhostPlugins = new HashMap<String, VirtualHostPluginFactory>();
private Map<String, SlowConsumerPolicyPluginFactory> _policyPlugins = new HashMap<String, SlowConsumerPolicyPluginFactory>();
+ private Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> _authenticationManagerPlugins = new HashMap<String, AuthenticationManagerPluginFactory<? extends Plugin>>();
- public PluginManager(String pluginPath, String cachePath) throws Exception
+ /** The default name of the OSGI system package list. */
+ private static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/server/plugins/OsgiSystemPackages.properties";
+
+ /** The name of the override system property that holds the name of the OSGI system package list. */
+ private static final String FILE_PROPERTY = "qpid.osgisystempackages.properties";
+
+ private static final String OSGI_SYSTEM_PACKAGES;
+
+ static
+ {
+ final String filename = System.getProperty(FILE_PROPERTY);
+ final InputStream is = FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
+ PluginManager.class.getClassLoader());
+
+ try
+ {
+ Version qpidReleaseVersion;
+ try
+ {
+ qpidReleaseVersion = Version.parseVersion(QpidProperties.getReleaseVersion());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ qpidReleaseVersion = null;
+ }
+
+ final Properties p = new Properties();
+ p.load(is);
+
+ final OsgiSystemPackageUtil osgiSystemPackageUtil = new OsgiSystemPackageUtil(qpidReleaseVersion, (Map)p);
+
+ OSGI_SYSTEM_PACKAGES = osgiSystemPackageUtil.getFormattedSystemPackageString();
+
+ _logger.debug("List of OSGi system packages to be added: " + OSGI_SYSTEM_PACKAGES);
+ }
+ catch (IOException e)
+ {
+ _logger.error("Error reading OSGI system package list", e);
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+
+ public PluginManager(String pluginPath, String cachePath, BundleContext bundleContext) throws Exception
{
// Store all non-OSGi plugins
// A little gross that we have to add them here, but not all the plugins are OSGIfied
@@ -97,7 +159,8 @@ public class PluginManager implements Cl
LegacyAccess.LegacyAccessConfiguration.FACTORY,
new SlowConsumerDetectionConfigurationFactory(),
new SlowConsumerDetectionPolicyConfigurationFactory(),
- new SlowConsumerDetectionQueueConfigurationFactory()))
+ new SlowConsumerDetectionQueueConfigurationFactory(),
+ PrincipalDatabaseAuthenticationManager.PrincipalDatabaseAuthenticationManagerConfiguration.FACTORY))
{
_configPlugins.put(configFactory.getParentPaths(), configFactory);
}
@@ -112,125 +175,109 @@ public class PluginManager implements Cl
_vhostPlugins.put(pluginFactory.getClass().getName(), pluginFactory);
}
- // Check the plugin directory path is set and exist
- if (pluginPath == null)
+ for (AuthenticationManagerPluginFactory<? extends Plugin> pluginFactory : Arrays.asList(
+ PrincipalDatabaseAuthenticationManager.FACTORY))
{
- return;
+ _authenticationManagerPlugins.put(pluginFactory.getPluginName(), pluginFactory);
}
- File pluginDir = new File(pluginPath);
- if (!pluginDir.exists())
- {
- return;
- }
-
- // Setup OSGi configuration propery map
- StringMap configMap = new StringMap(false);
-
- // Add the bundle provided service interface package and the core OSGi
- // packages to be exported from the class path via the system bundle.
- configMap.put(FRAMEWORK_SYSTEMPACKAGES,
- "org.osgi.framework; version=1.3.0," +
- "org.osgi.service.packageadmin; version=1.2.0," +
- "org.osgi.service.startlevel; version=1.0.0," +
- "org.osgi.service.url; version=1.0.0," +
- "org.osgi.util.tracker; version=1.0.0," +
- "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX +
- "org.apache.qpid; " + QPID_VER_SUFFIX +
- "org.apache.qpid.common; " + QPID_VER_SUFFIX +
- "org.apache.qpid.exchange; " + QPID_VER_SUFFIX +
- "org.apache.qpid.framing; " + QPID_VER_SUFFIX +
- "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX +
- "org.apache.qpid.protocol; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.management; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.security; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX +
- "org.apache.qpid.util; " + QPID_VER_SUFFIX +
- "org.apache.commons.configuration; version=1.0.0," +
- "org.apache.commons.lang; version=1.0.0," +
- "org.apache.commons.lang.builder; version=1.0.0," +
- "org.apache.commons.logging; version=1.0.0," +
- "org.apache.log4j; version=1.2.12," +
- "javax.management.openmbean; version=1.0.0," +
- "javax.management; version=1.0.0"
- );
-
- // No automatic shutdown hook
- configMap.put("felix.shutdown.hook", "false");
-
- // Add system activator
- List<BundleActivator> activators = new ArrayList<BundleActivator>();
- _activator = new Activator();
- activators.add(_activator);
- configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
- if (cachePath != null)
+ if(bundleContext == null)
{
- File cacheDir = new File(cachePath);
- if (!cacheDir.exists() && cacheDir.canWrite())
+ // Check the plugin directory path is set and exist
+ if (pluginPath == null)
{
- _logger.info("Creating plugin cache directory: " + cachePath);
- cacheDir.mkdir();
+ _logger.info("No plugin path specified, no plugins will be loaded.");
+ return;
}
-
- // Set plugin cache directory and empty it
- _logger.info("Cache bundles in directory " + cachePath);
- configMap.put(FRAMEWORK_STORAGE, cachePath);
- }
- configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
-
- // Set directory with plugins to auto-deploy
- _logger.info("Auto deploying bundles from directory " + pluginPath);
- configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
- configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);
-
- // Start plugin manager and trackers
- _felix = new Felix(configMap);
- try
- {
- _logger.info("Starting plugin manager...");
- _felix.init();
- process(configMap, _felix.getBundleContext());
- _felix.start();
- _logger.info("Started plugin manager");
+ File pluginDir = new File(pluginPath);
+ if (!pluginDir.exists())
+ {
+ _logger.warn("Plugin dir : " + pluginDir + " does not exist.");
+ return;
+ }
+
+ // Add the bundle provided service interface package and the core OSGi
+ // packages to be exported from the class path via the system bundle.
+
+ // Setup OSGi configuration property map
+ final StringMap configMap = new StringMap(false);
+ configMap.put(FRAMEWORK_SYSTEMPACKAGES, OSGI_SYSTEM_PACKAGES);
+
+ // No automatic shutdown hook
+ configMap.put("felix.shutdown.hook", "false");
+
+ // Add system activator
+ List<BundleActivator> activators = new ArrayList<BundleActivator>();
+ _activator = new Activator();
+ activators.add(_activator);
+ configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
+
+ if (cachePath != null)
+ {
+ File cacheDir = new File(cachePath);
+ if (!cacheDir.exists() && cacheDir.canWrite())
+ {
+ _logger.info("Creating plugin cache directory: " + cachePath);
+ cacheDir.mkdir();
+ }
+
+ // Set plugin cache directory and empty it
+ _logger.info("Cache bundles in directory " + cachePath);
+ configMap.put(FRAMEWORK_STORAGE, cachePath);
+ }
+ configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
+
+ // Set directory with plugins to auto-deploy
+ _logger.info("Auto deploying bundles from directory " + pluginPath);
+ configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
+ configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);
+
+ // Start plugin manager
+ _felix = new Felix(configMap);
+ try
+ {
+ _logger.info("Starting plugin manager framework");
+ _felix.init();
+ process(configMap, _felix.getBundleContext());
+ _felix.start();
+ _logger.info("Started plugin manager framework");
+ }
+ catch (BundleException e)
+ {
+ throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
+ }
+
+ bundleContext = _activator.getContext();
}
- catch (BundleException e)
+ else
{
- throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
+ _logger.info("Using the specified external BundleContext");
}
-
- // TODO save trackers in a map, keyed by class name
-
- _exchangeTracker = new ServiceTracker(_activator.getContext(), ExchangeType.class.getName(), null);
+
+ _exchangeTracker = new ServiceTracker(bundleContext, ExchangeType.class.getName(), null);
_exchangeTracker.open();
+ _trackers.add(_exchangeTracker);
- _securityTracker = new ServiceTracker(_activator.getContext(), SecurityPluginFactory.class.getName(), null);
+ _securityTracker = new ServiceTracker(bundleContext, SecurityPluginFactory.class.getName(), null);
_securityTracker.open();
+ _trackers.add(_securityTracker);
- _configTracker = new ServiceTracker(_activator.getContext(), ConfigurationPluginFactory.class.getName(), null);
+ _configTracker = new ServiceTracker(bundleContext, ConfigurationPluginFactory.class.getName(), null);
_configTracker.open();
+ _trackers.add(_configTracker);
- _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null);
+ _virtualHostTracker = new ServiceTracker(bundleContext, VirtualHostPluginFactory.class.getName(), null);
_virtualHostTracker.open();
+ _trackers.add(_virtualHostTracker);
- _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null);
+ _policyTracker = new ServiceTracker(bundleContext, SlowConsumerPolicyPluginFactory.class.getName(), null);
_policyTracker.open();
-
+ _trackers.add(_policyTracker);
+
+ _authenticationManagerTracker = new ServiceTracker(bundleContext, AuthenticationManagerPluginFactory.class.getName(), null);
+ _authenticationManagerTracker.open();
+ _trackers.add(_authenticationManagerTracker);
+
_logger.info("Opened service trackers");
}
@@ -301,22 +348,26 @@ public class PluginManager implements Cl
return getServices(_securityTracker, _securityPlugins);
}
+ public Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> getAuthenticationManagerPlugins()
+ {
+ return getServices(_authenticationManagerTracker, _authenticationManagerPlugins);
+ }
+
public void close()
{
- if (_felix != null)
+ try
{
- try
+ // Close all bundle trackers
+ for(ServiceTracker tracker : _trackers)
{
- // Close all bundle trackers
- _exchangeTracker.close();
- _securityTracker.close();
- _configTracker.close();
- _virtualHostTracker.close();
- _policyTracker.close();
+ tracker.close();
}
- finally
+ }
+ finally
+ {
+ if (_felix != null)
{
- _logger.info("Stopping plugin manager");
+ _logger.info("Stopping plugin manager framework");
try
{
// FIXME should be stopAndWait() but hangs VM, need upgrade in felix
@@ -335,7 +386,12 @@ public class PluginManager implements Cl
{
// Ignore
}
- _logger.info("Stopped plugin manager");
+ _logger.info("Stopped plugin manager framework");
+ }
+ else
+ {
+ _logger.info("Plugin manager was started with an external BundleContext, " +
+ "skipping remaining shutdown tasks");
}
}
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Fri Oct 21 14:42:12 2011
@@ -20,14 +20,35 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.protocol.AMQConstant;
+import java.util.List;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.stats.StatisticsGatherer;
-public interface AMQConnectionModel
+public interface AMQConnectionModel extends StatisticsGatherer
{
+ /**
+ * get a unique id for this connection.
+ *
+ * @return a {@link UUID} representing the connection
+ */
+ public UUID getId();
+
+ /**
+ * Close the underlying Connection
+ *
+ * @param cause
+ * @param message
+ * @throws org.apache.qpid.AMQException
+ */
+ public void close(AMQConstant cause, String message) throws AMQException;
/**
* Close the given requested Session
+ *
* @param session
* @param cause
* @param message
@@ -36,4 +57,20 @@ public interface AMQConnectionModel
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
public long getConnectionId();
+
+ /**
+ * Get a list of all sessions using this connection.
+ *
+ * @return a list of {@link AMQSessionModel}s
+ */
+ public List<AMQSessionModel> getSessionModels();
+
+ /**
+ * Return a {@link LogSubject} for the connection.
+ */
+ public LogSubject getLogSubject();
+
+ public String getUserName();
+
+ public boolean isSessionNameUnique(String name);
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -30,18 +32,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
+import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
@@ -66,12 +66,10 @@ import org.apache.qpid.framing.MethodDis
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -90,21 +88,22 @@ import org.apache.qpid.server.management
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.NetworkConnection;
-public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
- private static final AtomicLong idGenerator = new AtomicLong(0);
-
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -134,7 +133,7 @@ public class AMQProtocolEngine implement
private Object _lastSent;
protected volatile boolean _closed;
-
+
// maximum number of channels this session should have
private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
@@ -146,47 +145,46 @@ public class AMQProtocolEngine implement
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
- private Principal _authorizedID;
+ private Subject _authorizedSubject;
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
- // Create a simple ID that increments for ever new Session
- private final long _sessionID = idGenerator.getAndIncrement();
+ private final long _sessionID;
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
- private NetworkDriver _networkDriver;
-
private long _lastIoTime;
private long _writtenBytes;
private long _readBytes;
- private Job _readJob;
- private Job _writeJob;
- private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
private final UUID _id;
private final ConfigStore _configStore;
private long _createTime = System.currentTimeMillis();
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
+
public ManagedObject getManagedObject()
{
return _managedObject;
}
- public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
- _networkDriver = driver;
-
_codecFactory = new AMQCodecFactory(true, this);
- _poolReference.acquireExecutorService();
- _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
- _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
+
+ setNetworkConnection(network);
+ _sessionID = connectionId;
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
@@ -195,9 +193,21 @@ public class AMQProtocolEngine implement
_configStore = virtualHostRegistry.getConfigStore();
_id = _configStore.createId();
-
_actor.message(ConnectionMessages.OPEN(null, null, false, false));
+ _registry = virtualHostRegistry.getApplicationRegistry();
+ initialiseStatistics();
+ }
+
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ setNetworkConnection(network, network.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
}
private AMQProtocolSessionMBean createMBean() throws JMException
@@ -236,26 +246,18 @@ public class AMQProtocolEngine implement
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+ for (AMQDataBlock dataBlock : dataBlocks)
{
- public void run()
+ try
{
- // Decode buffer
-
- for (AMQDataBlock dataBlock : dataBlocks)
- {
- try
- {
- dataBlockReceived(dataBlock);
- }
- catch (Exception e)
- {
- _logger.error("Unexpected exception when processing datablock", e);
- closeProtocolSession();
- }
- }
+ dataBlockReceived(dataBlock);
}
- });
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
+ }
+ }
}
catch (Exception e)
{
@@ -333,6 +335,11 @@ public class AMQProtocolEngine implement
closeChannel(channelId);
throw e;
}
+ catch (TransportException e)
+ {
+ closeChannel(channelId);
+ throw e;
+ }
}
finally
{
@@ -343,7 +350,7 @@ public class AMQProtocolEngine implement
private void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
- ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
try
{
// Log incomming protocol negotiation request
@@ -363,15 +370,49 @@ public class AMQProtocolEngine implement
null,
mechanisms.getBytes(),
locales.getBytes());
- _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
+ _sender.send(asByteBuffer(responseBody.generateFrame(0)));
+ _sender.flush();
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
+ _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
+ _sender.flush();
+ }
+ }
+
+ private ByteBuffer asByteBuffer(AMQDataBlock block)
+ {
+ final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+
+ try
+ {
+ block.writePayload(new DataOutputStream(new OutputStream()
+ {
+
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ buf.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ buf.put(b, off, len);
+ }
+ }));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
}
+
+ buf.flip();
+ return buf;
}
public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
@@ -426,19 +467,19 @@ public class AMQProtocolEngine implement
AMQConstant.CHANNEL_ERROR.getName().toString());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce, false);
+ closeConnection(channelId, ce);
}
}
catch (AMQConnectionException e)
{
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, e, false);
+ closeConnection(channelId, e);
}
catch (AMQSecurityException e)
{
AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce, false);
+ closeConnection(channelId, ce);
}
}
catch (Exception e)
@@ -481,19 +522,14 @@ public class AMQProtocolEngine implement
*
* @param frame the frame to write
*/
- public void writeFrame(AMQDataBlock frame)
+ public synchronized void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
- final ByteBuffer buf = frame.toNioByteBuffer();
+ final ByteBuffer buf = asByteBuffer(frame);
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
- {
- public void run()
- {
- _networkDriver.send(buf);
- }
- });
+ _sender.send(buf);
+ _sender.flush();
}
public AMQShortString getContextKey()
@@ -683,8 +719,8 @@ public class AMQProtocolEngine implement
{
if (delay > 0)
{
- _networkDriver.setMaxWriteIdle(delay);
- _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+ _network.setMaxWriteIdle(delay);
+ _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
}
}
@@ -725,7 +761,7 @@ public class AMQProtocolEngine implement
}
closeAllChannels();
-
+
getConfigStore().removeConfiguredObject(this);
if (_managedObject != null)
@@ -745,7 +781,6 @@ public class AMQProtocolEngine implement
_closed = true;
notifyAll();
}
- _poolReference.releaseExecutorService();
CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
}
}
@@ -768,27 +803,32 @@ public class AMQProtocolEngine implement
}
}
- public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
+ public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
{
- if (_logger.isInfoEnabled())
+ try
{
- _logger.info("Closing connection due to: " + e);
- }
-
- markChannelAwaitingCloseOk(channelId);
- closeSession();
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e);
+ }
- if (closeProtocolSession)
+ markChannelAwaitingCloseOk(channelId);
+ closeSession();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame(channelId));
+ }
+ finally
{
closeProtocolSession();
}
+
+
}
public void closeProtocolSession()
{
- _networkDriver.close();
+ _network.close();
+
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -797,11 +837,15 @@ public class AMQProtocolEngine implement
{
_logger.info(e.getMessage());
}
+ catch (TransportException e)
+ {
+ _logger.info(e.getMessage());
+ }
}
public String toString()
{
- return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
+ return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
}
public String dump()
@@ -823,17 +867,11 @@ public class AMQProtocolEngine implement
*/
public String getLocalFQDN()
{
- SocketAddress address = _networkDriver.getLocalAddress();
- // we use the vmpipe address in some tests hence the need for this rather ugly test. The host
- // information is used by SASL primary.
+ SocketAddress address = _network.getLocalAddress();
if (address instanceof InetSocketAddress)
{
return ((InetSocketAddress) address).getHostName();
}
- else if (address instanceof VmPipeAddress)
- {
- return "vmpipe:" + ((VmPipeAddress) address).getPort();
- }
else
{
throw new IllegalArgumentException("Unsupported socket address class: " + address);
@@ -912,7 +950,7 @@ public class AMQProtocolEngine implement
public Object getClientIdentifier()
{
- return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
+ return _network.getRemoteAddress();
}
public VirtualHost getVirtualHost()
@@ -925,7 +963,7 @@ public class AMQProtocolEngine implement
_virtualHost = virtualHost;
_virtualHost.getConnectionRegistry().registerConnection(this);
-
+
_configStore.addConfiguredObject(this);
try
@@ -954,29 +992,33 @@ public class AMQProtocolEngine implement
return _protocolOutputConverter;
}
- public void setAuthorizedID(Principal authorizedID)
+ public void setAuthorizedSubject(final Subject authorizedSubject)
{
- _authorizedID = authorizedID;
+ if (authorizedSubject == null)
+ {
+ throw new IllegalArgumentException("authorizedSubject cannot be null");
+ }
+ _authorizedSubject = authorizedSubject;
}
- public Principal getAuthorizedID()
+ public Subject getAuthorizedSubject()
{
- return _authorizedID;
+ return _authorizedSubject;
}
- public Principal getPrincipal()
+ public Principal getAuthorizedPrincipal()
{
- return _authorizedID;
+ return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
}
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
public MethodRegistry getMethodRegistry()
@@ -999,6 +1041,10 @@ public class AMQProtocolEngine implement
{
_logger.error("Could not close protocol engine", e);
}
+ catch (TransportException e)
+ {
+ _logger.error("Could not close protocol engine", e);
+ }
}
public void readerIdle()
@@ -1006,14 +1052,9 @@ public class AMQProtocolEngine implement
// Nothing
}
- public void setNetworkDriver(NetworkDriver driver)
- {
- _networkDriver = driver;
- }
-
public void writerIdle()
{
- _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
+ _sender.send(asByteBuffer(HeartbeatBody.FRAME));
}
public void exception(Throwable throwable)
@@ -1021,7 +1062,7 @@ public class AMQProtocolEngine implement
if (throwable instanceof AMQProtocolHeaderException)
{
writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
- _networkDriver.close();
+ _sender.close();
_logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
}
@@ -1039,7 +1080,7 @@ public class AMQProtocolEngine implement
writeFrame(closeBody.generateFrame(0));
- _networkDriver.close();
+ _sender.close();
}
}
@@ -1078,19 +1119,6 @@ public class AMQProtocolEngine implement
return (_clientVersion == null) ? null : _clientVersion.toString();
}
- public void closeIfLingeringClosedChannels()
- {
- for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
- {
- if (id.getValue() + 30000 > System.currentTimeMillis())
- {
- // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
- _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
- closeProtocolSession();
- }
- }
- }
-
public Boolean isIncoming()
{
return true;
@@ -1108,7 +1136,7 @@ public class AMQProtocolEngine implement
public String getAuthId()
{
- return getAuthorizedID().getName();
+ return getAuthorizedPrincipal().getName();
}
public Integer getRemotePID()
@@ -1170,7 +1198,7 @@ public class AMQProtocolEngine implement
{
return false;
}
-
+
public void mgmtClose()
{
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1263,7 +1291,6 @@ public class AMQProtocolEngine implement
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
-
closeChannel((Integer)session.getID());
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1273,6 +1300,110 @@ public class AMQProtocolEngine implement
new AMQShortString(message),
0,0);
- writeFrame(responseBody.generateFrame((Integer)session.getID()));
- }
+ writeFrame(responseBody.generateFrame((Integer)session.getID()));
+ }
+
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
+ getProtocolOutputConverter().getProtocolMajorVersion(),
+ getProtocolOutputConverter().getProtocolMinorVersion(),
+ (Throwable) null));
+ }
+
+ public List<AMQSessionModel> getSessionModels()
+ {
+ List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ for (AMQChannel channel : getChannels())
+ {
+ sessions.add((AMQSessionModel) channel);
+ }
+ return sessions;
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _virtualHost.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _virtualHost.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
+ _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
+ _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
+
+ @Override
+ public boolean isSessionNameUnique(String name)
+ {
+ return true;
+ }
+
+ @Override
+ public String getUserName()
+ {
+ return getAuthorizedPrincipal().getName();
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Oct 21 14:42:12 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol;
+import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.qpid.AMQException;
@@ -28,16 +29,15 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.security.Principal;
import java.util.List;
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel
{
long getSessionID();
@@ -163,8 +163,10 @@ public interface AMQProtocolSession exte
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
void closeSession() throws AMQException;
+ void closeProtocolSession();
+
/** This must be called to close the session in order to free up any resources managed by the session. */
- void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
+ void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
/** @return a key that uniquely identifies this session */
@@ -205,7 +207,7 @@ public interface AMQProtocolSession exte
public ProtocolOutputConverter getProtocolOutputConverter();
- void setAuthorizedID(Principal authorizedID);
+ void setAuthorizedSubject(Subject authorizedSubject);
public java.net.SocketAddress getRemoteAddress();
@@ -231,7 +233,5 @@ public interface AMQProtocolSession exte
List<AMQChannel> getChannels();
- void closeIfLingeringClosedChannels();
-
void mgmtCloseChannel(int channelId);
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Fri Oct 21 14:42:12 2011
@@ -37,25 +37,15 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
+import java.util.Date;
+import java.util.List;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.NotCompliantMBeanException;
import javax.management.Notification;
+import javax.management.ObjectName;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -66,8 +56,20 @@ import javax.management.openmbean.Simple
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import java.util.Date;
-import java.util.List;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
/**
* This MBean class implements the management interface. In order to make more attributes, operations and notifications
@@ -94,8 +96,7 @@ public class AMQProtocolSessionMBean ext
super(ManagedConnection.class, ManagedConnection.TYPE);
_protocolSession = amqProtocolSession;
String remote = getRemoteAddress();
- remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
- _name = jmxEncode(new StringBuffer(remote), 0).toString();
+ _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
init();
}
@@ -130,7 +131,7 @@ public class AMQProtocolSessionMBean ext
public String getAuthorizedId()
{
- return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null;
+ return (_protocolSession.getAuthorizedPrincipal() != null ) ? _protocolSession.getAuthorizedPrincipal().getName() : null;
}
public String getVersion()
@@ -175,7 +176,7 @@ public class AMQProtocolSessionMBean ext
public String getObjectInstanceName()
{
- return _name;
+ return ObjectName.quote(_name);
}
/**
@@ -339,4 +340,78 @@ public class AMQProtocolSessionMBean ext
_broadcaster.sendNotification(n);
}
-} // End of MBean class
+ public void resetStatistics() throws Exception
+ {
+ _protocolSession.resetStatistics();
+ }
+
+ public double getPeakMessageDeliveryRate()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getPeak();
+ }
+
+ public double getPeakDataDeliveryRate()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getPeak();
+ }
+
+ public double getMessageDeliveryRate()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getRate();
+ }
+
+ public double getDataDeliveryRate()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getRate();
+ }
+
+ public long getTotalMessagesDelivered()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getTotal();
+ }
+
+ public long getTotalDataDelivered()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getTotal();
+ }
+
+ public double getPeakMessageReceiptRate()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getPeak();
+ }
+
+ public double getPeakDataReceiptRate()
+ {
+ return _protocolSession.getDataReceiptStatistics().getPeak();
+ }
+
+ public double getMessageReceiptRate()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getRate();
+ }
+
+ public double getDataReceiptRate()
+ {
+ return _protocolSession.getDataReceiptStatistics().getRate();
+ }
+
+ public long getTotalMessagesReceived()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getTotal();
+ }
+
+ public long getTotalDataReceived()
+ {
+ return _protocolSession.getDataReceiptStatistics().getTotal();
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _protocolSession.isStatisticsEnabled();
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _protocolSession.setStatisticsEnabled(enabled);
+ }
+}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Oct 21 14:42:12 2011
@@ -20,15 +20,35 @@
*/
package org.apache.qpid.server.protocol;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogSubject;
public interface AMQSessionModel
{
- Object getID();
+ public Object getID();
- AMQConnectionModel getConnectionModel();
+ public AMQConnectionModel getConnectionModel();
- String getClientID();
+ public String getClientID();
+
+ public void close() throws AMQException;
- LogSubject getLogSubject();
+ public LogSubject getLogSubject();
+
+ /**
+ * This method is called from the housekeeping thread to check the status of
+ * transactions on this session and react appropriately.
+ *
+ * If a transaction is open for too long or idle for too long then a warning
+ * is logged or the connection is closed, depending on the configuration. An open
+ * transaction is one that has recent activity. The transaction age is counted
+ * from the time the transaction was started. An idle transaction is one that
+ * has had no activity, such as publishing or acknowledgeing messages.
+ *
+ * @param openWarn time in milliseconds before alerting on open transaction
+ * @param openClose time in milliseconds before closing connection with open transaction
+ * @param idleWarn time in milliseconds before alerting on idle transaction
+ * @param idleClose time in milliseconds before closing connection with idle transaction
+ */
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException;
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri Oct 21 14:42:12 2011
@@ -22,45 +22,54 @@ package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Set;
-public class MultiVersionProtocolEngine implements ProtocolEngine
+public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
+ private final long _id;
-
- private NetworkDriver _networkDriver;
- private Set<VERSION> _supported;
+ private Set<AmqpProtocolVersion> _supported;
private String _fqdn;
private IApplicationRegistry _appRegistry;
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
+
+ private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
- private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+ public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
+ String fqdn,
+ Set<AmqpProtocolVersion> supported,
+ NetworkConnection network,
+ long id)
+ {
+ this(appRegistry,fqdn,supported,id);
+ setNetworkConnection(network);
+ }
public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
String fqdn,
- Set<VERSION> supported, NetworkDriver networkDriver)
+ Set<AmqpProtocolVersion> supported,
+ long id)
{
+ _id = id;
_appRegistry = appRegistry;
_fqdn = fqdn;
_supported = supported;
- _networkDriver = networkDriver;
- }
- public void setNetworkDriver(NetworkDriver driver)
- {
- _delegate.setNetworkDriver(driver);
}
+
public SocketAddress getRemoteAddress()
{
return _delegate.getRemoteAddress();
@@ -96,6 +105,7 @@ public class MultiVersionProtocolEngine
_delegate.readerIdle();
}
+
public void received(ByteBuffer msg)
{
_delegate.received(msg);
@@ -106,6 +116,11 @@ public class MultiVersionProtocolEngine
_delegate.exception(t);
}
+ public long getConnectionId()
+ {
+ return _delegate.getConnectionId();
+ }
+
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
private static final byte[] AMQP_0_8_HEADER =
@@ -130,7 +145,7 @@ public class MultiVersionProtocolEngine
(byte) 9
};
-private static final byte[] AMQP_0_9_1_HEADER =
+ private static final byte[] AMQP_0_9_1_HEADER =
new byte[] { (byte) 'A',
(byte) 'M',
(byte) 'Q',
@@ -153,19 +168,31 @@ private static final byte[] AMQP_0_9_1_H
(byte) 10
};
+ public void setNetworkConnection(NetworkConnection networkConnection)
+ {
+ setNetworkConnection(networkConnection, networkConnection.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+ }
+
+
private static interface DelegateCreator
{
- VERSION getVersion();
+ AmqpProtocolVersion getVersion();
byte[] getHeaderIdentifier();
- ProtocolEngine getProtocolEngine();
+ ServerProtocolEngine getProtocolEngine();
}
private DelegateCreator creator_0_8 = new DelegateCreator()
{
- public VERSION getVersion()
+ public AmqpProtocolVersion getVersion()
{
- return VERSION.v0_8;
+ return AmqpProtocolVersion.v0_8;
}
public byte[] getHeaderIdentifier()
@@ -173,18 +200,18 @@ private static final byte[] AMQP_0_9_1_H
return AMQP_0_8_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
private DelegateCreator creator_0_9 = new DelegateCreator()
{
- public VERSION getVersion()
+ public AmqpProtocolVersion getVersion()
{
- return VERSION.v0_9;
+ return AmqpProtocolVersion.v0_9;
}
@@ -193,18 +220,18 @@ private static final byte[] AMQP_0_9_1_H
return AMQP_0_9_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
private DelegateCreator creator_0_9_1 = new DelegateCreator()
{
- public VERSION getVersion()
+ public AmqpProtocolVersion getVersion()
{
- return VERSION.v0_9_1;
+ return AmqpProtocolVersion.v0_9_1;
}
@@ -213,9 +240,9 @@ private static final byte[] AMQP_0_9_1_H
return AMQP_0_9_1_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
@@ -223,9 +250,9 @@ private static final byte[] AMQP_0_9_1_H
private DelegateCreator creator_0_10 = new DelegateCreator()
{
- public VERSION getVersion()
+ public AmqpProtocolVersion getVersion()
{
- return VERSION.v0_10;
+ return AmqpProtocolVersion.v0_10;
}
@@ -234,15 +261,15 @@ private static final byte[] AMQP_0_9_1_H
return AMQP_0_10_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
final ConnectionDelegate connDelegate =
new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
- ServerConnection conn = new ServerConnection();
+ ServerConnection conn = new ServerConnection(_id);
conn.setConnectionDelegate(connDelegate);
- return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry);
+ return new ProtocolEngine_0_10( conn, _network, _appRegistry);
}
};
@@ -250,21 +277,16 @@ private static final byte[] AMQP_0_9_1_H
new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
- private class ClosedDelegateProtocolEngine implements ProtocolEngine
+ private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
- public void setNetworkDriver(NetworkDriver driver)
- {
- _networkDriver = driver;
- }
-
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
public long getWrittenBytes()
@@ -301,26 +323,30 @@ private static final byte[] AMQP_0_9_1_H
{
}
- }
- private class SelfDelegateProtocolEngine implements ProtocolEngine
- {
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
- private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
+ }
- public void setNetworkDriver(NetworkDriver driver)
+ public long getConnectionId()
{
- _networkDriver = driver;
+ return _id;
}
+ }
+
+ private class SelfDelegateProtocolEngine implements ServerProtocolEngine
+ {
+ private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
public long getWrittenBytes()
@@ -355,7 +381,7 @@ private static final byte[] AMQP_0_9_1_H
_header.get(headerBytes);
- ProtocolEngine newDelegate = null;
+ ServerProtocolEngine newDelegate = null;
byte[] newestSupported = null;
for(int i = 0; newDelegate == null && i < _creators.length; i++)
@@ -380,17 +406,20 @@ private static final byte[] AMQP_0_9_1_H
// If no delegate is found then send back the most recent support protocol version id
if(newDelegate == null)
{
- _networkDriver.send(ByteBuffer.wrap(newestSupported));
+ _sender.send(ByteBuffer.wrap(newestSupported));
+ _sender.flush();
_delegate = new ClosedDelegateProtocolEngine();
+
+ _network.close();
+
}
else
{
- newDelegate.setNetworkDriver(_networkDriver);
-
_delegate = newDelegate;
_header.flip();
+ _delegate.setNetworkConnection(_network, _sender);
_delegate.received(_header);
if(msg.hasRemaining())
{
@@ -402,6 +431,11 @@ private static final byte[] AMQP_0_9_1_H
}
+ public long getConnectionId()
+ {
+ return _id;
+ }
+
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
@@ -421,5 +455,10 @@ private static final byte[] AMQP_0_9_1_H
{
}
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+
+ }
}
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Fri Oct 21 14:42:12 2011
@@ -20,56 +20,38 @@
*/
package org.apache.qpid.server.protocol;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-
-import java.util.Set;
-import java.util.Arrays;
-import java.util.HashSet;
+import org.apache.qpid.transport.network.NetworkConnection;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
- ;
-
-
- public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 };
-
- private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
private final IApplicationRegistry _appRegistry;
private final String _fqdn;
- private final Set<VERSION> _supported;
-
+ private final Set<AmqpProtocolVersion> _supported;
- public MultiVersionProtocolEngineFactory()
+ public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
{
- this(1, "localhost", ALL_VERSIONS);
- }
-
- public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions)
- {
- this(1, fqdn, versions);
+ _appRegistry = ApplicationRegistry.getInstance();
+ _fqdn = fqdn;
+ _supported = supportedVersions;
}
-
- public MultiVersionProtocolEngineFactory(String fqdn)
+ public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
{
- this(1, fqdn, ALL_VERSIONS);
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
}
- public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions)
+ public ServerProtocolEngine newProtocolEngine()
{
- _appRegistry = ApplicationRegistry.getInstance(instance);
- _fqdn = fqdn;
- _supported = supportedVersions;
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement());
}
-
- public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
- {
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
- }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org