You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/04/19 18:24:45 UTC

svn commit: r530474 [2/8] - in /incubator/qpid/trunk/qpid: ./ java/broker/bin/ java/broker/distribution/src/main/assembly/ java/broker/etc/ java/broker/lib/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/ser...

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Thu Apr 19 09:24:30 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,14 +36,17 @@
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.configuration.ConfigurationException;
+
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.Logger;
 import org.apache.log4j.xml.DOMConfigurator;
+
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.SimpleByteBufferAllocator;
 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.pool.ReadWriteThreadModel;
@@ -59,7 +62,7 @@
  * Main entry point for AMQPD.
  *
  */
-@SuppressWarnings({"AccessStaticViaInstance"})
+@SuppressWarnings({ "AccessStaticViaInstance" })
 public class Main
 {
     private static final Logger _logger = Logger.getLogger(Main.class);
@@ -74,9 +77,9 @@
 
     protected static class InitException extends Exception
     {
-        InitException(String msg)
+        InitException(String msg, Throwable cause)
         {
-            super(msg);
+            super(msg, cause);
         }
     }
 
@@ -97,6 +100,7 @@
         try
         {
             commandLine = new PosixParser().parse(options, args);
+
             return true;
         }
         catch (ParseException e)
@@ -104,6 +108,7 @@
             System.err.println("Error: " + e.getMessage());
             HelpFormatter formatter = new HelpFormatter();
             formatter.printHelp("Qpid", options, true);
+
             return false;
         }
     }
@@ -112,17 +117,26 @@
     {
         Option help = new Option("h", "help", false, "print this message");
         Option version = new Option("v", "version", false, "print the version information and exit");
-        Option configFile = OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").
-                withLongOpt("config").create("c");
-        Option port = OptionBuilder.withArgName("port").hasArg().withDescription("listen on the specified port. Overrides any value in the config file").
-                withLongOpt("port").create("p");
-        Option bind = OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the specified address. Overrides any value in the config file").
-                withLongOpt("bind").create("b");
-        Option logconfig = OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the specified log4j xml configuration file. By " +
-                "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file").
-                withLongOpt("logconfig").create("l");
-        Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log file configuration file for changes. Units are seconds. " +
-                "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+        Option configFile =
+            OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
+                         .create("c");
+        Option port =
+            OptionBuilder.withArgName("port").hasArg()
+                         .withDescription("listen on the specified port. Overrides any value in the config file")
+                         .withLongOpt("port").create("p");
+        Option bind =
+            OptionBuilder.withArgName("bind").hasArg()
+                         .withDescription("bind to the specified address. Overrides any value in the config file")
+                         .withLongOpt("bind").create("b");
+        Option logconfig =
+            OptionBuilder.withArgName("logconfig").hasArg()
+                         .withDescription("use the specified log4j xml configuration file. By "
+                + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
+                + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
+        Option logwatchconfig =
+            OptionBuilder.withArgName("logwatch").hasArg()
+                         .withDescription("monitor the log file configuration file for changes. Units are seconds. "
+                + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
 
         options.addOption(help);
         options.addOption(version);
@@ -150,7 +164,7 @@
             boolean first = true;
             for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
             {
-                if(first)
+                if (first)
                 {
                     first = false;
                 }
@@ -158,9 +172,11 @@
                 {
                     protocol.append(", ");
                 }
+
                 protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
 
             }
+
             System.out.println(ver + " (" + protocol + ")");
         }
         else
@@ -186,7 +202,6 @@
         }
     }
 
-
     protected void startup() throws InitException, ConfigurationException, Exception
     {
         final String QpidHome = System.getProperty("QPID_HOME");
@@ -201,7 +216,7 @@
                 error = error + "\nNote: Qpid_HOME is not set.";
             }
 
-            throw new InitException(error);
+            throw new InitException(error, null);
         }
         else
         {
@@ -226,8 +241,8 @@
 
         _logger.info("Starting Qpid.AMQP broker");
 
-        ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
-                getConfiguredObject(ConnectorConfiguration.class);
+        ConnectorConfiguration connectorConfig =
+            ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class);
 
         ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
 
@@ -249,7 +264,7 @@
             }
             catch (NumberFormatException e)
             {
-                throw new InitException("Invalid port: " + portStr);
+                throw new InitException("Invalid port: " + portStr, e);
             }
         }
 
@@ -264,19 +279,21 @@
                 int totalVHosts = ((Collection) virtualHosts).size();
                 for (int vhost = 0; vhost < totalVHosts; vhost++)
                 {
-                    setupVirtualHosts(configFile.getParent() , (String)((List)virtualHosts).get(vhost));
+                    setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost));
                 }
             }
             else
             {
-               setupVirtualHosts(configFile.getParent() , (String)virtualHosts);
+                setupVirtualHosts(configFile.getParent(), (String) virtualHosts);
             }
         }
+
         bind(port, connectorConfig);
 
     }
 
-    protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException
+    protected void setupVirtualHosts(String configFileParent, String configFilePath)
+        throws ConfigurationException, AMQException, URLSyntaxException
     {
         String configVar = "${conf}";
 
@@ -285,7 +302,7 @@
             configFilePath = configFileParent + configFilePath.substring(configVar.length());
         }
 
-        if (configFilePath.indexOf(".xml") != -1 )
+        if (configFilePath.indexOf(".xml") != -1)
         {
             VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath);
             vHostConfig.performBindings();
@@ -298,11 +315,12 @@
 
             String[] fileNames = virtualHostDir.list();
 
-            for (int each=0; each < fileNames.length; each++)
+            for (int each = 0; each < fileNames.length; each++)
             {
                 if (fileNames[each].endsWith(".xml"))
                 {
-                    VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath+"/"+fileNames[each]);
+                    VirtualHostConfiguration vHostConfig =
+                        new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]);
                     vHostConfig.performBindings();
                 }
             }
