You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/07/04 18:28:54 UTC

svn commit: r674085 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/connection/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/se...

Author: ritchiem
Date: Fri Jul  4 09:28:53 2008
New Revision: 674085

URL: http://svn.apache.org/viewvc?rev=674085&view=rev
Log:
QPID-871 - Added a ConnectionRegistry per Virtualhost to track the open connections.
Altered the ApplicationRegistry so that when the shutdown hook is fired it:
Unbinds from the listening sockets
Then closes each virtualhost which in turn closes all the active TCP connections before closing the MessageStore thus preventing any logged errors occuring as a result of the active TCP connection performing an action on the closed store.

Test provided MessageStoreShutdownTest which uses the new InternalBrokerBaseCase and InternalTestProtocolSession classes to perform system testing of the Broker without TCP framing or client codebase.

Added:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
      - copied, changed from r674058, incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=674085&r1=674084&r2=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Fri Jul  4 09:28:53 2008
@@ -34,7 +34,6 @@
 import org.apache.log4j.xml.DOMConfigurator;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.SimpleByteBufferAllocator;
 import org.apache.mina.common.FixedSizeByteBufferAllocator;
 import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
@@ -421,7 +420,8 @@
                     bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
                 }
 
-                acceptor.bind(bindAddress, handler, sconfig);
+                bind(acceptor, bindAddress, handler, sconfig);
+
                 //fixme  qpid.AMQP should be using qpidproperties to get value
                 _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
             }
@@ -432,7 +432,8 @@
                 try
                 {
 
-                    acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+                    bind(acceptor, new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+
                     //fixme  qpid.AMQP should be using qpidproperties to get value
                     _brokerLogger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
 
@@ -455,6 +456,23 @@
         }
     }
 
+    /**
+     * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
+     *
+     * @param acceptor
+     * @param bindAddress
+     * @param handler
+     * @param sconfig
+     *
+     * @throws IOException from the acceptor.bind command
+     */
+    private void bind(IoAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
+    {
+        acceptor.bind(bindAddress, handler, sconfig);
+
+        ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
+    }
+
     public static void main(String[] args)
     {
 

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=674085&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Fri Jul  4 09:28:53 2008
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.connection;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.protocol.AMQConstant;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.List;
+
+public class ConnectionRegistry implements IConnectionRegistry
+{
+    private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>();
+
+    private VirtualHost _virtualHost;
+
+    public ConnectionRegistry(VirtualHost virtualHost)
+    {
+        _virtualHost = virtualHost;
+    }
+
+    public void initialise()
+    {
+
+    }
+
+    /** Close all of the currently open connections. */
+    public void close() throws AMQException
+    {
+        while (!_registry.isEmpty())
+        {
+            AMQProtocolSession connection = _registry.get(0);
+
+            connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down",
+                                                                  0, 0,
+                                                                  connection.getProtocolOutputConverter().getProtocolMajorVersion(),
+                                                                  connection.getProtocolOutputConverter().getProtocolMinorVersion(),
+                                                                  (Throwable) null), true);
+        }
+    }
+
+    public void registerConnection(AMQProtocolSession connnection)
+    {
+        _registry.add(connnection);
+    }
+
+    public void deregisterConnection(AMQProtocolSession connnection)
+    {
+        _registry.remove(connnection);
+    }
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=674085&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Fri Jul  4 09:28:53 2008
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.connection;
+
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+
+public interface IConnectionRegistry
+{
+
+    public void initialise();
+
+    public void close() throws AMQException;
+
+    public void registerConnection(AMQProtocolSession connnection);
+
+    public void deregisterConnection(AMQProtocolSession connnection);
+
+}

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=674085&r1=674084&r2=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Jul  4 09:28:53 2008
@@ -25,6 +25,7 @@
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.CloseFuture;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 
 import org.apache.qpid.AMQChannelException;
@@ -99,7 +100,7 @@
 
     private Object _lastSent;
 
-    private boolean _closed;
+    protected boolean _closed;
     // maximum number of channels this session should have
     private long _maxNoOfChannels = 1000;
 
@@ -115,13 +116,16 @@
     private MethodDispatcher _dispatcher;
     private ProtocolSessionIdentifier _sessionIdentifier;
 
+    private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
+    private org.apache.mina.common.WriteFuture _lastWriteFuture;
+
     public ManagedObject getManagedObject()
     {
         return _managedObject;
     }
 
     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
-        throws AMQException
+            throws AMQException
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _minaProtocolSession = session;
@@ -145,7 +149,7 @@
     }
 
     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
