You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/19 12:55:52 UTC

svn commit: r509172 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/protocol/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/clien...

Author: ritchiem
Date: Mon Feb 19 03:55:47 2007
New Revision: 509172

URL: http://svn.apache.org/viewvc?view=rev&rev=509172
Log:
QPID-372, QPID-376 Broker now ignores all frames for closing channels. 

When a close-ok is received the channel can be reopened and used
All uses of getChannel check the return type is not null and throw a NOT_FOUND AMQException. If the channel is not found during a method handler then the Channel will be closed.

ChannelCloseHandler - Now throws a connection exception if trying to close a a non exisitant channel.
AMQMinaProtocolSession - Added pre-check for closing channels to ignore all but Close-OK methods
 - Updated ChannelException method to close connection if the CE was a result of not having a valid channel.
 - Changed state to CLOSING when writing out a connection close frame.
AMQConnection - Wrapped all _logging calls , Updated comment formatting
AMQSession - called startDispatcherIfRequired when receiving a message as without it a producer will not get a returned message. This is because there is no consumer setup to consume.
ConnectionCloseMethodHandler - Wrapped code in try finally so that the protocol session would always be closed correctly.
AMQStateManager - Added state to the logging values
Modified AMQTimeoutException to include a new constant value to identify the failure reason.
AMQConstant - Added 408 REQUEST_TIMEOUT fixed error with NOT_ALLOWED value was 530 should be 507.

Added:
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=509172&r1=509171&r2=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Mon Feb 19 03:55:47 2007
@@ -26,9 +26,11 @@
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
 
 public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
 {
@@ -51,11 +53,21 @@
         ChannelCloseBody body = evt.getMethod();
         _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
                      " and method " + body.methodId);
-        session.closeChannel(evt.getChannelId());
+        int channelId = evt.getChannelId();
+
+        AMQChannel channel = session.getChannel(channelId);
+
+        if (channel == null)
+        {
+            throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
+        }
+
+        session.closeChannel(channelId);
+
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
+        AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
         session.writeFrame(response);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=509172&r1=509171&r2=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Feb 19 03:55:47 2007
@@ -56,9 +56,11 @@
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersionList;
 import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
@@ -254,12 +256,36 @@
         {
             _logger.debug("Method frame received: " + frame);
         }
+
         final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(),
                                                                                     (AMQMethodBody) frame.getBodyFrame());
+
+        //Check that this channel is not closing
+        if (channelAwaitingClosure(frame.getChannel()))
+        {
+            if ((evt.getMethod() instanceof ChannelCloseOkBody))
+            {
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok");
+                }
+            }
+            else
+            {
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring");
+                }
+                return;
+            }
+        }
+
+
         try
         {
             try
             {
+
                 boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
 
                 if (!_frameListeners.isEmpty())
@@ -277,14 +303,42 @@
             }
             catch (AMQChannelException e)
             {
-                _logger.error("Closing channel due to: " + e.getMessage());
-                writeFrame(e.getCloseFrame(frame.getChannel()));
-                closeChannel(frame.getChannel());
+                if (getChannel(frame.getChannel()) != null)
+                {
+                    if (_logger.isInfoEnabled())
+                    {
+                        _logger.info("Closing channel due to: " + e.getMessage());
+                    }
+                    writeFrame(e.getCloseFrame(frame.getChannel()));
+                    closeChannel(frame.getChannel());
+                }
+                else
+                {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
+                    }
+                    if (_logger.isInfoEnabled())
+                    {
+                        _logger.info("Closing connection due to: " + e.getMessage());
+                    }
+                    closeSession();
+
+                    AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+                                                                                       AMQConstant.CHANNEL_ERROR.getName().toString());
+
+                    _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+                    writeFrame(ce.getCloseFrame(frame.getChannel()));
+                }
             }
             catch (AMQConnectionException e)
             {
-                _logger.error("Closing connection due to: " + e.getMessage());
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Closing connection due to: " + e.getMessage());
+                }
                 closeSession();
+                _stateManager.changeState(AMQState.CONNECTION_CLOSING);
                 writeFrame(e.getCloseFrame(frame.getChannel()));
             }
         }
@@ -325,8 +379,17 @@
         {
             _logger.debug("Content header frame received: " + frame);
         }