@@ -319,7 +337,7 @@
 
         try
         {
-            //IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
+            // IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
             IoAcceptor acceptor = connectorConfig.createAcceptor();
             SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
             SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
@@ -334,7 +352,7 @@
             {
                 sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
             }
-            
+
             if (!connectorConfig.enableSSL || !connectorConfig.sslOnly)
             {
                 AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
@@ -347,6 +365,7 @@
                 {
                     bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
                 }
+
                 acceptor.bind(bindAddress, handler, sconfig);
                 _logger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
             }
@@ -356,8 +375,7 @@
                 AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
                 try
                 {
-                    acceptor.bind(new InetSocketAddress(connectorConfig.sslPort),
-                                  handler, sconfig);
+                    acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
                     _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
                 }
                 catch (IOException e)
@@ -415,16 +433,17 @@
         }
         catch (NumberFormatException e)
         {
-            System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " +
-                    "a non-negative integer. Using default of zero (no watching configured");
+            System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+                + "a non-negative integer. Using default of zero (no watching configured");
         }
+
         if (logConfigFile.exists() && logConfigFile.canRead())
         {
             System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
             if (logWatchTime > 0)
             {
-                System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " +
-                        logWatchTime + " seconds");
+                System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
+                    + logWatchTime + " seconds");
                 // log4j expects the watch interval in milliseconds
                 DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
             }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Thu Apr 19 09:24:30 2007
@@ -196,6 +196,7 @@
             }
             else
             {
+                _logger.error("MESSAGE LOSS: Message should be sent on a Dead Letter Queue");                
                 _logger.warn(msg);
             }
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Thu Apr 19 09:24:30 2007
@@ -98,7 +98,7 @@
             // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
 //            if (!evt.getMethod().resend)
             {
-                message.message.reject(message.message.getDeliveredSubscription());
+                message.message.reject(message.message.getDeliveredSubscription(message.queue));
             }
 
             if (evt.getMethod().requeue)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Thu Apr 19 09:24:30 2007
@@ -33,6 +33,7 @@
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.security.access.AccessResult;
+import org.apache.qpid.server.security.access.AccessRights;
 import org.apache.log4j.Logger;
 
 public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
@@ -75,23 +76,26 @@
 
         if (virtualHost == null)
         {
-            throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: " + virtualHostName);
+            throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'");
         }
         else
         {
             session.setVirtualHost(virtualHost);
 
-            AccessResult result = virtualHost.getAccessManager().isAuthorized(virtualHost, session.getAuthorizedID());
+            AccessResult result = virtualHost.getAccessManager().isAuthorized(virtualHost, session.getAuthorizedID(), AccessRights.Rights.ANY);
 
             switch (result.getStatus())
             {
                 default:
                 case REFUSED:
-                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
-                                                      "Access denied to vHost '" + virtualHostName + "' by "
-                                                      + result.getAuthorizer());
+                    String error = "Any access denied to vHost '" + virtualHostName + "' by "
+                                   + result.getAuthorizer();
+                    
+                    _logger.warn(error);
+
+                    throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error);
                 case GRANTED:
-                    _logger.info("Granted access to vHost '" + virtualHostName + "' for " + session.getAuthorizedID()
+                    _logger.info("Granted any access to vHost '" + virtualHostName + "' for " + session.getAuthorizedID()
                                  + " by '" + result.getAuthorizer() + "'");
             }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Thu Apr 19 09:24:30 2007
@@ -37,6 +37,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -106,7 +107,7 @@
                     ConnectionStartOkMethodHandler.getConfiguredFrameSize(),	// frameMax
                     HeartbeatConfig.getInstance().getDelay());	// heartbeat
                 session.writeFrame(tune);
-                session.setAuthorizedID(ss.getAuthorizationID());                
+                session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));                
                 disposeSaslServer(session);
                 break;
             case CONTINUE:

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Thu Apr 19 09:24:30 2007
@@ -37,6 +37,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -95,7 +96,7 @@
                     throw new AMQException("Authentication failed");
                 case SUCCESS:
                     _logger.info("Connected as: " + ss.getAuthorizationID());
-                    session.setAuthorizedID(ss.getAuthorizationID());
+                    session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));                
 
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
                     // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Thu Apr 19 09:24:30 2007
@@ -64,7 +64,6 @@
     private final AtomicInteger _counter = new AtomicInteger();
 
 
-
     protected QueueDeclareHandler()
     {
         Configurator.configure(this);
@@ -92,12 +91,12 @@
         synchronized (queueRegistry)
         {
 
-            if (((queue = queueRegistry.getQueue(body.queue)) == null) )
+            if (((queue = queueRegistry.getQueue(body.queue)) == null))
             {
-                if(body.passive)
+                if (body.passive)
                 {
-                    String msg = "Queue: " + body.queue + " not found.";
-                    throw body.getChannelException(AMQConstant.NOT_FOUND,msg );
+                    String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";
+                    throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
                 }
                 else
                 {
@@ -112,13 +111,16 @@
                         Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
 
                         queue.bind(body.queue, null, defaultExchange);
-                        _log.info("Queue " + body.queue + " bound to default exchange");
+                        _log.info("Queue " + body.queue + " bound to default exchange(" + defaultExchange.getName() + ")");
                     }
                 }
             }
-            else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
+            else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
             {
-                throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue, as exclusive queue with same name declared on another connection");        
+                throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + body.queue + "'),"
+                                                                           + " as exclusive queue with same name "
+                                                                           + "declared on another client ID('"
+                                                                           + queue.getOwner() + "')");
             }
 
             AMQChannel channel = session.getChannel(evt.getChannelId());
@@ -138,10 +140,10 @@
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
             AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
-                (byte)8, (byte)0,	// AMQP version (major, minor)
-                queue.getConsumerCount(), // consumerCount
-                queue.getMessageCount(), // messageCount
-                body.queue); // queue
+                                                                  (byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                  queue.getConsumerCount(), // consumerCount
+                                                                  queue.getMessageCount(), // messageCount
+                                                                  body.queue); // queue
             _log.info("Queue " + body.queue + " declared successfully");
             session.writeFrame(response);
         }
