You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/07/31 17:54:49 UTC

svn commit: r561365 [5/10] - in /incubator/qpid/trunk/qpid: ./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/ dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/ dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/BrokerDetai...

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Tue Jul 31 08:53:37 2007
@@ -34,6 +34,7 @@
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.Logger;
@@ -48,6 +49,7 @@
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.management.JMXManagedObjectRegistry;
 import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
 import org.apache.qpid.server.protocol.AMQPProtocolProvider;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -55,11 +57,19 @@
 import org.apache.qpid.server.transport.ConnectorConfiguration;
 import org.apache.qpid.url.URLSyntaxException;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+
 /**
  * Main entry point for AMQPD.
  *
  */
-@SuppressWarnings({ "AccessStaticViaInstance" })
+@SuppressWarnings({"AccessStaticViaInstance"})
 public class Main
 {
     /** Used for debugging. */
@@ -133,6 +143,12 @@
             OptionBuilder.withArgName("port").hasArg()
                          .withDescription("listen on the specified port. Overrides any value in the config file")
                          .withLongOpt("port").create("p");
+        Option mport =
+                OptionBuilder.withArgName("mport").hasArg()
+                        .withDescription("listen on the specified management port. Overrides any value in the config file")
+                        .withLongOpt("mport").create("m");
+
+
         Option bind =
             OptionBuilder.withArgName("bind").hasArg()
                          .withDescription("bind to the specified address. Overrides any value in the config file")
@@ -153,6 +169,7 @@
         options.addOption(logconfig);
         options.addOption(logwatchconfig);
         options.addOption(port);
+        options.addOption(mport);
         options.addOption(bind);
     }
 
@@ -203,15 +220,19 @@
             catch (InitException e)
             {
                 System.out.println(e.getMessage());
+                _brokerLogger.error("Initialisation Error : " + e.getMessage());
+
             }
             catch (ConfigurationException e)
             {
                 System.out.println("Error configuring message broker: " + e);
+                _brokerLogger.error("Error configuring message broker: " + e);
                 e.printStackTrace();
             }
             catch (Exception e)
             {
                 System.out.println("Error intialising message broker: " + e);
+                _brokerLogger.error("Error intialising message broker: " + e);
                 e.printStackTrace();
             }
         }
@@ -260,7 +281,15 @@
             configureLogging(logConfigFile, logWatchConfig);
         }
 
-        ApplicationRegistry.initialise(new ConfigurationFileApplicationRegistry(configFile));
+        ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
+
+
+        updateManagementPort(config.getConfiguration(), commandLine.getOptionValue("m"));
+
+
+
+        ApplicationRegistry.initialise(config);
+
 
         // fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues
         // that are causing the broker build to pick up the wrong properties file and hence say
@@ -316,6 +345,29 @@
         }
 
         bind(port, connectorConfig);
+    }
+
+    /**
+     * Update the configuration data with the management port.
+     * @param configuration
+     * @param managementPort The string from the command line
+     */
+    private void updateManagementPort(Configuration configuration, String managementPort)
+    {
+        if (managementPort != null)
+        {
+            int mport;
+            int defaultMPort = configuration.getInt(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH);
+            try
+            {
+                mport = Integer.parseInt(managementPort);
+                configuration.setProperty(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH, mport);
+            }
+            catch (NumberFormatException e)
+            {
+                _logger.warn("Invalid management port: " + managementPort + " will use default:" + defaultMPort, e);
+            }
+        }
     }
 
     protected void setupVirtualHosts(String configFileParent, String configFilePath)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Jul 31 08:53:37 2007
@@ -38,9 +38,13 @@
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.List;
+import java.util.Map;
+
 public abstract class AbstractExchange implements Exchange, Managable
 {
     private AMQShortString _name;
@@ -188,6 +192,8 @@
             _exchangeMbean.unregister();
         }
     }
