You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [22/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java Fri Oct 21 01:19:00 2011
@@ -18,56 +18,39 @@
*/
package org.apache.qpid.server.plugins;
-import static org.apache.felix.framework.util.FelixConstants.SYSTEMBUNDLE_ACTIVATORS_PROP;
-import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_ACTION_PROPERY;
-import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_DIR_PROPERY;
-import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_INSTALL_VALUE;
-import static org.apache.felix.main.AutoProcessor.AUTO_DEPLOY_START_VALUE;
-import static org.apache.felix.main.AutoProcessor.process;
-import static org.osgi.framework.Constants.FRAMEWORK_STORAGE;
-import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN;
-import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT;
-import static org.osgi.framework.Constants.FRAMEWORK_SYSTEMPACKAGES;
+import static org.apache.felix.framework.util.FelixConstants.*;
+import static org.apache.felix.main.AutoProcessor.*;
import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.felix.framework.Felix;
import org.apache.felix.framework.util.StringMap;
import org.apache.log4j.Logger;
import org.apache.qpid.common.Closeable;
-import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.configuration.TopicConfiguration;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SecurityPluginFactory;
import org.apache.qpid.server.security.access.plugins.AllowAll;
import org.apache.qpid.server.security.access.plugins.DenyAll;
import org.apache.qpid.server.security.access.plugins.LegacyAccess;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
+import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection;
import org.apache.qpid.server.virtualhost.plugins.policies.TopicDeletePolicy;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
-import org.apache.qpid.util.FileUtils;
import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleException;
-import org.osgi.framework.Version;
import org.osgi.framework.launch.Framework;
import org.osgi.util.tracker.ServiceTracker;
@@ -80,6 +63,7 @@ public class PluginManager implements Cl
private static final Logger _logger = Logger.getLogger(PluginManager.class);
private static final int FELIX_STOP_TIMEOUT = 30000;
+ private static final String QPID_VER_SUFFIX = "version=0.9,";
private Framework _felix;
@@ -88,61 +72,15 @@ public class PluginManager implements Cl
private ServiceTracker _configTracker = null;
private ServiceTracker _virtualHostTracker = null;
private ServiceTracker _policyTracker = null;
- private ServiceTracker _authenticationManagerTracker = null;
private Activator _activator;
- private final List<ServiceTracker> _trackers = new ArrayList<ServiceTracker>();
private Map<String, SecurityPluginFactory> _securityPlugins = new HashMap<String, SecurityPluginFactory>();
private Map<List<String>, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap<List<String>, ConfigurationPluginFactory>();
private Map<String, VirtualHostPluginFactory> _vhostPlugins = new HashMap<String, VirtualHostPluginFactory>();
private Map<String, SlowConsumerPolicyPluginFactory> _policyPlugins = new HashMap<String, SlowConsumerPolicyPluginFactory>();
- private Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> _authenticationManagerPlugins = new HashMap<String, AuthenticationManagerPluginFactory<? extends Plugin>>();
- /** The default name of the OSGI system package list. */
- private static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/server/plugins/OsgiSystemPackages.properties";
-
- /** The name of the override system property that holds the name of the OSGI system package list. */
- private static final String FILE_PROPERTY = "qpid.osgisystempackages.properties";
-
- private static final String OSGI_SYSTEM_PACKAGES;
-
- static
- {
- final String filename = System.getProperty(FILE_PROPERTY);
- final InputStream is = FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
- PluginManager.class.getClassLoader());
-
- try
- {
- Version qpidReleaseVersion;
- try
- {
- qpidReleaseVersion = Version.parseVersion(QpidProperties.getReleaseVersion());
- }
- catch (IllegalArgumentException iae)
- {
- qpidReleaseVersion = null;
- }
-
- final Properties p = new Properties();
- p.load(is);
-
- final OsgiSystemPackageUtil osgiSystemPackageUtil = new OsgiSystemPackageUtil(qpidReleaseVersion, (Map)p);
-
- OSGI_SYSTEM_PACKAGES = osgiSystemPackageUtil.getFormattedSystemPackageString();
-
- _logger.debug("List of OSGi system packages to be added: " + OSGI_SYSTEM_PACKAGES);
- }
- catch (IOException e)
- {
- _logger.error("Error reading OSGI system package list", e);
- throw new ExceptionInInitializerError(e);
- }
- }
-
-
- public PluginManager(String pluginPath, String cachePath, BundleContext bundleContext) throws Exception
+ public PluginManager(String pluginPath, String cachePath) throws Exception
{
// Store all non-OSGi plugins
// A little gross that we have to add them here, but not all the plugins are OSGIfied
@@ -159,8 +97,7 @@ public class PluginManager implements Cl
LegacyAccess.LegacyAccessConfiguration.FACTORY,
new SlowConsumerDetectionConfigurationFactory(),
new SlowConsumerDetectionPolicyConfigurationFactory(),
- new SlowConsumerDetectionQueueConfigurationFactory(),
- PrincipalDatabaseAuthenticationManager.PrincipalDatabaseAuthenticationManagerConfiguration.FACTORY))
+ new SlowConsumerDetectionQueueConfigurationFactory()))
{
_configPlugins.put(configFactory.getParentPaths(), configFactory);
}
@@ -175,109 +112,125 @@ public class PluginManager implements Cl
_vhostPlugins.put(pluginFactory.getClass().getName(), pluginFactory);
}
- for (AuthenticationManagerPluginFactory<? extends Plugin> pluginFactory : Arrays.asList(
- PrincipalDatabaseAuthenticationManager.FACTORY))
+ // Check the plugin directory path is set and exist
+ if (pluginPath == null)
{
- _authenticationManagerPlugins.put(pluginFactory.getPluginName(), pluginFactory);
+ return;
}
-
- if(bundleContext == null)
+ File pluginDir = new File(pluginPath);
+ if (!pluginDir.exists())
{
- // Check the plugin directory path is set and exist
- if (pluginPath == null)
- {
- _logger.info("No plugin path specified, no plugins will be loaded.");
- return;
- }
- File pluginDir = new File(pluginPath);
- if (!pluginDir.exists())
- {
- _logger.warn("Plugin dir : " + pluginDir + " does not exist.");
- return;
- }
-
- // Add the bundle provided service interface package and the core OSGi
- // packages to be exported from the class path via the system bundle.
-
- // Setup OSGi configuration property map
- final StringMap configMap = new StringMap(false);
- configMap.put(FRAMEWORK_SYSTEMPACKAGES, OSGI_SYSTEM_PACKAGES);
-
- // No automatic shutdown hook
- configMap.put("felix.shutdown.hook", "false");
-
- // Add system activator
- List<BundleActivator> activators = new ArrayList<BundleActivator>();
- _activator = new Activator();
- activators.add(_activator);
- configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
-
- if (cachePath != null)
- {
- File cacheDir = new File(cachePath);
- if (!cacheDir.exists() && cacheDir.canWrite())
- {
- _logger.info("Creating plugin cache directory: " + cachePath);
- cacheDir.mkdir();
- }
-
- // Set plugin cache directory and empty it
- _logger.info("Cache bundles in directory " + cachePath);
- configMap.put(FRAMEWORK_STORAGE, cachePath);
- }
- configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
-
- // Set directory with plugins to auto-deploy
- _logger.info("Auto deploying bundles from directory " + pluginPath);
- configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
- configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);
+ return;
+ }
+
+ // Setup OSGi configuration propery map
+ StringMap configMap = new StringMap(false);
+
+ // Add the bundle provided service interface package and the core OSGi
+ // packages to be exported from the class path via the system bundle.
+ configMap.put(FRAMEWORK_SYSTEMPACKAGES,
+ "org.osgi.framework; version=1.3.0," +
+ "org.osgi.service.packageadmin; version=1.2.0," +
+ "org.osgi.service.startlevel; version=1.0.0," +
+ "org.osgi.service.url; version=1.0.0," +
+ "org.osgi.util.tracker; version=1.0.0," +
+ "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX +
+ "org.apache.qpid; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.common; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.exchange; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.framing; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.protocol; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.management; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.security; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX +
+ "org.apache.qpid.util; " + QPID_VER_SUFFIX +
+ "org.apache.commons.configuration; version=1.0.0," +
+ "org.apache.commons.lang; version=1.0.0," +
+ "org.apache.commons.lang.builder; version=1.0.0," +
+ "org.apache.commons.logging; version=1.0.0," +
+ "org.apache.log4j; version=1.2.12," +
+ "javax.management.openmbean; version=1.0.0," +
+ "javax.management; version=1.0.0"
+ );
+
+ // No automatic shutdown hook
+ configMap.put("felix.shutdown.hook", "false");
+
+ // Add system activator
+ List<BundleActivator> activators = new ArrayList<BundleActivator>();
+ _activator = new Activator();
+ activators.add(_activator);
+ configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
- // Start plugin manager
- _felix = new Felix(configMap);
- try
- {
- _logger.info("Starting plugin manager framework");
- _felix.init();
- process(configMap, _felix.getBundleContext());
- _felix.start();
- _logger.info("Started plugin manager framework");
- }
- catch (BundleException e)
+ if (cachePath != null)
+ {
+ File cacheDir = new File(cachePath);
+ if (!cacheDir.exists() && cacheDir.canWrite())
{
- throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
+ _logger.info("Creating plugin cache directory: " + cachePath);
+ cacheDir.mkdir();
}
-
- bundleContext = _activator.getContext();
+
+ // Set plugin cache directory and empty it
+ _logger.info("Cache bundles in directory " + cachePath);
+ configMap.put(FRAMEWORK_STORAGE, cachePath);
+ }
+ configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
+
+ // Set directory with plugins to auto-deploy
+ _logger.info("Auto deploying bundles from directory " + pluginPath);
+ configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
+ configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);
+
+ // Start plugin manager and trackers
+ _felix = new Felix(configMap);
+ try
+ {
+ _logger.info("Starting plugin manager...");
+ _felix.init();
+ process(configMap, _felix.getBundleContext());
+ _felix.start();
+ _logger.info("Started plugin manager");
}
- else
+ catch (BundleException e)
{
- _logger.info("Using the specified external BundleContext");
+ throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
}
-
- _exchangeTracker = new ServiceTracker(bundleContext, ExchangeType.class.getName(), null);
+
+ // TODO save trackers in a map, keyed by class name
+
+ _exchangeTracker = new ServiceTracker(_activator.getContext(), ExchangeType.class.getName(), null);
_exchangeTracker.open();
- _trackers.add(_exchangeTracker);
- _securityTracker = new ServiceTracker(bundleContext, SecurityPluginFactory.class.getName(), null);
+ _securityTracker = new ServiceTracker(_activator.getContext(), SecurityPluginFactory.class.getName(), null);
_securityTracker.open();
- _trackers.add(_securityTracker);
- _configTracker = new ServiceTracker(bundleContext, ConfigurationPluginFactory.class.getName(), null);
+ _configTracker = new ServiceTracker(_activator.getContext(), ConfigurationPluginFactory.class.getName(), null);
_configTracker.open();
- _trackers.add(_configTracker);
- _virtualHostTracker = new ServiceTracker(bundleContext, VirtualHostPluginFactory.class.getName(), null);
+ _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null);
_virtualHostTracker.open();
- _trackers.add(_virtualHostTracker);
- _policyTracker = new ServiceTracker(bundleContext, SlowConsumerPolicyPluginFactory.class.getName(), null);
+ _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null);
_policyTracker.open();
- _trackers.add(_policyTracker);
-
- _authenticationManagerTracker = new ServiceTracker(bundleContext, AuthenticationManagerPluginFactory.class.getName(), null);
- _authenticationManagerTracker.open();
- _trackers.add(_authenticationManagerTracker);
-
+
_logger.info("Opened service trackers");
}
@@ -348,26 +301,22 @@ public class PluginManager implements Cl
return getServices(_securityTracker, _securityPlugins);
}
- public Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> getAuthenticationManagerPlugins()
- {
- return getServices(_authenticationManagerTracker, _authenticationManagerPlugins);
- }
-
public void close()
{
- try
+ if (_felix != null)
{
- // Close all bundle trackers
- for(ServiceTracker tracker : _trackers)
+ try
{
- tracker.close();
+ // Close all bundle trackers
+ _exchangeTracker.close();
+ _securityTracker.close();
+ _configTracker.close();
+ _virtualHostTracker.close();
+ _policyTracker.close();
}
- }
- finally
- {
- if (_felix != null)
+ finally
{
- _logger.info("Stopping plugin manager framework");
+ _logger.info("Stopping plugin manager");
try
{
// FIXME should be stopAndWait() but hangs VM, need upgrade in felix
@@ -386,12 +335,7 @@ public class PluginManager implements Cl
{
// Ignore
}
- _logger.info("Stopped plugin manager framework");
- }
- else
- {
- _logger.info("Plugin manager was started with an external BundleContext, " +
- "skipping remaining shutdown tasks");
+ _logger.info("Stopped plugin manager");
}
}
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Fri Oct 21 01:19:00 2011
@@ -20,35 +20,14 @@
*/
package org.apache.qpid.server.protocol;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.AMQException;
-public interface AMQConnectionModel extends StatisticsGatherer
+public interface AMQConnectionModel
{
- /**
- * get a unique id for this connection.
- *
- * @return a {@link UUID} representing the connection
- */
- public UUID getId();
-
- /**
- * Close the underlying Connection
- *
- * @param cause
- * @param message
- * @throws org.apache.qpid.AMQException
- */
- public void close(AMQConstant cause, String message) throws AMQException;
/**
* Close the given requested Session
- *
* @param session
* @param cause
* @param message
@@ -57,20 +36,4 @@ public interface AMQConnectionModel exte
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
public long getConnectionId();
-
- /**
- * Get a list of all sessions using this connection.
- *
- * @return a list of {@link AMQSessionModel}s
- */
- public List<AMQSessionModel> getSessionModels();
-
- /**
- * Return a {@link LogSubject} for the connection.
- */
- public LogSubject getLogSubject();
-
- public String getUserName();
-
- public boolean isSessionNameUnique(String name);
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.server.protocol;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -32,16 +30,18 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
-import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
@@ -66,10 +66,12 @@ import org.apache.qpid.framing.MethodDis
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -88,22 +90,21 @@ import org.apache.qpid.server.management
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.NetworkConnection;
-public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -133,7 +134,7 @@ public class AMQProtocolEngine implement
private Object _lastSent;
protected volatile boolean _closed;
-
+
// maximum number of channels this session should have
private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
@@ -145,46 +146,47 @@ public class AMQProtocolEngine implement
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
- private Subject _authorizedSubject;
+ private Principal _authorizedID;
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
- private final long _sessionID;
+ // Create a simple ID that increments for ever new Session
+ private final long _sessionID = idGenerator.getAndIncrement();
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
+ private NetworkDriver _networkDriver;
+
private long _lastIoTime;
private long _writtenBytes;
private long _readBytes;
+ private Job _readJob;
+ private Job _writeJob;
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
private final UUID _id;
private final ConfigStore _configStore;
private long _createTime = System.currentTimeMillis();
- private ApplicationRegistry _registry;
- private boolean _statisticsEnabled = false;
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
-
- private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
-
public ManagedObject getManagedObject()
{
return _managedObject;
}
- public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
- _codecFactory = new AMQCodecFactory(true, this);
+ _networkDriver = driver;
- setNetworkConnection(network);
- _sessionID = connectionId;
+ _codecFactory = new AMQCodecFactory(true, this);
+ _poolReference.acquireExecutorService();
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
@@ -193,21 +195,9 @@ public class AMQProtocolEngine implement
_configStore = virtualHostRegistry.getConfigStore();
_id = _configStore.createId();
- _actor.message(ConnectionMessages.OPEN(null, null, false, false));
- _registry = virtualHostRegistry.getApplicationRegistry();
- initialiseStatistics();
- }
-
- public void setNetworkConnection(NetworkConnection network)
- {
- setNetworkConnection(network, network.getSender());
- }
+ _actor.message(ConnectionMessages.OPEN(null, null, false, false));
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
- _network = network;
- _sender = sender;
}
private AMQProtocolSessionMBean createMBean() throws JMException
@@ -246,18 +236,26 @@ public class AMQProtocolEngine implement
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- for (AMQDataBlock dataBlock : dataBlocks)
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
- try
- {
- dataBlockReceived(dataBlock);
- }
- catch (Exception e)
+ public void run()
{
- _logger.error("Unexpected exception when processing datablock", e);
- closeProtocolSession();
+ // Decode buffer
+
+ for (AMQDataBlock dataBlock : dataBlocks)
+ {
+ try
+ {
+ dataBlockReceived(dataBlock);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
+ }
+ }
}
- }
+ });
}
catch (Exception e)
{
@@ -335,11 +333,6 @@ public class AMQProtocolEngine implement
closeChannel(channelId);
throw e;
}
- catch (TransportException e)
- {
- closeChannel(channelId);
- throw e;
- }
}
finally
{
@@ -350,7 +343,7 @@ public class AMQProtocolEngine implement
private void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
- (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
try
{
// Log incomming protocol negotiation request
@@ -370,49 +363,15 @@ public class AMQProtocolEngine implement
null,
mechanisms.getBytes(),
locales.getBytes());
- _sender.send(asByteBuffer(responseBody.generateFrame(0)));
- _sender.flush();
+ _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
- _sender.flush();
- }
- }
-
- private ByteBuffer asByteBuffer(AMQDataBlock block)
- {
- final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
-
- try
- {
- block.writePayload(new DataOutputStream(new OutputStream()
- {
-
-
- @Override
- public void write(int b) throws IOException
- {
- buf.put((byte) b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- buf.put(b, off, len);
- }
- }));
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
}
-
- buf.flip();
- return buf;
}
public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
@@ -467,19 +426,19 @@ public class AMQProtocolEngine implement
AMQConstant.CHANNEL_ERROR.getName().toString());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce);
+ closeConnection(channelId, ce, false);
}
}
catch (AMQConnectionException e)
{
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, e);
+ closeConnection(channelId, e, false);
}
catch (AMQSecurityException e)
{
AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
- closeConnection(channelId, ce);
+ closeConnection(channelId, ce, false);
}
}
catch (Exception e)
@@ -522,14 +481,19 @@ public class AMQProtocolEngine implement
*
* @param frame the frame to write
*/
- public synchronized void writeFrame(AMQDataBlock frame)
+ public void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
- final ByteBuffer buf = asByteBuffer(frame);
+ final ByteBuffer buf = frame.toNioByteBuffer();
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- _sender.send(buf);
- _sender.flush();
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
+ {
+ public void run()
+ {
+ _networkDriver.send(buf);
+ }
+ });
}
public AMQShortString getContextKey()
@@ -719,8 +683,8 @@ public class AMQProtocolEngine implement
{
if (delay > 0)
{
- _network.setMaxWriteIdle(delay);
- _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+ _networkDriver.setMaxWriteIdle(delay);
+ _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
}
}
@@ -761,7 +725,7 @@ public class AMQProtocolEngine implement
}
closeAllChannels();
-
+
getConfigStore().removeConfiguredObject(this);
if (_managedObject != null)
@@ -781,6 +745,7 @@ public class AMQProtocolEngine implement
_closed = true;
notifyAll();
}
+ _poolReference.releaseExecutorService();
CurrentActor.get().message(_logSubject, ConnectionMessages.CLOSE());
}
}
@@ -803,32 +768,27 @@ public class AMQProtocolEngine implement
}
}
- public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+ public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
{
- try
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing connection due to: " + e);
- }
-
- markChannelAwaitingCloseOk(channelId);
- closeSession();
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
+ _logger.info("Closing connection due to: " + e);
}
- finally
+
+ markChannelAwaitingCloseOk(channelId);
+ closeSession();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame(channelId));
+
+ if (closeProtocolSession)
{
closeProtocolSession();
}
-
-
}
public void closeProtocolSession()
{
- _network.close();
-
+ _networkDriver.close();
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -837,15 +797,11 @@ public class AMQProtocolEngine implement
{
_logger.info(e.getMessage());
}
- catch (TransportException e)
- {
- _logger.info(e.getMessage());
- }
}
public String toString()
{
- return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
+ return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
}
public String dump()
@@ -867,11 +823,17 @@ public class AMQProtocolEngine implement
*/
public String getLocalFQDN()
{
- SocketAddress address = _network.getLocalAddress();
+ SocketAddress address = _networkDriver.getLocalAddress();
+ // we use the vmpipe address in some tests hence the need for this rather ugly test. The host
+ // information is used by SASL primary.
if (address instanceof InetSocketAddress)
{
return ((InetSocketAddress) address).getHostName();
}
+ else if (address instanceof VmPipeAddress)
+ {
+ return "vmpipe:" + ((VmPipeAddress) address).getPort();
+ }
else
{
throw new IllegalArgumentException("Unsupported socket address class: " + address);
@@ -950,7 +912,7 @@ public class AMQProtocolEngine implement
public Object getClientIdentifier()
{
- return _network.getRemoteAddress();
+ return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
}
public VirtualHost getVirtualHost()
@@ -963,7 +925,7 @@ public class AMQProtocolEngine implement
_virtualHost = virtualHost;
_virtualHost.getConnectionRegistry().registerConnection(this);
-
+
_configStore.addConfiguredObject(this);
try
@@ -992,33 +954,29 @@ public class AMQProtocolEngine implement
return _protocolOutputConverter;
}
- public void setAuthorizedSubject(final Subject authorizedSubject)
+ public void setAuthorizedID(Principal authorizedID)
{
- if (authorizedSubject == null)
- {
- throw new IllegalArgumentException("authorizedSubject cannot be null");
- }
- _authorizedSubject = authorizedSubject;
+ _authorizedID = authorizedID;
}
- public Subject getAuthorizedSubject()
+ public Principal getAuthorizedID()
{
- return _authorizedSubject;
+ return _authorizedID;
}
- public Principal getAuthorizedPrincipal()
+ public Principal getPrincipal()
{
- return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+ return _authorizedID;
}
public SocketAddress getRemoteAddress()
{
- return _network.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _network.getLocalAddress();
+ return _networkDriver.getLocalAddress();
}
public MethodRegistry getMethodRegistry()
@@ -1041,10 +999,6 @@ public class AMQProtocolEngine implement
{
_logger.error("Could not close protocol engine", e);
}
- catch (TransportException e)
- {
- _logger.error("Could not close protocol engine", e);
- }
}
public void readerIdle()
@@ -1052,9 +1006,14 @@ public class AMQProtocolEngine implement
// Nothing
}
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
public void writerIdle()
{
- _sender.send(asByteBuffer(HeartbeatBody.FRAME));
+ _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
}
public void exception(Throwable throwable)
@@ -1062,7 +1021,7 @@ public class AMQProtocolEngine implement
if (throwable instanceof AMQProtocolHeaderException)
{
writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
- _sender.close();
+ _networkDriver.close();
_logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
}
@@ -1080,7 +1039,7 @@ public class AMQProtocolEngine implement
writeFrame(closeBody.generateFrame(0));
- _sender.close();
+ _networkDriver.close();
}
}
@@ -1119,6 +1078,19 @@ public class AMQProtocolEngine implement
return (_clientVersion == null) ? null : _clientVersion.toString();
}
+ public void closeIfLingeringClosedChannels()
+ {
+ for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
+ {
+ if (id.getValue() + 30000 > System.currentTimeMillis())
+ {
+ // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
+ _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
+ closeProtocolSession();
+ }
+ }
+ }
+
public Boolean isIncoming()
{
return true;
@@ -1136,7 +1108,7 @@ public class AMQProtocolEngine implement
public String getAuthId()
{
- return getAuthorizedPrincipal().getName();
+ return getAuthorizedID().getName();
}
public Integer getRemotePID()
@@ -1198,7 +1170,7 @@ public class AMQProtocolEngine implement
{
return false;
}
-
+
public void mgmtClose()
{
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1291,6 +1263,7 @@ public class AMQProtocolEngine implement
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
{
+
closeChannel((Integer)session.getID());
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1300,110 +1273,6 @@ public class AMQProtocolEngine implement
new AMQShortString(message),
0,0);
- writeFrame(responseBody.generateFrame((Integer)session.getID()));
- }
-
- public void close(AMQConstant cause, String message) throws AMQException
- {
- closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
- getProtocolOutputConverter().getProtocolMajorVersion(),
- getProtocolOutputConverter().getProtocolMinorVersion(),
- (Throwable) null));
- }
-
- public List<AMQSessionModel> getSessionModels()
- {
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
- for (AMQChannel channel : getChannels())
- {
- sessions.add((AMQSessionModel) channel);
- }
- return sessions;
- }
-
- public LogSubject getLogSubject()
- {
- return _logSubject;
- }
-
- public void registerMessageDelivered(long messageSize)
- {
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
- _virtualHost.registerMessageDelivered(messageSize);
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
- _virtualHost.registerMessageReceived(messageSize, timestamp);
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
- }
-
- public void initialiseStatistics()
- {
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
- _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
- _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
- _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
- }
-
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
-
- @Override
- public boolean isSessionNameUnique(String name)
- {
- return true;
- }
-
- @Override
- public String getUserName()
- {
- return getAuthorizedPrincipal().getName();
- }
+ writeFrame(responseBody.generateFrame((Integer)session.getID()));
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Oct 21 01:19:00 2011
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol;
-import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.qpid.AMQException;
@@ -29,15 +28,16 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.security.Principal;
import java.util.List;
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel
{
long getSessionID();
@@ -163,10 +163,8 @@ public interface AMQProtocolSession exte
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
void closeSession() throws AMQException;
- void closeProtocolSession();
-
/** This must be called to close the session in order to free up any resources managed by the session. */
- void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
+ void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
/** @return a key that uniquely identifies this session */
@@ -207,7 +205,7 @@ public interface AMQProtocolSession exte
public ProtocolOutputConverter getProtocolOutputConverter();
- void setAuthorizedSubject(Subject authorizedSubject);
+ void setAuthorizedID(Principal authorizedID);
public java.net.SocketAddress getRemoteAddress();
@@ -233,5 +231,7 @@ public interface AMQProtocolSession exte
List<AMQChannel> getChannels();
+ void closeIfLingeringClosedChannels();
+
void mgmtCloseChannel(int channelId);
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Fri Oct 21 01:19:00 2011
@@ -37,15 +37,25 @@
*/
package org.apache.qpid.server.protocol;
-import java.util.Date;
-import java.util.List;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.NotCompliantMBeanException;
import javax.management.Notification;
-import javax.management.ObjectName;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -56,20 +66,8 @@ import javax.management.openmbean.Simple
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
+import java.util.Date;
+import java.util.List;
/**
* This MBean class implements the management interface. In order to make more attributes, operations and notifications
@@ -96,7 +94,8 @@ public class AMQProtocolSessionMBean ext
super(ManagedConnection.class, ManagedConnection.TYPE);
_protocolSession = amqProtocolSession;
String remote = getRemoteAddress();
- _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
+ remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
+ _name = jmxEncode(new StringBuffer(remote), 0).toString();
init();
}
@@ -131,7 +130,7 @@ public class AMQProtocolSessionMBean ext
public String getAuthorizedId()
{
- return (_protocolSession.getAuthorizedPrincipal() != null ) ? _protocolSession.getAuthorizedPrincipal().getName() : null;
+ return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null;
}
public String getVersion()
@@ -176,7 +175,7 @@ public class AMQProtocolSessionMBean ext
public String getObjectInstanceName()
{
- return ObjectName.quote(_name);
+ return _name;
}
/**
@@ -340,78 +339,4 @@ public class AMQProtocolSessionMBean ext
_broadcaster.sendNotification(n);
}
- public void resetStatistics() throws Exception
- {
- _protocolSession.resetStatistics();
- }
-
- public double getPeakMessageDeliveryRate()
- {
- return _protocolSession.getMessageDeliveryStatistics().getPeak();
- }
-
- public double getPeakDataDeliveryRate()
- {
- return _protocolSession.getDataDeliveryStatistics().getPeak();
- }
-
- public double getMessageDeliveryRate()
- {
- return _protocolSession.getMessageDeliveryStatistics().getRate();
- }
-
- public double getDataDeliveryRate()
- {
- return _protocolSession.getDataDeliveryStatistics().getRate();
- }
-
- public long getTotalMessagesDelivered()
- {
- return _protocolSession.getMessageDeliveryStatistics().getTotal();
- }
-
- public long getTotalDataDelivered()
- {
- return _protocolSession.getDataDeliveryStatistics().getTotal();
- }
-
- public double getPeakMessageReceiptRate()
- {
- return _protocolSession.getMessageReceiptStatistics().getPeak();
- }
-
- public double getPeakDataReceiptRate()
- {
- return _protocolSession.getDataReceiptStatistics().getPeak();
- }
-
- public double getMessageReceiptRate()
- {
- return _protocolSession.getMessageReceiptStatistics().getRate();
- }
-
- public double getDataReceiptRate()
- {
- return _protocolSession.getDataReceiptStatistics().getRate();
- }
-
- public long getTotalMessagesReceived()
- {
- return _protocolSession.getMessageReceiptStatistics().getTotal();
- }
-
- public long getTotalDataReceived()
- {
- return _protocolSession.getDataReceiptStatistics().getTotal();
- }
-
- public boolean isStatisticsEnabled()
- {
- return _protocolSession.isStatisticsEnabled();
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _protocolSession.setStatisticsEnabled(enabled);
- }
-}
+} // End of MBean class
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Oct 21 01:19:00 2011
@@ -20,35 +20,15 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogSubject;
public interface AMQSessionModel
{
- public Object getID();
+ Object getID();
- public AMQConnectionModel getConnectionModel();
+ AMQConnectionModel getConnectionModel();
- public String getClientID();
-
- public void close() throws AMQException;
+ String getClientID();
- public LogSubject getLogSubject();
-
- /**
- * This method is called from the housekeeping thread to check the status of
- * transactions on this session and react appropriately.
- *
- * If a transaction is open for too long or idle for too long then a warning
- * is logged or the connection is closed, depending on the configuration. An open
- * transaction is one that has recent activity. The transaction age is counted
- * from the time the transaction was started. An idle transaction is one that
- * has had no activity, such as publishing or acknowledgeing messages.
- *
- * @param openWarn time in milliseconds before alerting on open transaction
- * @param openClose time in milliseconds before closing connection with open transaction
- * @param idleWarn time in milliseconds before alerting on idle transaction
- * @param idleClose time in milliseconds before closing connection with idle transaction
- */
- public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException;
+ LogSubject getLogSubject();
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri Oct 21 01:19:00 2011
@@ -22,53 +22,44 @@ package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.NetworkDriver;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Set;
-public class MultiVersionProtocolEngine implements ServerProtocolEngine
+public class MultiVersionProtocolEngine implements ProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
- private final long _id;
- private Set<AmqpProtocolVersion> _supported;
+
+ private NetworkDriver _networkDriver;
+ private Set<VERSION> _supported;
private String _fqdn;
private IApplicationRegistry _appRegistry;
- private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
-
- private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
- public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
- String fqdn,
- Set<AmqpProtocolVersion> supported,
- NetworkConnection network,
- long id)
- {
- this(appRegistry,fqdn,supported,id);
- setNetworkConnection(network);
- }
+ private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
String fqdn,
- Set<AmqpProtocolVersion> supported,
- long id)
+ Set<VERSION> supported, NetworkDriver networkDriver)
{
- _id = id;
_appRegistry = appRegistry;
_fqdn = fqdn;
_supported = supported;
-
+ _networkDriver = networkDriver;
}
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _delegate.setNetworkDriver(driver);
+ }
public SocketAddress getRemoteAddress()
{
@@ -105,7 +96,6 @@ public class MultiVersionProtocolEngine
_delegate.readerIdle();
}
-
public void received(ByteBuffer msg)
{
_delegate.received(msg);
@@ -116,11 +106,6 @@ public class MultiVersionProtocolEngine
_delegate.exception(t);
}
- public long getConnectionId()
- {
- return _delegate.getConnectionId();
- }
-
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
private static final byte[] AMQP_0_8_HEADER =
@@ -145,7 +130,7 @@ public class MultiVersionProtocolEngine
(byte) 9
};
- private static final byte[] AMQP_0_9_1_HEADER =
+private static final byte[] AMQP_0_9_1_HEADER =
new byte[] { (byte) 'A',
(byte) 'M',
(byte) 'Q',
@@ -168,31 +153,19 @@ public class MultiVersionProtocolEngine
(byte) 10
};
- public void setNetworkConnection(NetworkConnection networkConnection)
- {
- setNetworkConnection(networkConnection, networkConnection.getSender());
- }
-
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
- _network = network;
- _sender = sender;
- }
-
-
private static interface DelegateCreator
{
- AmqpProtocolVersion getVersion();
+ VERSION getVersion();
byte[] getHeaderIdentifier();
- ServerProtocolEngine getProtocolEngine();
+ ProtocolEngine getProtocolEngine();
}
private DelegateCreator creator_0_8 = new DelegateCreator()
{
- public AmqpProtocolVersion getVersion()
+ public VERSION getVersion()
{
- return AmqpProtocolVersion.v0_8;
+ return VERSION.v0_8;
}
public byte[] getHeaderIdentifier()
@@ -200,18 +173,18 @@ public class MultiVersionProtocolEngine
return AMQP_0_8_HEADER;
}
- public ServerProtocolEngine getProtocolEngine()
+ public ProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
}
};
private DelegateCreator creator_0_9 = new DelegateCreator()
{
- public AmqpProtocolVersion getVersion()
+ public VERSION getVersion()
{
- return AmqpProtocolVersion.v0_9;
+ return VERSION.v0_9;
}
@@ -220,18 +193,18 @@ public class MultiVersionProtocolEngine
return AMQP_0_9_HEADER;
}
- public ServerProtocolEngine getProtocolEngine()
+ public ProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
}
};
private DelegateCreator creator_0_9_1 = new DelegateCreator()
{
- public AmqpProtocolVersion getVersion()
+ public VERSION getVersion()
{
- return AmqpProtocolVersion.v0_9_1;
+ return VERSION.v0_9_1;
}
@@ -240,9 +213,9 @@ public class MultiVersionProtocolEngine
return AMQP_0_9_1_HEADER;
}
- public ServerProtocolEngine getProtocolEngine()
+ public ProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
}
};
@@ -250,9 +223,9 @@ public class MultiVersionProtocolEngine
private DelegateCreator creator_0_10 = new DelegateCreator()
{
- public AmqpProtocolVersion getVersion()
+ public VERSION getVersion()
{
- return AmqpProtocolVersion.v0_10;
+ return VERSION.v0_10;
}
@@ -261,15 +234,15 @@ public class MultiVersionProtocolEngine
return AMQP_0_10_HEADER;
}
- public ServerProtocolEngine getProtocolEngine()
+ public ProtocolEngine getProtocolEngine()
{
final ConnectionDelegate connDelegate =
new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
- ServerConnection conn = new ServerConnection(_id);
+ ServerConnection conn = new ServerConnection();
conn.setConnectionDelegate(connDelegate);
- return new ProtocolEngine_0_10( conn, _network, _appRegistry);
+ return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry);
}
};
@@ -277,16 +250,21 @@ public class MultiVersionProtocolEngine
new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
- private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
+ private class ClosedDelegateProtocolEngine implements ProtocolEngine
{
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
public SocketAddress getRemoteAddress()
{
- return _network.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _network.getLocalAddress();
+ return _networkDriver.getLocalAddress();
}
public long getWrittenBytes()
@@ -323,30 +301,26 @@ public class MultiVersionProtocolEngine
{
}
+ }
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
+ private class SelfDelegateProtocolEngine implements ProtocolEngine
+ {
- }
+ private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
- public long getConnectionId()
+ public void setNetworkDriver(NetworkDriver driver)
{
- return _id;
+ _networkDriver = driver;
}
- }
-
- private class SelfDelegateProtocolEngine implements ServerProtocolEngine
- {
- private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
public SocketAddress getRemoteAddress()
{
- return _network.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _network.getLocalAddress();
+ return _networkDriver.getLocalAddress();
}
public long getWrittenBytes()
@@ -381,7 +355,7 @@ public class MultiVersionProtocolEngine
_header.get(headerBytes);
- ServerProtocolEngine newDelegate = null;
+ ProtocolEngine newDelegate = null;
byte[] newestSupported = null;
for(int i = 0; newDelegate == null && i < _creators.length; i++)
@@ -406,20 +380,17 @@ public class MultiVersionProtocolEngine
// If no delegate is found then send back the most recent support protocol version id
if(newDelegate == null)
{
- _sender.send(ByteBuffer.wrap(newestSupported));
- _sender.flush();
+ _networkDriver.send(ByteBuffer.wrap(newestSupported));
_delegate = new ClosedDelegateProtocolEngine();
-
- _network.close();
-
}
else
{
+ newDelegate.setNetworkDriver(_networkDriver);
+
_delegate = newDelegate;
_header.flip();
- _delegate.setNetworkConnection(_network, _sender);
_delegate.received(_header);
if(msg.hasRemaining())
{
@@ -431,11 +402,6 @@ public class MultiVersionProtocolEngine
}
- public long getConnectionId()
- {
- return _id;
- }
-
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
@@ -455,10 +421,5 @@ public class MultiVersionProtocolEngine
{
}
-
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
-
- }
}
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Fri Oct 21 01:19:00 2011
@@ -20,38 +20,56 @@
*/
package org.apache.qpid.server.protocol;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.transport.network.NetworkConnection;
+
+import java.util.Set;
+import java.util.Arrays;
+import java.util.HashSet;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
- private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+ ;
+
+
+ public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 };
+
+ private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
private final IApplicationRegistry _appRegistry;
private final String _fqdn;
- private final Set<AmqpProtocolVersion> _supported;
+ private final Set<VERSION> _supported;
+
- public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
+ public MultiVersionProtocolEngineFactory()
{
- _appRegistry = ApplicationRegistry.getInstance();
- _fqdn = fqdn;
- _supported = supportedVersions;
+ this(1, "localhost", ALL_VERSIONS);
}
- public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
+ public MultiVersionProtocolEngineFactory(String fqdn, Set<VERSION> versions)
{
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
+ this(1, fqdn, versions);
}
- public ServerProtocolEngine newProtocolEngine()
+
+ public MultiVersionProtocolEngineFactory(String fqdn)
{
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement());
+ this(1, fqdn, ALL_VERSIONS);
}
+ public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<VERSION> supportedVersions)
+ {
+ _appRegistry = ApplicationRegistry.getInstance(instance);
+ _fqdn = fqdn;
+ _supported = supportedVersions;
+ }
+
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ {
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Fri Oct 21 01:19:00 2011
@@ -20,26 +20,25 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.server.configuration.*;
import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.registry.IApplicationRegistry;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.util.UUID;
-public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine, ConnectionConfig
+public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine, ConnectionConfig
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
- private NetworkConnection _network;
+ private NetworkDriver _networkDriver;
private long _readBytes;
private long _writtenBytes;
private ServerConnection _connection;
@@ -48,22 +47,26 @@ public class ProtocolEngine_0_10 extend
private long _createTime = System.currentTimeMillis();
public ProtocolEngine_0_10(ServerConnection conn,
- NetworkConnection network,
+ NetworkDriver networkDriver,
final IApplicationRegistry appRegistry)
{
super(new Assembler(conn));
_connection = conn;
_connection.setConnectionConfig(this);
-
+ _networkDriver = networkDriver;
_id = appRegistry.getConfigStore().createId();
_appRegistry = appRegistry;
- if(network != null)
- {
- setNetworkConnection(network);
- }
-
+ // FIXME Two log messages to maintain compatinbility with earlier protocol versions
+ _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
+ _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
+ }
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE);
+ _connection.setSender(dis);
_connection.onOpen(new Runnable()
{
public void run()
@@ -74,30 +77,14 @@ public class ProtocolEngine_0_10 extend
}
- public void setNetworkConnection(NetworkConnection network)
- {
- setNetworkConnection(network, network.getSender());
- }
-
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
- _network = network;
-
- _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
-
- // FIXME Two log messages to maintain compatibility with earlier protocol versions
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
- }
-
public SocketAddress getRemoteAddress()
{
- return _network.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _network.getLocalAddress();
+ return _networkDriver.getLocalAddress();
}
public long getReadBytes()
@@ -147,7 +134,7 @@ public class ProtocolEngine_0_10 extend
public String getAuthId()
{
- return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
+ return _connection.getAuthorizationID();
}
public String getRemoteProcessName()
@@ -206,14 +193,9 @@ public class ProtocolEngine_0_10 extend
{
return false;
}
-
+
public void mgmtClose()
{
_connection.mgmtClose();
}
-
- public long getConnectionId()
- {
- return _connection.getConnectionId();
- }
}
Propchange: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -3,5 +3,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
-/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1072051-1185907
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Fri Oct 21 01:19:00 2011
@@ -60,7 +60,7 @@ public class AMQPriorityQueue extends Si
{
// check that all subscriptions are not in advance of the entry
SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
- while(subIter.advance() && entry.isAvailable())
+ while(subIter.advance() && !entry.isAcquired())
{
final Subscription subscription = subIter.getNode().getSubscription();
if(!subscription.isClosed())
@@ -70,7 +70,7 @@ public class AMQPriorityQueue extends Si
{
QueueEntry subnode = context._lastSeenEntry;
QueueEntry released = context._releasedEntry;
- while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0))
+ while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
{
if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
{
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Oct 21 01:19:00 2011
@@ -21,18 +21,21 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfig;
+import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -69,8 +72,8 @@ public interface AMQQueue extends Managa
boolean isAutoDelete();
AMQShortString getOwner();
- AuthorizationHolder getAuthorizationHolder();
- void setAuthorizationHolder(AuthorizationHolder principalHolder);
+ PrincipalHolder getPrincipalHolder();
+ void setPrincipalHolder(PrincipalHolder principalHolder);
void setExclusiveOwningSession(AMQSessionModel owner);
AMQSessionModel getExclusiveOwningSession();
@@ -105,16 +108,23 @@ public interface AMQQueue extends Managa
boolean isDeleted();
+
int delete() throws AMQException;
+
void requeue(QueueEntry entry);
+ void requeue(QueueEntryImpl storeContext, Subscription subscription);
+
void dequeue(QueueEntry entry, Subscription sub);
void decrementUnackedMsgCount();
+
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
+
+
void addQueueDeleteTask(final Task task);
void removeQueueDeleteTask(final Task task);
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Fri Oct 21 01:19:00 2011
@@ -43,7 +43,6 @@ import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
-import javax.management.ObjectName;
import javax.management.OperationsException;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.ArrayType;
@@ -98,7 +97,7 @@ public class AMQQueueMBean extends AMQMa
{
super(ManagedQueue.class, ManagedQueue.TYPE);
_queue = queue;
- _queueName = queue.getName();
+ _queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString();
}
public ManagedObject getParentObject()
@@ -148,7 +147,7 @@ public class AMQQueueMBean extends AMQMa
public String getObjectInstanceName()
{
- return ObjectName.quote(_queueName);
+ return _queueName;
}
public String getName()
@@ -507,7 +506,7 @@ public class AMQQueueMBean extends AMQMa
private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
{
List<String> list = new ArrayList<String>();
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties();
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
list.add("reply-to = " + headerProperties.getReplyToAsString());
list.add("propertyFlags = " + headerProperties.getPropertyFlags());
list.add("ApplicationID = " + headerProperties.getAppIdAsString());
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Fri Oct 21 01:19:00 2011
@@ -96,9 +96,9 @@ public class IncomingMessage implements
public void setExpiration()
{
long expiration =
- ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
+ ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
long timestamp =
- ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
+ ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
if (SYNCHED_CLOCKS)
{
@@ -139,7 +139,7 @@ public class IncomingMessage implements
public int addContentBodyFrame(final ContentChunk contentChunk)
throws AMQException
{
- _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
+ _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
@@ -193,8 +193,8 @@ public class IncomingMessage implements
public boolean isPersistent()
{
- return getContentHeader().getProperties() instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() ==
+ return getContentHeader().properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
@@ -263,7 +263,7 @@ public class IncomingMessage implements
int written = 0;
for(ContentChunk cb : _contentChunks)
{
- ByteBuffer data = ByteBuffer.wrap(cb.getData());
+ ByteBuffer data = cb.getData().buf();
if(offset+written >= pos && offset < pos + data.limit())
{
ByteBuffer src = data.duplicate();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org