-        AMQStateManager stateManager) throws AMQException
+                                  AMQStateManager stateManager) throws AMQException
     {
         _stateManager = stateManager;
         _minaProtocolSession = session;
@@ -199,7 +203,7 @@
     }
 
     private void frameReceived(AMQFrame frame) throws AMQException
-    {        
+    {
         int channelId = frame.getChannel();
         AMQBody body = frame.getBodyFrame();
 
@@ -222,15 +226,14 @@
             {
                 if (_logger.isInfoEnabled())
                 {
-                    _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
+                    _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
                 }
 
+                closeProtocolSession();
                 return;
             }
         }
 
-
-
         try
         {
             body.handle(channelId, this);
@@ -258,7 +261,6 @@
 
             String locales = "en_US";
 
-
             AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
                                                                                        (short) getProtocolMinorVersion(),
                                                                                        null,
@@ -266,7 +268,6 @@
                                                                                        locales.getBytes());
             _minaProtocolSession.write(responseBody.generateFrame(0));
 
-
         }
         catch (AMQException e)
         {
@@ -332,27 +333,16 @@
                         _logger.info("Closing connection due to: " + e.getMessage());
                     }
 
-                    closeSession();
-
                     AMQConnectionException ce =
-                        evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
-                            AMQConstant.CHANNEL_ERROR.getName().toString());
+                            evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+                                                                   AMQConstant.CHANNEL_ERROR.getName().toString());
 
-                    _stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                    writeFrame(ce.getCloseFrame(channelId));
+                    closeConnection(channelId, ce, false);
                 }
             }
             catch (AMQConnectionException e)
             {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Closing connection due to: " + e.getMessage());
-                }
-
-                markChannelAwaitingCloseOk(channelId);
-                closeSession();
-                _stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                writeFrame(e.getCloseFrame(channelId));
+                closeConnection(channelId, e, false);
             }
         }
         catch (Exception e)
@@ -365,7 +355,7 @@
 
             _logger.error("Unexpected exception while processing frame.  Closing connection.", e);
 
-            _minaProtocolSession.close();
+            closeProtocolSession();
         }
     }
 
@@ -399,7 +389,8 @@
     public void writeFrame(AMQDataBlock frame)
     {
         _lastSent = frame;
-        _minaProtocolSession.write(frame);
+
+        _lastWriteFuture = _minaProtocolSession.write(frame);
     }
 
     public AMQShortString getContextKey()
