You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/08 11:10:11 UTC

svn commit: r619823 [9/19] - in /incubator/qpid/branches/thegreatmerge/qpid: ./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Common...

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Fri Feb  8 02:09:37 2008
@@ -30,6 +30,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
@@ -64,14 +65,13 @@
 
     private static class DeliveryDetails
     {
-        public AMQMessage message;
-        public AMQQueue queue;
+        public QueueEntry entry;
+
         private boolean deliverFirst;
 
-        public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst)
+        public DeliveryDetails(QueueEntry entry, boolean deliverFirst)
         {
-            this.message = message;
-            this.queue = queue;
+            this.entry = entry;
             this.deliverFirst = deliverFirst;
         }
     }
@@ -103,7 +103,7 @@
         _postCommitDeliveryList.clear();
     }
 
-    public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
+    public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
     {
         // A publication will result in the enlisting of several
         // TxnOps. The first is an op that will store the message.
@@ -112,9 +112,9 @@
         // enqueued. Finally a cleanup op will be added to decrement
         // the reference associated with the routing.
         // message.incrementReference();
-        _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
+        _postCommitDeliveryList.add(new DeliveryDetails(entry, deliverFirst));
         _messageDelivered = true;
-        _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+        _txnBuffer.enlist(new CleanupMessageOperation(entry.getMessage(), _returnMessages));
         /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
         if (_log.isDebugEnabled())
         {
@@ -242,11 +242,11 @@
         {
             for (DeliveryDetails dd : _postCommitDeliveryList)
             {
-                dd.queue.process(_storeContext, dd.message, dd.deliverFirst);
+                dd.entry.process(_storeContext, dd.deliverFirst);
 
                 try
                 {
-                    dd.message.checkDeliveredToConsumer();
+                    dd.entry.checkDeliveredToConsumer();
                 }
                 catch (NoConsumersException nce)
                 {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Fri Feb  8 02:09:37 2008
@@ -34,6 +34,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
@@ -92,20 +93,14 @@
         // Does not apply to this context
     }
 
-    public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
+    public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
     {
         try
         {
-            //DTX removed  - deliverFirst is to do with the position on the Queue not enqueuing!!
-            // This should be done in routingComplete
-//            if( ! deliverFirst )
-//            {
-//                message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
-//            }
-            queue.process(_storeContext, message, deliverFirst);
+            entry.process(_storeContext, deliverFirst);
             //following check implements the functionality
             //required by the 'immediate' flag:
-            message.checkDeliveredToConsumer();
+            entry.checkDeliveredToConsumer();
         }
         catch (NoConsumersException e)
         {
@@ -134,7 +129,7 @@
                         {
                             if (_log.isDebugEnabled())
                             {
-                                _log.debug("Discarding message: " + message.message.getMessageId());
+                                _log.debug("Discarding message: " + message.getMessage().getMessageId());
                             }
 
                             //Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -168,7 +163,7 @@
                     {
                         if (_log.isDebugEnabled())
                         {
-                            _log.debug("Discarding message: " + msg.message.getMessageId());
+                            _log.debug("Discarding message: " + msg.getMessage().getMessageId());
                         }
 
                         //Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -198,7 +193,7 @@
             {
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug("Discarding message: " + msg.message.getMessageId());
+                    _log.debug("Discarding message: " + msg.getMessage().getMessageId());
                 }
 
                 //Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -212,7 +207,7 @@
             if (_log.isDebugEnabled())
             {
                 _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
-                           msg.message.getMessageId());
+                           msg.getMessage().getMessageId());
             }
         }
     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Fri Feb  8 02:09:37 2008
@@ -25,6 +25,7 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoreContext;
 
 /**
@@ -111,14 +112,13 @@
      *
      * <p/>This is an 'enqueue' operation.
      *
-     * @param message      The message to deliver.
-     * @param queue        The queue to deliver the message to.
+     * @param entry        The message to deliver, and the queue to deliver to.
      * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
      *                     for normal FIFO message ordering.
      *
      * @throws AMQException If the message cannot be delivered for any reason.
      */
-    void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
+    void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException;
 
     /**
      * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Fri Feb  8 02:09:37 2008
@@ -29,6 +29,7 @@
 import org.apache.commons.configuration.MapConfiguration;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
@@ -51,6 +52,8 @@
 
     private PrincipalDatabaseManager _databaseManager;
 
+    private PluginManager _pluginManager;
+
 
     public NullApplicationRegistry()
     {
@@ -81,7 +84,7 @@
         VirtualHost dummyHost = new VirtualHost("test", getConfiguration());
         _virtualHostRegistry.registerVirtualHost(dummyHost);
         _virtualHostRegistry.setDefaultVirtualHostName("test");
-
+        _pluginManager = new PluginManager("");
         _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
 
     }
@@ -121,6 +124,11 @@
     public AccessManager getAccessManager()
     {
         return _accessManager;
+    }
+
+    public PluginManager getPluginManager()
+    {
+        return _pluginManager;
     }
 }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Feb  8 02:09:37 2008
@@ -1,289 +1,326 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
-import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-
-public class VirtualHost implements Accessable
-{
-    private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
-
-    private final String _name;
-
-    private QueueRegistry _queueRegistry;
-
-    private ExchangeRegistry _exchangeRegistry;
-
-    private ExchangeFactory _exchangeFactory;
-
-    private MessageStore _messageStore;
-
-    private TransactionManager _transactionManager;
-
-    protected VirtualHostMBean _virtualHostMBean;
-
-    private AMQBrokerManagerMBean _brokerMBean;
-
-    private AuthenticationManager _authenticationManager;
-
-    private AccessManager _accessManager;
-
-
-    public void setAccessableName(String name)
-    {
-        _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
-                     + name + ") ignored remains :" + getAccessableName());
-    }
-
-    public String getAccessableName()
-    {
-        return _name;
-    }
-
-
-    /**
-     * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
-     * implementaion of an Exchange MBean should extend this class.
-     */
-    public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
-    {
-        public VirtualHostMBean() throws NotCompliantMBeanException
-        {
-            super(ManagedVirtualHost.class, "VirtualHost");
-        }
-
-        public String getObjectInstanceName()
-        {
-            return _name.toString();
-        }
-
-        public String getName()
-        {
-            return _name.toString();
-        }
-
-        public VirtualHost getVirtualHost()
-        {
-            return VirtualHost.this;
-        }
-
-
-    } // End of MBean class
-
-    /**
-     * Used for testing only
-     * @param name
-     * @param store
-     *
-     * @throws Exception
-     */
-    public VirtualHost(String name, MessageStore store) throws Exception
-    {
-        this(name, null, store);
-    }
-
-    /**
-     * Normal Constructor
-     *
-     * @param name
-     * @param hostConfig
-     *
-     * @throws Exception
-     */
-    public VirtualHost(String name, Configuration hostConfig) throws Exception
-    {
-        this(name, hostConfig, null);
-    }
-
-    private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
-    {
-        _name = name;
-
-        _virtualHostMBean = new VirtualHostMBean();
-        // This isn't needed to be registered
-        //_virtualHostMBean.register();
-
-        _queueRegistry = new DefaultQueueRegistry(this);
-        _exchangeFactory = new DefaultExchangeFactory(this);
-        _exchangeFactory.initialise(hostConfig);
-        _exchangeRegistry = new DefaultExchangeRegistry(this);
-
-        if (store != null)
-        {
-            _messageStore = store;
-        }
-        else
-        {
-            if (hostConfig == null)
-            {
-                throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
-            }
-            initialiseTransactionManager(hostConfig);
-            initialiseMessageStore(hostConfig);
-        }
-
-        _exchangeRegistry.initialise();
-
-        _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
-
-        _accessManager = new AccessManagerImpl(name, hostConfig);
-
-        _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
-        _brokerMBean.register();
-    }
-
-    private void initialiseMessageStore(Configuration config) throws Exception
-    {
-        String messageStoreClass = config.getString("store.class");
-
-        Class clazz = Class.forName(messageStoreClass);
-        Object o = clazz.newInstance();
-
-        if (!(o instanceof MessageStore))
-        {
-            throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
-                                         " does not.");
-        }
-        _messageStore = (MessageStore) o;
-          //DTX MessageStore
-//        _messageStore.configure(this, _transactionManager, "store", config);
-        _messageStore.configure(this, "store", config);
-    }
-
-    private void initialiseTransactionManager(Configuration config) throws Exception
-    {
-        String transactionManagerClass = config.getString("txn.class");
-        Class clazz = Class.forName(transactionManagerClass);
-        Object o = clazz.newInstance();
-
-        if (!(o instanceof TransactionManager))
-        {
-            throw new ClassCastException("Transaction Manager class must implement " + TransactionManager.class + ". Class " + clazz +
-                                         " does not.");
-        }
-        _transactionManager = (TransactionManager) o;
-    }
-
-
-    public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
-    {
-        T instance;
-        try
-        {
-            instance = instanceType.newInstance();
-        }
-        catch (Exception e)
-        {
-            _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
-            throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
-        }
-        Configurator.configure(instance);
-
-        return instance;
-    }
-
-
-    public String getName()
-    {
-        return _name;
-    }
-
-    public QueueRegistry getQueueRegistry()
-    {
-        return _queueRegistry;
-    }
-
-    public ExchangeRegistry getExchangeRegistry()
-    {
-        return _exchangeRegistry;
-    }
-
-    public ExchangeFactory getExchangeFactory()
-    {
-        return _exchangeFactory;
-    }
-
-    public ApplicationRegistry getApplicationRegistry()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public MessageStore getMessageStore()
-    {
-        return _messageStore;
-    }
-
-    public TransactionManager getTransactionManager()
-    {
-        return _transactionManager;
-    }
-
-    public AuthenticationManager getAuthenticationManager()
-    {
-        return _authenticationManager;
-    }
-
-    public AccessManager getAccessManager()
-    {
-        return _accessManager;
-    }
-
-    public void close() throws Exception
-    {
-        if (_messageStore != null)
-        {
-            _messageStore.close();
-        }
-    }
-
-    public ManagedObject getBrokerMBean()
-    {
-        return _brokerMBean;
-    }
-
-    public ManagedObject getManagedObject()
-    {
-        return _virtualHostMBean;
-    }
-}
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import javax.management.NotCompliantMBeanException;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.security.access.AccessManager;
+import org.apache.qpid.server.security.access.AccessManagerImpl;
+import org.apache.qpid.server.security.access.Accessable;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.AMQException;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+public class VirtualHost implements Accessable
+{
+    private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+    private final String _name;
+
+    private QueueRegistry _queueRegistry;
+
+    private ExchangeRegistry _exchangeRegistry;
+
+    private ExchangeFactory _exchangeFactory;
+
+    private MessageStore _messageStore;
+
+    private TransactionManager _transactionManager;
+
+    protected VirtualHostMBean _virtualHostMBean;
+
+    private AMQBrokerManagerMBean _brokerMBean;
+
+    private AuthenticationManager _authenticationManager;
+
+    private AccessManager _accessManager;
+
+    private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true);
+     
+    private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
+    
+    public void setAccessableName(String name)
+    {
+        _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
+                     + name + ") ignored remains :" + getAccessableName());
+    }
+
+    public String getAccessableName()
+    {
+        return _name;
+    }
+
+
+    /**
+     * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
+     * implementaion of an Exchange MBean should extend this class.
+     */
+    public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+    {
+        public VirtualHostMBean() throws NotCompliantMBeanException
+        {
+            super(ManagedVirtualHost.class, "VirtualHost");
+        }
+
+        public String getObjectInstanceName()
+        {
+            return _name.toString();
+        }
+
+        public String getName()
+        {
+            return _name.toString();
+        }
+
+        public VirtualHost getVirtualHost()
+        {
+            return VirtualHost.this;
+        }
+
+
+    } // End of MBean class
+
+    /**
+     * Used for testing only
+     * @param name
+     * @param store
+     * @throws Exception
+     */
+    public VirtualHost(String name, MessageStore store) throws Exception
+    {
+        this(name, null, store);
+    }
+
+    /**
+     * Normal Constructor
+     * @param name
+     * @param hostConfig
+     * @throws Exception
+     */
+    public VirtualHost(String name, Configuration hostConfig) throws Exception
+    {
+        this(name, hostConfig, null);
+    }
+
+    public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
+    {
+        _name = name;
+
+        _virtualHostMBean = new VirtualHostMBean();
+        // This isn't needed to be registered
+        //_virtualHostMBean.register();
+
+        _queueRegistry = new DefaultQueueRegistry(this);
+        _exchangeFactory = new DefaultExchangeFactory(this);
+        _exchangeFactory.initialise(hostConfig);
+        _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+        if (store != null)
+        {
+            _messageStore = store;
+        }
+        else
+        {
+            if (hostConfig == null)
+            {
+                throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+            }
+	    initialiseTransactionManager(hostConfig);
+	    initialiseMessageStore(hostConfig);
+        }
+
+        _exchangeRegistry.initialise();
+
+        _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig);
+
+        _accessManager = new AccessManagerImpl(name, hostConfig);
+
+        _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+        _brokerMBean.register();
+        initialiseHouseKeeping(hostConfig);
+    }
+
+    
+    private void initialiseHouseKeeping(final Configuration hostConfig)
+    {
+     
+    	long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
+    
+    	/* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
+    	if(period != 0L)
+    	{
+    		class RemoveExpiredMessagesTask extends TimerTask
+    		{
+    			public void run()
+    			{
+    				for(AMQQueue q : _queueRegistry.getQueues())
+    				{
+
+    					try
+    					{
+    						q.removeExpiredIfNoSubscribers();
+    					}
+    					catch (AMQException e)
+    					{
+    						_logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
+    						throw new RuntimeException(e);
+    					}
+    				}
+    			}
+    		}
+    		
+    		_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
+    				period/2,
+    				period);
+    	}
+    }
+    
+    private void initialiseMessageStore(Configuration config) throws Exception
+    {
+        String messageStoreClass = config.getString("store.class");
+
+        Class clazz = Class.forName(messageStoreClass);
+        Object o = clazz.newInstance();
+
+        if (!(o instanceof MessageStore))
+        {
+            throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
+                                         " does not.");
+        }
+        _messageStore = (MessageStore) o;
+        _messageStore.configure(this, "store", config);
+    }
+
+    private void initialiseTransactionManager(Configuration config) throws Exception
+    {
+        String transactionManagerClass = config.getString("txn.class");
+        Class clazz = Class.forName(transactionManagerClass);
+        Object o = clazz.newInstance();
+
+        if (!(o instanceof TransactionManager))
+        {
+            throw new ClassCastException("Transaction Manager class must implement " + TransactionManager.class + ". Class " + clazz +
+                                         " does not.");
+        }
+        _transactionManager = (TransactionManager) o;
+    }
+    
+    public <T> T getConfiguredObject(Class<T> instanceType, Configuration config)
+    {
+        T instance;
+        try
+        {
+            instance = instanceType.newInstance();
+        }
+        catch (Exception e)
+        {
+            _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+            throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
+        }
+        Configurator.configure(instance);
+
+        return instance;
+    }
+
+
+    public String getName()
+    {
+        return _name;
+    }
+
+    public QueueRegistry getQueueRegistry()
+    {
+        return _queueRegistry;
+    }
+
+    public ExchangeRegistry getExchangeRegistry()
+    {
+        return _exchangeRegistry;
+    }
+
+    public ExchangeFactory getExchangeFactory()
+    {
+        return _exchangeFactory;
+    }
+
+    public ApplicationRegistry getApplicationRegistry()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public MessageStore getMessageStore()
+    {
+        return _messageStore;
+    }
+
+    public TransactionManager getTransactionManager()
+    {
+        return _transactionManager;
+    }
+
+    public AuthenticationManager getAuthenticationManager()
+    {
+        return _authenticationManager;
+    }
+
+    public AccessManager getAccessManager()
+    {
+        return _accessManager;
+    }
+
+    public void close() throws Exception
+    {
+        if (_messageStore != null)
+        {
+            _messageStore.close();
+        }
+    }
+
+    public ManagedObject getBrokerMBean()
+    {
+        return _brokerMBean;
+    }
+
+    public ManagedObject getManagedObject()
+    {
+        return _virtualHostMBean;
+    }
+}

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Fri Feb  8 02:09:37 2008
@@ -24,6 +24,7 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 import org.apache.qpid.tools.utils.Console;
 
@@ -85,7 +86,7 @@
     }
 
 
-    protected List<List> createMessageData(java.util.List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+    protected List<List> createMessageData(java.util.List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
                                            boolean showMessageHeaders)
     {
 
@@ -96,8 +97,9 @@
         display.add(hex);
         display.add(ascii);
 
-        for (AMQMessage msg : messages)
+        for (QueueEntry entry : messages)
         {
+            AMQMessage msg = entry.getMessage();
             if (!includeMsg(msg, msgids))
             {
                 continue;
@@ -252,8 +254,8 @@
     private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
                                     String title, boolean routing, boolean headers, boolean messageHeaders)
     {
-        List<AMQMessage> single = new LinkedList<AMQMessage>();
-        single.add(msg);
+        List<QueueEntry> single = new LinkedList<QueueEntry>();
+        single.add(new QueueEntry(null,msg));
 
         List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Fri Feb  8 02:09:37 2008
@@ -23,6 +23,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 
@@ -166,12 +167,12 @@
 
         if (fromQueue != null)
         {
-            List<AMQMessage> messages = fromQueue.getMessagesOnTheQueue();
+            List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue();
             if (messages != null)
             {
-                for (AMQMessage msg : messages)
+                for (QueueEntry msg : messages)
                 {
-                    ids.add(msg.getMessageId());
+                    ids.add(msg.getMessage().getMessageId());
                 }
             }
         }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Fri Feb  8 02:09:37 2008
@@ -27,6 +27,7 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 import org.apache.qpid.tools.utils.Console;
 
@@ -113,7 +114,7 @@
 
         if (_queue != null)
         {
-            List<AMQMessage> messages = _queue.getMessagesOnTheQueue();
+            List<QueueEntry> messages = _queue.getMessagesOnTheQueue();
             if (messages == null || messages.size() == 0)
             {
                 _console.println("No messages on queue");
@@ -152,7 +153,7 @@
      * @param showMessageHeaders show the msg headers be shown
      * @return the formated data lists for printing
      */
-    protected List<List> createMessageData(List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+    protected List<List> createMessageData(List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
                                            boolean showMessageHeaders)
     {
 
@@ -333,8 +334,9 @@
         }
 
         //Add create the table of data
-        for (AMQMessage msg : messages)
+        for (QueueEntry entry : messages)
         {
+            AMQMessage msg = entry.getMessage();
             if (!includeMsg(msg, msgids))
             {
                 continue;

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java Fri Feb  8 02:09:37 2008
@@ -43,7 +43,7 @@
         }
         catch (Exception e)
         {
-            System.out.println("Unable to start broker due to: " + e.getMessage());
+            System.err.println("Unable to start broker due to: " + e.getMessage());
 
             e.printStackTrace();
             exit(1);
@@ -55,7 +55,7 @@
         try
         {
             Process task = Runtime.getRuntime().exec(args[0]);
-            System.out.println("Started Proccess: " + args[0]);
+            System.err.println("Started Proccess: " + args[0]);
 
             InputStream inputStream = task.getInputStream();
 
@@ -70,19 +70,21 @@
             out.join();
             err.join();
 
-            System.out.println("Waiting for process to exit: " + args[0]);
+            System.err.println("Waiting for process to exit: " + args[0]);
             task.waitFor();
-            System.out.println("Done Proccess: " + args[0]);
+            System.err.println("Done Proccess: " + args[0]);
 
         }
         catch (IOException e)
         {
-            System.out.println("Proccess had problems: " + e.getMessage());
+            System.err.println("Proccess had problems: " + e.getMessage());
+            e.printStackTrace(System.err);
             exit(1);
         }
         catch (InterruptedException e)
         {
-            System.out.println("Proccess had problems: " + e.getMessage());
+            System.err.println("Proccess had problems: " + e.getMessage());
+            e.printStackTrace(System.err);
 
             exit(1);
         }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Fri Feb  8 02:09:37 2008
@@ -100,7 +100,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -139,7 +139,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -158,7 +158,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -198,7 +198,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -217,7 +217,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -236,7 +236,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -255,7 +255,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -294,7 +294,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -313,7 +313,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -353,7 +353,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -387,7 +387,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -427,7 +427,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());
@@ -467,7 +467,7 @@
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+        Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
 
         queue.deleteMessageFromTop(_context);
         Assert.assertEquals(0, queue.getMessageCount());

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java Fri Feb  8 02:09:37 2008
@@ -59,7 +59,7 @@
         mbean.createNewBinding(_queue.getName().toString(), "binding2");
 
         TabularData data = mbean.bindings();
-        ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
+        ArrayList<Object> list = new ArrayList<Object>(data.values());
         assertTrue(list.size() == 2);
 
         // test general exchange properties
@@ -85,7 +85,7 @@
         mbean.createNewBinding(_queue.getName().toString(), "binding2");
 
         TabularData data = mbean.bindings();
-        ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
+        ArrayList<Object> list = new ArrayList<Object>(data.values());
         assertTrue(list.size() == 2);
 
         // test general exchange properties
@@ -111,7 +111,7 @@
         mbean.createNewBinding(_queue.getName().toString(), "key3=binding3");
 
         TabularData data = mbean.bindings();
-        ArrayList<CompositeData> list = new ArrayList<CompositeData>(data.values());
+        ArrayList<Object> list = new ArrayList<Object>(data.values());
         assertTrue(list.size() == 2);
 
         // test general exchange properties

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Fri Feb  8 02:09:37 2008
@@ -291,7 +291,7 @@
 
         for (int i = 0; i < messageCount; i++)
         {
-            _queue.process(_storeContext, messages[i], false);
+            _queue.process(_storeContext, new QueueEntry(_queue,messages[i]), false);
         }
     }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Feb  8 02:09:37 2008
@@ -23,11 +23,14 @@
 import junit.framework.TestCase;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -38,6 +41,8 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.mina.common.ByteBuffer;
 
 import javax.management.JMException;
 import java.util.LinkedList;
@@ -59,11 +64,12 @@
                                                                                      new LinkedList<RequiredDeliveryException>(),
                                                                                      new HashSet<Long>());
     private VirtualHost _virtualHost;
+    private AMQProtocolSession _protocolSession;
 
     public void testMessageCount() throws Exception
     {
         int messageCount = 10;
-        sendMessages(messageCount);
+        sendMessages(messageCount, false);
         assertTrue(_queueMBean.getMessageCount() == messageCount);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
         long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
@@ -76,6 +82,43 @@
         _queueMBean.clearQueue();
         assertTrue(_queueMBean.getMessageCount() == 0);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+        //Ensure that the data has been removed from the Store
+        verifyBrokerState();
+    }
+    
+    public void testMessageCountPersistent() throws Exception
+    {
+        int messageCount = 10;
+        sendMessages(messageCount, true);
+        assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+        long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+        assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+        _queueMBean.deleteMessageFromTop();
+        assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+        _queueMBean.clearQueue();
+        assertTrue(_queueMBean.getMessageCount() == 0);
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+        //Ensure that the data has been removed from the Store
+        verifyBrokerState();
+    }
+
+    // todo: collect to a general testing class -duplicated from Systest/MessageReturntest
+    private void verifyBrokerState()
+    {
+
+        TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+
+        // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
+        assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());       
+        assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+        assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
+        assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
     }
 
     public void testConsumerCount() throws AMQException
@@ -168,13 +211,13 @@
 
         }
 
