You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/09/07 16:31:40 UTC

svn commit: r812153 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/handler/ systests/src/main/java/org/apache/qpid/test/unit/close/

Author: ritchiem
Date: Mon Sep  7 14:31:40 2009
New Revision: 812153

URL: http://svn.apache.org/viewvc?rev=812153&view=rev
Log:
QPID-1809, QPID-2081 : Corrected ChannelClose logic. Removed an unnecessary sync on the failoverMutex in AMQSession that was causing the notification of the close to be blocked until a TimeOutException occured.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=812153&r1=812152&r2=812153&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Sep  7 14:31:40 2009
@@ -721,25 +721,22 @@
 
         if (!_closed.getAndSet(true))
         {
-            synchronized (getFailoverMutex())
+            synchronized (_messageDeliveryLock)
             {
-                synchronized (_messageDeliveryLock)
+                // An AMQException has an error code and message already and will be passed in when closure occurs as a
+                // result of a channel close request
+                AMQException amqe;
+                if (e instanceof AMQException)
                 {
-                    // An AMQException has an error code and message already and will be passed in when closure occurs as a
-                    // result of a channel close request
-                    AMQException amqe;
-                    if (e instanceof AMQException)
-                    {
-                        amqe = (AMQException) e;
-                    }
-                    else
-                    {
-                        amqe = new AMQException("Closing session forcibly", e);
-                    }
-
-                    _connection.deregisterSession(_channelId);
-                    closeProducersAndConsumers(amqe);
+                    amqe = (AMQException) e;
                 }
+                else
+                {
+                    amqe = new AMQException("Closing session forcibly", e);
+                }
+
+                _connection.deregisterSession(_channelId);
+                closeProducersAndConsumers(amqe);
             }
         }
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=812153&r1=812152&r2=812153&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Mon Sep  7 14:31:40 2009
@@ -32,7 +32,6 @@
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.protocol.AMQConstant;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +47,7 @@
     }
 
     public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId)
-        throws AMQException
+            throws AMQException
     {
         _logger.debug("ChannelClose method received");
 
@@ -59,52 +58,62 @@
             _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
         }
 
-
-
         ChannelCloseOkBody body = session.getMethodRegistry().createChannelCloseOkBody();
         AMQFrame frame = body.generateFrame(channelId);
         session.writeFrame(frame);
-
-        if (errorCode != AMQConstant.REPLY_SUCCESS)
+        try
         {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason);
-            }
-
-            if (errorCode == AMQConstant.NO_CONSUMERS)
-            {
-                throw new AMQNoConsumersException("Error: " + reason, null, null);
-            }
-            else if (errorCode == AMQConstant.NO_ROUTE)
+            if (errorCode != AMQConstant.REPLY_SUCCESS)
             {
-                throw new AMQNoRouteException("Error: " + reason, null, null);
-            }
-            else if (errorCode == AMQConstant.INVALID_ARGUMENT)
-            {
-                _logger.debug("Broker responded with Invalid Argument.");
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+                }
+
+                if (errorCode == AMQConstant.NO_CONSUMERS)
+                {
+                    throw new AMQNoConsumersException("Error: " + reason, null, null);
+                }
+                else if (errorCode == AMQConstant.NO_ROUTE)
+                {
+                    throw new AMQNoRouteException("Error: " + reason, null, null);
+                }
+                else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+                {
+                    _logger.debug("Broker responded with Invalid Argument.");
+
+                    throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null);
+                }
+                else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+                {
+                    _logger.debug("Broker responded with Invalid Routing Key.");
+
+                    throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null);
+                }
+                else
+                {
 
-                throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null);
-            }
-            else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
-            {
-                _logger.debug("Broker responded with Invalid Routing Key.");
+                    throw new AMQChannelClosedException(errorCode, "Error: " + reason, null);
+                }
 
-                throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null);
             }
-            else
-            {
-                throw new AMQChannelClosedException(errorCode, "Error: " + reason, null);
-            }
-
         }
