You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/09/07 16:31:40 UTC
svn commit: r812153 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/handler/
systests/src/main/java/org/apache/qpid/test/unit/close/
Author: ritchiem
Date: Mon Sep 7 14:31:40 2009
New Revision: 812153
URL: http://svn.apache.org/viewvc?rev=812153&view=rev
Log:
QPID-1809, QPID-2081 : Corrected ChannelClose logic. Removed an unnecessary sync on the failoverMutex in AMQSession that was causing the notification of the close to be blocked until a TimeOutException occured.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=812153&r1=812152&r2=812153&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Sep 7 14:31:40 2009
@@ -721,25 +721,22 @@
if (!_closed.getAndSet(true))
{
- synchronized (getFailoverMutex())
+ synchronized (_messageDeliveryLock)
{
- synchronized (_messageDeliveryLock)
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ AMQException amqe;
+ if (e instanceof AMQException)
{
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
- {
- amqe = new AMQException("Closing session forcibly", e);
- }
-
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ amqe = (AMQException) e;
}
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
+ }
+
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
}
}
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=812153&r1=812152&r2=812153&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Mon Sep 7 14:31:40 2009
@@ -32,7 +32,6 @@
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +47,7 @@
}
public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId)
- throws AMQException
+ throws AMQException
{
_logger.debug("ChannelClose method received");
@@ -59,52 +58,62 @@
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
-
-
ChannelCloseOkBody body = session.getMethodRegistry().createChannelCloseOkBody();
AMQFrame frame = body.generateFrame(channelId);
session.writeFrame(frame);
-
- if (errorCode != AMQConstant.REPLY_SUCCESS)
+ try
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason);
- }
-
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- throw new AMQNoConsumersException("Error: " + reason, null, null);
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- throw new AMQNoRouteException("Error: " + reason, null, null);
- }
- else if (errorCode == AMQConstant.INVALID_ARGUMENT)
- {
- _logger.debug("Broker responded with Invalid Argument.");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+ }
+
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ throw new AMQNoConsumersException("Error: " + reason, null, null);
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ throw new AMQNoRouteException("Error: " + reason, null, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+ {
+ _logger.debug("Broker responded with Invalid Argument.");
+
+ throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null);
+ }
+ else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+ {
+ _logger.debug("Broker responded with Invalid Routing Key.");
+
+ throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null);
+ }
+ else
+ {
- throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null);
- }
- else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
- {
- _logger.debug("Broker responded with Invalid Routing Key.");
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason, null);
+ }
- throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null);
}
- else
- {
- throw new AMQChannelClosedException(errorCode, "Error: " + reason, null);
- }
-
}
- // fixme why is this only done when the close is expected...
- // should the above forced closes not also cause a close?
- // ----------
- // Closing the session only when it is expected allows the errors to be processed
- // Calling this here will prevent failover. So we should do this for all exceptions
- // that should never cause failover. Such as authentication errors.
+ finally
+ {
+ // fixme why is this only done when the close is expected...
+ // should the above forced closes not also cause a close?
+ // ----------
+ // Closing the session only when it is expected allows the errors to be processed
+ // Calling this here will prevent failover. So we should do this for all exceptions
+ // that should never cause failover. Such as authentication errors.
+ // ----
+ // 2009-09-07 - ritchiem
+ // calling channelClosed will only close this session and will not
+ // prevent failover. If we don't close the session here then we will
+ // have problems during the session close as it will attempt to
+ // close the session that the broker has closed,
- session.channelClosed(channelId, errorCode, String.valueOf(reason));
+ session.channelClosed(channelId, errorCode, String.valueOf(reason));
+ }
}
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java?rev=812153&r1=812152&r2=812153&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java Mon Sep 7 14:31:40 2009
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.test.unit.close;
-import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
@@ -32,9 +31,52 @@
import javax.jms.Session;
-/** QPID-1085 */
+/** QPID-1809
+ *
+ * Race condition on error handling and close logic.
+ *
+ * See most often with SimpleACLTest as this test is the expects the server to
+ * shut the connection/channels. This sort of testing is not performed by many,
+ * if any, of the other system tests.
+ *
+ * The problem is that we have two threads
+ *
+ * MainThread Exception(Mina)Thread
+ * | |
+ * Performs |
+ * ACtion |
+ * | Receives Server
+ * | Close
+ * Blocks for |
+ * Response |
+ * | Starts To Notify
+ * | client
+ * | |
+ * | <----- Notify Main Thread
+ * Notification |
+ * wakes client |
+ * | |
+ * Client then |
+ * processes Error. |
+ * | |
+ * Potentially Attempting Close Channel/Connection
+ * Connection Close
+ *
+ * The two threads both attempt to close the connection but the main thread does
+ * so assuming that the connection is open and valid.
+ *
+ * The Exception thread must modify the connection so that no furter syncWait
+ * commands are performed.
+ *
+ * This test sends an ExchangeDeclare that is Asynchronous and will fail and
+ * so cause a ChannelClose error but we perform a syncWait so that we can be
+ * sure to test that the BlockingWaiter is correctly awoken.
+ *
+ */
public class JavaServerCloseRaceConditionTest extends QpidTestCase
{
+ private static final String EXCHANGE_NAME = "NewExchangeNametoFailLookup";
+
public void test() throws Exception
{
@@ -42,14 +84,11 @@
AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQDestination destination = (AMQDestination) session.createQueue(getTestQueueName());
-
// Set no wait true so that we block the connection
// Also set a different exchange class string so the attempt to declare
// the exchange causes an exchange.
- ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), destination.getExchangeName(), new AMQShortString("NewTypeForException"),
- destination.getExchangeName().toString().startsWith("amq."),
- false, false, false, true, null);
+ ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null,
+ true, false, false, false, true, null);
AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId());
@@ -60,10 +99,7 @@
}
catch (Exception e)
{
- if (!(e instanceof AMQAuthenticationException))
- {
- fail("Cause was not AMQAuthenticationException. Was " + e.getClass() + ":" + e.getMessage());
- }
+ assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
}
try
@@ -76,10 +112,7 @@
}
catch (Exception e)
{
- if (!(e instanceof AMQAuthenticationException))
- {
- fail("Cause was not AMQAuthenticationException. Was " + e.getClass() + ":" + e.getMessage());
- }
+ assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org