-        AMQMessage msg = message(false);
+        AMQMessage msg = message(false, false);
         long id = msg.getMessageId();
         _queue.clearQueue(_storeContext);
 
         msg.enqueue(_queue);
         msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
-        _queue.process(_storeContext, msg, false);
+        _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
         _queueMBean.viewMessageContent(id);
         try
         {
@@ -187,7 +230,7 @@
         }
     }
 
-    private AMQMessage message(final boolean immediate) throws AMQException
+    private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException
     {
         MessagePublishInfo publish = new MessagePublishInfo()
         {
@@ -215,6 +258,7 @@
                               
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes
+	((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
         return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
     }
 
@@ -224,22 +268,41 @@
         super.setUp();
         IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
         _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+	_messageStore = _virtualHost.getMessageStore();
         _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
         _queueMBean = new AMQQueueMBean(_queue);
+	_protocolSession = new TestMinaProtocolSession();
     }
 
-    private void sendMessages(int messageCount) throws AMQException
+    private void sendMessages(int messageCount, boolean persistent) throws AMQException
     {
         AMQMessage[] messages = new AMQMessage[messageCount];
         for (int i = 0; i < messages.length; i++)
         {
-            messages[i] = message(false);
+            messages[i] = message(false, persistent);
             messages[i].enqueue(_queue);
             messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
         }
         for (int i = 0; i < messageCount; i++)
         {
-            _queue.process(_storeContext, messages[i], false);
+            /* TGM: From here to end of method was:
+	     * queue.process(_storeContext, messages[i], false);
+	     */
+	    
+            AMQMessage currentMessage = message(false, persistent);
+            currentMessage.enqueue(_queue);
+
+            // route header
+            currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+
+            // Add the body so we have somthing to test later
+            currentMessage.addContentBodyFrame(_storeContext,
+                                               _protocolSession.getMethodRegistry()
+                                                       .getProtocolVersionMethodConverter()
+                                                       .convertToContentChunk(
+                                                       new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
+                                                                       MESSAGE_SIZE)));
+
         }
     }
 }

Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.List;
+
+/**
+ * Adds some extra methods to the memory message store for testing purposes.
+ */
+public class TestableMemoryMessageStore extends MemoryMessageStore
+{
+
+    MemoryMessageStore _mms = null;
+
+    public TestableMemoryMessageStore(MemoryMessageStore mms)
+    {
+        _mms = mms;
+    }
+
+    public TestableMemoryMessageStore()
+    {
+        _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
+        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+    }
+
+    public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+    {
+        if (_mms != null)
+        {
+            return _mms._metaDataMap;
+        }
+        else
+        {
+            return _metaDataMap;
+        }
+    }
+
+    public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
+    {
+        if (_mms != null)
+        {
+            return _mms._contentBodyMap;
+        }
+        else
+        {
+            return _contentBodyMap;
+        }
+    }
+}

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml Fri Feb  8 02:09:37 2008
@@ -69,34 +69,12 @@
             <artifactId>commons-lang</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.mina</groupId>