@@ -162,24 +164,22 @@
     {
         final QueueRegistry registry = virtualHost.getQueueRegistry();
         AMQShortString owner = body.exclusive ? session.getContextKey() : null;
-        final AMQQueue queue =  new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+        final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
         final AMQShortString queueName = queue.getName();
 
-        if(body.exclusive && !body.durable)
+        if (body.exclusive && !body.durable)
         {
             final AMQProtocolSession.Task deleteQueueTask =
-                new AMQProtocolSession.Task()
-                {
-
-                    public void doTask(AMQProtocolSession session) throws AMQException
+                    new AMQProtocolSession.Task()
                     {
-                        if(registry.getQueue(queueName) == queue)
+                        public void doTask(AMQProtocolSession session) throws AMQException
                         {
-                            queue.delete();
+                            if (registry.getQueue(queueName) == queue)
+                            {
+                                queue.delete();
+                            }
                         }
-
-                    }
-                };
+                    };
 
             session.addSessionCloseTask(deleteQueueTask);
 
@@ -190,16 +190,14 @@
                     session.removeSessionCloseTask(deleteQueueTask);
                 }
             });
-
-
-        }
+        }// if exclusive and not durable
 
         Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
         if (virtualHostDefaultQueueConfiguration != null)
         {
             Configurator.configure(queue, virtualHostDefaultQueueConfiguration);
         }
-        
+
         return queue;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Thu Apr 19 09:24:30 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,29 +20,174 @@
  */
 package org.apache.qpid.server.management;
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Map;
 
 import javax.management.JMException;
 import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.MBeanServerForwarder;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AccountNotFoundException;
+import javax.security.sasl.AuthorizeCallback;
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
+
+/**
+ * This class starts up an MBeanserver. If out of the box agent is being used then there are no security features
+ * implemented. To use the security features like user authentication, turn off the jmx options in the "QPID_OPTS" env
+ * variable and use JMXMP connector server. If JMXMP connector is not available, then the standard JMXConnector will be
+ * used, which again doesn't have user authentication.
+ */
 public class JMXManagedObjectRegistry implements ManagedObjectRegistry
 {
     private static final Logger _log = Logger.getLogger(JMXManagedObjectRegistry.class);
 
     private final MBeanServer _mbeanServer;
+    private Registry _rmiRegistry;
+    private JMXServiceURL _jmxURL;
 
-    public JMXManagedObjectRegistry()
+    public JMXManagedObjectRegistry() throws AMQException
     {
         _log.info("Initialising managed object registry using platform MBean server");
-        // we use the platform MBean server currently but this must be changed or at least be configuurable
-        _mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+
+        // Retrieve the config parameters
+        boolean platformServer = appRegistry.getConfiguration().getBoolean("management.platform-mbeanserver", true);
+
+        _mbeanServer =
+                platformServer ? ManagementFactory.getPlatformMBeanServer()
+                : MBeanServerFactory.createMBeanServer(ManagedObject.DOMAIN);
+    }
+
+
+    public void start()
+    {
+        // Check if the "QPID_OPTS" is set to use Out of the Box JMXAgent
+        if (areOutOfTheBoxJMXOptionsSet())
+        {
+            _log.info("JMX: Using the out of the box JMX Agent");
+            return;
+        }
+
+        IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+
+        boolean security = appRegistry.getConfiguration().getBoolean("management.security-enabled", true);
+        int port = appRegistry.getConfiguration().getInt("management.jmxport", 8999);
+
+        try
+        {
+            if (security)
+            {
+                // For SASL using JMXMP
+                _jmxURL = new JMXServiceURL("jmxmp", null, port);
+
+                Map env = new HashMap();
+                Map<String, PrincipalDatabase> map = appRegistry.getDatabaseManager().getDatabases();
+                PrincipalDatabase db = null;
+                
+                for (Map.Entry<String, PrincipalDatabase> entry : map.entrySet())
+                {
+                    if (entry.getValue() instanceof Base64MD5PasswordFilePrincipalDatabase)
+                    {
+                        db = entry.getValue();
+                        break;
+                    }
+                    else if (entry.getValue() instanceof PlainPasswordFilePrincipalDatabase)
+                    {
+                        db = entry.getValue();
+                    }
+                }
+
+                if (db instanceof Base64MD5PasswordFilePrincipalDatabase)
+                {
+                    env.put("jmx.remote.profiles", "SASL/CRAM-MD5");
+                    CRAMMD5HashedInitialiser initialiser = new CRAMMD5HashedInitialiser();
+                    initialiser.initialise(db);
+                    env.put("jmx.remote.sasl.callback.handler", initialiser.getCallbackHandler());
+                }
+                else if (db instanceof PlainPasswordFilePrincipalDatabase)
+                {
+                    env.put("jmx.remote.profiles", "SASL/PLAIN");
+                    env.put("jmx.remote.sasl.callback.handler", new UserCallbackHandler(db));
+                }
+
+                // Enable the SSL security and server authentication
+                /*
+                SslRMIClientSocketFactory csf = new SslRMIClientSocketFactory();
+                SslRMIServerSocketFactory ssf = new SslRMIServerSocketFactory();
+                env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, csf);
+                env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, ssf);
+                 */
+
+                try
+                {
+                    JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(_jmxURL, env, _mbeanServer);
+                    MBeanServerForwarder mbsf = MBeanInvocationHandlerImpl.newProxyInstance();
+                    cs.setMBeanServerForwarder(mbsf);
+                    cs.start();
+                    _log.info("JMX: Starting JMXConnector server with SASL");
+                }
+                catch (java.net.MalformedURLException urlException)
+                {
+                    // When JMXMPConnector is not available
+                    // java.net.MalformedURLException: Unsupported protocol: jmxmp
+                    _log.info("JMX: Starting JMXConnector server");
+                    startJMXConnectorServer(port);
+                }
+            }
+            else
+            {
+                startJMXConnectorServer(port);
+            }
+        }
+        catch (Exception ex)
+        {
+            _log.error("Error in initialising Managed Object Registry." + ex.getMessage());
+            ex.printStackTrace();
+        }
+    }
+
+    /**
+     * Starts up an RMIRegistry at configured port and attaches a JMXConnectorServer to it.
+     *
+     * @param port
+     *
+     * @throws IOException
+     */
+    private void startJMXConnectorServer(int port) throws IOException
+    {
+        startRMIRegistry(port);
+        _jmxURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + port + "/jmxrmi");
+        JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(_jmxURL, null, _mbeanServer);
+        cs.start();
     }
 
     public void registerObject(ManagedObject managedObject) throws JMException
     {
-         _mbeanServer.registerMBean(managedObject, managedObject.getObjectName());
+        _mbeanServer.registerMBean(managedObject, managedObject.getObjectName());
     }
 
     public void unregisterObject(ManagedObject managedObject) throws JMException
