You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/11/28 01:45:34 UTC

svn commit: r598834 - in /incubator/qpid/branches/M2.1.1/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/registry/ broker/src/main/java/org/apache/qpid...

Author: ritchiem
Date: Tue Nov 27 16:45:32 2007
New Revision: 598834

URL: http://svn.apache.org/viewvc?rev=598834&view=rev
Log:
QPID-679 : Patch provided by Aidan Skinner and additional from odd problems during test runs.
AMQChannel - Catch and log AMQException occuring when requeue()-ing. Previously exceptions wouldn't be caught at all. The requeue() is called during closure so there is nothing we can do protocol wise on error other than log the issue and continue with any other shutdown that is needed.
AMQMinaProtocolSession & AMQPFastProtocolHandler . Additions to catch and log AMQExceptions. Changes to AMQMinaProtocolSession were done to ignore all input on a closing session other than the close-ok. Previously only Protocol frames were ignored this resulted in Content*Body-s still being processed. Additional checks were made for the MessageStoreClosedException to log and continue. As said else were we need to seperate protocol exceptoions(AMQException) from internal code exception handling. Further All AMQExceptions occuring in the frameReceived method are now caught and logged. Allowing them to propogate higher will only result in thread death.
AMQPFastProtocolHandler Caught AMQExceptions occuring whilst closing the session. Again allowing these to continue will result in thread death. There is not a lot that can be done other than log the problem as the session is already closed by this point. Prevented the stacktrace associated with a session exception being printed in the exceptionCaught method when the problem was an IO Exception. This doesn't add anything useful and only adds to the log file sizes.
ApplicationRegistry - Added removeAll option which ensures that all ARs are correctly purged so that we can attempt to clean up between Unit Tests.
MemoryMessageStore - This was causing us real problems during the failover testing. Similar checks should probably be made to any other Message Store Impl. The issue was that when shutting down the broker the MS.close() method is called this sets all the storage to null. However, there may still be message processing going on as the close() does not attempt to stop connection processing. Hence we now check to see if the Store is close throwing a MSClosedException if required. This prevents NPEs that have been seen during Unit failover testing. In fact the close() is called as a request to shutdown the ApplicationRegistry, but this only occurs from tests and broker shutdown, no attempt to unbind or prevent further connections during this period is yet done.

CLIENT CHANGES
AMQConnection - Added method to check if failover is in progress.
AMQClient - Upgraded acknowledge() exception to JMSException for errors due to failover. Also , added call to update consumers as a result of failover.
BasicMessageConsumer - Changes to acquireReceiving to take in to consideration blocking for failover to occur. wrt receiveNoWait.. which previously blocked for failover to complete... not exactly noWait. acknowledge will now
TransportConnection - Update to ensure all inVM brokers are correctly killed.

FailoverTest - QPID-679 - Finder of all the above problems.

Added:
    incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java   (with props)
    incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/
    incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java   (with props)
Modified:
    incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
    incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java

Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Nov 27 16:45:32 2007
@@ -229,7 +229,7 @@
                 BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties;
                 //fixme: fudge for QPID-677
                 properties.getHeaders().keySet();
-                
+
                 properties.setUserId(protocolSession.getAuthorizedID().getName());
             }
 
@@ -381,7 +381,14 @@
     {
         _txnContext.rollback();
         unsubscribeAllConsumers(session);
-        requeue();
+        try
+        {
+            requeue();
+        }
+        catch (AMQException e)
+        {
+            _log.error("Caught AMQException whilst attempting to reque:" + e);        
+        }
 
         setClosing(true);
     }

Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue Nov 27 16:45:32 2007
@@ -33,12 +33,27 @@
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MainRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.VersionSpecificRegistry;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.store.MessageStoreClosedException;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -118,7 +133,7 @@
     }
 
     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
-        throws AMQException
+            throws AMQException
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _minaProtocolSession = session;
@@ -144,7 +159,7 @@
     }
 
     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
-        AMQStateManager stateManager) throws AMQException
+                                  AMQStateManager stateManager) throws AMQException
     {
         _stateManager = stateManager;
         _minaProtocolSession = session;
@@ -197,7 +212,7 @@
         }
     }
 