-            <artifactId>mina-filter-ssl</artifactId>
-        </dependency>
 
         <!-- Test Dependencies -->
-        <dependency>  
-            <groupId>org.slf4j</groupId> 
-            <artifactId>slf4j-log4j12</artifactId>  
-            <version>1.4.0</version>  
-            <scope>test</scope> 
-        </dependency>
 
         <dependency> <!-- for inVm Broker -->
             <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-broker</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.easymock</groupId>
-            <artifactId>easymockclassextension</artifactId>
             <scope>test</scope>
         </dependency>
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Feb  8 02:09:37 2008
@@ -20,17 +20,29 @@
  */
 package org.apache.qpid.client;
 
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -49,25 +61,15 @@
 import javax.naming.Reference;
 import javax.naming.Referenceable;
 import javax.naming.StringRefAddr;
-
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.QpidURL;
-import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
@@ -79,7 +81,9 @@
      * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
      * held by any child objects of this connection such as the session, producers and consumers.
      */
-    protected final Object _failoverMutex = new Object();
+    private final Object _failoverMutex = new Object();
+
+    private final Object _sessionCreationLock = new Object();
 
     /**
      * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
@@ -88,7 +92,7 @@
     protected long _maximumChannelCount;
 
     /** The maximum size of frame supported by the server */