-        //fixme what happens if getChannel returns null
-        getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
+
+        AMQChannel channel = getChannel(frame.getChannel());
+
+        if (channel == null)
+        {
+            throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
+        }
+        else
+        {
+            channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
+        }
     }
 
     private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -335,8 +398,16 @@
         {
             _logger.debug("Content body frame received: " + frame);
         }
-        //fixme what happens if getChannel returns null
-        getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this);
+        AMQChannel channel = getChannel(frame.getChannel());
+
+        if (channel == null)
+        {
+            throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
+        }
+        else
+        {
+            channel.publishContentBody((ContentBody) frame.getBodyFrame(), this);
+        }
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=509172&r1=509171&r2=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Feb 19 03:55:47 2007
@@ -70,20 +70,18 @@
     private AtomicInteger _idFactory = new AtomicInteger(0);
 
     /**
-     * This is the "root" mutex that must be held when doing anything that could be impacted by failover.
-     * This must be held by any child objects of this connection such as the session, producers and consumers.
+     * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
+     * held by any child objects of this connection such as the session, producers and consumers.
      */
     private final Object _failoverMutex = new Object();
 
     /**
-     * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels
-     * per session and we must prevent the client from opening too many. Zero means unlimited.
+     * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
+     * and we must prevent the client from opening too many. Zero means unlimited.
      */
     private long _maximumChannelCount;
 
-    /**
-     * The maximum size of frame supported by the server
-     */
+    /** The maximum size of frame supported by the server */
     private long _maximumFrameSize;
 
     /**
@@ -93,26 +91,18 @@
      */
     private AMQProtocolHandler _protocolHandler;
 
-    /**
-     * Maps from session id (Integer) to AMQSession instance
-     */
+    /** Maps from session id (Integer) to AMQSession instance */
     private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap    
 
     private String _clientName;
 
-    /**
-     * The user name to use for authentication
-     */
+    /** The user name to use for authentication */
     private String _username;
 
-    /**
-     * The password to use for authentication
-     */
+    /** The password to use for authentication */
     private String _password;
 
-    /**
-     * The virtual path to connect to on the AMQ server
-     */
+    /** The virtual path to connect to on the AMQ server */
     private String _virtualHost;
 
     private ExceptionListener _exceptionListener;
@@ -122,14 +112,12 @@
     private ConnectionURL _connectionURL;
 
     /**
-     * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for
-     * message publication.
+     * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
+     * publication.
      */
     private boolean _started;
 
-    /**
-     * Policy dictating how to failover
-     */
+    /** Policy dictating how to failover */
     private FailoverPolicy _failoverPolicy;
 
     /*
@@ -148,9 +136,7 @@
      */
     private QpidConnectionMetaData _connectionMetaData;
 
-    /**
-     * Configuration info for SSL
-     */
+    /** Configuration info for SSL */
     private SSLConfiguration _sslConfiguration;
 
     private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
@@ -164,6 +150,7 @@
      * @param password    password
      * @param clientName  clientid
      * @param virtualHost virtualhost
+     *
      * @throws AMQException
      * @throws URLSyntaxException
      */
@@ -182,6 +169,7 @@
      * @param password    password
      * @param clientName  clientid
      * @param virtualHost virtualhost
+     *
      * @throws AMQException
      * @throws URLSyntaxException
      */
@@ -238,7 +226,10 @@
 
     public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
     {
-        _logger.info("Connection:" + connectionURL);
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Connection:" + connectionURL);
+        }
         _sslConfiguration = sslConfig;
         if (connectionURL == null)
         {
@@ -297,11 +288,17 @@
             {
                 lastException = e;
 
-                _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+                }
             }
         }
 
-        _logger.debug("Are we connected:" + _connected);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Are we connected:" + _connected);
+        }
 
         if (!_connected)
         {
@@ -402,7 +399,10 @@
         }
         catch (Exception e)
         {
-            _logger.info("Unable to connect to broker at " + bd);
+            if (_logger.isInfoEnabled())
+            {
+                _logger.info("Unable to connect to broker at " + bd);
+            }
             attemptReconnection();
         }
         return false;
@@ -421,11 +421,17 @@
             {
                 if (!(e instanceof AMQException))
                 {
-                    _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
+                    if (_logger.isInfoEnabled())
+                    {
+                        _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
+                    }
                 }
                 else
                 {
-                    _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+                    if (_logger.isInfoEnabled())
+                    {
+                        _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+                    }
                 }
             }
         }