@@ -50,4 +195,105 @@
         _mbeanServer.unregisterMBean(managedObject.getObjectName());
     }
 
+    /**
+     * Checks is the "QPID_OPTS" env variable is set to use the out of the box JMXAgent.
+     *
+     * @return
+     */
+    private boolean areOutOfTheBoxJMXOptionsSet()
+    {
+        if (System.getProperty("com.sun.management.jmxremote") != null)
+        {
+            return true;
+        }
+
+        if (System.getProperty("com.sun.management.jmxremote.port") != null)
+        {
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Starts the rmi registry at given port
+     *
+     * @param port
+     *
+     * @throws RemoteException
+     */
+    private void startRMIRegistry(int port) throws RemoteException
+    {
+        System.setProperty("java.rmi.server.randomIDs", "true");
+        _rmiRegistry = LocateRegistry.createRegistry(port);
+    }
+
+    // stops the RMIRegistry, if it was running and bound to a port
+    public void close() throws RemoteException
+    {
+        if (_rmiRegistry != null)
+        {
+            // Stopping the RMI registry
+            UnicastRemoteObject.unexportObject(_rmiRegistry, true);
+        }
+    }
+
+    /** This class is used for SASL enabled JMXConnector for performing user authentication. */
+    private class UserCallbackHandler implements CallbackHandler
+    {
+        private final PrincipalDatabase _principalDatabase;
+
+        protected UserCallbackHandler(PrincipalDatabase database)
+        {
+            _principalDatabase = database;
+        }
+
+        public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+        {
+            // Retrieve callbacks
+            NameCallback ncb = null;
+            PasswordCallback pcb = null;
+            for (int i = 0; i < callbacks.length; i++)
+            {
+                if (callbacks[i] instanceof NameCallback)
+                {
+                    ncb = (NameCallback) callbacks[i];
+                }
+                else if (callbacks[i] instanceof PasswordCallback)
+                {
+                    pcb = (PasswordCallback) callbacks[i];
+                }
+                else if (callbacks[i] instanceof AuthorizeCallback)
+                {
+                    ((AuthorizeCallback) callbacks[i]).setAuthorized(true);
+                }
+                else
+                {
+                    throw new UnsupportedCallbackException(callbacks[i]);
+                }
+            }
+
+            boolean authorized = false;
+            // Process retrieval of password; can get password if username is available in NameCallback
+            if ((ncb != null) && (pcb != null))
+            {
+                String username = ncb.getDefaultName();
+                try
+                {
+                    authorized = _principalDatabase.verifyPassword(username, new String(pcb.getPassword()));
+                }
+                catch (AccountNotFoundException e)
+                {
+                    IOException ioe = new IOException("User not authorized.  " + e);
+                    ioe.initCause(e);
+                    throw ioe;
+                }
+            }
+
+            if (!authorized)
+            {
+                throw new IOException("User not authorized.");
+            }
+        }
+    }
 }

Copied: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java (from r526666, incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java?view=diff&rev=530474&p1=incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java&r1=526666&p2=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java&r2=530474
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java Thu Apr 19 09:24:30 2007
@@ -44,21 +44,20 @@
 import java.io.FileInputStream;
 
 /**
- * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations.
- * This implements the logic for allowing the users to invoke MBean operations and implements the
- * restrictions for readOnly, readWrite and admin users.
+ * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations. This implements
+ * the logic for allowing the users to invoke MBean operations and implements the restrictions for readOnly, readWrite
+ * and admin users.
  */
 public class MBeanInvocationHandlerImpl implements InvocationHandler
 {
     private static final Logger _logger = Logger.getLogger(MBeanInvocationHandlerImpl.class);
-    
-    private final static String ADMIN = "admin";
-    private final static String READWRITE="readwrite";
-    private final static String READONLY = "readonly";
+
+    public final static String ADMIN = "admin";
+    public final static String READWRITE = "readwrite";
+    public final static String READONLY = "readonly";
     private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate";
-    private static final String DEFAULT_PERMISSIONS_FILE = "etc" + File.separator + "jmxremote.access";
     private MBeanServer mbs;
-    private final static Properties _userRoles = new Properties();
+    private static Properties _userRoles = new Properties();
 
     public static MBeanServerForwarder newProxyInstance()
     {
@@ -119,7 +118,7 @@
         {
             throw new SecurityException("Access denied");
         }
-        
+
         Principal principal = principals.iterator().next();
         String identity = principal.getName();
 
@@ -140,21 +139,9 @@
     }
 
     // Initialises the user roles
-    protected static void initialise() throws AMQException
+    public static void setAccessRights(Properties accessRights)
     {
-        final String QpidHome = System.getProperty("QPID_HOME");
-        String fileName = QpidHome + File.separator + DEFAULT_PERMISSIONS_FILE;
-        try
-        {
-            FileInputStream in = new FileInputStream(fileName);
-            _userRoles.load(in);
-            in.close();
-        }
-        catch (IOException ex)
-        {
-            _logger.error("Error in loading JMX User permissions." + ex.getMessage());
-            //throw new AMQException("Error in loading JMX User permissions", ex);
-        }
+        _userRoles = accessRights;
     }
 
     private boolean isAdmin(String userName)
@@ -200,11 +187,13 @@
         {
             String mbeanMethod = (args.length > 1) ? (String) args[1] : null;
             if (mbeanMethod == null)
+            {
                 return false;
-            
+            }
+
             try
             {
-                MBeanInfo mbeanInfo = mbs.getMBeanInfo((ObjectName)args[0]);
+                MBeanInfo mbeanInfo = mbs.getMBeanInfo((ObjectName) args[0]);
                 if (mbeanInfo != null)
                 {
                     MBeanOperationInfo[] opInfos = mbeanInfo.getOperations();
@@ -219,7 +208,7 @@
             }
             catch (JMException ex)
             {
-                ex.printStackTrace();   
+                ex.printStackTrace();
             }
         }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java Thu Apr 19 09:24:30 2007
@@ -52,8 +52,7 @@
     @MBeanOperation(name="createNewExchange", description="Creates a new Exchange", impact= MBeanOperationInfo.ACTION)
     void createNewExchange(@MBeanOperationParameter(name="name", description="Name of the new exchange")String name,
                            @MBeanOperationParameter(name="ExchangeType", description="Type of the exchange")String type,
-                           @MBeanOperationParameter(name="durable", description="true if the Exchang should be durable")boolean durable,
-                           @MBeanOperationParameter(name="passive", description="true of the Exchange should be passive")boolean passive)
+                           @MBeanOperationParameter(name="durable", description="true if the Exchang should be durable")boolean durable)
         throws IOException, JMException;
 
     /**
@@ -81,8 +80,7 @@
     @MBeanOperation(name="createNewQueue", description="Create a new Queue on the Broker server", impact= MBeanOperationInfo.ACTION)
     void createNewQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName,
                         @MBeanOperationParameter(name="owner", description="Owner name")String owner,
-                        @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable,
-                        @MBeanOperationParameter(name="autoDelete", description="true if the queue should be auto delete") boolean autoDelete)
+                        @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable)
         throws IOException, JMException;
 
     /**

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java Thu Apr 19 09:24:30 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.management;
 
 import javax.management.JMException;
+import java.rmi.RemoteException;
 
 /**
  * Handles the registration (and unregistration and so on) of managed objects.
@@ -36,7 +37,11 @@
  */
 public interface ManagedObjectRegistry
 {
+    void start();
+
     void registerObject(ManagedObject managedObject) throws JMException;
 
     void unregisterObject(ManagedObject managedObject) throws JMException;
+
+    void close() throws RemoteException;
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java Thu Apr 19 09:24:30 2007
@@ -24,6 +24,8 @@
 
 import org.apache.log4j.Logger;
 
+import java.rmi.RemoteException;
+
 /**
  * This managed object registry does not actually register MBeans. This can be used in tests when management is
  * not required or when management has been disabled.
@@ -38,11 +40,21 @@
         _log.info("Management is disabled");
     }
 
+    public void start()
+    {
+        //no-op
+    }
+
     public void registerObject(ManagedObject managedObject) throws JMException
     {
     }
 
     public void unregisterObject(ManagedObject managedObject) throws JMException
     {
+    }
+
+    public void close() throws RemoteException
+    {
+        
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Apr 19 09:24:30 2007
@@ -28,6 +28,7 @@
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.security.Principal;
 
 import javax.management.JMException;
 import javax.security.sasl.SaslServer;
@@ -108,7 +109,7 @@
     private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
     private List<Integer> _closingChannelsList = new ArrayList<Integer>();
     private ProtocolOutputConverter _protocolOutputConverter;
-    private String _authorizedID;
+    private Principal _authorizedID;
 
 
     public ManagedObject getManagedObject()
@@ -745,12 +746,12 @@
         return _protocolOutputConverter;
     }
 
-    public void setAuthorizedID(String authorizedID)
+    public void setAuthorizedID(Principal authorizedID)
     {
         _authorizedID = authorizedID;
     }
 
-    public String getAuthorizedID()
+    public Principal getAuthorizedID()
     {
         return _authorizedID;
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Apr 19 09:24:30 2007
@@ -31,6 +31,8 @@
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.security.Principal;
+
 
 public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
 {
@@ -165,9 +167,9 @@
 
     public ProtocolOutputConverter getProtocolOutputConverter();
 
-    void setAuthorizedID(String authorizedID);
+    void setAuthorizedID(Principal authorizedID);
 
-    /** @return a username string that was used to authorized this session */    
-    String getAuthorizedID();
+    /** @return a Principal that was used to authorized this session */
+    Principal getAuthorizedID();
 
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Thu Apr 19 09:24:30 2007
@@ -1,5 +1,25 @@
 /*
  *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
  * Copyright (c) 2006 The Apache Software Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,14 +37,15 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.security.Principal;
 import java.util.Date;
 import java.util.List;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
 import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
 import javax.management.NotCompliantMBeanException;
+import javax.management.Notification;
 import javax.management.monitor.MonitorNotification;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
@@ -56,15 +77,17 @@
 {
     private AMQMinaProtocolSession _session = null;
     private String _name = null;
-    
-    //openmbean data types for representing the channel attributes
-    private final static String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
-    private final static String[] _indexNames = {_channelAtttibuteNames[0]};
-    private final static OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
-    private static CompositeType _channelType = null;      // represents the data type for channel data
-    private static TabularType _channelsType = null;       // Data type for list of channels type
+
+    // openmbean data types for representing the channel attributes
+    private static final String[] _channelAtttibuteNames =
+        { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count" };
+    private static final String[] _indexNames = { _channelAtttibuteNames[0] };
+    private static final OpenType[] _channelAttributeTypes =
+        { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER };
+    private static CompositeType _channelType = null; // represents the data type for channel data
+    private static TabularType _channelsType = null; // Data type for list of channels type
     private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION =
-            new AMQShortString("Broker Management Console has closed the connection.");
+        new AMQShortString("Broker Management Console has closed the connection.");
 
     @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
     public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
@@ -72,22 +95,21 @@
         super(ManagedConnection.class, ManagedConnection.TYPE);
         _session = session;
         String remote = getRemoteAddress();
-        remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
+        remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
         _name = jmxEncode(new StringBuffer(remote), 0).toString();
         init();
     }
 
-
     static
     {
         try
         {
             init();
         }
-        catch(JMException ex)
+        catch (JMException ex)
         {
-            // It should never occur
-            System.out.println(ex.getMessage());
+            // This is not expected to ever occur.
+            throw new RuntimeException("Got JMException in static initializer.", ex);
         }
     }
 
@@ -96,26 +118,27 @@
      */
     private static void init() throws OpenDataException
     {
-        _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
-                                         _channelAtttibuteNames, _channelAttributeTypes);
+        _channelType =
+            new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, _channelAtttibuteNames,
+                _channelAttributeTypes);
         _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
     }
 
     public String getClientId()
     {
-        return _session.getContextKey() == null ? null : _session.getContextKey().toString();
+        return (_session.getContextKey() == null) ? null : _session.getContextKey().toString();
     }
 
     public String getAuthorizedId()
     {
-        return _session.getAuthorizedID();
+        return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null;
     }
 
     public String getVersion()
     {
-        return _session.getClientVersion() == null ? null : _session.getClientVersion().toString();
+        return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString();
     }
-    
+
     public Date getLastIoTime()
     {
         return new Date(_session.getIOSession().getLastIoTime());
@@ -171,6 +194,7 @@
             {
                 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
             }
+
             _session.commitTransactions(channel);
         }
         catch (AMQException ex)
@@ -194,6 +218,7 @@
             {
                 throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
             }
+
             _session.rollbackTransactions(channel);
         }
         catch (AMQException ex)
@@ -215,9 +240,12 @@
 
         for (AMQChannel channel : list)
         {
-            Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
+            Object[] itemValues =
+                {
+                    channel.getChannelId(), channel.isTransactional(),
                     (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName().asString() : null,
-                    channel.getUnacknowledgedMessageMap().size()};
+                    channel.getUnacknowledgedMessageMap().size()
+                };
 
             CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
             channelsList.put(channelData);
@@ -232,17 +260,16 @@
      * @throws JMException
      */
     public void closeConnection() throws JMException
-    {        
+    {
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
-            _session.getProtocolMajorVersion(),
-            _session.getProtocolMinorVersion(),	// AMQP version (major, minor)
-            0,	// classId
-            0,	// methodId
-        	AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
-            BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION    // replyText
+        final AMQFrame response =
+            ConnectionCloseBody.createAMQFrame(0, _session.getProtocolMajorVersion(), _session.getProtocolMinorVersion(), // AMQP version (major, minor)
+                0, // classId
+                0, // methodId
+                AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText
             );
         _session.writeFrame(response);
 
@@ -259,18 +286,19 @@
     @Override
     public MBeanNotificationInfo[] getNotificationInfo()
     {
-        String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+        String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
         String name = MonitorNotification.class.getName();
         String description = "Channel count has reached threshold value";
         MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
 
-        return new MBeanNotificationInfo[]{info1};
+        return new MBeanNotificationInfo[] { info1 };
     }
 
     public void notifyClients(String notificationMsg)
     {
-        Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
-                ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+        Notification n =
+            new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+                System.currentTimeMillis(), notificationMsg);
         _broadcaster.sendNotification(n);
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java Thu Apr 19 09:24:30 2007
@@ -23,6 +23,7 @@
 
 import java.io.IOException;
 import java.util.Date;
+import java.security.Principal;
 
 import javax.management.JMException;
 import javax.management.MBeanOperationInfo;
@@ -67,16 +68,17 @@
     /**
      * Tells the total number of bytes written till now.
      * @return number of bytes written.
-     */
+     *
     @MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now")
     Long getWrittenBytes();
-
+    */
     /**
      * Tells the total number of bytes read till now.
      * @return number of bytes read.
-     */
+     *
     @MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now")
     Long getReadBytes();
+    */
 
     /**
      * Threshold high value for no of channels.  This is useful in setting notifications or

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Apr 19 09:24:30 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -42,6 +43,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -78,19 +81,20 @@
     private boolean _immediate;
 
     private AtomicBoolean _taken = new AtomicBoolean(false);
-
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
     private Subscription _takenBySubcription;
-
     private Set<Subscription> _rejectedBy = null;
+    private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
+    private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
 
-    public boolean isTaken()
+    public boolean isTaken(AMQQueue queue)
     {
         return _taken.get();
     }
 
     private final int hashcode = System.identityHashCode(this);
+
     public String debugIdentity()
     {
         return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
@@ -203,9 +207,10 @@
         _transientMessageData.setMessagePublishInfo(info);
 
         _taken = new AtomicBoolean(false);
+
         if (_log.isDebugEnabled())
         {
-            _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")");
+            _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity() + ")");
         }
     }
 
@@ -318,8 +323,10 @@
 
         // enqueuing the messages ensure that if required the destinations are recorded to a
         // persistent store
+
         for (AMQQueue q : _transientMessageData.getDestinationQueues())
         {
+            _takenMap.put(q, new AtomicBoolean(false));
             _messageHandle.enqueue(storeContext, _messageId, q);
         }
 
@@ -356,12 +363,13 @@
     }
 
     /**
-     * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation.
+     * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
+     * operation.
      */
     public AMQMessage takeReference()
     {
         _referenceCount.incrementAndGet();
-	return this;
+        return this;
     }
 
     /** Threadsafe. Increment the reference count on the message. */
@@ -378,9 +386,10 @@
      * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
      * message store.
      *
+     * @param storeContext
+     *
      * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
      *                                 failed
-     * @param storeContext
      */
     public void decrementReference(StoreContext storeContext) throws MessageCleanupException
     {
@@ -451,7 +460,7 @@
     }
 
 
-    public boolean taken(Subscription sub)
+    public boolean taken(AMQQueue queue, Subscription sub)
     {
         if (_taken.getAndSet(true))
         {
@@ -464,7 +473,7 @@
         }
     }
 
-    public void release()
+    public void release(AMQQueue queue)
     {
         if (_log.isTraceEnabled())
         {
@@ -600,7 +609,7 @@
             for (AMQQueue q : destinationQueues)
             {
                 //Increment the references to this message for each queue delivery.
-                incrementReference();                
+                incrementReference();
                 //normal deliver so add this message at the end.
                 _txnContext.deliver(this, q, false);
             }
@@ -824,11 +833,14 @@
 
     public String toString()
     {
-        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
-               _taken + " by:" + _takenBySubcription;
+        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+               _taken + " by :" + _takenBySubcription;
+
+//        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+//               _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
     }
 
-    public Subscription getDeliveredSubscription()
+    public Subscription getDeliveredSubscription(AMQQueue queue)
     {
         return _takenBySubcription;
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Apr 19 09:24:30 2007
@@ -1,5 +1,25 @@
 /*
  *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
  * Copyright (c) 2006 The Apache Software Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,11 +37,11 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Date;
-import java.text.SimpleDateFormat;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -41,12 +61,14 @@
 import javax.management.openmbean.TabularType;
 
 import org.apache.log4j.Logger;
+
 import org.apache.mina.common.ByteBuffer;
+
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.MBeanConstructor;
@@ -73,15 +95,15 @@
     private AMQQueue _queue = null;
     private String _queueName = null;
     // OpenMBean data types for viewMessages method
-    private final static String[] _msgAttributeNames = {"AMQ MessageId", "Header", "Size(bytes)", "Redelivered"};
-    private static String[] _msgAttributeIndex = {_msgAttributeNames[0]};
+    private static final String[] _msgAttributeNames = { "AMQ MessageId", "Header", "Size(bytes)", "Redelivered" };
+    private static String[] _msgAttributeIndex = { _msgAttributeNames[0] };
     private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
-    private static CompositeType _messageDataType = null;           // Composite type for representing AMQ Message data.
-    private static TabularType _messagelistDataType = null;         // Datatype for representing AMQ messages list.
+    private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
+    private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
 
     // OpenMBean data types for viewMessageContent method
     private static CompositeType _msgContentType = null;
-    private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
+    private static final String[] _msgContentAttributes = { "AMQ MessageId", "MimeType", "Encoding", "Content" };
     private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
 
     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -95,7 +117,6 @@
         _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
     }
 
-
     public ManagedObject getParentObject()
     {
         return _queue.getVirtualHost().getManagedObject();
@@ -107,10 +128,10 @@
         {
             init();
         }
-        catch(JMException ex)
+        catch (JMException ex)
         {
-            // It should never occur
-            System.out.println(ex.getMessage());
+            // This is not expected to ever occur.
+            throw new RuntimeException("Got JMException in static initializer.", ex);
         }
     }
 
@@ -119,19 +140,21 @@
      */
     private static void init() throws OpenDataException
     {
-        _msgContentAttributeTypes[0] = SimpleType.LONG;                    // For message id
-        _msgContentAttributeTypes[1] = SimpleType.STRING;                  // For MimeType
-        _msgContentAttributeTypes[2] = SimpleType.STRING;                  // For Encoding
-        _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE);  // For message content
-        _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
-                _msgContentAttributes, _msgContentAttributeTypes);
-
-        _msgAttributeTypes[0] = SimpleType.LONG;                      // For message id
-        _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING);  // For header attributes
-        _msgAttributeTypes[2] = SimpleType.LONG;                      // For size
-        _msgAttributeTypes[3] = SimpleType.BOOLEAN;                   // For redelivered
+        _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
+        _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
+        _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
+        _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
+        _msgContentType =
+            new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, _msgContentAttributes,
+                _msgContentAttributeTypes);
+
+        _msgAttributeTypes[0] = SimpleType.LONG; // For message id
+        _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
+        _msgAttributeTypes[2] = SimpleType.LONG; // For size
+        _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
 
-        _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+        _messageDataType =
+            new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
         _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
     }
 
@@ -213,7 +236,8 @@
     public Long getMaximumQueueDepth()
     {
         long queueDepthInBytes = _queue.getMaximumQueueDepth();
-        return queueDepthInBytes >> 10 ;
+
+        return queueDepthInBytes >> 10;
     }
 
     public void setMaximumQueueDepth(Long value)
@@ -227,7 +251,8 @@
     public Long getQueueDepth() throws JMException
     {
         long queueBytesSize = _queue.getQueueDepth();
-        return queueBytesSize >> 10 ;
+
+        return queueBytesSize >> 10;
     }
 
     /**
@@ -237,13 +262,13 @@
     {
 
         final long currentTime = System.currentTimeMillis();
-        final long thresholdTime =  currentTime - _queue.getMinimumAlertRepeatGap();
+        final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
 
-        for(NotificationCheck check : NotificationCheck.values())
+        for (NotificationCheck check : NotificationCheck.values())
         {
-            if(check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()]<thresholdTime)
+            if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
             {
-                if(check.notifyIfNecessary(msg, _queue, this))
+                if (check.notifyIfNecessary(msg, _queue, this))
                 {
                     _lastNotificationTimes[check.ordinal()] = currentTime;
                 }
@@ -260,9 +285,10 @@
         // important : add log to the log file - monitoring tools may be looking for this
         _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
         notificationMsg = notification.name() + " " + notificationMsg;
-        
-        _lastNotification = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
-                ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+
+        _lastNotification =
+            new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+                System.currentTimeMillis(), notificationMsg);
 
         _broadcaster.sendNotification(_lastNotification);
     }
@@ -334,20 +360,25 @@
         try
         {
             // Create header attributes list
-            CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+            CommonContentHeaderProperties headerProperties =
+                (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
             String mimeType = null, encoding = null;
             if (headerProperties != null)
             {
                 AMQShortString mimeTypeShortSting = headerProperties.getContentType();
-                mimeType = mimeTypeShortSting == null ? null : mimeTypeShortSting.toString();
-                encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding().toString();
+                mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+                encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
             }
-            Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
+
+            Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
             return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
         }
         catch (AMQException e)
         {
-            throw new JMException("Error creating header attributes list: " + e);
+            JMException jme = new JMException("Error creating header attributes list: " + e);
+            jme.initCause(e);
+            throw jme;
         }
     }
 
@@ -358,8 +389,8 @@
     {
         if ((beginIndex > endIndex) || (beginIndex < 1))
         {
-            throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex +
-                                          "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
+            throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex
+                + "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
         }
 
         List<AMQMessage> list = _queue.getMessagesOnTheQueue();
@@ -368,20 +399,22 @@
         try
         {
             // Create the tabular list of message header contents
-            for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
+            for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
             {
                 AMQMessage msg = list.get(i - 1);
                 ContentHeaderBody headerBody = msg.getContentHeaderBody();
                 // Create header attributes list
                 String[] headerAttributes = getMessageHeaderProperties(headerBody);
-                Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
+                Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() };
                 CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
                 _messageList.put(messageData);
             }
         }
         catch (AMQException e)
         {
-            throw new JMException("Error creating message contents: " + e);
+            JMException jme = new JMException("Error creating message contents: " + e);
+            jme.initCause(e);
+            throw jme;
         }
 
         return _messageList;
@@ -400,11 +433,11 @@
         list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString());
 
         int delMode = headerProperties.getDeliveryMode();
-        list.add("JMSDeliveryMode = " + (delMode == 1 ? "Persistent" : "Non_Persistent"));
+        list.add("JMSDeliveryMode = " + ((delMode == 1) ? "Persistent" : "Non_Persistent"));
 
         list.add("JMSPriority = " + headerProperties.getPriority());
         list.add("JMSType = " + headerProperties.getType());
-        
+
         long longDate = headerProperties.getExpiration();
         String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
         list.add("JMSExpiration = " + strDate);
@@ -425,27 +458,26 @@
      */
     public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
     {
-        if (fromMessageId > toMessageId || (fromMessageId < 1))
+        if ((fromMessageId > toMessageId) || (fromMessageId < 1))
         {
-            throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");            
+            throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
         }
 
         _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
     }
 
-
     /**
      * returns Notifications sent by this MBean.
      */
     @Override
     public MBeanNotificationInfo[] getNotificationInfo()
     {
-        String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+        String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
         String name = MonitorNotification.class.getName();
         String description = "Either Message count or Queue depth or Message size has reached threshold high value";
         MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
 
-        return new MBeanNotificationInfo[]{info1};
+        return new MBeanNotificationInfo[] { info1 };
     }
 
 } // End of AMQQueueMBean class

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Apr 19 09:24:30 2007
@@ -210,6 +210,7 @@
 
     /**
      * Returns all the messages in the Queue
+     *
      * @return List of messages
      */
     public List<AMQMessage> getMessages()
@@ -222,14 +223,16 @@
             list.add(message);
         }
         _lock.unlock();
-        
+
         return list;
     }
 
     /**
      * Returns messages within the range of given messageIds
+     *
      * @param fromMessageId
      * @param toMessageId
+     *
      * @return
      */
     public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