-    private void frameReceived(AMQFrame frame) throws AMQException
+    private void frameReceived(AMQFrame frame)
     {
         int channelId = frame.getChannel();
         AMQBody body = frame.getBodyFrame();
@@ -207,26 +222,57 @@
             _logger.debug("Frame Received: " + frame);
         }
 
-        if (body instanceof AMQMethodBody)
-        {
-            methodFrameReceived(channelId, (AMQMethodBody) body);
-        }
-        else if (body instanceof ContentHeaderBody)
-        {
-            contentHeaderReceived(channelId, (ContentHeaderBody) body);
-        }
-        else if (body instanceof ContentBody)
+        // Check that this channel is not closing
+        if (channelAwaitingClosure(channelId))
         {
-            contentBodyReceived(channelId, (ContentBody) body);
+            if (body instanceof ChannelCloseOkBody)
+            {
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+                }
+            }
+            else
+            {
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
+                }
+
+                return;
+            }
         }
-        else if (body instanceof HeartbeatBody)
+        try
         {
-            // NO OP
+            if (body instanceof AMQMethodBody)
+            {
+                methodFrameReceived(channelId, (AMQMethodBody) body);
+            }
+            else if (body instanceof ContentHeaderBody)
+            {
+                contentHeaderReceived(channelId, (ContentHeaderBody) body);
+            }
+            else if (body instanceof ContentBody)
+            {
+                contentBodyReceived(channelId, (ContentBody) body);
+            }
+            else if (body instanceof HeartbeatBody)
+            {
+                // NO OP
+            }
+            else
+            {
+                _logger.warn("Unrecognised frame " + frame.getClass().getName());
+            }
         }
-        else
+        catch (AMQException e)
         {
-            _logger.warn("Unrecognised frame " + frame.getClass().getName());
+            //This will occur if we receive Content*Body chunks during an 'inverse' shutdown.
+            // That is one where were the store shuts down before we can gracefully close connections.
+            // note: todo: Here we should send forced ConnectionClose frames.
+            _logger.error("AMQException occured whilst receiving Frame:" + e);
         }
+
     }
 
     private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -246,12 +292,12 @@
 
             // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
             AMQFrame response =
-                ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
-                    locales.getBytes(), // locales
-                    mechanisms.getBytes(), // mechanisms
-                    null, // serverProperties
-                    (short) getProtocolMajorVersion(), // versionMajor
-                    (short) getProtocolMinorVersion()); // versionMinor
+                    ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+                                                       locales.getBytes(), // locales
+                                                       mechanisms.getBytes(), // mechanisms
+                                                       null, // serverProperties
+                                                       (short) getProtocolMajorVersion(), // versionMajor
+                                                       (short) getProtocolMinorVersion()); // versionMinor
             _minaProtocolSession.write(response);
         }
         catch (AMQException e)
@@ -271,35 +317,12 @@
 
     private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
     {
-
         final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
 
-        // Check that this channel is not closing
-        if (channelAwaitingClosure(channelId))
-        {
-            if ((evt.getMethod() instanceof ChannelCloseOkBody))
-            {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
-                }
-            }
-            else
-            {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
-                }
-
-                return;
-            }
-        }
-
         try
         {
             try
             {
-
                 boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
 
                 if (!_frameListeners.isEmpty())
@@ -342,8 +365,8 @@
                     closeSession();
 
                     AMQConnectionException ce =
-                        evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
-                            AMQConstant.CHANNEL_ERROR.getName().toString());
+                            evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+                                                                   AMQConstant.CHANNEL_ERROR.getName().toString());
 
                     _stateManager.changeState(AMQState.CONNECTION_CLOSING);
                     writeFrame(ce.getCloseFrame(channelId));