-        // fixme why is this only done when the close is expected...
-        // should the above forced closes not also cause a close?
-        // ----------
-        // Closing the session only when it is expected allows the errors to be processed
-        // Calling this here will prevent failover. So we should do this for all exceptions
-        // that should never cause failover. Such as authentication errors.
+        finally
+        {
+            // fixme why is this only done when the close is expected...
+            // should the above forced closes not also cause a close?
+            // ----------
+            // Closing the session only when it is expected allows the errors to be processed
+            // Calling this here will prevent failover. So we should do this for all exceptions
+            // that should never cause failover. Such as authentication errors.
+            // ----
+            // 2009-09-07 - ritchiem
+            // calling channelClosed will only close this session and will not
+            // prevent failover. If we don't close the session here then we will
+            // have problems during the session close as it will attempt to
+            // close the session that the broker has closed, 
 
-        session.channelClosed(channelId, errorCode, String.valueOf(reason));
+            session.channelClosed(channelId, errorCode, String.valueOf(reason));
+        }
     }
 }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java?rev=812153&r1=812152&r2=812153&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java Mon Sep  7 14:31:40 2009
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.test.unit.close;
 
-import org.apache.qpid.client.AMQAuthenticationException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
@@ -32,9 +31,52 @@
 
 import javax.jms.Session;
 
-/** QPID-1085 */
+/** QPID-1809
+ *
+ * Race condition on error handling and close logic.
+ *
+ * See most often with SimpleACLTest as this test is the expects the server to
+ * shut the connection/channels. This sort of testing is not performed by many,
+ * if any, of the other system tests.
+ *
+ * The problem is that we have two threads
+ *
+ *  MainThread              Exception(Mina)Thread
+ *     |                          |
+ *    Performs                    |
+ *     ACtion                     |
+ *     |                      Receives Server
+ *     |                        Close
+ *  Blocks for                    |
+ *    Response                    |
+ *      |                     Starts To Notify
+ *      |                        client
+ *      |                         |
+ *      |             <----- Notify Main Thread
+ *    Notification                |
+ *     wakes client               |
+ *      |                         |
+ *     Client then                |
+ * processes Error.               |
+ *      |                         |
+ *   Potentially Attempting      Close Channel/Connection
+ *      Connection Close
+ *
+ * The two threads both attempt to close the connection but the main thread does
+ * so assuming that the connection is open and valid.
+ *
+ * The Exception thread must modify the connection so that no furter syncWait
+ * commands are performed.
+ *
+ * This test sends an ExchangeDeclare that is Asynchronous and will fail and
+ * so cause a ChannelClose error but we perform a syncWait so that we can be
+ * sure to test that the BlockingWaiter is correctly awoken.
+ *  
+ */
 public class JavaServerCloseRaceConditionTest extends QpidTestCase
 {
+    private static final String EXCHANGE_NAME = "NewExchangeNametoFailLookup";
+
     public void test() throws Exception
     {
 
@@ -42,14 +84,11 @@
 
         AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-        AMQDestination destination = (AMQDestination) session.createQueue(getTestQueueName());
-
         // Set no wait true so that we block the connection
         // Also set a different exchange class string so the attempt to declare
         // the exchange causes an exchange. 
-        ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), destination.getExchangeName(), new AMQShortString("NewTypeForException"),
-                                                                                         destination.getExchangeName().toString().startsWith("amq."),
-                                                                                         false, false, false, true, null);
+        ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null,
+                                                                                         true, false, false, false, true, null);
 
         AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId());
 
@@ -60,10 +99,7 @@
         }
         catch (Exception e)
         {
-            if (!(e instanceof AMQAuthenticationException))
-            {
-                fail("Cause was not AMQAuthenticationException. Was " + e.getClass() + ":" + e.getMessage());
-            }
+            assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
         }
 
         try
@@ -76,10 +112,7 @@
         }
         catch (Exception e)
         {
-            if (!(e instanceof AMQAuthenticationException))
-            {
-                fail("Cause was not AMQAuthenticationException. Was " + e.getClass() + ":" + e.getMessage());
-            }
+            assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
         }
 
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org