@@ -242,7 +245,7 @@
         long maxMessageCount = toMessageId - fromMessageId + 1;
 
         _lock.lock();
-        
+
         List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
 
         for (AMQMessage message : _messages)
@@ -399,7 +402,7 @@
     public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
     {
         _lock.lock();
-        
+
         AMQMessage message = _messages.poll();
         if (message != null)
         {
@@ -432,9 +435,7 @@
         return count;
     }
 
-    /**
-        This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. 
-     */
+    /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
     private AMQMessage getNextMessage() throws AMQException
     {
         return getNextMessage(_messages, null);
@@ -444,8 +445,12 @@
     {
         AMQMessage message = messages.peek();
 
-        //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.)
-        while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub))
+        //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
+        while (message != null
+               && (
+                ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
+                || sub == null)
+               && message.taken(_queue, sub))
         {
             //remove the already taken message
             AMQMessage removed = messages.poll();
@@ -506,7 +511,7 @@
                 }
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug(debugIdentity() + "Async Delivery Message " + message.getMessageId() + "(" + System.identityHashCode(message) +
+                    _log.debug(debugIdentity() + "Async Delivery Message :" + message + "(" + System.identityHashCode(message) +
                                ") by :" + System.identityHashCode(this) +
                                ") to :" + System.identityHashCode(sub));
                 }
@@ -526,7 +531,7 @@
 
             if (_log.isDebugEnabled())
             {
-                _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message.debugIdentity() +
+                _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message +
                            ") by :" + System.identityHashCode(this) +
                            ") to :" + System.identityHashCode(sub));
             }
@@ -562,7 +567,7 @@
         }
         catch (AMQException e)
         {
-            message.release();
+            message.release(_queue);
             _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
         }
     }
@@ -723,7 +728,7 @@
                             _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
-                        msg.taken(s);
+                        msg.taken(_queue, s);
                         //Deliver the message
                         s.send(msg, _queue);
                     }
@@ -737,7 +742,7 @@
                     }
                 }
 
-                if (!msg.isTaken())
+                if (!msg.isTaken(_queue))
                 {
                     if (_log.isInfoEnabled())
                     {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=530474&r1=530473&r2=530474
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Apr 19 09:24:30 2007
@@ -558,7 +558,7 @@
                     _logger.trace("Removed for resending:" + resent.debugIdentity());
                 }
 
-                resent.release();
+                resent.release(_queue);
                 _queue.subscriberHasPendingResend(false, this, resent);
 
                 try