+
+    abstract public Map<AMQShortString, List<AMQQueue>> getBindings();
 
     public String toString()
     {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Tue Jul 31 08:53:37 2007
@@ -20,18 +20,20 @@
  */
 package org.apache.qpid.server.exchange;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.protocol.ExchangeInitialiser;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.exception.InternalErrorException;
 
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 public class DefaultExchangeRegistry implements ExchangeRegistry
 {
     private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class);
@@ -64,7 +66,7 @@
     public void registerExchange(Exchange exchange) throws AMQException
     {
         _exchangeMap.put(exchange.getName(), exchange);
-        if(exchange.isDurable())
+        if (exchange.isDurable())
         {
             try
             {
@@ -86,13 +88,18 @@
         return _defaultExchange;
     }
 
+    public Collection<AMQShortString> getExchangeNames()
+    {
+        return _exchangeMap.keySet();
+    }
+
     public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
     {
         // TODO: check inUse argument
         Exchange e = _exchangeMap.remove(name);
         if (e != null)
         {
-            if(e.isDurable())
+            if (e.isDurable())
             {
                 try
                 {
@@ -112,7 +119,7 @@
 
     public Exchange getExchange(AMQShortString name)
     {
-        if((name == null) || name.length() == 0)
+        if ((name == null) || name.length() == 0)
         {
             return getDefaultExchange();
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Tue Jul 31 08:53:37 2007
@@ -26,22 +26,16 @@
 
 import javax.management.JMException;
 import javax.management.MBeanException;
-import javax.management.openmbean.ArrayType;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
 import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.management.MBeanConstructor;
@@ -221,5 +215,10 @@
     public boolean hasBindings() throws AMQException
     {
         return !_index.getBindingsMap().isEmpty();
+    }
+
+    public Map<AMQShortString, List<AMQQueue>> getBindings()
+    {
+        return _index.getBindingsMap();
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Tue Jul 31 08:53:37 2007
@@ -21,11 +21,9 @@
 package org.apache.qpid.server.exchange;
 
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.management.MBeanConstructor;
@@ -35,17 +33,11 @@
 
 import javax.management.JMException;
 import javax.management.MBeanException;
-import javax.management.openmbean.ArrayType;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
 import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -59,7 +51,7 @@
     private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
 
     private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
-        new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+            new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
     // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
     private static final String TOPIC_SEPARATOR = ".";
     private static final String AMQP_STAR = "*";
@@ -92,7 +84,7 @@
                     queueList.add(q.getName().toString());
                 }
 
-                Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) };
+                Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])};
                 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
                 _bindingList.put(bindingData);
             }
@@ -311,6 +303,11 @@
         }
     }
 
+    public Map<AMQShortString, List<AMQQueue>> getBindings()
+    {
+        return _routingKey2queues;
+    }
+
     private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
     {
         List<AMQQueue> list = new LinkedList<AMQQueue>();
@@ -358,8 +355,8 @@
                         if (queueList.size() > (depth + queueskip))
                         { // a hash and it is the last entry
                             matching =
-                                queueList.get(depth + queueskip).equals(AMQP_HASH)
-                                && (queueList.size() == (depth + queueskip + 1));
+                                    queueList.get(depth + queueskip).equals(AMQP_HASH)
+                                    && (queueList.size() == (depth + queueskip + 1));
                         }
                     }
                     else if (routingkeyList.size() > (depth + routingskip))

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Tue Jul 31 08:53:37 2007
@@ -27,9 +27,13 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.List;
+import java.util.Map;
+
 public interface Exchange
 {
     AMQShortString getName();
+
     AMQShortString getType();
 
     void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
@@ -51,6 +55,17 @@
 
     void route(AMQMessage message) throws AMQException;
 
+
+    /**
+     * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
+     * @param routingKey
+     * @param arguments
+     * @param queue
+     * @return
+     * @throws AMQException
+     */
+    boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue);
+
     /**
      * Determines whether a message would be isBound to a particular queue using a specific routing key
      * @param routingKey
@@ -58,22 +73,25 @@
      * @return
      * @throws AMQException
      */
-    boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException;
+    boolean isBound(AMQShortString routingKey, AMQQueue queue);
 
     /**
-     * Determines whether a message is routing to any queue using a specific routing key
+     * Determines whether a message is routing to any queue using a specific _routing key
      * @param routingKey
      * @return
      * @throws AMQException
      */
-    boolean isBound(AMQShortString routingKey) throws AMQException;
+    boolean isBound(AMQShortString routingKey);
 
-    boolean isBound(AMQQueue queue) throws AMQException;
+    boolean isBound(AMQQueue queue);
 
     /**
      * Returns true if this exchange has at least one binding associated with it.
      * @return
      * @throws AMQException
      */
-    boolean hasBindings() throws AMQException;
+    boolean hasBindings();
+
+    Map<AMQShortString, List<AMQQueue>> getBindings();
+
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Tue Jul 31 08:53:37 2007
@@ -23,6 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 
+import java.util.Collection;
+
 
 public interface ExchangeRegistry extends MessageRouter
 {
@@ -42,6 +44,8 @@
     void setDefaultExchange(Exchange exchange);
 
     Exchange getDefaultExchange();
+
+    Collection<AMQShortString> getExchangeNames();
 
     void initialise() throws AMQException;
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Tue Jul 31 08:53:37 2007
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.exchange;
 
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
@@ -34,17 +33,13 @@
 
 import javax.management.JMException;
 import javax.management.MBeanException;
-import javax.management.openmbean.ArrayType;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
 import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArraySet;
 
 public class FanoutExchange extends AbstractExchange
@@ -79,7 +74,7 @@
             {
                 String queueName = queue.getName().toString();
 
-                Object[] bindingItemValues = { queueName, new String[] { queueName } };
+                Object[] bindingItemValues = {queueName, new String[]{queueName}};
                 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
                 _bindingList.put(bindingData);
             }
@@ -120,6 +115,11 @@
         }
     }
 