@@ -363,23 +386,41 @@
         }
         catch (Exception e)
         {
+            //NOTE: Currently we throw AMQExceptions sub-classes that are not Protcol problems.
+            // These items should not cause the connection to close unless there is no other option.
+            //note; todo: This should cause the connection to close
+            if (e instanceof MessageStoreClosedException)
+            {
+               _logger.error("Message Store is closed so unable to perform action:" + e);
+                // This should really close the exception as mentioned below.
+                return;
+            }
+
+            //NOTE: TODO: While this is the responsible for closing the connection as a last resort the above section
+            // May have a problem closing channel ... This may be related to a connection fault but we should still
+            // attempt to send a connection close so that the connecion may be shutdown gracefully.
+
+            //Detect when needed and shutdown connection gracefully .. such as Logged MSCException above
+
+            // If an AMQException gets to here then there it should ONLY be donw to a protocol error
+            // from the above attempts to close the Connection.
+            // Notify any exceptions listeners before we just close the conneciton
             _stateManager.error(e);
             for (AMQMethodListener listener : _frameListeners)
             {
                 listener.error(e);
             }
 
+            // This is the last resort
             _minaProtocolSession.close();
         }
     }
 
     private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
     {
-
         AMQChannel channel = getAndAssertChannel(channelId);
 
         channel.publishContentHeader(body, this);
-
     }
 
     private void contentBodyReceived(int channelId, ContentBody body) throws AMQException
@@ -430,7 +471,7 @@
     public AMQChannel getChannel(int channelId) throws AMQException
     {
         final AMQChannel channel =
-            ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+                ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
         if ((channel == null) || channel.isClosing())
         {
             return null;
@@ -463,8 +504,8 @@
         if (_channelMap.size() == _maxNoOfChannels)
         {
             String errorMessage =
-                toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
-                + "); can't create channel";
+                    toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+                    + "); can't create channel";
             _logger.error(errorMessage);
             throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
         }

Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Tue Nov 27 16:45:32 2007
@@ -171,7 +171,14 @@
         //fixme  -- this can be null
         if (amqProtocolSession != null)
         {
-            amqProtocolSession.closeSession();
+            try
+            {
+                amqProtocolSession.closeSession();
+            }
+            catch (AMQException e)
+            {
+                _logger.error("Caught AMQException whilst closingSession:" + e);
+            }
         }
     }
 
@@ -205,7 +212,7 @@
         }
         else if (throwable instanceof IOException)
         {
-            _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
+            _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable);
         }
         else
         {

Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Tue Nov 27 16:45:32 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,15 +20,14 @@
  */
 package org.apache.qpid.server.registry;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * An abstract application registry that provides access to configuration information and handles the
  * construction and caching of configurable objects.
@@ -59,24 +58,7 @@
         public void run()
         {
             _logger.info("Shutting down application registries...");
-            try
-            {
-                synchronized (ApplicationRegistry.class)
-                {
-                    Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator();
-
-                    while (keyIterator.hasNext())
-                    {
-                        IApplicationRegistry instance = keyIterator.next();
-
-                        instance.close();
-                    }
-                }
-            }
-            catch (Exception e)
-            {
-                _logger.error("Error shutting down message store: " + e, e);
-            }
+            removeAll();
         }
     }
 
@@ -116,6 +98,7 @@
         }
         catch (Exception e)
         {
+            _logger.error("Error shutting down message store: " + e, e);
 
         }
         finally
@@ -124,6 +107,14 @@
         }
     }
 
+    public static void removeAll()
+    {
+        Object[] keys = _instanceMap.keySet().toArray();
+        for (Object k : keys)
+        {
+            remove((Integer) k);
+        }
+    }
 
     protected ApplicationRegistry(Configuration configuration)
     {
@@ -154,7 +145,7 @@
                 catch (Exception e)
                 {
                     _logger.error("Error configuring application: " + e, e);
-                //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID);
+                    //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID);
                     throw new RuntimeException("Unable to create Application Registry", e);
                 }
             }
@@ -167,7 +158,7 @@
 
     public void close() throws Exception
     {
-        for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
+        for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
         {
             virtualHost.close();
         }
@@ -204,7 +195,6 @@
         return instance;
     }
 