@@ -431,7 +422,7 @@
     public AMQChannel getChannel(int channelId) throws AMQException
     {
         final AMQChannel channel =
-            ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+                ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
         if ((channel == null) || channel.isClosing())
         {
             return null;
@@ -464,8 +455,8 @@
         if (_channelMap.size() == _maxNoOfChannels)
         {
             String errorMessage =
-                toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
-                + "); can't create channel";
+                    toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+                    + "); can't create channel";
             _logger.error(errorMessage);
             throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
         }
@@ -619,6 +610,12 @@
         if (!_closed)
         {
             _closed = true;
+
+            if (_virtualHost != null)
+            {
+                _virtualHost.getConnectionRegistry().deregisterConnection(this);
+            }
+
             closeAllChannels();
             if (_managedObject != null)
             {
@@ -632,9 +629,54 @@
         }
     }
 
+    public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
+    {
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Closing connection due to: " + e.getMessage());
+        }
+
+        markChannelAwaitingCloseOk(channelId);
+        closeSession();
+        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+        writeFrame(e.getCloseFrame(channelId));
+
+        if (closeProtocolSession)
+        {
+            closeProtocolSession();
+        }
+    }
+
+    public void closeProtocolSession()
+    {
+        closeProtocolSession(true);
+    }
+
+    public void closeProtocolSession(boolean waitLast)
+    {
+        _logger.debug("Waiting for last write to join.");
+        if (waitLast && (_lastWriteFuture != null))
+        {
+            _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+        }
+
+        _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
+        final CloseFuture future = _minaProtocolSession.close();
+        future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+
+        try
+        {
+            _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+        }
+        catch (AMQException e)
+        {
+            _logger.info(e.getMessage());
+        }
+    }
+
     public String toString()
     {
-        return _minaProtocolSession.getRemoteAddress() + "("+(getAuthorizedID() == null ? "?" : getAuthorizedID().getName()+")");
+        return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
     }
 
     public String dump()
@@ -752,6 +794,9 @@
     public void setVirtualHost(VirtualHost virtualHost) throws AMQException
     {
         _virtualHost = virtualHost;
+
+        _virtualHost.getConnectionRegistry().registerConnection(this);
+
         _managedObject = createMBean();
         _managedObject.register();
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=674085&r1=674084&r2=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Jul  4 09:28:53 2008
@@ -25,6 +25,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -150,6 +151,10 @@
     /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     void closeSession() throws AMQException;
 
+    /** This must be called to close the session in order to free up any resources managed by the session. */
+    void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
+
+
     /** @return a key that uniquely identifies this session */
     Object getKey();
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=674085&r1=674084&r2=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Fri Jul  4 09:28:53 2008
@@ -24,9 +24,17 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.mina.common.IoAcceptor;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.net.InetSocketAddress;
 
 /**
  * An abstract application registry that provides access to configuration information and handles the
@@ -48,6 +56,21 @@
     public static final String DEFAULT_APPLICATION_REGISTRY = "org.apache.qpid.server.util.NullApplicationRegistry";
     public static String _APPLICATION_REGISTRY = DEFAULT_APPLICATION_REGISTRY;
 
+    protected final Map<InetSocketAddress, IoAcceptor> _acceptors = new HashMap<InetSocketAddress, IoAcceptor>();
+
+    protected ManagedObjectRegistry _managedObjectRegistry;
+
+    protected AuthenticationManager _authenticationManager;
+
+    protected VirtualHostRegistry _virtualHostRegistry;
+
+    protected ACLPlugin _accessManager;
+
+    protected PrincipalDatabaseManager _databaseManager;
+
+    protected PluginManager _pluginManager;
+
+
     static
     {
         Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -57,7 +80,6 @@
     {
         public void run()
         {
-            _logger.info("Shutting down application registries...");
             removeAll();
         }
     }
@@ -90,6 +112,12 @@
         }
     }
 
+    /**
+     * Method to cleanly shutdown specified registry running in this JVM
+     *
+     * @param instanceID the instance to shutdown
+     */
+
     public static void remove(int instanceID)
     {
         try
@@ -111,8 +139,9 @@
         }
     }
 
+    /** Method to cleanly shutdown all registries currently running in this JVM */
     public static void removeAll()
-    {
+    {        
         Object[] keys = _instanceMap.keySet().toArray();
         for (Object k : keys)
         {
@@ -162,6 +191,10 @@
 
     public void close() throws Exception
     {
+        //Stop incomming connections
+        unbind();
+
+        //Shutdown virtualhosts
         for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
         {
             virtualHost.close();
@@ -174,11 +207,31 @@
         }
     }
 
+    private void unbind()
+    {
+        synchronized (_acceptors)
+        {
+            for (InetSocketAddress bindAddress : _acceptors.keySet())
+            {
+                IoAcceptor acceptor = _acceptors.get(bindAddress);
+                acceptor.unbind(bindAddress);
+            }
+        }
+    }
+
     public Configuration getConfiguration()
     {
         return _configuration;
     }
 
+    public void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor)
+    {
+        synchronized (_acceptors)
+        {
+            _acceptors.put(bindAddress, acceptor);
+        }
+    }
+
     public <T> T getConfiguredObject(Class<T> instanceType)
     {
         T instance = (T) _configuredObjects.get(instanceType);
@@ -204,4 +257,35 @@
     {
         _APPLICATION_REGISTRY = clazz;
     }
+
+    public VirtualHostRegistry getVirtualHostRegistry()
+    {
+        return _virtualHostRegistry;
+    }
+
+    public ACLPlugin getAccessManager()
+    {
+        return _accessManager;
+    }
+
+    public ManagedObjectRegistry getManagedObjectRegistry()
+    {
+        return _managedObjectRegistry;
+    }
+
+    public PrincipalDatabaseManager getDatabaseManager()
+    {
+        return _databaseManager;
+    }
+
+    public AuthenticationManager getAuthenticationManager()
+    {
+        return _authenticationManager;
+    }
+
+    public PluginManager getPluginManager()
+    {
+        return _pluginManager;
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=674085&r1=674084&r2=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Fri Jul  4 09:28:53 2008
@@ -48,18 +48,6 @@
 public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
 {
 
-    private ManagedObjectRegistry _managedObjectRegistry;
-
-    private AuthenticationManager _authenticationManager;
-
-    private ACLPlugin _accessManager;
-
-    private PrincipalDatabaseManager _databaseManager;
-
-    private VirtualHostRegistry _virtualHostRegistry;
-
-    private PluginManager _pluginManager;
-
 
     public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
     {
@@ -145,39 +133,9 @@
         }
     }
 
-
-    public VirtualHostRegistry getVirtualHostRegistry()
-    {
-        return _virtualHostRegistry;
-    }
-
-    public ACLPlugin getAccessManager()
-    {
-        return _accessManager;
-    }
-
-    public ManagedObjectRegistry getManagedObjectRegistry()
-    {
-        return _managedObjectRegistry;
-    }
-
-    public PrincipalDatabaseManager getDatabaseManager()
-    {
-        return _databaseManager;
-    }
-
-    public AuthenticationManager getAuthenticationManager()
-    {
-        return _authenticationManager;
-    }
-
     public Collection<String> getVirtualHostNames()
     {
         return getConfiguration().getList("virtualhosts.virtualhost.name");
     }
 
-    public PluginManager getPluginManager()
-    {
-        return _pluginManager;
-    }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=674085&r1=674084&r2=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Fri Jul  4 09:28:53 2008
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.registry;
 
 import java.util.Collection;
+import java.net.InetSocketAddress;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
@@ -29,6 +30,7 @@
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.mina.common.IoAcceptor;
 
 public interface IApplicationRegistry
 {
@@ -39,6 +41,10 @@
      */
     void initialise() throws Exception;
 
+    /**
+     * Shutdown this Registry
+     * @throws Exception - //fixme needs to be made more specific
+     */
     void close() throws Exception;
 
     /**
@@ -71,5 +77,12 @@
     ACLPlugin getAccessManager();
 
     PluginManager getPluginManager();
-    
+
+    /**
+     * Register any acceptors for this registry
+     * @param bindAddress The address that the acceptor has been bound with
+     * @param acceptor The acceptor in use
+     */
+    void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor);
+
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=674085&r1=674084&r2=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Fri Jul  4 09:28:53 2008
@@ -42,19 +42,6 @@
 
 public class NullApplicationRegistry extends ApplicationRegistry
 {
-    private ManagedObjectRegistry _managedObjectRegistry;
-
-    private AuthenticationManager _authenticationManager;
-
-    private VirtualHostRegistry _virtualHostRegistry;
-
-    private ACLPlugin _accessManager;
-
-    private PrincipalDatabaseManager _databaseManager;
-
-    private PluginManager _pluginManager;
-
-
     public NullApplicationRegistry()
     {
         super(new MapConfiguration(new HashMap()));
@@ -84,47 +71,11 @@
 
     }
 
-    public Configuration getConfiguration()
-    {
-        return _configuration;
-    }
-
-
-    public ManagedObjectRegistry getManagedObjectRegistry()
-    {
-        return _managedObjectRegistry;
-    }
-
-    public PrincipalDatabaseManager getDatabaseManager()
-    {
-        return _databaseManager;
-    }
-
-    public AuthenticationManager getAuthenticationManager()
-    {
-        return _authenticationManager;
-    }
-
     public Collection<String> getVirtualHostNames()
     {
         String[] hosts = {"test"};
         return Arrays.asList(hosts);
     }
-
-    public VirtualHostRegistry getVirtualHostRegistry()
-    {
-        return _virtualHostRegistry;
-    }
-
-    public ACLPlugin getAccessManager()
-    {
-        return _accessManager;
-    }
-
-    public PluginManager getPluginManager()
-    {
-        return _pluginManager;
-    }
 }
 
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=674085&r1=674084&r2=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Jul  4 09:28:53 2008
@@ -26,6 +26,8 @@
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.connection.ConnectionRegistry;
+import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.access.Accessable;
@@ -55,6 +57,8 @@
 
     private final String _name;
 
+    private ConnectionRegistry _connectionRegistry;
+
     private QueueRegistry _queueRegistry;
 
     private ExchangeRegistry _exchangeRegistry;
@@ -74,7 +78,8 @@
     private final Timer _houseKeepingTimer;
      
     private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
-    
+
+
     public void setAccessableName(String name)
     {
         _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
@@ -86,6 +91,10 @@
         return _name;
     }
 
+    public IConnectionRegistry getConnectionRegistry()
+    {
+        return _connectionRegistry;
+    }
 
     /**
      * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
@@ -143,8 +152,8 @@
         _name = name;
 
         _virtualHostMBean = new VirtualHostMBean();
-        // This isn't needed to be registered
-        //_virtualHostMBean.register();
+
+        _connectionRegistry = new ConnectionRegistry(this);
 
         _houseKeepingTimer = new Timer("Queue-housekeeping-"+name, true);
         _queueRegistry = new DefaultQueueRegistry(this);
@@ -283,14 +292,20 @@
     public ACLPlugin getAccessManager()
     {
         return _accessManager;
-    }
+    }                                                                   
 
     public void close() throws Exception
     {
+        //Stop Housekeeping
         if (_houseKeepingTimer != null)
         {
             _houseKeepingTimer.cancel();
         }
+
+        //Stop Connections
+        _connectionRegistry.close();
+
+        //Close MessageStore
         if (_messageStore != null)
         {
             _messageStore.close();

Added: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=674085&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Fri Jul  4 09:28:53 2008
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.AMQChannel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
+{
+    // ChannelID(LIST)  -> LinkedList<Pair>
+    final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
+    private AtomicInteger _deliveryCount = new AtomicInteger(0);
+
+    public InternalTestProtocolSession() throws AMQException
+    {
+        super(new TestIoSession(),
+              ApplicationRegistry.getInstance().getVirtualHostRegistry(),
+              new AMQCodecFactory(true));
+
+        _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+
+    }
+
+    public ProtocolOutputConverter getProtocolOutputConverter()
+    {
+        return this;
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return (byte) 8;
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return (byte) 0;
+    }
+
+    // ***
+
+    public List<DeliveryPair> getDelivers(int channelId, AMQShortString consumerTag, int count)
+    {
+        synchronized (_channelDelivers)
+        {
+            List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count);
+
+            List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
+
+            //Remove the msgs from the receivedList.
+            msgs.clear();
+
+            return response;
+        }
+    }
+
+    // *** ProtocolOutputConverter Implementation
+    public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+    {
+    }
+
+    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+    {
+    }
+
+    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+    {
+        _deliveryCount.incrementAndGet();
+
+        synchronized (_channelDelivers)
+        {
+            Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
+
+            if (consumers == null)
+            {
+                consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+                _channelDelivers.put(channelId, consumers);
+            }
+
+            LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
+
+            if (consumerDelivers == null)
+            {
+                consumerDelivers = new LinkedList<DeliveryPair>();
+                consumers.put(consumerTag, consumerDelivers);
+            }
+
+            consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+        }
+    }
+
+    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+    {
+    }
+
+    public void awaitDelivery(int msgs)
+    {
+        while (msgs > _deliveryCount.get())
+        {
+            try
+            {
+                Thread.sleep(100);
+            }
+            catch (InterruptedException e)
+            {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public class DeliveryPair
+    {
+        private long _deliveryTag;
+        private AMQMessage _message;
+
+        public DeliveryPair(long deliveryTag, AMQMessage message)
+        {
+            _deliveryTag = deliveryTag;
+            _message = message;
+        }
+
+        public AMQMessage getMessage()
+        {
+            return _message;
+        }
+
+        public long getDeliveryTag()
+        {
+            return _deliveryTag;
+        }
+    }
+
+    public boolean isClosed()
+    {
+        return _closed;
+    }
+
+    public void closeProtocolSession(boolean waitLast)
+    {
+        // Override as we don't have a real IOSession to close.
+        //  The alternative is to fully implement the TestIOSession to return a CloseFuture from close();
+        //  Then the AMQMinaProtocolSession can join on the returning future without a NPE.
+    }
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java?rev=674085&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java Fri Jul  4 09:28:53 2008
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+import java.util.List;
+
+public class MessageStoreShutdownTest extends InternalBrokerBaseCase
+{
+
+    public void test()
+    {
+        subscribe(_session, _channel, _queue);
+
+        try
+        {
+            publishMessages(_session, _channel, 1);
+        }
+        catch (AMQException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            fail(e.getMessage());
+        }
+
+        try
+        {
+            _registry.close();
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            fail(e.getMessage());
+        }
+
+        assertTrue("Session should now be closed", _session.isClosed());
+
+
+        //Test attempting to modify the broker state after session has been closed.
+
+        //The Message should have been removed from the unacked list.
+
+        //Ack Messages
+        List<InternalTestProtocolSession.DeliveryPair> list = _session.getDelivers(_channel.getChannelId(), new AMQShortString("sgen_1"), 1);
+
+        InternalTestProtocolSession.DeliveryPair pair = list.get(0);
+
+        try
+        {
+            // The message should now be requeued and so unable to ack it.
+            _channel.acknowledgeMessage(pair.getDeliveryTag(), false);
+        }
+        catch (AMQException e)
+        {
+            assertEquals("Incorrect exception thrown", "Single ack on delivery tag 1 not known for channel:1", e.getMessage());
+        }
+
+    }
+
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=674085&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Fri Jul  4 09:28:53 2008
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+public class InternalBrokerBaseCase extends TestCase
+{
+    protected IApplicationRegistry _registry;
+    protected MessageStore _messageStore;
+    protected AMQChannel _channel;
+    protected InternalTestProtocolSession _session;
+    protected VirtualHost _virtualHost;
+    protected StoreContext _storeContext = new StoreContext();
+    protected AMQQueue _queue;
+    protected AMQShortString QUEUE_NAME;
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _registry = new TestApplicationRegistry();
+        ApplicationRegistry.initialise(_registry);
+        _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
+        _messageStore = _virtualHost.getMessageStore();
+
+        QUEUE_NAME = new AMQShortString("test");
+        _queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"),
+                                                    false, _virtualHost, null);
+
+        _virtualHost.getQueueRegistry().registerQueue(_queue);
+
+        Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
+        _queue.bind(defaultExchange, QUEUE_NAME, null);
+
+        _session = new InternalTestProtocolSession();
+
+        _session.setVirtualHost(_virtualHost);
+
+        _channel = new AMQChannel(_session, 1, _messageStore);
+
+        _session.addChannel(_channel);
+    }
+
+    public void tearDown() throws Exception
+    {
+        ApplicationRegistry.removeAll();
+        super.tearDown();
+    }
+
+    protected void checkStoreContents(int messageCount)
+    {
+        assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size());
+
+        //The above publish message is sufficiently small not to fit in the header so no Body is required.
+        //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());
+    }
+
+    protected AMQShortString subscribe(InternalTestProtocolSession session, AMQChannel channel, AMQQueue queue)
+    {
+        try
+        {
+            return channel.subscribeToQueue(null, queue, true, null, false, true);
+        }
+        catch (AMQException e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+        catch (ConsumerTagNotUniqueException e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+        //Keep the compiler happy
+        return null;
+    }
+
+    public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException
+    {
+        MessagePublishInfo info = new MessagePublishInfo()
+        {
+            public AMQShortString getExchange()
+            {
+                return ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+            }
+
+            public void setExchange(AMQShortString exchange)
+            {
+
+            }
+
+            public boolean isImmediate()
+            {
+                return false;
+            }
+
+            public boolean isMandatory()
+            {
+                return false;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return QUEUE_NAME;
+            }
+        };
+
+        for (int count = 0; count < messages; count++)
+        {
+            channel.setPublishFrame(info,  _virtualHost.getExchangeRegistry().getExchange(info.getExchange()));
+
+            //Set the body size
+            ContentHeaderBody _headerBody = new ContentHeaderBody();
+            _headerBody.bodySize = 0;
+
+            //Set Minimum properties
+            BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+
+            properties.setExpiration(0L);
+            properties.setTimestamp(System.currentTimeMillis());
+
+            //Make Message Persistent
+            properties.setDeliveryMode((byte) 2);
+
+            _headerBody.properties = properties;
+
+            channel.publishContentHeader(_headerBody);
+        }
+
+    }
+
+    public void acknowledge(AMQChannel channel, long deliveryTag)
+    {
+        try
+        {
+            channel.acknowledgeMessage(deliveryTag, false);
+        }
+        catch (AMQException e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+}

Copied: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (from r674058, incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?p2=incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java&p1=incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java&r1=674058&r2=674085&rev=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Fri Jul  4 09:28:53 2008
@@ -20,29 +20,30 @@
  */
 package org.apache.qpid.server.util;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
 import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
-import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.MapConfiguration;
 
-import java.util.HashMap;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Properties;
+import java.util.Arrays;
 
 public class TestApplicationRegistry extends ApplicationRegistry
 {
@@ -52,17 +53,11 @@
 
     private ExchangeFactory _exchangeFactory;
 
-    private ManagedObjectRegistry _managedObjectRegistry;
-
-    private ACLPlugin _accessManager;
-
-    private PrincipalDatabaseManager _databaseManager;
-
-    private AuthenticationManager _authenticationManager;
-
     private MessageStore _messageStore;
+
     private VirtualHost _vHost;
 
+
     public TestApplicationRegistry()
     {
         super(new MapConfiguration(new HashMap()));
@@ -80,23 +75,23 @@
 
         _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
 
-        IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
-        _managedObjectRegistry = appRegistry.getManagedObjectRegistry();
-        _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
+        _managedObjectRegistry = new NoopManagedObjectRegistry();
+
+        _messageStore = new TestableMemoryMessageStore();
+
+        _virtualHostRegistry = new VirtualHostRegistry();
+
+        _vHost = new VirtualHost("test", _messageStore);
+
+        _virtualHostRegistry.registerVirtualHost(_vHost);
+
         _queueRegistry = _vHost.getQueueRegistry();
         _exchangeFactory = _vHost.getExchangeFactory();
         _exchangeRegistry = _vHost.getExchangeRegistry();
 
-        _messageStore = new TestableMemoryMessageStore();
-
         _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
     }
 
-    public Configuration getConfiguration()
-    {
-        return _configuration;
-    }
-
     public QueueRegistry getQueueRegistry()
     {
         return _queueRegistry;
@@ -112,34 +107,10 @@
         return _exchangeFactory;
     }
 
-    public ManagedObjectRegistry getManagedObjectRegistry()
-    {
-        return _managedObjectRegistry;
-    }
-
-    public PrincipalDatabaseManager getDatabaseManager()
-    {
-        return _databaseManager;
-    }
-
-    public AuthenticationManager getAuthenticationManager()
-    {
-        return _authenticationManager;
-    }
-
     public Collection<String> getVirtualHostNames()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public VirtualHostRegistry getVirtualHostRegistry()
-    {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public ACLPlugin getAccessManager()
-    {
-        return _accessManager;
+        String[] hosts = {"test"};
+        return Arrays.asList(hosts);
     }
 
     public void setAccessManager(ACLPlugin newManager)
@@ -152,10 +123,6 @@
         return _messageStore;
     }
 
-    public PluginManager getPluginManager()
-    {
-        return null;
-    }
 }
 
 

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?rev=674085&r1=674084&r2=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Fri Jul  4 09:28:53 2008
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -117,6 +118,10 @@
     {
     }
 
+    public void closeConnection(int channelId, AMQConnectionException e, boolean closeIoSession) throws AMQException
+    {        
+    }
+
     public Object getKey()
     {
         return null;

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=674085&r1=674084&r2=674085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java Fri Jul  4 09:28:53 2008
@@ -52,15 +52,8 @@
 
     private ExchangeFactory _exchangeFactory;
 
-    private ManagedObjectRegistry _managedObjectRegistry;
-
-    private ACLPlugin _accessManager;
-
-    private PrincipalDatabaseManager _databaseManager;
-
-    private AuthenticationManager _authenticationManager;
-
     private MessageStore _messageStore;
+
     private VirtualHost _vHost;
 
     public TestApplicationRegistry()
@@ -92,11 +85,6 @@
         _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
     }
 
-    public Configuration getConfiguration()
-    {
-        return _configuration;
-    }
-
     public QueueRegistry getQueueRegistry()
     {
         return _queueRegistry;
@@ -112,21 +100,6 @@
         return _exchangeFactory;
     }
 
-    public ManagedObjectRegistry getManagedObjectRegistry()
-    {
-        return _managedObjectRegistry;
-    }
-
-    public PrincipalDatabaseManager getDatabaseManager()
-    {
-        return _databaseManager;
-    }
-
-    public AuthenticationManager getAuthenticationManager()
-    {
-        return _authenticationManager;
-    }
-
     public Collection<String> getVirtualHostNames()
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
@@ -137,11 +110,6 @@
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public ACLPlugin getAccessManager()
-    {
-        return _accessManager;
-    }
-
     public void setAccessManager(ACLPlugin newManager)
     {
         _accessManager = newManager;