+    public Map<AMQShortString, List<AMQQueue>> getBindings()
+    {
+        return null;
+    }
+
     public AMQShortString getType()
     {
         return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
@@ -181,24 +181,29 @@
         }
     }
 
-    public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+    public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+    {
+        return isBound(routingKey, queue);
+    }
+
+    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
         return _queues.contains(queue);
     }
 
-    public boolean isBound(AMQShortString routingKey) throws AMQException
+    public boolean isBound(AMQShortString routingKey)
     {
 
         return (_queues != null) && !_queues.isEmpty();
     }
 
-    public boolean isBound(AMQQueue queue) throws AMQException
+    public boolean isBound(AMQQueue queue)
     {
 
         return _queues.contains(queue);
     }
 
-    public boolean hasBindings() throws AMQException
+    public boolean hasBindings()
     {
         return !_queues.isEmpty();
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Jul 31 08:53:37 2007
@@ -20,23 +20,6 @@
  */
 package org.apache.qpid.server.exchange;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.management.JMException;
-import javax.management.openmbean.ArrayType;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -50,6 +33,23 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
+import javax.management.JMException;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
 /**
  * An exchange that binds queues based on a set of required headers and header values
  * and routes messages to these queues by matching the headers of the message against
@@ -91,13 +91,13 @@
     private final class HeadersExchangeMBean extends ExchangeMBean
     {
         @MBeanConstructor("Creates an MBean for AMQ Headers exchange")
-        public HeadersExchangeMBean()  throws JMException
+        public HeadersExchangeMBean() throws JMException
         {
             super();
             _exchangeType = "headers";
             init();
         }
-        
+
         /**
          * initialises the OpenType objects.
          */
@@ -113,7 +113,7 @@
             _bindingDataType = new CompositeType("Exchange Binding", "Queue name and header bindings",
                                                  _bindingItemNames, _bindingItemNames, _bindingItemTypes);
             _bindinglistDataType = new TabularType("Exchange Bindings", "List of exchange bindings for " + getName(),
-                                                 _bindingDataType, _bindingItemIndexNames);
+                                                   _bindingDataType, _bindingItemIndexNames);
         }
 
         public TabularData bindings() throws OpenDataException
@@ -169,7 +169,7 @@
                 throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
             }
 
-            String[] bindings  = binding.split(",");
+            String[] bindings = binding.split(",");
             FieldTable bindingMap = new FieldTable();
             for (int i = 0; i < bindings.length; i++)
             {
@@ -241,17 +241,23 @@
         }
     }
 
-    public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+    public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+    {
+        //fixme isBound here should take the arguements in to consideration.
+        return isBound(routingKey, queue);
+    }
+
+    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
         return isBound(queue);
     }
 
-    public boolean isBound(AMQShortString routingKey) throws AMQException
+    public boolean isBound(AMQShortString routingKey)
     {
         return hasBindings();
     }
 
-    public boolean isBound(AMQQueue queue) throws AMQException
+    public boolean isBound(AMQQueue queue)
     {
         for (Registration r : _bindings)
         {
@@ -263,7 +269,7 @@
         return false;
     }
 
-    public boolean hasBindings() throws AMQException
+    public boolean hasBindings()
     {
         return !_bindings.isEmpty();
     }
@@ -286,6 +292,11 @@
             _logger.error("Exception occured in creating the HeadersExchangeMBean", ex);
             throw new AMQException(null, "Exception occured in creating the HeadersExchangeMBean", ex);
         }
+    }
+
+    public Map<AMQShortString, List<AMQQueue>> getBindings()
+    {
+        return null;
     }
 
     private static class Registration

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Jul 31 08:53:37 2007
@@ -100,6 +100,12 @@
             }
             else
             {
+
+                if (body.consumerTag != null)
+                {
+                    body.consumerTag = body.consumerTag.intern();
+                }
+
                 try
                 {
                     AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
@@ -138,15 +144,15 @@
                     // If the above doesn't work then perhaps this is wrong too.
 //                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
 //                                                      "Non-unique consumer tag, '" + body.consumerTag + "'");
-                                        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
                     // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
                     // Be aware of possible changes to parameter order as versions change.
                     session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
-                        (byte)8, (byte)0,	// AMQP version (major, minor)
-                        BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
-                        BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
-                        AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
-                        msg));	// replyText
+                                                                          (byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                          BasicConsumeBody.getClazz((byte) 8, (byte) 0),    // classId
+                                                                          BasicConsumeBody.getMethod((byte) 8, (byte) 0),    // methodId
+                                                                          AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
+                                                                          msg));    // replyText
                 }
                 catch (ExistingExclusiveSubscriptionException e)
                 {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Tue Jul 31 08:53:37 2007
@@ -67,6 +67,10 @@
             body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
 
         }