-    protected long _maximumFrameSize;
+    private long _maximumFrameSize;
 
     /**
      * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
@@ -98,24 +102,25 @@
     protected AMQProtocolHandler _protocolHandler;
 
     /** Maps from session id (Integer) to AMQSession instance */
-    protected final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
+    private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
 
-    protected String _clientName;
+    private String _clientName;
 
     /** The user name to use for authentication */
-    protected String _username;
+    private String _username;
 
     /** The password to use for authentication */
-    protected String _password;
+    private String _password;
 
     /** The virtual path to connect to on the AMQ server */
-    protected String _virtualHost;
+    private String _virtualHost;
+   
 
-    protected ExceptionListener _exceptionListener;
+    private ExceptionListener _exceptionListener;
 
-    protected ConnectionListener _connectionListener;
+    private ConnectionListener _connectionListener;
 
-    protected ConnectionURL _connectionURL;
+    private ConnectionURL _connectionURL;
 
     /**
      * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
@@ -139,22 +144,23 @@
     /*
      * The connection meta data
      */
-    protected QpidConnectionMetaData _connectionMetaData;
+    private QpidConnectionMetaData _connectionMetaData;
 
     /** Configuration info for SSL */
-    protected SSLConfiguration _sslConfiguration;
+    private SSLConfiguration _sslConfiguration;
 