-    
 
     public static void setDefaultApplicationRegistry(String clazz)
     {

Modified: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Tue Nov 27 16:45:32 2007
@@ -20,27 +20,26 @@
  */
 package org.apache.qpid.server.store;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
 
-/**
- * A simple message store that stores the messages in a threadsafe structure in memory.
- */
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A simple message store that stores the messages in a threadsafe structure in memory. */
 public class MemoryMessageStore implements MessageStore
 {
     private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -54,6 +53,7 @@
     protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
 
     private final AtomicLong _messageId = new AtomicLong(1);
+    private AtomicBoolean _closed = new AtomicBoolean(false);
 
     public void configure()
     {
@@ -77,6 +77,7 @@
 
     public void close() throws Exception
     {
+        _closed.getAndSet(true);
         if (_metaDataMap != null)
         {
             _metaDataMap.clear();
@@ -89,8 +90,9 @@
         }
     }
 
-    public void removeMessage(StoreContext context, Long messageId)
+    public void removeMessage(StoreContext context, Long messageId) throws AMQException
     {
+        checkNotClosed();
         if (_log.isDebugEnabled())
         {
             _log.debug("Removing message with id " + messageId);
@@ -172,9 +174,10 @@
     public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
             throws AMQException
     {
+        checkNotClosed();
         List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
 
-        if(bodyList == null && lastContentBody)
+        if (bodyList == null && lastContentBody)
         {
             _contentBodyMap.put(messageId, Collections.singletonList(contentBody));
         }
@@ -193,17 +196,28 @@
     public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
             throws AMQException
     {
+        checkNotClosed();
         _metaDataMap.put(messageId, messageMetaData);
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
     {
+        checkNotClosed();
         return _metaDataMap.get(messageId);
     }
 
     public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
     {
+        checkNotClosed();
         List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
         return bodyList.get(index);
+    }
+
+     private void checkNotClosed() throws MessageStoreClosedException
+     {
+        if (_closed.get())
+        {
+            throw new MessageStoreClosedException();
+        }
     }
 }

Added: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java?rev=598834&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java (added)
+++ incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java Tue Nov 27 16:45:32 2007
@@ -0,0 +1,36 @@
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.AMQException;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * NOTE: this class currently extends AMQException but
+ * we should be using AMQExceptions internally in the code base for Protocol errors hence
+ * the message store interface should throw a different super class which this should be
+ * moved to reflect
+ */
+public class MessageStoreClosedException extends AMQException
+{
+    public MessageStoreClosedException()
+    {
+        super("Message store closed");
+    }
+}

Propchange: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Nov 27 16:45:32 2007
@@ -1286,4 +1286,9 @@
     {
         return _sessions.get(channelId);
     }
+
+    public boolean isFailingOver()
+    {
+        return (_protocolHandler.getFailoverLatch() != null);
+    }
 }

Modified: incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Nov 27 16:45:32 2007
@@ -420,7 +420,7 @@
      *
      * @throws IllegalStateException If the session is closed.
      */
-    public void acknowledge() throws IllegalStateException
+    public void acknowledge() throws JMSException
     {
         if (isClosed())
         {
@@ -2510,6 +2510,7 @@
         for (Iterator it = consumers.iterator(); it.hasNext();)
         {
             BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+            consumer.failedOver();
             registerConsumer(consumer, true);
         }
     }

Modified: incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Nov 27 16:45:32 2007
@@ -33,14 +33,12 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
-
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -277,8 +275,28 @@
         _session.setInRecovery(false);
     }
 
-    private void acquireReceiving() throws JMSException
+    /**
+     * @param immediate if true then return immediately if the connection is failing over
+     *
+     * @return boolean if the acquisition was successful
+     *
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
     {
+        if (_connection.isFailingOver())
+        {
+            if (immediate)
+            {
+                return false;
+            }
+            else
+            {
+                _connection.blockUntilNotFailingOver();
+            }
+        }
+
         if (!_receiving.compareAndSet(false, true))
         {
             throw new javax.jms.IllegalStateException("Another thread is already receiving.");
@@ -290,6 +308,7 @@
         }
 
         _receivingThread = Thread.currentThread();
+        return true;
     }
 
     private void releaseReceiving()
@@ -343,7 +362,18 @@
 
         checkPreConditions();
 
-        acquireReceiving();
+        try
+        {
+            acquireReceiving(false);
+        }
+        catch (InterruptedException e)
+        {
+            _logger.warn("Interrupted: " + e);
+            if (isClosed())
+            {
+                return null;
+            }
+        }
 
         _session.startDistpatcherIfNecessary();
 
@@ -424,7 +454,25 @@
     {
         checkPreConditions();
 
-        acquireReceiving();
+        try
+        {
+            if (!acquireReceiving(true))
+            {
+                //If we couldn't acquire the receiving thread then return null.
+                // This will occur if failing over.
+                return null;
+            }
+        }
+        catch (InterruptedException e)
+        {
+            /*
+             *  This seems slightly shoddy but should never actually be executed
+             *  since we told acquireReceiving to return immediately and it shouldn't
+             *  block on anything.
+             */
+
+            return null;
+        }
 
         _session.startDistpatcherIfNecessary();
 
@@ -868,11 +916,18 @@
         }
     }
 
-    public void acknowledge() // throws JMSException
+    public void acknowledge() throws JMSException
     {
-        if (!isClosed())
+        if (isClosed())
+        {
+            throw new IllegalStateException("Consumer is closed");
+        }
+        else if (_session.hasFailedOver())
+        {
+            throw new JMSException("has failed over");
+        }
+        else
         {
-
             Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
             while (tags.hasNext())
             {
@@ -880,10 +935,6 @@
                 tags.remove();
             }
         }
-        else
-        {
-            throw new IllegalStateException("Consumer is closed");
-        }
     }
 
     /** Called on recovery to reset the list of delivery tags */
@@ -1021,5 +1072,12 @@
     public void clearReceiveQueue()
     {
         _synchronousQueue.clear();
+    }
+
+    /** to be called when a failover has occured */
+    public void failedOver()
+    {
+        clearReceiveQueue();
+        clearUnackedMessages();
     }
 }

Modified: incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Tue Nov 27 16:45:32 2007
@@ -35,7 +35,6 @@
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -99,8 +98,8 @@
                         if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio"))
                         {
                             _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
-                                                                 ? "Qpid NIO is new default"
-                                                                 : "Sysproperty 'qpidnio' is set"));
+                                                                              ? "Qpid NIO is new default"
+                                                                              : "Sysproperty 'qpidnio' is set"));
                             result = new MultiThreadSocketConnector();
                         }
                         else