@@ -437,8 +443,8 @@
     /**
      * Get the details of the currently active broker
      *
-     * @return null if no broker is active (i.e. no successful connection has been made, or
-     *         the BrokerDetail instance otherwise
+     * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
+     *         otherwise
      */
     public BrokerDetails getActiveBrokerDetails()
     {
@@ -593,12 +599,14 @@
     }
 
     /**
-     * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions
-     * where specified in the JMS spec
+     * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
+     * the JMS spec
      *
      * @param transacted
      * @param acknowledgeMode
+     *
      * @return QueueSession
+     *
      * @throws JMSException
      */
     public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
@@ -607,12 +615,14 @@
     }
 
     /**
-     * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions
-     * where specified in the JMS spec
+     * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
+     * the JMS spec
      *
      * @param transacted
      * @param acknowledgeMode
+     *
      * @return TopicSession
+     *
      * @throws JMSException
      */
     public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
@@ -718,11 +728,9 @@
     }
 
     /**
-     * Marks all sessions and their children as closed without sending any protocol messages. Useful when
-     * you need to mark objects "visible" in userland as closed after failover or other significant event that
-     * impacts the connection.
-     * <p/>
-     * The caller must hold the failover mutex before calling this method.
+     * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
+     * mark objects "visible" in userland as closed after failover or other significant event that impacts the
+     * connection. <p/> The caller must hold the failover mutex before calling this method.
      */
     private void markAllSessionsClosed()
     {
@@ -740,9 +748,8 @@
     /**
      * Close all the sessions, either due to normal connection closure or due to an error occurring.
      *
-     * @param cause if not null, the error that is causing this shutdown
-     *              <p/>
-     *              The caller must hold the failover mutex before calling this method.
+     * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
+     *              before calling this method.
      */
     private void closeAllSessions(Throwable cause, long timeout) throws JMSException
     {
@@ -891,6 +898,7 @@
      * Fire the preFailover event to the registered connection listener (if any)
      *
      * @param redirect true if this is the result of a redirect request rather than a connection error
+     *
      * @return true if no listener or listener does not veto change
      */
     public boolean firePreFailover(boolean redirect)
@@ -904,10 +912,11 @@
     }
 
     /**
-     * Fire the preResubscribe event to the registered connection listener (if any). If the listener
-     * vetoes resubscription then all the sessions are closed.
+     * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
+     * resubscription then all the sessions are closed.
      *
      * @return true if no listener or listener does not veto resubscription.
+     *
      * @throws JMSException
      */
     public boolean firePreResubscribe() throws JMSException
@@ -927,9 +936,7 @@
         }
     }
 
-    /**
-     * Fires a failover complete event to the registered connection listener (if any).
-     */
+    /** Fires a failover complete event to the registered connection listener (if any). */
     public void fireFailoverComplete()
     {
         if (_connectionListener != null)
@@ -939,8 +946,8 @@
     }
 
     /**
-     * In order to protect the consistency of the connection and its child sessions, consumers and producers,
-     * the "failover mutex" must be held when doing any operations that could be corrupted during failover.
+     * In order to protect the consistency of the connection and its child sessions, consumers and producers, the
+     * "failover mutex" must be held when doing any operations that could be corrupted during failover.
      *
      * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
      */
@@ -950,8 +957,8 @@
     }
 
     /**
-     * If failover is taking place this will block until it has completed. If failover
-     * is not taking place it will return immediately.
+     * If failover is taking place this will block until it has completed. If failover is not taking place it will
+     * return immediately.
      *
      * @throws InterruptedException
      */
@@ -961,18 +968,19 @@
     }
 
     /**
-     * Invoked by the AMQProtocolSession when a protocol session exception has occurred.
-     * This method sends the exception to a JMS exception listener, if configured, and
-     * propagates the exception to sessions, which in turn will propagate to consumers.
-     * This allows synchronous consumers to have exceptions thrown to them.
+     * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
+     * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
+     * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
      *
      * @param cause the exception
      */
     public void exceptionReceived(Throwable cause)
     {
 
-        _logger.debug("Connection Close done by:" + Thread.currentThread().getName());
-        _logger.debug("exceptionReceived is ", cause);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
+        }
 
         final JMSException je;
         if (cause instanceof JMSException)