-    protected AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
-    protected AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
-    protected AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
-    protected AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+    private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+    private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+    private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+    private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
 
     /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
-    protected final ExecutorService _taskPool = Executors.newCachedThreadPool();
-    protected static final long DEFAULT_TIMEOUT = 1000 * 30;
+    private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+    private static final long DEFAULT_TIMEOUT = 1000 * 30;
+    private ProtocolVersion _protocolVersion;
 
     protected AMQConnectionDelegate _delegate;
-
+    
     /**
      * @param broker      brokerdetails
      * @param username    username
@@ -276,6 +282,9 @@
         _clientName = connectionURL.getClientName();
         _username = connectionURL.getUsername();
         _password = connectionURL.getPassword();
+
+        _protocolVersion = connectionURL.getProtocolVersion();
+
         setVirtualHost(connectionURL.getVirtualHost());
 
         if (connectionURL.getDefaultQueueExchangeName() != null)
@@ -298,9 +307,9 @@
             _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
         }
 
-
         _failoverPolicy = new FailoverPolicy(connectionURL);
-        _protocolHandler = new AMQProtocolHandler(this);
+
+         _protocolHandler = new AMQProtocolHandler(this);
 
         // We are not currently connected
         _connected = false;
@@ -320,6 +329,11 @@
             {
                 lastException = e;
 
+                //We need to change protocol handler here as an error during the connect will not
+                // cause the StateManager to be replaced. So the state is out of sync on reconnect
+                // This can be seen when a exception occurs during connection. i.e. log4j NoSuchMethod. (using < 1.2.12)
+                _protocolHandler.setStateManager(new AMQStateManager());
+
                 if (_logger.isInfoEnabled())
                 {
                     _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
@@ -374,7 +388,7 @@
                 if (e.getCause() != null)
                 {
                     e.initCause(lastException);
-                }
+	        }
             }
 
             throw e;
@@ -383,6 +397,18 @@
         _connectionMetaData = new QpidConnectionMetaData(this);
     }
 
+    protected boolean checkException(Throwable thrown)
+    {
+        Throwable cause = thrown.getCause();
+
+        if (cause == null)
+        {
+            cause = thrown;
+        }
+
+        return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
+    }
+
     protected AMQConnection(String username, String password, String clientName, String virtualHost)
     {
         _clientName = clientName;
@@ -401,20 +427,6 @@
         _virtualHost = virtualHost;
     }
 
-    protected boolean checkException(Throwable thrown)
-    {
-        Throwable cause = thrown.getCause();
-
-        if (cause == null)
-        {
-            cause = thrown;
-        }
-
-        return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
-    }
-
-
-
     public boolean attemptReconnection(String host, int port)
     {
         BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
@@ -520,6 +532,50 @@
         return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
     }
 
+    private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+            throws AMQException, FailoverException
+    {
+
+        ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
+        // TODO: Be aware of possible changes to parameter order as versions change.
+
+        _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId),  ChannelOpenOkBody.class);
+
+        BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
+
+        // todo send low water mark when protocol allows.
+        // todo Be aware of possible changes to parameter order as versions change.
+        _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
+
+        if (transacted)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Issuing TxSelect for " + channelId);
+            }
+
+            TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
+
+            // TODO: Be aware of possible changes to parameter order as versions change.
+            _protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
+        }
+    }
+
+    private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+            throws AMQException, FailoverException
+    {
+        try
+        {
+            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+        }
+        catch (AMQException e)
+        {
+            deregisterSession(channelId);
+            throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
+        }
+    }
+
     public void setFailoverPolicy(FailoverPolicy policy)
     {
         _failoverPolicy = policy;
@@ -615,7 +671,6 @@
         checkNotClosed();
         if (!_started)
         {
-            _started = true;
             final Iterator it = _sessions.entrySet().iterator();
             while (it.hasNext())
             {
@@ -630,6 +685,7 @@
                 }
             }
 
+            _started = true;
         }
     }
 
@@ -661,49 +717,87 @@
 
     public void close(long timeout) throws JMSException
     {
-        synchronized (getFailoverMutex())
+        close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+    }
+
+    public void close(List<AMQSession> sessions, long timeout) throws JMSException
+    {
+        synchronized(_sessionCreationLock)
         {
-            if (!_closed.getAndSet(true))
+            if(!sessions.isEmpty())
             {
-                try
+                AMQSession session = sessions.remove(0);
+                synchronized(session.getMessageDeliveryLock())
                 {
-                    long startCloseTime = System.currentTimeMillis();
-
-                    _taskPool.shutdown();
-                    closeAllSessions(null, timeout, startCloseTime);
-
-                    if (!_taskPool.isTerminated())
+                    close(sessions, timeout);
+                }
+            }
+            else
+            {
+                synchronized (getFailoverMutex())
+                {
+                    if (!_closed.getAndSet(true))
                     {
                         try
                         {
+                            long startCloseTime = System.currentTimeMillis();
+
+                            _taskPool.shutdown();
+                            closeAllSessions(null, timeout, startCloseTime);
+
+                            if (!_taskPool.isTerminated())
+                            {
+                                try
+                                {
+                                    // adjust timeout
+                                    long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+
+                                    _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+                                }
+                                catch (InterruptedException e)
+                                {
+                                    _logger.info("Interrupted while shutting down connection thread pool.");
+                                }
+                            }
+
                             // adjust timeout
-                            long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+                            timeout = adjustTimeout(timeout, startCloseTime);
+                            _delegate.closeConneciton(timeout);
 
-                            _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+                            //If the taskpool hasn't shutdown by now then give it shutdownNow.
+                            // This will interupt any running tasks.
+                            if (!_taskPool.isTerminated())
+                            {
+                                List<Runnable> tasks = _taskPool.shutdownNow();
+                                for (Runnable r : tasks)
+                                {
+                                    _logger.warn("Connection close forced taskpool to prevent execution:" + r);
+                                }
+                            }
                         }
-                        catch (InterruptedException e)
+                        catch (AMQException e)
                         {
-                            _logger.info("Interrupted while shutting down connection thread pool.");
+                            JMSException jmse = new JMSException("Error closing connection: " + e);
+                            jmse.setLinkedException(e);
+                            throw jmse;
                         }
                     }
-
-                    // adjust timeout
-                    timeout = adjustTimeout(timeout, startCloseTime);
-                    _delegate.closeConneciton(timeout);
-                    //_protocolHandler.closeConnection(timeout);
-
-                }
-                catch (AMQException e)
-                {
-                    JMSException jmse = new JMSException("Error closing connection: " + e);
-                    jmse.setLinkedException(e);
-                    throw jmse;
                 }
             }
         }
     }
 
+    private long adjustTimeout(long timeout, long startTime)
+    {
+        long now = System.currentTimeMillis();
+        timeout -= now - startTime;
+        if (timeout < 0)
+        {
+            timeout = 0;
+        }
 
+        return timeout;
+    }
 
     /**
      * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
@@ -768,19 +862,6 @@
         }
     }
 
-
-    private long adjustTimeout(long timeout, long startTime)
-    {
-        long now = System.currentTimeMillis();
-        timeout -= now - startTime;
-        if (timeout < 0)
-        {
-            timeout = 0;
-        }
-
-        return timeout;
-    }
-
     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
                                                        ServerSessionPool sessionPool, int maxMessages) throws JMSException
     {
@@ -974,18 +1055,7 @@
 
     /**
      * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
-     * to a JMS exception liste
-    {
-        ArrayList sessions = new ArrayList(_sessions.values());
-        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
-        for (Iterator it = sessions.iterator(); it.hasNext();)
-        {
-            AMQSession s = (AMQSession) it.next();
-            // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
-            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
-            s.resubscribe();
-        }
-    }ner, if configured, and propagates the exception to sessions, which in turn will
+     * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
      * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
      *
      * @param cause the exception
@@ -1042,6 +1112,10 @@
         {
             _exceptionListener.onException(je);
         }
+        else
+        {
+            _logger.error("Throwable Received but no listener set: " + cause.getMessage());
+        }
 
         if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException))
         {
@@ -1162,5 +1236,21 @@
     public AMQSession getSession(int channelId)
     {
         return _sessions.get(channelId);
+    }
+
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
+    }
+
+    public void setProtocolVersion(ProtocolVersion protocolVersion)
+    {
+        _protocolVersion = protocolVersion;
+        _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
+    }
+
+    public boolean isFailingOver()
+    {
+        return (_protocolHandler.getFailoverLatch() != null);
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java Fri Feb  8 02:09:37 2008
@@ -178,31 +178,25 @@
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
             throws AMQException, FailoverException
     {
-
+        ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
         // TODO: Be aware of possible changes to parameter order as versions change.
-
-        _conn._protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
-                _conn._protocolHandler.getProtocolMinorVersion(), null), // outOfBand
-                                                                                                                     ChannelOpenOkBody.class);
+        _conn._protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId),  ChannelOpenOkBody.class);
 
         // todo send low water mark when protocol allows.
         // todo Be aware of possible changes to parameter order as versions change.
-        _conn._protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
-                _conn._protocolHandler.getProtocolMinorVersion(), false, // global
-                                                               prefetchHigh, // prefetchCount
-                                                               0), // prefetchSize
-                                                                   BasicQosOkBody.class);
-
+        BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
+        _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
+        
         if (transacted)
         {
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Issuing TxSelect for " + channelId);
             }
-
+            TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
+            
             // TODO: Be aware of possible changes to parameter order as versions change.
-            _conn._protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
-                    _conn._protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
+            _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
         }
     }
 
@@ -212,7 +206,7 @@
      */
     public void resubscribeSessions() throws JMSException, AMQException, FailoverException
     {
-        ArrayList sessions = new ArrayList(_conn._sessions.values());
+        ArrayList sessions = new ArrayList(_conn.getSessions().values());
         _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
         for (Iterator it = sessions.iterator(); it.hasNext();)
         {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Fri Feb  8 02:09:37 2008
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Hashtable;
+import java.util.UUID;
 
 import javax.jms.*;
 import javax.naming.Context;
@@ -258,7 +259,7 @@
         }
         catch (UnknownHostException e)
         {
-            return null;
+            return "UnknownHost" + UUID.randomUUID();
         }
     }
 
