You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/04/19 18:24:45 UTC
svn commit: r530474 [2/8] - in /incubator/qpid/trunk/qpid: ./
java/broker/bin/ java/broker/distribution/src/main/assembly/
java/broker/etc/ java/broker/lib/
java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/ser...
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Thu Apr 19 09:24:30 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,14 +36,17 @@
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.configuration.ConfigurationException;
+
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
+
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
@@ -59,7 +62,7 @@
* Main entry point for AMQPD.
*
*/
-@SuppressWarnings({"AccessStaticViaInstance"})
+@SuppressWarnings({ "AccessStaticViaInstance" })
public class Main
{
private static final Logger _logger = Logger.getLogger(Main.class);
@@ -74,9 +77,9 @@
protected static class InitException extends Exception
{
- InitException(String msg)
+ InitException(String msg, Throwable cause)
{
- super(msg);
+ super(msg, cause);
}
}
@@ -97,6 +100,7 @@
try
{
commandLine = new PosixParser().parse(options, args);
+
return true;
}
catch (ParseException e)
@@ -104,6 +108,7 @@
System.err.println("Error: " + e.getMessage());
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Qpid", options, true);
+
return false;
}
}
@@ -112,17 +117,26 @@
{
Option help = new Option("h", "help", false, "print this message");
Option version = new Option("v", "version", false, "print the version information and exit");
- Option configFile = OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").
- withLongOpt("config").create("c");
- Option port = OptionBuilder.withArgName("port").hasArg().withDescription("listen on the specified port. Overrides any value in the config file").
- withLongOpt("port").create("p");
- Option bind = OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the specified address. Overrides any value in the config file").
- withLongOpt("bind").create("b");
- Option logconfig = OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the specified log4j xml configuration file. By " +
- "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file").
- withLongOpt("logconfig").create("l");
- Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log file configuration file for changes. Units are seconds. " +
- "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+ Option configFile =
+ OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
+ .create("c");
+ Option port =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("listen on the specified port. Overrides any value in the config file")
+ .withLongOpt("port").create("p");
+ Option bind =
+ OptionBuilder.withArgName("bind").hasArg()
+ .withDescription("bind to the specified address. Overrides any value in the config file")
+ .withLongOpt("bind").create("b");
+ Option logconfig =
+ OptionBuilder.withArgName("logconfig").hasArg()
+ .withDescription("use the specified log4j xml configuration file. By "
+ + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
+ + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
+ Option logwatchconfig =
+ OptionBuilder.withArgName("logwatch").hasArg()
+ .withDescription("monitor the log file configuration file for changes. Units are seconds. "
+ + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
options.addOption(help);
options.addOption(version);
@@ -150,7 +164,7 @@
boolean first = true;
for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
{
- if(first)
+ if (first)
{
first = false;
}
@@ -158,9 +172,11 @@
{
protocol.append(", ");
}
+
protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
}
+
System.out.println(ver + " (" + protocol + ")");
}
else
@@ -186,7 +202,6 @@
}
}
-
protected void startup() throws InitException, ConfigurationException, Exception
{
final String QpidHome = System.getProperty("QPID_HOME");
@@ -201,7 +216,7 @@
error = error + "\nNote: Qpid_HOME is not set.";
}
- throw new InitException(error);
+ throw new InitException(error, null);
}
else
{
@@ -226,8 +241,8 @@
_logger.info("Starting Qpid.AMQP broker");
- ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
- getConfiguredObject(ConnectorConfiguration.class);
+ ConnectorConfiguration connectorConfig =
+ ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class);
ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
@@ -249,7 +264,7 @@
}
catch (NumberFormatException e)
{
- throw new InitException("Invalid port: " + portStr);
+ throw new InitException("Invalid port: " + portStr, e);
}
}
@@ -264,19 +279,21 @@
int totalVHosts = ((Collection) virtualHosts).size();
for (int vhost = 0; vhost < totalVHosts; vhost++)
{
- setupVirtualHosts(configFile.getParent() , (String)((List)virtualHosts).get(vhost));
+ setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost));
}
}
else
{
- setupVirtualHosts(configFile.getParent() , (String)virtualHosts);
+ setupVirtualHosts(configFile.getParent(), (String) virtualHosts);
}
}
+
bind(port, connectorConfig);
}
- protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException
+ protected void setupVirtualHosts(String configFileParent, String configFilePath)
+ throws ConfigurationException, AMQException, URLSyntaxException
{
String configVar = "${conf}";
@@ -285,7 +302,7 @@
configFilePath = configFileParent + configFilePath.substring(configVar.length());
}
- if (configFilePath.indexOf(".xml") != -1 )
+ if (configFilePath.indexOf(".xml") != -1)
{
VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath);
vHostConfig.performBindings();
@@ -298,11 +315,12 @@
String[] fileNames = virtualHostDir.list();
- for (int each=0; each < fileNames.length; each++)
+ for (int each = 0; each < fileNames.length; each++)
{
if (fileNames[each].endsWith(".xml"))
{
- VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath+"/"+fileNames[each]);
+ VirtualHostConfiguration vHostConfig =
+ new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]);
vHostConfig.performBindings();
}
}
@@ -319,7 +337,7 @@
try
{
- //IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
+ // IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
IoAcceptor acceptor = connectorConfig.createAcceptor();
SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
@@ -334,7 +352,7 @@
{
sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
}
-
+
if (!connectorConfig.enableSSL || !connectorConfig.sslOnly)
{
AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
@@ -347,6 +365,7 @@
{
bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
}
+
acceptor.bind(bindAddress, handler, sconfig);
_logger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
}
@@ -356,8 +375,7 @@
AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
try
{
- acceptor.bind(new InetSocketAddress(connectorConfig.sslPort),
- handler, sconfig);
+ acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
_logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
}
catch (IOException e)
@@ -415,16 +433,17 @@
}
catch (NumberFormatException e)
{
- System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " +
- "a non-negative integer. Using default of zero (no watching configured");
+ System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+ + "a non-negative integer. Using default of zero (no watching configured");
}
+
if (logConfigFile.exists() && logConfigFile.canRead())
{
System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
if (logWatchTime > 0)
{
- System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " +
- logWatchTime + " seconds");
+ System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
+ + logWatchTime + " seconds");
// log4j expects the watch interval in milliseconds
DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Thu Apr 19 09:24:30 2007
@@ -196,6 +196,7 @@
}
else
{
+ _logger.error("MESSAGE LOSS: Message should be sent on a Dead Letter Queue");
_logger.warn(msg);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Thu Apr 19 09:24:30 2007
@@ -98,7 +98,7 @@
// If we haven't requested message to be resent to this consumer then reject it from ever getting it.
// if (!evt.getMethod().resend)
{
- message.message.reject(message.message.getDeliveredSubscription());
+ message.message.reject(message.message.getDeliveredSubscription(message.queue));
}
if (evt.getMethod().requeue)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Thu Apr 19 09:24:30 2007
@@ -33,6 +33,7 @@
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.security.access.AccessResult;
+import org.apache.qpid.server.security.access.AccessRights;
import org.apache.log4j.Logger;
public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
@@ -75,23 +76,26 @@
if (virtualHost == null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: " + virtualHostName);
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'");
}
else
{
session.setVirtualHost(virtualHost);
- AccessResult result = virtualHost.getAccessManager().isAuthorized(virtualHost, session.getAuthorizedID());
+ AccessResult result = virtualHost.getAccessManager().isAuthorized(virtualHost, session.getAuthorizedID(), AccessRights.Rights.ANY);
switch (result.getStatus())
{
default:
case REFUSED:
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
- "Access denied to vHost '" + virtualHostName + "' by "
- + result.getAuthorizer());
+ String error = "Any access denied to vHost '" + virtualHostName + "' by "
+ + result.getAuthorizer();
+
+ _logger.warn(error);
+
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error);
case GRANTED:
- _logger.info("Granted access to vHost '" + virtualHostName + "' for " + session.getAuthorizedID()
+ _logger.info("Granted any access to vHost '" + virtualHostName + "' for " + session.getAuthorizedID()
+ " by '" + result.getAuthorizer() + "'");
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Thu Apr 19 09:24:30 2007
@@ -37,6 +37,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -106,7 +107,7 @@
ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
HeartbeatConfig.getInstance().getDelay()); // heartbeat
session.writeFrame(tune);
- session.setAuthorizedID(ss.getAuthorizationID());
+ session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
disposeSaslServer(session);
break;
case CONTINUE:
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Thu Apr 19 09:24:30 2007
@@ -37,6 +37,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -95,7 +96,7 @@
throw new AMQException("Authentication failed");
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
- session.setAuthorizedID(ss.getAuthorizationID());
+ session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Thu Apr 19 09:24:30 2007
@@ -64,7 +64,6 @@
private final AtomicInteger _counter = new AtomicInteger();
-
protected QueueDeclareHandler()
{
Configurator.configure(this);
@@ -92,12 +91,12 @@
synchronized (queueRegistry)
{
- if (((queue = queueRegistry.getQueue(body.queue)) == null) )
+ if (((queue = queueRegistry.getQueue(body.queue)) == null))
{
- if(body.passive)
+ if (body.passive)
{
- String msg = "Queue: " + body.queue + " not found.";
- throw body.getChannelException(AMQConstant.NOT_FOUND,msg );
+ String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
else
{
@@ -112,13 +111,16 @@
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
queue.bind(body.queue, null, defaultExchange);
- _log.info("Queue " + body.queue + " bound to default exchange");
+ _log.info("Queue " + body.queue + " bound to default exchange(" + defaultExchange.getName() + ")");
}
}
}
- else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
+ else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
{
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue, as exclusive queue with same name declared on another connection");
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + body.queue + "'),"
+ + " as exclusive queue with same name "
+ + "declared on another client ID('"
+ + queue.getOwner() + "')");
}
AMQChannel channel = session.getChannel(evt.getChannelId());
@@ -138,10 +140,10 @@
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- queue.getConsumerCount(), // consumerCount
- queue.getMessageCount(), // messageCount
- body.queue); // queue
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ queue.getConsumerCount(), // consumerCount
+ queue.getMessageCount(), // messageCount
+ body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
session.writeFrame(response);
}
@@ -162,24 +164,22 @@
{
final QueueRegistry registry = virtualHost.getQueueRegistry();
AMQShortString owner = body.exclusive ? session.getContextKey() : null;
- final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+ final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
final AMQShortString queueName = queue.getName();
- if(body.exclusive && !body.durable)
+ if (body.exclusive && !body.durable)
{
final AMQProtocolSession.Task deleteQueueTask =
- new AMQProtocolSession.Task()
- {
-
- public void doTask(AMQProtocolSession session) throws AMQException
+ new AMQProtocolSession.Task()
{
- if(registry.getQueue(queueName) == queue)
+ public void doTask(AMQProtocolSession session) throws AMQException
{
- queue.delete();
+ if (registry.getQueue(queueName) == queue)
+ {
+ queue.delete();
+ }
}
-
- }
- };
+ };
session.addSessionCloseTask(deleteQueueTask);
@@ -190,16 +190,14 @@
session.removeSessionCloseTask(deleteQueueTask);
}
});
-
-
- }
+ }// if exclusive and not durable
Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
if (virtualHostDefaultQueueConfiguration != null)
{
Configurator.configure(queue, virtualHostDefaultQueueConfiguration);
}
-
+
return queue;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Thu Apr 19 09:24:30 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,29 +20,174 @@
*/
package org.apache.qpid.server.management;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.MBeanServerForwarder;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AccountNotFoundException;
+import javax.security.sasl.AuthorizeCallback;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
+
+/**
+ * This class starts up an MBeanserver. If out of the box agent is being used then there are no security features
+ * implemented. To use the security features like user authentication, turn off the jmx options in the "QPID_OPTS" env
+ * variable and use JMXMP connector server. If JMXMP connector is not available, then the standard JMXConnector will be
+ * used, which again doesn't have user authentication.
+ */
public class JMXManagedObjectRegistry implements ManagedObjectRegistry
{
private static final Logger _log = Logger.getLogger(JMXManagedObjectRegistry.class);
private final MBeanServer _mbeanServer;
+ private Registry _rmiRegistry;
+ private JMXServiceURL _jmxURL;
- public JMXManagedObjectRegistry()
+ public JMXManagedObjectRegistry() throws AMQException
{
_log.info("Initialising managed object registry using platform MBean server");
- // we use the platform MBean server currently but this must be changed or at least be configuurable
- _mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+
+ // Retrieve the config parameters
+ boolean platformServer = appRegistry.getConfiguration().getBoolean("management.platform-mbeanserver", true);
+
+ _mbeanServer =
+ platformServer ? ManagementFactory.getPlatformMBeanServer()
+ : MBeanServerFactory.createMBeanServer(ManagedObject.DOMAIN);
+ }
+
+
+ public void start()
+ {
+ // Check if the "QPID_OPTS" is set to use Out of the Box JMXAgent
+ if (areOutOfTheBoxJMXOptionsSet())
+ {
+ _log.info("JMX: Using the out of the box JMX Agent");
+ return;
+ }
+
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+
+ boolean security = appRegistry.getConfiguration().getBoolean("management.security-enabled", true);
+ int port = appRegistry.getConfiguration().getInt("management.jmxport", 8999);
+
+ try
+ {
+ if (security)
+ {
+ // For SASL using JMXMP
+ _jmxURL = new JMXServiceURL("jmxmp", null, port);
+
+ Map env = new HashMap();
+ Map<String, PrincipalDatabase> map = appRegistry.getDatabaseManager().getDatabases();
+ PrincipalDatabase db = null;
+
+ for (Map.Entry<String, PrincipalDatabase> entry : map.entrySet())
+ {
+ if (entry.getValue() instanceof Base64MD5PasswordFilePrincipalDatabase)
+ {
+ db = entry.getValue();
+ break;
+ }
+ else if (entry.getValue() instanceof PlainPasswordFilePrincipalDatabase)
+ {
+ db = entry.getValue();
+ }
+ }
+
+ if (db instanceof Base64MD5PasswordFilePrincipalDatabase)
+ {
+ env.put("jmx.remote.profiles", "SASL/CRAM-MD5");
+ CRAMMD5HashedInitialiser initialiser = new CRAMMD5HashedInitialiser();
+ initialiser.initialise(db);
+ env.put("jmx.remote.sasl.callback.handler", initialiser.getCallbackHandler());
+ }
+ else if (db instanceof PlainPasswordFilePrincipalDatabase)
+ {
+ env.put("jmx.remote.profiles", "SASL/PLAIN");
+ env.put("jmx.remote.sasl.callback.handler", new UserCallbackHandler(db));
+ }
+
+ // Enable the SSL security and server authentication
+ /*
+ SslRMIClientSocketFactory csf = new SslRMIClientSocketFactory();
+ SslRMIServerSocketFactory ssf = new SslRMIServerSocketFactory();
+ env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, csf);
+ env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, ssf);
+ */
+
+ try
+ {
+ JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(_jmxURL, env, _mbeanServer);
+ MBeanServerForwarder mbsf = MBeanInvocationHandlerImpl.newProxyInstance();
+ cs.setMBeanServerForwarder(mbsf);
+ cs.start();
+ _log.info("JMX: Starting JMXConnector server with SASL");
+ }
+ catch (java.net.MalformedURLException urlException)
+ {
+ // When JMXMPConnector is not available
+ // java.net.MalformedURLException: Unsupported protocol: jmxmp
+ _log.info("JMX: Starting JMXConnector server");
+ startJMXConnectorServer(port);
+ }
+ }
+ else
+ {
+ startJMXConnectorServer(port);
+ }
+ }
+ catch (Exception ex)
+ {
+ _log.error("Error in initialising Managed Object Registry." + ex.getMessage());
+ ex.printStackTrace();
+ }
+ }
+
+ /**
+ * Starts up an RMIRegistry at configured port and attaches a JMXConnectorServer to it.
+ *
+ * @param port
+ *
+ * @throws IOException
+ */
+ private void startJMXConnectorServer(int port) throws IOException
+ {
+ startRMIRegistry(port);
+ _jmxURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + port + "/jmxrmi");
+ JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(_jmxURL, null, _mbeanServer);
+ cs.start();
}
public void registerObject(ManagedObject managedObject) throws JMException
{
- _mbeanServer.registerMBean(managedObject, managedObject.getObjectName());
+ _mbeanServer.registerMBean(managedObject, managedObject.getObjectName());
}
public void unregisterObject(ManagedObject managedObject) throws JMException
@@ -50,4 +195,105 @@
_mbeanServer.unregisterMBean(managedObject.getObjectName());
}
+ /**
+ * Checks is the "QPID_OPTS" env variable is set to use the out of the box JMXAgent.
+ *
+ * @return
+ */
+ private boolean areOutOfTheBoxJMXOptionsSet()
+ {
+ if (System.getProperty("com.sun.management.jmxremote") != null)
+ {
+ return true;
+ }
+
+ if (System.getProperty("com.sun.management.jmxremote.port") != null)
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Starts the rmi registry at given port
+ *
+ * @param port
+ *
+ * @throws RemoteException
+ */
+ private void startRMIRegistry(int port) throws RemoteException
+ {
+ System.setProperty("java.rmi.server.randomIDs", "true");
+ _rmiRegistry = LocateRegistry.createRegistry(port);
+ }
+
+ // stops the RMIRegistry, if it was running and bound to a port
+ public void close() throws RemoteException
+ {
+ if (_rmiRegistry != null)
+ {
+ // Stopping the RMI registry
+ UnicastRemoteObject.unexportObject(_rmiRegistry, true);
+ }
+ }
+
+ /** This class is used for SASL enabled JMXConnector for performing user authentication. */
+ private class UserCallbackHandler implements CallbackHandler
+ {
+ private final PrincipalDatabase _principalDatabase;
+
+ protected UserCallbackHandler(PrincipalDatabase database)
+ {
+ _principalDatabase = database;
+ }
+
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+ {
+ // Retrieve callbacks
+ NameCallback ncb = null;
+ PasswordCallback pcb = null;
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ if (callbacks[i] instanceof NameCallback)
+ {
+ ncb = (NameCallback) callbacks[i];
+ }
+ else if (callbacks[i] instanceof PasswordCallback)
+ {
+ pcb = (PasswordCallback) callbacks[i];
+ }
+ else if (callbacks[i] instanceof AuthorizeCallback)
+ {
+ ((AuthorizeCallback) callbacks[i]).setAuthorized(true);
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(callbacks[i]);
+ }
+ }
+
+ boolean authorized = false;
+ // Process retrieval of password; can get password if username is available in NameCallback
+ if ((ncb != null) && (pcb != null))
+ {
+ String username = ncb.getDefaultName();
+ try
+ {
+ authorized = _principalDatabase.verifyPassword(username, new String(pcb.getPassword()));
+ }
+ catch (AccountNotFoundException e)
+ {
+ IOException ioe = new IOException("User not authorized. " + e);
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
+
+ if (!authorized)
+ {
+ throw new IOException("User not authorized.");
+ }
+ }
+ }
}
Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java (from r526666, incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java?view=diff&rev=530474&p1=incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java&r1=526666&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java&r2=530474
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java Thu Apr 19 09:24:30 2007
@@ -44,21 +44,20 @@
import java.io.FileInputStream;
/**
- * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations.
- * This implements the logic for allowing the users to invoke MBean operations and implements the
- * restrictions for readOnly, readWrite and admin users.
+ * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations. This implements
+ * the logic for allowing the users to invoke MBean operations and implements the restrictions for readOnly, readWrite
+ * and admin users.
*/
public class MBeanInvocationHandlerImpl implements InvocationHandler
{
private static final Logger _logger = Logger.getLogger(MBeanInvocationHandlerImpl.class);
-
- private final static String ADMIN = "admin";
- private final static String READWRITE="readwrite";
- private final static String READONLY = "readonly";
+
+ public final static String ADMIN = "admin";
+ public final static String READWRITE = "readwrite";
+ public final static String READONLY = "readonly";
private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate";
- private static final String DEFAULT_PERMISSIONS_FILE = "etc" + File.separator + "jmxremote.access";
private MBeanServer mbs;
- private final static Properties _userRoles = new Properties();
+ private static Properties _userRoles = new Properties();
public static MBeanServerForwarder newProxyInstance()
{
@@ -119,7 +118,7 @@
{
throw new SecurityException("Access denied");
}
-
+
Principal principal = principals.iterator().next();
String identity = principal.getName();
@@ -140,21 +139,9 @@
}
// Initialises the user roles
- protected static void initialise() throws AMQException
+ public static void setAccessRights(Properties accessRights)
{
- final String QpidHome = System.getProperty("QPID_HOME");
- String fileName = QpidHome + File.separator + DEFAULT_PERMISSIONS_FILE;
- try
- {
- FileInputStream in = new FileInputStream(fileName);
- _userRoles.load(in);
- in.close();
- }
- catch (IOException ex)
- {
- _logger.error("Error in loading JMX User permissions." + ex.getMessage());
- //throw new AMQException("Error in loading JMX User permissions", ex);
- }
+ _userRoles = accessRights;
}
private boolean isAdmin(String userName)
@@ -200,11 +187,13 @@
{
String mbeanMethod = (args.length > 1) ? (String) args[1] : null;
if (mbeanMethod == null)
+ {
return false;
-
+ }
+
try
{
- MBeanInfo mbeanInfo = mbs.getMBeanInfo((ObjectName)args[0]);
+ MBeanInfo mbeanInfo = mbs.getMBeanInfo((ObjectName) args[0]);
if (mbeanInfo != null)
{
MBeanOperationInfo[] opInfos = mbeanInfo.getOperations();
@@ -219,7 +208,7 @@
}
catch (JMException ex)
{
- ex.printStackTrace();
+ ex.printStackTrace();
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java Thu Apr 19 09:24:30 2007
@@ -52,8 +52,7 @@
@MBeanOperation(name="createNewExchange", description="Creates a new Exchange", impact= MBeanOperationInfo.ACTION)
void createNewExchange(@MBeanOperationParameter(name="name", description="Name of the new exchange")String name,
@MBeanOperationParameter(name="ExchangeType", description="Type of the exchange")String type,
- @MBeanOperationParameter(name="durable", description="true if the Exchang should be durable")boolean durable,
- @MBeanOperationParameter(name="passive", description="true of the Exchange should be passive")boolean passive)
+ @MBeanOperationParameter(name="durable", description="true if the Exchang should be durable")boolean durable)
throws IOException, JMException;
/**
@@ -81,8 +80,7 @@
@MBeanOperation(name="createNewQueue", description="Create a new Queue on the Broker server", impact= MBeanOperationInfo.ACTION)
void createNewQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName,
@MBeanOperationParameter(name="owner", description="Owner name")String owner,
- @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable,
- @MBeanOperationParameter(name="autoDelete", description="true if the queue should be auto delete") boolean autoDelete)
+ @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable)
throws IOException, JMException;
/**
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java Thu Apr 19 09:24:30 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.server.management;
import javax.management.JMException;
+import java.rmi.RemoteException;
/**
* Handles the registration (and unregistration and so on) of managed objects.
@@ -36,7 +37,11 @@
*/
public interface ManagedObjectRegistry
{
+ void start();
+
void registerObject(ManagedObject managedObject) throws JMException;
void unregisterObject(ManagedObject managedObject) throws JMException;
+
+ void close() throws RemoteException;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java Thu Apr 19 09:24:30 2007
@@ -24,6 +24,8 @@
import org.apache.log4j.Logger;
+import java.rmi.RemoteException;
+
/**
* This managed object registry does not actually register MBeans. This can be used in tests when management is
* not required or when management has been disabled.
@@ -38,11 +40,21 @@
_log.info("Management is disabled");
}
+ public void start()
+ {
+ //no-op
+ }
+
public void registerObject(ManagedObject managedObject) throws JMException
{
}
public void unregisterObject(ManagedObject managedObject) throws JMException
{
+ }
+
+ public void close() throws RemoteException
+ {
+
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Apr 19 09:24:30 2007
@@ -28,6 +28,7 @@
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.security.Principal;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -108,7 +109,7 @@
private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
private List<Integer> _closingChannelsList = new ArrayList<Integer>();
private ProtocolOutputConverter _protocolOutputConverter;
- private String _authorizedID;
+ private Principal _authorizedID;
public ManagedObject getManagedObject()
@@ -745,12 +746,12 @@
return _protocolOutputConverter;
}
- public void setAuthorizedID(String authorizedID)
+ public void setAuthorizedID(Principal authorizedID)
{
_authorizedID = authorizedID;
}
- public String getAuthorizedID()
+ public Principal getAuthorizedID()
{
return _authorizedID;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Apr 19 09:24:30 2007
@@ -31,6 +31,8 @@
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.security.Principal;
+
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
{
@@ -165,9 +167,9 @@
public ProtocolOutputConverter getProtocolOutputConverter();
- void setAuthorizedID(String authorizedID);
+ void setAuthorizedID(Principal authorizedID);
- /** @return a username string that was used to authorized this session */
- String getAuthorizedID();
+ /** @return a Principal that was used to authorized this session */
+ Principal getAuthorizedID();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Thu Apr 19 09:24:30 2007
@@ -1,5 +1,25 @@
/*
*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,14 +37,15 @@
*/
package org.apache.qpid.server.protocol;
+import java.security.Principal;
import java.util.Date;
import java.util.List;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
import javax.management.NotCompliantMBeanException;
+import javax.management.Notification;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -56,15 +77,17 @@
{
private AMQMinaProtocolSession _session = null;
private String _name = null;
-
- //openmbean data types for representing the channel attributes
- private final static String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
- private final static String[] _indexNames = {_channelAtttibuteNames[0]};
- private final static OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
- private static CompositeType _channelType = null; // represents the data type for channel data
- private static TabularType _channelsType = null; // Data type for list of channels type
+
+ // openmbean data types for representing the channel attributes
+ private static final String[] _channelAtttibuteNames =
+ { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count" };
+ private static final String[] _indexNames = { _channelAtttibuteNames[0] };
+ private static final OpenType[] _channelAttributeTypes =
+ { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER };
+ private static CompositeType _channelType = null; // represents the data type for channel data
+ private static TabularType _channelsType = null; // Data type for list of channels type
private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION =
- new AMQShortString("Broker Management Console has closed the connection.");
+ new AMQShortString("Broker Management Console has closed the connection.");
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
@@ -72,22 +95,21 @@
super(ManagedConnection.class, ManagedConnection.TYPE);
_session = session;
String remote = getRemoteAddress();
- remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
+ remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
_name = jmxEncode(new StringBuffer(remote), 0).toString();
init();
}
-
static
{
try
{
init();
}
- catch(JMException ex)
+ catch (JMException ex)
{
- // It should never occur
- System.out.println(ex.getMessage());
+ // This is not expected to ever occur.
+ throw new RuntimeException("Got JMException in static initializer.", ex);
}
}
@@ -96,26 +118,27 @@
*/
private static void init() throws OpenDataException
{
- _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
- _channelAtttibuteNames, _channelAttributeTypes);
+ _channelType =
+ new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, _channelAtttibuteNames,
+ _channelAttributeTypes);
_channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
}
public String getClientId()
{
- return _session.getContextKey() == null ? null : _session.getContextKey().toString();
+ return (_session.getContextKey() == null) ? null : _session.getContextKey().toString();
}
public String getAuthorizedId()
{
- return _session.getAuthorizedID();
+ return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null;
}
public String getVersion()
{
- return _session.getClientVersion() == null ? null : _session.getClientVersion().toString();
+ return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString();
}
-
+
public Date getLastIoTime()
{
return new Date(_session.getIOSession().getLastIoTime());
@@ -171,6 +194,7 @@
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
+
_session.commitTransactions(channel);
}
catch (AMQException ex)
@@ -194,6 +218,7 @@
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
+
_session.rollbackTransactions(channel);
}
catch (AMQException ex)
@@ -215,9 +240,12 @@
for (AMQChannel channel : list)
{
- Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
+ Object[] itemValues =
+ {
+ channel.getChannelId(), channel.isTransactional(),
(channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName().asString() : null,
- channel.getUnacknowledgedMessageMap().size()};
+ channel.getUnacknowledgedMessageMap().size()
+ };
CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
channelsList.put(channelData);
@@ -232,17 +260,16 @@
* @throws JMException
*/
public void closeConnection() throws JMException
- {
+ {
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
- _session.getProtocolMajorVersion(),
- _session.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText
+ final AMQFrame response =
+ ConnectionCloseBody.createAMQFrame(0, _session.getProtocolMajorVersion(), _session.getProtocolMinorVersion(), // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText
);
_session.writeFrame(response);
@@ -259,18 +286,19 @@
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
String name = MonitorNotification.class.getName();
String description = "Channel count has reached threshold value";
MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
- return new MBeanNotificationInfo[]{info1};
+ return new MBeanNotificationInfo[] { info1 };
}
public void notifyClients(String notificationMsg)
{
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ Notification n =
+ new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java Thu Apr 19 09:24:30 2007
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.Date;
+import java.security.Principal;
import javax.management.JMException;
import javax.management.MBeanOperationInfo;
@@ -67,16 +68,17 @@
/**
* Tells the total number of bytes written till now.
* @return number of bytes written.
- */
+ *
@MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now")
Long getWrittenBytes();
-
+ */
/**
* Tells the total number of bytes read till now.
* @return number of bytes read.
- */
+ *
@MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now")
Long getReadBytes();
+ */
/**
* Threshold high value for no of channels. This is useful in setting notifications or
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Apr 19 09:24:30 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -42,6 +43,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -78,19 +81,20 @@
private boolean _immediate;
private AtomicBoolean _taken = new AtomicBoolean(false);
-
private TransientMessageData _transientMessageData = new TransientMessageData();
private Subscription _takenBySubcription;
-
private Set<Subscription> _rejectedBy = null;
+ private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
+ private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
- public boolean isTaken()
+ public boolean isTaken(AMQQueue queue)
{
return _taken.get();
}
private final int hashcode = System.identityHashCode(this);
+
public String debugIdentity()
{
return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
@@ -203,9 +207,10 @@
_transientMessageData.setMessagePublishInfo(info);
_taken = new AtomicBoolean(false);
+
if (_log.isDebugEnabled())
{
- _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")");
+ _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity() + ")");
}
}
@@ -318,8 +323,10 @@
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
+
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
+ _takenMap.put(q, new AtomicBoolean(false));
_messageHandle.enqueue(storeContext, _messageId, q);
}
@@ -356,12 +363,13 @@
}
/**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation.
+ * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
+ * operation.
*/
public AMQMessage takeReference()
{
_referenceCount.incrementAndGet();
- return this;
+ return this;
}
/** Threadsafe. Increment the reference count on the message. */
@@ -378,9 +386,10 @@
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
*
+ * @param storeContext
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
- * @param storeContext
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
@@ -451,7 +460,7 @@
}
- public boolean taken(Subscription sub)
+ public boolean taken(AMQQueue queue, Subscription sub)
{
if (_taken.getAndSet(true))
{
@@ -464,7 +473,7 @@
}
}
- public void release()
+ public void release(AMQQueue queue)
{
if (_log.isTraceEnabled())
{
@@ -600,7 +609,7 @@
for (AMQQueue q : destinationQueues)
{
//Increment the references to this message for each queue delivery.
- incrementReference();
+ incrementReference();
//normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
@@ -824,11 +833,14 @@
public String toString()
{
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
- _taken + " by:" + _takenBySubcription;
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+ _taken + " by :" + _takenBySubcription;
+
+// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
- public Subscription getDeliveredSubscription()
+ public Subscription getDeliveredSubscription(AMQQueue queue)
{
return _takenBySubcription;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Apr 19 09:24:30 2007
@@ -1,5 +1,25 @@
/*
*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,11 +37,11 @@
*/
package org.apache.qpid.server.queue;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Date;
import java.util.Iterator;
import java.util.List;
-import java.util.Date;
-import java.text.SimpleDateFormat;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -41,12 +61,14 @@
import javax.management.openmbean.TabularType;
import org.apache.log4j.Logger;
+
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -73,15 +95,15 @@
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
- private final static String[] _msgAttributeNames = {"AMQ MessageId", "Header", "Size(bytes)", "Redelivered"};
- private static String[] _msgAttributeIndex = {_msgAttributeNames[0]};
+ private static final String[] _msgAttributeNames = { "AMQ MessageId", "Header", "Size(bytes)", "Redelivered" };
+ private static String[] _msgAttributeIndex = { _msgAttributeNames[0] };
private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
- private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
- private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
+ private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
+ private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
// OpenMBean data types for viewMessageContent method
private static CompositeType _msgContentType = null;
- private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
+ private static final String[] _msgContentAttributes = { "AMQ MessageId", "MimeType", "Encoding", "Content" };
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -95,7 +117,6 @@
_queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
}
-
public ManagedObject getParentObject()
{
return _queue.getVirtualHost().getManagedObject();
@@ -107,10 +128,10 @@
{
init();
}
- catch(JMException ex)
+ catch (JMException ex)
{
- // It should never occur
- System.out.println(ex.getMessage());
+ // This is not expected to ever occur.
+ throw new RuntimeException("Got JMException in static initializer.", ex);
}
}
@@ -119,19 +140,21 @@
*/
private static void init() throws OpenDataException
{
- _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
- _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
- _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
-
- _msgAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
- _msgAttributeTypes[2] = SimpleType.LONG; // For size
- _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
+ _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
+ _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
+ _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
+ _msgContentType =
+ new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, _msgContentAttributes,
+ _msgContentAttributeTypes);
+
+ _msgAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
+ _msgAttributeTypes[2] = SimpleType.LONG; // For size
+ _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
- _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+ _messageDataType =
+ new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
_messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
}
@@ -213,7 +236,8 @@
public Long getMaximumQueueDepth()
{
long queueDepthInBytes = _queue.getMaximumQueueDepth();
- return queueDepthInBytes >> 10 ;
+
+ return queueDepthInBytes >> 10;
}
public void setMaximumQueueDepth(Long value)
@@ -227,7 +251,8 @@
public Long getQueueDepth() throws JMException
{
long queueBytesSize = _queue.getQueueDepth();
- return queueBytesSize >> 10 ;
+
+ return queueBytesSize >> 10;
}
/**
@@ -237,13 +262,13 @@
{
final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
- for(NotificationCheck check : NotificationCheck.values())
+ for (NotificationCheck check : NotificationCheck.values())
{
- if(check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()]<thresholdTime)
+ if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
- if(check.notifyIfNecessary(msg, _queue, this))
+ if (check.notifyIfNecessary(msg, _queue, this))
{
_lastNotificationTimes[check.ordinal()] = currentTime;
}
@@ -260,9 +285,10 @@
// important : add log to the log file - monitoring tools may be looking for this
_logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
notificationMsg = notification.name() + " " + notificationMsg;
-
- _lastNotification = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+
+ _lastNotification =
+ new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(_lastNotification);
}
@@ -334,20 +360,25 @@
try
{
// Create header attributes list
- CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+ CommonContentHeaderProperties headerProperties =
+ (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
String mimeType = null, encoding = null;
if (headerProperties != null)
{
AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = mimeTypeShortSting == null ? null : mimeTypeShortSting.toString();
- encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding().toString();
+ mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+ encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
}
- Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
+
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
}
catch (AMQException e)
{
- throw new JMException("Error creating header attributes list: " + e);
+ JMException jme = new JMException("Error creating header attributes list: " + e);
+ jme.initCause(e);
+ throw jme;
}
}
@@ -358,8 +389,8 @@
{
if ((beginIndex > endIndex) || (beginIndex < 1))
{
- throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex +
- "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
+ throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex
+ + "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
}
List<AMQMessage> list = _queue.getMessagesOnTheQueue();
@@ -368,20 +399,22 @@
try
{
// Create the tabular list of message header contents
- for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
+ for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
AMQMessage msg = list.get(i - 1);
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
+ Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() };
CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
_messageList.put(messageData);
}
}
catch (AMQException e)
{
- throw new JMException("Error creating message contents: " + e);
+ JMException jme = new JMException("Error creating message contents: " + e);
+ jme.initCause(e);
+ throw jme;
}
return _messageList;
@@ -400,11 +433,11 @@
list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString());
int delMode = headerProperties.getDeliveryMode();
- list.add("JMSDeliveryMode = " + (delMode == 1 ? "Persistent" : "Non_Persistent"));
+ list.add("JMSDeliveryMode = " + ((delMode == 1) ? "Persistent" : "Non_Persistent"));
list.add("JMSPriority = " + headerProperties.getPriority());
list.add("JMSType = " + headerProperties.getType());
-
+
long longDate = headerProperties.getExpiration();
String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
list.add("JMSExpiration = " + strDate);
@@ -425,27 +458,26 @@
*/
public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
{
- if (fromMessageId > toMessageId || (fromMessageId < 1))
+ if ((fromMessageId > toMessageId) || (fromMessageId < 1))
{
- throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
+ throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
}
_queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
}
-
/**
* returns Notifications sent by this MBean.
*/
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
String name = MonitorNotification.class.getName();
String description = "Either Message count or Queue depth or Message size has reached threshold high value";
MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
- return new MBeanNotificationInfo[]{info1};
+ return new MBeanNotificationInfo[] { info1 };
}
} // End of AMQQueueMBean class
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Apr 19 09:24:30 2007
@@ -210,6 +210,7 @@
/**
* Returns all the messages in the Queue
+ *
* @return List of messages
*/
public List<AMQMessage> getMessages()
@@ -222,14 +223,16 @@
list.add(message);
}
_lock.unlock();
-
+
return list;
}
/**
* Returns messages within the range of given messageIds
+ *
* @param fromMessageId
* @param toMessageId
+ *
* @return
*/
public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
@@ -242,7 +245,7 @@
long maxMessageCount = toMessageId - fromMessageId + 1;
_lock.lock();
-
+
List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
for (AMQMessage message : _messages)
@@ -399,7 +402,7 @@
public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
_lock.lock();
-
+
AMQMessage message = _messages.poll();
if (message != null)
{
@@ -432,9 +435,7 @@
return count;
}
- /**
- This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
- */
+ /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
@@ -444,8 +445,12 @@
{
AMQMessage message = messages.peek();
- //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.)
- while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub))
+ //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
+ while (message != null
+ && (
+ ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
+ || sub == null)
+ && message.taken(_queue, sub))
{
//remove the already taken message
AMQMessage removed = messages.poll();
@@ -506,7 +511,7 @@
}
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Async Delivery Message " + message.getMessageId() + "(" + System.identityHashCode(message) +
+ _log.debug(debugIdentity() + "Async Delivery Message :" + message + "(" + System.identityHashCode(message) +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
@@ -526,7 +531,7 @@
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message.debugIdentity() +
+ _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
@@ -562,7 +567,7 @@
}
catch (AMQException e)
{
- message.release();
+ message.release(_queue);
_log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
}
}
@@ -723,7 +728,7 @@
_log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- msg.taken(s);
+ msg.taken(_queue, s);
//Deliver the message
s.send(msg, _queue);
}
@@ -737,7 +742,7 @@
}
}
- if (!msg.isTaken())
+ if (!msg.isTaken(_queue))
{
if (_log.isInfoEnabled())
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Apr 19 09:24:30 2007
@@ -558,7 +558,7 @@
_logger.trace("Removed for resending:" + resent.debugIdentity());
}
- resent.release();
+ resent.release(_queue);
_queue.subscriberHasPendingResend(false, this, resent);
try