@@ -1012,7 +1020,10 @@
         {
             try
             {
-                _logger.info("Closing AMQConnection due to :" + cause.getMessage());
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Closing AMQConnection due to :" + cause.getMessage());
+                }
                 _closed.set(true);
                 closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
             }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=509172&r1=509171&r2=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Feb 19 03:55:47 2007
@@ -1629,8 +1629,12 @@
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Message received in session with channel id " + _channelId);
+            _logger.debug("Message[" + (message.getDeliverBody() == null ?
+                                        "B:" + message.getBounceBody() : "D:" + message.getDeliverBody())
+                          + "] received in session with channel id " + _channelId);
         }
+
+        startDistpatcherIfNecessary();
 
         _queue.add(message);
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?view=diff&rev=509172&r1=509171&r2=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Mon Feb 19 03:55:47 2007
@@ -60,36 +60,41 @@
         AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
         AMQShortString reason = method.replyText;
 
-        // TODO: check whether channel id of zero is appropriate
-        // Be aware of possible changes to parameter order as versions change.
-        protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, method.getMajor(), method.getMinor()));
-
-        if (errorCode != AMQConstant.REPLY_SUCCESS)
+        try
         {
-            if(errorCode == AMQConstant.NOT_ALLOWED)
+            // TODO: check whether channel id of zero is appropriate
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(), method.getMinor()));
+
+            if (errorCode != AMQConstant.REPLY_SUCCESS)
             {
-                _logger.info("Authentication Error:"+Thread.currentThread().getName());
+                if (errorCode == AMQConstant.NOT_ALLOWED)
+                {
+                    _logger.info("Authentication Error:" + Thread.currentThread().getName());
 
-                protocolSession.closeProtocolSession();
+                    protocolSession.closeProtocolSession();
 
-                 //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
-                 stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
+                    //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
+                    stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
 
-                throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString());
-            }
-            else
-            {
-                _logger.info("Connection close received with error code " + errorCode);
+                    throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString());
+                }
+                else
+                {
+                    _logger.info("Connection close received with error code " + errorCode);
 
 
-                throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+                    throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+                }
             }
         }
+        finally
+        {
+            // this actually closes the connection in the case where it is not an error.
 
-        // this actually closes the connection in the case where it is not an error.
-
-        protocolSession.closeProtocolSession();
+            protocolSession.closeProtocolSession();
 
-        stateManager.changeState(AMQState.CONNECTION_CLOSED);
+            stateManager.changeState(AMQState.CONNECTION_CLOSED);
+        }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=509172&r1=509171&r2=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Mon Feb 19 03:55:47 2007
@@ -59,25 +59,22 @@
 import org.apache.qpid.protocol.AMQMethodListener;
 
 /**
- * The state manager is responsible for managing the state of the protocol session.
- * <p/>
- * For each AMQProtocolHandler there is a separate state manager.
+ * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
+ * there is a separate state manager.
  */
 public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
     private AMQProtocolSession _protocolSession;
 
-    /**
-     * The current state
-     */
+    /** The current state */
     private AMQState _currentState;
 
     /**
-     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
-     * The class must be a subclass of AMQFrame.
+     * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
+     * AMQFrame.
      */
-    private final Map _state2HandlersMap = new HashMap();
+    protected final Map _state2HandlersMap = new HashMap();
 
     private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
     private final Object _stateLock = new Object();
@@ -87,7 +84,7 @@
     {
         this(null);
     }
-    
+
 
     public AMQStateManager(AMQProtocolSession protocolSession)
     {
@@ -98,7 +95,7 @@
     {
         _protocolSession = protocolSession;
         _currentState = state;
-        if(register)
+        if (register)
         {
             registerListeners();
         }
@@ -194,7 +191,7 @@
         final Class clazz = frame.getClass();
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Looking for state transition handler for frame " + clazz);
+            _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz);
         }
         final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState);
 