@@ -277,8 +276,7 @@
             }
 
             AMQVMBrokerCreationException amqbce =
-                    new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
-            amqbce.initCause(e);
+                    new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e);
             throw amqbce;
         }
 
@@ -291,14 +289,11 @@
         _acceptor.unbindAll();
         synchronized (_inVmPipeAddress)
         {
-            Iterator keys = _inVmPipeAddress.keySet().iterator();
-
-            while (keys.hasNext())
-            {
-                int id = (Integer) keys.next();
-                _inVmPipeAddress.remove(id);
-            }
-        }
+            _inVmPipeAddress.clear();
+        }        
+        _acceptor = null;
+        _currentInstance = -1;
+        _currentVMPort = -1;
     }
 
     public static void killVMBroker(int port)

Added: incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java?rev=598834&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java (added)
+++ incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java Tue Nov 27 16:45:32 2007
@@ -0,0 +1,222 @@
+package org.apache.qpid.test.client.failover;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.log4j.Logger;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+
+public class FailoverTest extends TestCase implements ConnectionListener
+{
+    private static final Logger _logger = Logger.getLogger(FailoverTest.class);
+
+    private static final int NUM_BROKERS = 2;
+    private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'";
+    private static final String QUEUE = "queue";
+    private static final int NUM_MESSAGES = 10;
+    private Connection con;
+    private AMQConnectionFactory conFactory;
+    private Session prodSess;
+    private AMQQueue q;
+    private MessageProducer prod;
+    private Session conSess;
+    private MessageConsumer consumer;
+
+    private static int usedBrokers = 0;
+    private CountDownLatch failoverComplete;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        // Create two VM brokers
+
+        for (int i = 0; i < NUM_BROKERS; i++)
+        {
+            usedBrokers++;
+
+            TransportConnection.createVMBroker(usedBrokers);
+        }
+        //undo last addition
+
+        conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers));
+        _logger.info("Connecting on:" + conFactory.getConnectionURL());
+        con = conFactory.createConnection();
+        ((AMQConnection) con).setConnectionListener(this);
+        con.start();
+        failoverComplete = new CountDownLatch(1);
+    }
+
+    private void init(boolean transacted, int mode) throws JMSException
+    {
+        prodSess = con.createSession(transacted, mode);
+        q = new AMQQueue("amq.direct", QUEUE);
+        prod = prodSess.createProducer(q);
+        conSess = con.createSession(transacted, mode);
+        consumer = conSess.createConsumer(q);
+    }
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            con.close();
+        }
+        catch (Exception e)
+        {
+
+        }
+
+        try
+        {
+            TransportConnection.killAllVMBrokers();
+            ApplicationRegistry.removeAll();
+        }
+        catch (Exception e)
+        {
+            fail("Unable to clean up");
+        }
+        super.tearDown();
+    }
+
+    private void consumeMessages(int toConsume) throws JMSException
+    {
+        Message msg;
+        for (int i = 0; i < toConsume; i++)
+        {
+            msg = consumer.receive(1000);
+            assertNotNull("Message " + i + " was null!", msg);
+            assertEquals("message " + i, ((TextMessage) msg).getText());
+        }
+    }
+
+    private void sendMessages(int totalMessages) throws JMSException
+    {
+        for (int i = 0; i < totalMessages; i++)
+        {
+            prod.send(prodSess.createTextMessage("message " + i));
+        }
+
+//        try
+//        {
+//            Thread.sleep(100 * totalMessages);
+//        }
+//        catch (InterruptedException e)
+//        {
+//            //evil ignoring of IE
+//        }
+    }
+
+    public void testP2PFailover() throws Exception
+    {
+        testP2PFailover(NUM_MESSAGES, true);
+    }
+
+    public void testP2PFailoverWithMessagesLeft() throws Exception
+    {
+        testP2PFailover(NUM_MESSAGES, false);
+    }
+
+    private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException
+    {
+        Message msg = null;
+        init(false, Session.AUTO_ACKNOWLEDGE);
+        sendMessages(totalMessages);
+
+        // Consume some messages
+        int toConsume = totalMessages;
+        if (!consumeAll)
+        {
+            toConsume = totalMessages / 2;
+        }
+
+        consumeMessages(toConsume);
+
+        _logger.info("Failing over");
+
+        causeFailure();
+
+        msg = consumer.receive(500);
+        //todo: reinstate
+        assertNull("Should not have received message from new broker!", msg);
+        // Check that messages still sent / received
+        sendMessages(totalMessages);
+        consumeMessages(totalMessages);
+    }
+
+    private void causeFailure()
+    {
+        _logger.info("Failover");
+
+        TransportConnection.killVMBroker(usedBrokers - 1);
+        ApplicationRegistry.remove(usedBrokers - 1);
+
+        _logger.info("Awaiting Failover completion");
+        try
+        {
+            failoverComplete.await();
+        }
+        catch (InterruptedException e)
+        {
+            //evil ignore IE.
+        }
+    }
+
+    public void testClientAckFailover() throws Exception
+    {
+        init(false, Session.CLIENT_ACKNOWLEDGE);
+        sendMessages(1);
+        Message msg = consumer.receive();
+        assertNotNull("Expected msgs not received", msg);
+
+
+        causeFailure();
+
+        Exception failure = null;
+        try
+        {
+            msg.acknowledge();
+        }
+        catch (Exception e)
+        {
+            failure = e;
+        }
+        assertNotNull("Exception should be thrown", failure);
+    }
+
+    public void bytesSent(long count)
+    {
+    }
+
+    public void bytesReceived(long count)
+    {
+    }
+
+    public boolean preFailover(boolean redirect)
+    {
+        return true;
+    }
+
+    public boolean preResubscribe()
+    {
+        return true;
+    }
+
+    public void failoverComplete()
+    {
+        failoverComplete.countDown();
+    }
+}

Propchange: incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date