+        else
+        {
+            body.exchange = body.exchange.intern();
+        }
         VirtualHost vHost = session.getVirtualHost();
         Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange);
         // if the exchange does not exist we raise a channel exception
@@ -86,10 +90,16 @@
                 throw body.getChannelNotFoundException(evt.getChannelId());
             }
 
+            if(body.routingKey != null)
+            {
+                body.routingKey = body.routingKey.intern();
+            }
+            
             MessagePublishInfo info = session.getRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
             channel.setPublishFrame(info, session);
         }
     }
 }
+
 
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Tue Jul 31 08:53:37 2007
@@ -35,6 +35,7 @@
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.protocol.HeartbeatConfig;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
@@ -72,7 +73,7 @@
 
         SaslServer ss = null;
         try
-        {
+        {                       
             ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
 
             if (ss == null)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Tue Jul 31 08:53:37 2007
@@ -1,18 +1,21 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 package org.apache.qpid.server.handler;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Tue Jul 31 08:53:37 2007
@@ -83,7 +83,9 @@
                     try
                     {
 
-                    exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+                    exchange = exchangeFactory.createExchange(body.exchange == null ? null : body.exchange.intern(),
+                                                              body.type == null ? null : body.type.intern(), 
+                                                              body.durable,
                                                               body.passive, body.ticket);
                     exchangeRegistry.registerExchange(exchange);
                     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Tue Jul 31 08:53:37 2007
@@ -28,6 +28,7 @@
 import org.apache.qpid.framing.QueueBindOkBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -36,7 +37,6 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
 
 public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
 {
@@ -77,7 +77,7 @@
             {
                 throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
             }
-            
+
             if (body.routingKey == null)
             {
                 body.routingKey = queue.getName();
@@ -97,9 +97,18 @@
         {
             throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
         }
+
+        if (body.routingKey != null)
+        {
+            body.routingKey = body.routingKey.intern();
+        }
+
         try
-        {            
-            queue.bind(body.routingKey, body.arguments, exch);
+        {
+            if (!exch.isBound(body.routingKey, body.arguments, queue))
+            {
+                queue.bind(body.routingKey, body.arguments, exch);
+            }
         }
         catch (AMQInvalidRoutingKeyException rke)
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Jul 31 08:53:37 2007
@@ -93,8 +93,15 @@
         synchronized (queueRegistry)
         {
 
+
+
             if (((queue = queueRegistry.getQueue(body.queue)) == null))
             {
+                if(body.queue != null)
+                {
+                    body.queue = body.queue.intern();
+                }
+
                 if (body.passive)
                 {
                     String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Tue Jul 31 08:53:37 2007
@@ -20,14 +20,14 @@
  */
 package org.apache.qpid.server.management;
 
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
 
 import javax.management.JMException;
 import javax.management.MBeanServer;
@@ -43,17 +43,14 @@
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.login.AccountNotFoundException;
 import javax.security.sasl.AuthorizeCallback;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
-import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * This class starts up an MBeanserver. If out of the box agent is being used then there are no security features
@@ -68,6 +65,9 @@
     private final MBeanServer _mbeanServer;
     private Registry _rmiRegistry;
     private JMXServiceURL _jmxURL;
+    
+    public static final String MANAGEMENT_PORT_CONFIG_PATH = "management.jmxport";
+    public static final int MANAGEMENT_PORT_DEFAULT = 8999;
 
     public JMXManagedObjectRegistry() throws AMQException
     {
@@ -95,7 +95,7 @@
         IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
 
         boolean security = appRegistry.getConfiguration().getBoolean("management.security-enabled", false);
-        int port = appRegistry.getConfiguration().getInt("management.jmxport", 8999);
+        int port = appRegistry.getConfiguration().getInt(MANAGEMENT_PORT_CONFIG_PATH, MANAGEMENT_PORT_DEFAULT);
 
         if (security)
         {
@@ -144,13 +144,13 @@
             MBeanServerForwarder mbsf = MBeanInvocationHandlerImpl.newProxyInstance();
             cs.setMBeanServerForwarder(mbsf);
             cs.start();
-            _log.warn("JMX: Started JMXConnector server with SASL");
+            _log.warn("JMX: Started JMXConnector server  on port '" + port + "' with SASL");
 
         }
         else
         {
             startJMXConnectorServer(port);
-            _log.warn("JMX: Started JMXConnector server with security disabled");
+            _log.warn("JMX: Started JMXConnector server on port '" + port + "' with security disabled");
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java Tue Jul 31 08:53:37 2007
@@ -1,18 +1,21 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 package org.apache.qpid.server.management;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Tue Jul 31 08:53:37 2007
@@ -44,6 +44,9 @@
 import org.apache.qpid.server.transport.ConnectorConfiguration;
 import org.apache.qpid.ssl.SSLContextFactory;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
 /**
  * The protocol handler handles "protocol events" for all connections. The state
  * associated with an individual connection is accessed through the protocol session.
@@ -80,7 +83,7 @@
         final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
 
         createSession(protocolSession, _applicationRegistry, codecFactory);
-        _logger.info("Protocol session created");
+        _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
 
         final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
 
@@ -127,12 +130,12 @@
 
     public void sessionOpened(IoSession protocolSession) throws Exception
     {
-        _logger.info("Session opened");
+        _logger.info("Session opened for:" + protocolSession.getRemoteAddress());
     }
 
     public void sessionClosed(IoSession protocolSession) throws Exception
     {
-        _logger.info("Protocol Session closed");
+        _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress());
         final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
         // fixme  -- this can be null
         if (amqProtocolSession != null)
@@ -143,7 +146,7 @@
 
     public void sessionIdle(IoSession session, IdleStatus status) throws Exception
     {
-        _logger.debug("Protocol Session [" + this + "] idle: " + status);
+        _logger.debug("Protocol Session [" + this + "] idle: " + status + " :for:" + session.getRemoteAddress());
         if (IdleStatus.WRITER_IDLE.equals(status))
         {
             // write heartbeat frame:
@@ -167,7 +170,7 @@
 
             protocolSession.close();
 
-            _logger.error("Error in protocol initiation " + session + ": " + throwable.getMessage(), throwable);
+            _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() + " :" + throwable.getMessage(), throwable);
         }
         else if (throwable instanceof IOException)
         {
@@ -178,13 +181,14 @@
             _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
 
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.write(ConnectionCloseBody.createAMQFrame(0, session.getProtocolMajorVersion(),
-                    session.getProtocolMinorVersion(), // AMQP version (major, minor)
-                    0, // classId
-                    0, // methodId
-                    200, // replyCode
-                    new AMQShortString(throwable.getMessage()) // replyText
-                ));
+            protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+                                                                     session.getProtocolMajorVersion(),
+                                                                     session.getProtocolMinorVersion(),    // AMQP version (major, minor)
+                                                                     0,    // classId
+                                                                     0,    // methodId
+                                                                     200,    // replyCode
+                                                                     new AMQShortString(throwable.getMessage())    // replyText
+            ));
             protocolSession.close();
         }
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Jul 31 08:53:37 2007
@@ -53,6 +53,7 @@
  */
 public class AMQMessage implements StorableMessage
 {
+    /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
     // The ordered list of queues into which this message is enqueued.
@@ -76,19 +77,16 @@
 
     private AMQMessageHandle _messageHandle;
 
-    // TODO: ideally this should be able to go into the transient message date - check this! (RG)
+    /** Holds the transactional context in which this message is being processed. */
     private TransactionalContext _txnContext;
 
     /**
-     * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
-     * messages published with the 'immediate' flag.
+     * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+     * for messages published with the 'immediate' flag.
      */
     private boolean _deliveredToConsumer;
-    /**
-     * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the
-     * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
-     * removed from the store.
-     */
+
+    /** Flag to indicate that this message requires 'immediate' delivery. */
     private boolean _immediate;
 
     // private Subscription _takenBySubcription;
@@ -494,7 +492,7 @@
      */
     public AMQMessage takeReference()
     {
-        _referenceCount.incrementAndGet();
+        incrementReference(); // _referenceCount.incrementAndGet();
 
         return this;
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Jul 31 08:53:37 2007
@@ -33,7 +33,6 @@
 import javax.management.JMException;
 
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.framing.AMQShortString;
@@ -46,11 +45,11 @@
 import org.apache.qpid.server.messageStore.StorableMessage;
 import org.apache.qpid.server.messageStore.StorableQueue;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import javax.management.JMException;
-
 import java.text.MessageFormat;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -163,22 +162,22 @@
     }
 
     public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
-        throws AMQException
+            throws AMQException
     {
         this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
-            new SubscriptionSet(), new SubscriptionImpl.Factory());
+             new SubscriptionSet(), new SubscriptionImpl.Factory());
     }
 
     protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
-        VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
+                       VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
     {
         this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
-            new SubscriptionImpl.Factory());
+             new SubscriptionImpl.Factory());
     }
 
     protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
-        VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
-        SubscriptionFactory subscriptionFactory) throws AMQException
+                       VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
+                       SubscriptionFactory subscriptionFactory) throws AMQException
     {
         if (name == null)
         {
@@ -260,7 +259,7 @@
     }
 
     /**
-     * Returns messages within the given range of message Ids
+     * Returns messages within the given range of message Ids.
      *
      * @param fromMessageId
      * @param toMessageId
@@ -292,38 +291,220 @@
     }
 
     /**
-     * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
-     * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup
-     * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue
-     * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
-     * locks from both queues and start async delivery
+     * Moves messages from this queue to another queue, and also commits the move on the message store. Delivery activity
+     * on the queues being moved between is suspended during the move.
      *
-     * @param fromMessageId
-     * @param toMessageId
-     * @param queueName
-     * @param storeContext
+     * @param fromMessageId The first message id to move.
+     * @param toMessageId   The last message id to move.
+     * @param queueName     The queue to move the messages to.
+     * @param storeContext  The context of the message store under which to perform the move. This is associated with
+     *                      the stores transactional context.
      */
     public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
-        StoreContext storeContext)
+                                                        StoreContext storeContext)
+    {
+        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+        MessageStore fromStore = getVirtualHost().getMessageStore();
+        MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
+        if (toStore != fromStore)
+        {
+            throw new RuntimeException("Can only move messages between queues on the same message store.");
+        }
+
+        try
+        {
+            // Obtain locks to prevent activity on the queues being moved between.
+            startMovingMessages();
+            toQueue.startMovingMessages();
+
+            // Get the list of messages to move.
+            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+            try
+            {
+                fromStore.beginTran(storeContext);
+
+                // Move the messages in on the message store.
+                for (AMQMessage message : foundMessagesList)
+                {
+                    fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+                    toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
+                }
+
+                // Commit and flush the move transcations.
+                try
+                {
+                    fromStore.commitTran(storeContext);
+                }
+                catch (AMQException e)
+                {
+                    throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+                }
+
+                // Move the messages on the in-memory queues.
+                toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+                _deliveryMgr.removeMovedMessages(foundMessagesList);
+            }
+            // Abort the move transactions on move failures.
+            catch (AMQException e)
+            {
+                try
+                {
+                    fromStore.abortTran(storeContext);
+                }
+                catch (AMQException ae)
+                {
+                    throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+                }
+            }
+        }
+        // Release locks to allow activity on the queues being moved between to continue.
+        finally
+        {
+            toQueue.stopMovingMessages();
+            stopMovingMessages();
+        }
+    }
+
+    /**
+     * Copies messages on this queue to another queue, and also commits the move on the message store. Delivery activity
+     * on the queues being moved between is suspended during the move.
+     *
+     * @param fromMessageId The first message id to move.
+     * @param toMessageId   The last message id to move.
+     * @param queueName     The queue to move the messages to.
+     * @param storeContext  The context of the message store under which to perform the move. This is associated with
+     *                      the stores transactional context.
+     */
+    public synchronized void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
+                                                        StoreContext storeContext)
+    {
+        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+        MessageStore fromStore = getVirtualHost().getMessageStore();
+        MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
+        if (toStore != fromStore)
+        {
+            throw new RuntimeException("Can only move messages between queues on the same message store.");
+        }
+
+        try
+        {
+            // Obtain locks to prevent activity on the queues being moved between.
+            startMovingMessages();
+            toQueue.startMovingMessages();
+
+            // Get the list of messages to move.
+            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+            try
+            {
+                fromStore.beginTran(storeContext);
+
+                // Move the messages in on the message store.
+                for (AMQMessage message : foundMessagesList)
+                {
+                    toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
+                    message.takeReference();
+                }
+
+                // Commit and flush the move transcations.
+                try
+                {
+                    fromStore.commitTran(storeContext);
+                }
+                catch (AMQException e)
+                {
+                    throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+                }
+
+                // Move the messages on the in-memory queues.
+                toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+            }
+            // Abort the move transactions on move failures.
+            catch (AMQException e)
+            {
+                try
+                {
+                    fromStore.abortTran(storeContext);
+                }
+                catch (AMQException ae)
+                {
+                    throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+                }
+            }
+        }
+        // Release locks to allow activity on the queues being moved between to continue.
+        finally
+        {
+            toQueue.stopMovingMessages();
+            stopMovingMessages();
+        }
+    }
+
+    /**
+     * Removes messages from this queue, and also commits the remove on the message store. Delivery activity
+     * on the queues being moved between is suspended during the remove.
+     *
+     * @param fromMessageId The first message id to move.
+     * @param toMessageId   The last message id to move.
+     * @param storeContext  The context of the message store under which to perform the move. This is associated with
+     *                      the stores transactional context.
+     */
+    public synchronized void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
     {
-        // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock
-        AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+        MessageStore fromStore = getVirtualHost().getMessageStore();
+
         try
         {
+            // Obtain locks to prevent activity on the queues being moved between.
             startMovingMessages();
+
+            // Get the list of messages to move.
             List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
 
-            // move messages to another queue
-            anotherQueue.startMovingMessages();
-            anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+            try
+            {
+                fromStore.beginTran(storeContext);
+
+                // remove the messages in on the message store.
+                for (AMQMessage message : foundMessagesList)
+                {
+                    fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+                }
+
+                // Commit and flush the move transcations.
+                try
+                {
+                    fromStore.commitTran(storeContext);
+                }
+                catch (AMQException e)
+                {
+                    throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+                }
 
-            // moving is successful, now remove from original queue
-            _deliveryMgr.removeMovedMessages(foundMessagesList);
+                // remove the messages on the in-memory queues.
+                _deliveryMgr.removeMovedMessages(foundMessagesList);
+            }
+            // Abort the move transactions on move failures.
+            catch (AMQException e)
+            {
+                try
+                {
+                    fromStore.abortTran(storeContext);
+                }
+                catch (AMQException ae)
+                {
+                    throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+                }
+            }
         }
+        // Release locks to allow activity on the queues being moved between to continue.
         finally
         {
-            // remove the lock and start the async delivery
-            anotherQueue.stopMovingMessages();
             stopMovingMessages();
         }
     }
@@ -458,7 +639,7 @@
     }
 
     public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
-        FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+                                        FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
     {
         if (incrementSubscriberCount() > 1)
         {
@@ -481,13 +662,12 @@
 
         if (_logger.isDebugEnabled())
         {
-            _logger.debug(MessageFormat.format(
-                    "Registering protocol session {0} with channel {1} and " + "consumer tag {2} with {3}", ps, channel,
-                    consumerTag, this));
+            _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and "
+                                               + "consumer tag {2} with {3}", ps, channel, consumerTag, this));
         }
 
         Subscription subscription =
-            _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
+                _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
 
         if (subscription.filtersMessages())
         {
@@ -525,8 +705,8 @@
         if (_logger.isDebugEnabled())
         {
             _logger.debug(MessageFormat.format(
-                    "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel,
-                    consumerTag, this));
+                    "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}",
+                    ps, channel, consumerTag, this));
         }
 
         Subscription removedSubscription;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Tue Jul 31 08:53:37 2007
@@ -18,30 +18,23 @@
  * under the License.
  *
  */
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
 package org.apache.qpid.server.queue;
 
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.log4j.Logger;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.store.StoreContext;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -60,30 +53,25 @@
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
 
-import org.apache.log4j.Logger;
-
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
 
 /**
- * MBean class for AMQQueue. It implements all the management features exposed
- * for an AMQQueue.
+ * AMQQueueMBean is the management bean for an {@link AMQQueue}.
+ *
+ * <p/><tablse id="crc"><caption>CRC Caption</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
  */
 @MBeanDescription("Management Interface for AMQQueue")
 public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
 {
+    /** Used for debugging purposes. */
     private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
     private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
 
     /**

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Jul 31 08:53:37 2007
@@ -20,19 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.Configured;
@@ -42,8 +29,21 @@
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.util.MessageQueue;
 import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
+import org.apache.qpid.util.MessageQueue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 
 /** Manages delivery of messages on behalf of a queue */
@@ -87,6 +87,10 @@
     private final Object _queueHeadLock = new Object();
     private String _processingThreadName = "";
 
+
+    /** Used by any reaping thread to purge messages */
+    private StoreContext _reapingStoreContext = new StoreContext();
+
     ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
     {
 
@@ -453,12 +457,31 @@
         //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
         while (purgeMessage(message, sub))
         {
+            // if we are purging then ensure we mark this message taken for the current subscriber
+            // the current subscriber may be null in the case of a get or a purge but this is ok.
+//            boolean alreadyTaken = message.taken(_queue, sub);
+
             //remove the already taken message or expired
             AMQMessage removed = messages.poll();
 
             assert removed == message;
 
-            _totalMessageSize.addAndGet(-message.getSize());
+            // if the message expired then the _totalMessageSize needs adjusting
+            if (message.expired(_queue))
+            {
+                _totalMessageSize.addAndGet(-message.getSize());
+
+                // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
+                message.dequeue(_reapingStoreContext, _queue);
+
+                if (_log.isInfoEnabled())
+                {
+                    _log.info(debugIdentity() + " Doing clean up of the main _message queue.");
+                }
+            }
+
+            //else the clean up is not required as the message has already been taken for this queue therefore
+            // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated.
 
             if (_log.isTraceEnabled())
             {
@@ -473,7 +496,10 @@
     }
 
     /**
-     * 
+     *  This method will return true if the message is to be purged from the queue.
+     *
+     *
+     *  SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
      * @param message
      * @param sub
      * @return
@@ -493,15 +519,15 @@
         // if the message is null then don't purge as we have no messagse.
         if (message != null)
         {
+            // Check that the message hasn't expired.
+            if (message.expired(_queue))
+            {
+                return true;
+            }
+
             // if we have a subscriber perform message checks
             if (sub != null)
             {
-                // Check that the message hasn't expired.
-                if (message.expired(sub.getChannel().getStoreContext(), _queue))
-                {
-                    return true;
-                }
-
                 // if we have a queue browser(we don't purge) so check mark the message as taken
                 purge = ((!sub.isBrowser() || message.isTaken(_queue)));
             }
@@ -606,7 +632,10 @@
             {
                 if (_log.isInfoEnabled())
                 {
-                    _log.info(debugIdentity() + "We could do clean up of the main _message queue here");
+                    //fixme - we should do the clean up as the message remains on the _message queue
+                    // this is resulting in the next consumer receiving the message and then attempting to purge it
+                    //
+                    _log.info(debugIdentity() + "We should do clean up of the main _message queue here");
                 }
             }
 
@@ -617,7 +646,14 @@
         }
         catch (AMQException e)
         {
-            message.release(_queue);
+            if (message != null)
+            {
+                message.release(_queue);
+            }
+            else
+            {
+                _log.error(debugIdentity() + "Unable to release message as it is null. " + e, e);
+            }
             _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
         }
     }
@@ -696,25 +732,6 @@
 
     }
 
-//    private void sendNextMessage(Subscription sub)
-//    {
-//        if (sub.filtersMessages())
-//        {
-//            sendNextMessage(sub, sub.getPreDeliveryQueue());
-//            if (sub.isAutoClose())
-//            {
-//                if (sub.getPreDeliveryQueue().isEmpty())
-//                {
-//                    sub.close();
-//                }
-//            }
-//        }
-//        else
-//        {
-//            sendNextMessage(sub, _messages);
-//        }
-//    }
-
     public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
     {
 
@@ -723,8 +740,6 @@
         {
             _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
         }
-        // This shouldn't be done here.
-//        msg.release();
 
         //Check if we have someone to deliver the message to.
         _lock.lock();
@@ -800,7 +815,7 @@
                         if (debugEnabled)
                         {
                             _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
-                                      "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
+                                       "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
                         }
                     }
                 }
@@ -810,7 +825,7 @@
                     if (debugEnabled)
                     {
                         _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
-                                  " Subscriber:" + System.identityHashCode(s));
+                                   " Subscriber:" + System.identityHashCode(s));
                     }
 
                     deliver(context, name, msg, deliverFirst);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Tue Jul 31 08:53:37 2007
@@ -20,13 +20,14 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 public class DefaultQueueRegistry implements QueueRegistry
 {
     private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
@@ -56,5 +57,15 @@
     public AMQQueue getQueue(AMQShortString name)
     {
         return _queueMap.get(name);
+    }
+
+    public Collection<AMQShortString> getQueueNames()
+    {
+        return _queueMap.keySet();
+    }
+
+    public Collection<AMQQueue> getQueues()
+    {
+        return _queueMap.values();
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Tue Jul 31 08:53:37 2007
@@ -46,7 +46,7 @@
 
         ExchangeBinding(AMQShortString routingKey, Exchange exchange)
         {
-            this(routingKey, exchange,EMPTY_ARGUMENTS);
+            this(routingKey, exchange, EMPTY_ARGUMENTS);
         }
 
         ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
@@ -80,7 +80,10 @@
 
         public boolean equals(Object o)
         {
-            if (!(o instanceof ExchangeBinding)) return false;
+            if (!(o instanceof ExchangeBinding))
+            {
+                return false;
+            }
             ExchangeBinding eb = (ExchangeBinding) o;
             return _exchange.equals(eb._exchange)
                    && _routingKey.equals(eb._routingKey)
@@ -104,16 +107,16 @@
      */
     void addBinding(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
     {
-        _bindings.add(new ExchangeBinding(routingKey, exchange, arguments ));
+        _bindings.add(new ExchangeBinding(routingKey, exchange, arguments));
     }
 
 
     public void remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
     {
-        _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments ));
+        _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments));
     }
 
-    
+
     /**
      * Deregisters this queue from any exchange it has been bound to
      */

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java Tue Jul 31 08:53:37 2007
@@ -1,18 +1,22 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
  */
 package org.apache.qpid.server.queue;
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?view=diff&rev=561365&r1=561364&r2=561365
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Tue Jul 31 08:53:37 2007
@@ -1,18 +1,21 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 package org.apache.qpid.server.queue;