@@ -228,12 +225,12 @@
 
     public void attainState(final AMQState s) throws AMQException
     {
-        synchronized(_stateLock)
+        synchronized (_stateLock)
         {
             final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
             long waitTime = MAXIMUM_STATE_WAIT_TIME;
 
-            while(_currentState != s && waitTime > 0)
+            while (_currentState != s && waitTime > 0)
             {
                 try
                 {
@@ -243,12 +240,12 @@
                 {
                     _logger.warn("Thread interrupted");
                 }
-                if(_currentState != s)
+                if (_currentState != s)
                 {
                     waitTime = waitUntilTime - System.currentTimeMillis();
                 }
             }
-            if(_currentState != s)
+            if (_currentState != s)
             {
                 _logger.warn("State not achieved within permitted time.  Current state " + _currentState + ", desired state: " + s);
                 throw new AMQException("State not achieved within permitted time.  Current state " + _currentState + ", desired state: " + s);

Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?view=auto&rev=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java Mon Feb 19 03:55:47 2007
@@ -0,0 +1,400 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import junit.framework.TestCase;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import javax.jms.JMSException;
+import javax.jms.ExceptionListener;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.Queue;
+
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.log4j.Logger;
+
+public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener
+{
+    private static final Logger _logger = Logger.getLogger(ChannelCloseTest.class);
+
+    Connection _connection;
+    private String _brokerlist = "vm://:1";
+    private Session _session;
+    private static final long SYNC_TIMEOUT = 500;
+    private int TEST = 0;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+    }
+
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+        TransportConnection.killAllVMBrokers();
+    }
+
+
+    /*
+          close channel, use chanel with same id ensure error.
+       */
+    public void testReusingChannelAfterFullClosure()
+    {
+        _connection = newConnection();
+
+        //Create Producer
+        try
+        {
+            _connection.start();
+
+            createChannelAndTest(1);
+
+            // Cause it to close
+            try
+            {
+                _logger.info("Testing invalid exchange");
+                declareExchange(1, "", "name_that_will_lookup_to_null", false);
+                fail("Exchange name is empty so this should fail ");
+            }
+            catch (AMQException e)
+            {
+                assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
+            }
+
+            // Check that
+            try
+            {
+                _logger.info("Testing valid exchange should fail");
+                declareExchange(1, "topic", "amq.topic", false);
+                fail("This should not succeed as the channel should be closed ");
+            }
+            catch (AMQException e)
+            {
+                assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
+
+                _connection = newConnection();
+            }
+
+            checkSendingMessage();
+
+            _session.close();
+            _connection.close();
+
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    /*
+    close channel and send guff then send ok no errors
+     */
+    public void testSendingMethodsAfterClose()
+    {
+        try
+        {
+            _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='"
+                                            + _brokerlist + "'");
+
+            ((AMQConnection) _connection).setConnectionListener(this);
+
+
+            _connection.setExceptionListener(this);
+
+            //Change the StateManager for one that doesn't respond with Close-OKs
+            AMQStateManager oldStateManager = ((AMQConnection) _connection).getProtocolHandler().getStateManager();
+
+            _session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            _connection.start();
+
+            //Test connection
+            checkSendingMessage();
+
+            //Set StateManager to manager that ignores Close-oks
+            AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
+            AMQStateManager newStateManager = new TestCloseStateManager(protocolSession);
+            newStateManager.changeState(oldStateManager.getCurrentState());
+
+            ((AMQConnection) _connection).getProtocolHandler().setStateManager(newStateManager);
+
+            final int TEST_CHANNEL = 1;
+            _logger.info("Testing Channel(" + TEST_CHANNEL + ") Creation");
+
+            createChannelAndTest(TEST_CHANNEL);
+
+            // Cause it to close
+            try
+            {
+                _logger.info("Closing Channel - invalid exchange");
+                declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false);
+                fail("Exchange name is empty so this should fail ");
+            }
+            catch (AMQException e)
+            {
+                assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
+            }
+
+            try
+            {
+                // Send other methods that should be ignored
+                // send them no wait as server will ignore them
+                _logger.info("Tested known exchange - should ignore");
+                declareExchange(TEST_CHANNEL, "topic", "amq.topic", true);
+
+                _logger.info("Tested known invalid exchange - should ignore");
+                declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true);
+
+                _logger.info("Tested known invalid exchange - should ignore");
+                declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true);
+
+                // Send sync .. server will igore and timy oue
+                _logger.info("Tested known invalid exchange - should ignore");
+                declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false);
+            }
+            catch (AMQTimeoutException te)
+            {
+                assertEquals("Request should timeout", AMQConstant.REQUEST_TIMEOUT, te.getErrorCode());
+            }
+            catch (AMQException e)
+            {
+                fail("This should not fail as all requests should be ignored");
+            }
+
+            _logger.info("Sending Close");
+            // Send Close-ok
+            sendClose(TEST_CHANNEL);
+
+            _logger.info("Re-opening channel");
+
+            createChannelAndTest(TEST_CHANNEL);
+
+            //Test connection is still ok
+
+            checkSendingMessage();
+
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+
+        }
+        catch (URLSyntaxException e)
+        {
+            fail(e.getMessage());
+        }
+        finally
+        {
+            try
+            {
+                _session.close();
+                _connection.close();
+            }
+            catch (JMSException e)
+            {
+                e.printStackTrace();
+                fail(e.getMessage());
+            }
+        }
+    }
+
+    private void createChannelAndTest(int channel)
+    {
+        //Create A channel
+        try
+        {
+            createChannel(channel);
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+
+        // Test it is ok
+        try
+        {
+            declareExchange(channel, "topic", "amq.topic", false);
+            _logger.info("Tested known exchange");
+        }
+        catch (AMQException e)
+        {
+            fail("This should not fail as this is the default exchange details");
+        }
+    }
+
+    private void sendClose(int channel)
+    {
+        AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel,
+                                                           ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+                                                           ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
+
+        ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
+    }
+
+
+    private void checkSendingMessage() throws JMSException
+    {
+        TEST++;
+        _logger.info("Test creating producer which will use channel id 1");
+
+        Queue queue = _session.createQueue("CCT_test_validation_queue" + TEST);
+
+        MessageConsumer consumer = _session.createConsumer(queue);
+
+        MessageProducer producer = _session.createProducer(queue);
+
+        final String MESSAGE = "CCT_Test_Message";
+        producer.send(_session.createTextMessage(MESSAGE));
+
+        Message msg = consumer.receive(2000);
+
+        assertNotNull("Received messages should not be null.", msg);
+        assertEquals("Message received not what we sent", MESSAGE, ((TextMessage) msg).getText());
+    }
+
+    private Connection newConnection()
+    {
+        AMQConnection connection = null;
+        try
+        {
+            connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='"
+                                           + _brokerlist + "'");
+
+            connection.setConnectionListener(this);
+
+            _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            connection.start();
+
+        }
+        catch (JMSException e)
+        {
+            fail("Creating new connection when:"+e.getMessage());
+        }
+        catch (AMQException e)
+        {
+            fail("Creating new connection when:"+e.getMessage());
+        }
+        catch (URLSyntaxException e)
+        {
+            fail("Creating new connection when:"+e.getMessage());
+        }
+
+
+        return connection;
+    }
+
+    private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException
+    {
+        AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId,
+                                                                      ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+                                                                      ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(),
+                                                                      null,    // arguments
+                                                                      false,    // autoDelete
+                                                                      false,    // durable
+                                                                      new AMQShortString(_name),    // exchange
+                                                                      false,    // internal
+                                                                      nowait,    // nowait
+                                                                      true,    // passive
+                                                                      0,    // ticket
+                                                                      new AMQShortString(_type));    // type
+
+        if (nowait)
+        {
+            ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare);
+        }
+        else
+        {
+            ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+        }
+    }
+
+    private void createChannel(int channelId) throws AMQException
+    {
+        ((AMQConnection) _connection).getProtocolHandler().syncWrite(
+                ChannelOpenBody.createAMQFrame(channelId,
+                                               ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
+                                               ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(),
+                                               null),    // outOfBand
+                                                         ChannelOpenOkBody.class);
+
+    }
+
+
+    public void onException(JMSException jmsException)
+    {
+        //_logger.info("CCT" + jmsException);
+        fail(jmsException.getMessage());
+    }
+
+    public void bytesSent(long count)
+    {
+    }
+
+    public void bytesReceived(long count)
+    {
+
+    }
+
+    public boolean preFailover(boolean redirect)
+    {
+        return false;
+    }
+
+    public boolean preResubscribe()
+    {
+        return false;
+    }
+
+    public void failoverComplete()
+    {
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java?view=auto&rev=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java Mon Feb 19 03:55:47 2007
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+
+public class TestChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener
+{
+    private static final Logger _logger = Logger.getLogger(TestChannelCloseMethodHandlerNoCloseOk.class);
+
+    private static TestChannelCloseMethodHandlerNoCloseOk _handler = new TestChannelCloseMethodHandlerNoCloseOk();
+
+    public static TestChannelCloseMethodHandlerNoCloseOk getInstance()
+    {
+        return _handler;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    {
+        _logger.debug("ChannelClose method received");
+        ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+
+        AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
+        AMQShortString reason = method.replyText;
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
+        }
+
+        // For this test Method Handler .. don't send Close-OK
+//        // TODO: Be aware of possible changes to parameter order as versions change.
+//        AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
+//        protocolSession.writeFrame(frame);
+        if (errorCode != AMQConstant.REPLY_SUCCESS)
+        {
+            _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+            if (errorCode == AMQConstant.NO_CONSUMERS)
+            {
+                throw new AMQNoConsumersException("Error: " + reason, null);
+            }
+            else if (errorCode == AMQConstant.NO_ROUTE)
+            {
+                throw new AMQNoRouteException("Error: " + reason, null);
+            }
+            else if (errorCode == AMQConstant.INVALID_SELECTOR)
+            {
+                _logger.debug("Broker responded with Invalid Selector.");
+
+                throw new AMQInvalidSelectorException(String.valueOf(reason));
+            }
+            else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+            {
+                _logger.debug("Broker responded with Invalid Routing Key.");
+
+                throw new AMQInvalidRoutingKeyException(String.valueOf(reason));
+            }
+            else
+            {
+                throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+            }
+
+        }
+        protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+    }
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestChannelCloseMethodHandlerNoCloseOk.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java?view=auto&rev=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java Mon Feb 19 03:55:47 2007
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
+import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
+import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
+import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
+import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
+import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
+import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
+import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
+import org.apache.qpid.client.handler.BasicReturnMethodHandler;
+import org.apache.qpid.client.handler.BasicCancelOkMethodHandler;
+import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler;
+import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler;
+import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.BasicReturnBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public class TestCloseStateManager extends AMQStateManager
+{
+    public TestCloseStateManager(AMQProtocolSession protocolSession)
+    {
+        super(protocolSession);
+    }
+
+    protected void registerListeners()
+    {
+        Map frame2handlerMap = new HashMap();
+
+        // we need to register a map for the null (i.e. all state) handlers otherwise you get
+        // a stack overflow in the handler searching code when you present it with a frame for which
+        // no handlers are registered
+        //
+        _state2HandlersMap.put(null, frame2handlerMap);
+
+        frame2handlerMap = new HashMap();
+        frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance());
+        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+        _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
+
+        frame2handlerMap = new HashMap();
+        frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance());
+        frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance());
+        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+        _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
+
+        frame2handlerMap = new HashMap();
+        frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance());
+        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+        _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
+
+        //
+        // ConnectionOpen handlers
+        //
+        frame2handlerMap = new HashMap();
+        // Use Test Handler for Close methods to not send Close-OKs
+        frame2handlerMap.put(ChannelCloseBody.class, TestChannelCloseMethodHandlerNoCloseOk.getInstance());
+
+        frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
+        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
+        frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
+        frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
+        frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
+        frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
+        frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
+        frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
+        _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/TestCloseStateManager.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java?view=diff&rev=509172&r1=509171&r2=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java Mon Feb 19 03:55:47 2007
@@ -20,10 +20,12 @@
  */
 package org.apache.qpid;
 
+import org.apache.qpid.protocol.AMQConstant;
+
 public class AMQTimeoutException extends AMQException
 {
     public AMQTimeoutException(String message)
     {
-        super(message);
+        super(AMQConstant.REQUEST_TIMEOUT, message);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=diff&rev=509172&r1=509171&r2=509172
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Mon Feb 19 03:55:47 2007
@@ -90,6 +90,8 @@
 
     public static final AMQConstant IN_USE = new AMQConstant(406, "In use", true);
 
+    public static final AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "Request Timeout", true);
+
     public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
 
     public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);
@@ -100,7 +102,7 @@
 
     public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
 
-    public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true);
+    public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
 
     public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);