@@ -351,7 +352,9 @@
      * @param name
      * @param ctx
      * @param env
+     *
      * @return AMQConnection,AMQTopic,AMQQueue, or AMQConnectionFactory.
+     *
      * @throws Exception
      */
     public Object getObjectInstance(Object obj, Name name, Context ctx, Hashtable env) throws Exception
@@ -407,8 +410,9 @@
 
     public Reference getReference() throws NamingException
     {
-        return new Reference(AMQConnectionFactory.class.getName(),
-                             new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
+        return new Reference(
+                AMQConnectionFactory.class.getName(),
+                new StringRefAddr(AMQConnectionFactory.class.getName(), _connectionDetails.getURL()),
                              AMQConnectionFactory.class.getName(), null);          // factory location
     }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java Fri Feb  8 02:09:37 2008
@@ -29,6 +29,7 @@
 import org.apache.qpid.client.url.URLParser_0_8;
 import org.apache.qpid.client.url.URLParser_0_10;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.url.URLHelper;
@@ -53,6 +54,7 @@
     private AMQShortString _defaultTopicExchangeName;
     private AMQShortString _temporaryTopicExchangeName;
     private AMQShortString _temporaryQueueExchangeName;
+    private ProtocolVersion _protocolVersion = ProtocolVersion.defaultProtocolVersion();
     private byte _urlVersion;
 
     public AMQConnectionURL(String fullURL) throws URLSyntaxException
@@ -104,6 +106,15 @@
     public void setURLVersion(byte version)
     {
         _urlVersion = version;
+        if(_options.containsKey(OPTIONS_PROTOCOL_VERSION))
+        {
+            ProtocolVersion pv = ProtocolVersion.parse(_options.get(OPTIONS_PROTOCOL_VERSION));
+            if(pv != null)
+            {
+                _protocolVersion = pv;
+            }
+        }
+
     }
 
     public String getURL()
@@ -264,6 +275,11 @@
     public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
     {
         _temporaryTopicExchangeName = temporaryTopicExchangeName;
+    }
+
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
     }
 
     public String toString()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java Fri Feb  8 02:09:37 2008
@@ -49,6 +49,6 @@
         //Not sure what the best approach is here, probably to treat this like a topic
         //and allow server to generate names. As it is AMQ specific it doesn't need to
         //fit the JMS API expectations so this is not as yet critical.
-        return false;
+        return getAMQQueueName() == null;
     }
 }