You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2011/09/30 16:56:58 UTC

svn commit: r1177689 - in /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQConnectionDelegate_8_0.java protocol/AMQProtocolHandler.java state/AMQStateManager.java state/StateWaiter.java util/BlockingWaiter.java

Author: kwall
Date: Fri Sep 30 14:56:57 2011
New Revision: 1177689

URL: http://svn.apache.org/viewvc?rev=1177689&view=rev
Log:
QPID-3512: Avoid race during 0-8..0-9-1 connection close.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1177689&r1=1177688&r2=1177689&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri Sep 30 14:56:57 2011
@@ -36,11 +36,13 @@ import javax.jms.XASession;
 import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.failover.FailoverRetrySupport;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.framing.BasicQosBody;
 import org.apache.qpid.framing.BasicQosOkBody;
@@ -69,8 +71,30 @@ public class AMQConnectionDelegate_8_0 i
 
     public void closeConnection(long timeout) throws JMSException, AMQException
     {
-        _conn.getProtocolHandler().closeConnection(timeout);
+        final AMQStateManager stateManager = _conn.getProtocolHandler().getStateManager();
+        final AMQState currentState = stateManager.getCurrentState();
 
+        if (currentState.equals(AMQState.CONNECTION_CLOSED))
+        {
+            _logger.debug("Connection already closed.");
+        }
+        else if (currentState.equals(AMQState.CONNECTION_CLOSING))
+        {
+            _logger.debug("Connection already closing, awaiting closed state.");
+            final StateWaiter closeWaiter = new StateWaiter(stateManager, currentState, EnumSet.of(AMQState.CONNECTION_CLOSED));
+            try
+            {
+                closeWaiter.await(timeout);
+            }
+            catch (AMQTimeoutException te)
+            {
+                throw new AMQTimeoutException("Close did not complete in timely fashion", te);
+            }
+        }
+        else
+        {
+            _conn.getProtocolHandler().closeConnection(timeout);
+        }
     }
 
     public AMQConnectionDelegate_8_0(AMQConnection conn)

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1177689&r1=1177688&r2=1177689&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Sep 30 14:56:57 2011
@@ -665,22 +665,21 @@ public class AMQProtocolHandler implemen
      * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
      * anyway.
      *
-     * @param timeout The timeout to wait for an acknowledgement to the close request.
+     * @param timeout The timeout to wait for an acknowledgment to the close request.
      *
      * @throws AMQException If the close fails for any reason.
      */
     public void closeConnection(long timeout) throws AMQException
     {
-        ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                                                                                                  new AMQShortString("JMS client is closing the connection."), 0, 0);
-
-        final AMQFrame frame = body.generateFrame(0);
-
-        //If the connection is already closed then don't do a syncWrite
         if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
         {
+            // Connection is already closed then don't do a syncWrite
             try
             {
+                final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                        new AMQShortString("JMS client is closing the connection."), 0, 0);
+                final AMQFrame frame = body.generateFrame(0);
+
                 syncWrite(frame, ConnectionCloseOkBody.class, timeout);
                 _network.close();
                 closed();
@@ -691,10 +690,9 @@ public class AMQProtocolHandler implemen
             }
             catch (FailoverException e)
             {
-                _logger.debug("FailoverException interrupted connection close, ignoring as connection   close anyway.");
+                _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
             }
         }
-
     }
 
     /** @return the number of bytes read from this protocol session */

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=1177689&r1=1177688&r2=1177689&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Fri Sep 30 14:56:57 2011
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Set;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.io.IOException;
 
 /**
  * The state manager is responsible for managing the state of the protocol session. <p/>
@@ -48,7 +47,7 @@ import java.io.IOException;
  *
  * The two step process is required as there is an inherit race condition between starting a process that will cause
  * the state to change and then attempting to wait for that change. The interest in the change must be first set up so
- * that any asynchrous errors that occur can be delivered to the correct waiters.
+ * that any asynchronous errors that occur can be delivered to the correct waiters.
  */
 public class AMQStateManager implements AMQMethodListener
 {
@@ -84,7 +83,10 @@ public class AMQStateManager implements 
 
     public AMQState getCurrentState()
     {
-        return _currentState;
+        synchronized (_stateLock)
+        {
+            return _currentState;
+        }
     }
 
     public void changeState(AMQState newState)
@@ -114,7 +116,7 @@ public class AMQStateManager implements 
     }
 
     /**
-     * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+     * Setting of the ProtocolSession will be required when Failover has been successfully completed.
      *
      * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
      * connection to the network.
@@ -131,9 +133,9 @@ public class AMQStateManager implements 
     }
 
     /**
-     * Propogate error to waiters
+     * Propagate error to waiters
      *
-     * @param error The error to propogate.
+     * @param error The error to propagate.
      */
     public void error(Exception error)
     {
@@ -177,7 +179,7 @@ public class AMQStateManager implements 
     }
 
     /**
-     * Create and add a new waiter to the notifcation list.
+     * Create and add a new waiter to the notification list.
      *
      * @param states The waiter will attempt to wait for one of these desired set states to be achived.
      *

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?rev=1177689&r1=1177688&r2=1177689&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java Fri Sep 30 14:56:57 2011
@@ -34,7 +34,7 @@ import java.util.Set;
  *
  * On construction the current state and a set of States to await for is provided.
  *
- * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * When await() is called the state at construction is compared against the awaitStates. If the state at construction is
  * a desired state then await() returns immediately.
  *
  * Otherwise it will block for the set timeout for a desired state to be achieved.
@@ -48,9 +48,9 @@ public class StateWaiter extends Blockin
 {
     private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
 
-    Set<AMQState> _awaitStates;
-    private AMQState _startState;
-    private AMQStateManager _stateManager;
+    private final Set<AMQState> _awaitStates;
+    private final AMQState _startState;
+    private final AMQStateManager _stateManager;
 
     /**
      *
@@ -78,9 +78,9 @@ public class StateWaiter extends Blockin
     }
 
     /**
-     * Await for the requried State to be achieved within the default timeout.
+     * Await for the required State to be achieved within the default timeout.
      * @return The achieved state that was requested.
-     * @throws AMQException The exception that prevented the required state from being achived.
+     * @throws AMQException The exception that prevented the required state from being achieved.
      */
     public AMQState await() throws AMQException
     {
@@ -88,13 +88,13 @@ public class StateWaiter extends Blockin
     }
 
     /**
-     * Await for the requried State to be achieved.
+     * Await for the required State to be achieved.
      *
      * <b>It is the responsibility of this class to remove the waiter from the StateManager
      *
-     * @param timeout The time in milliseconds to wait for any of the states to be achived.
+     * @param timeout The time in milliseconds to wait for any of the states to be achieved.
      * @return The achieved state that was requested.
-     * @throws AMQException The exception that prevented the required state from being achived.
+     * @throws AMQException The exception that prevented the required state from being achieved.
      */
     public AMQState await(long timeout) throws AMQException
     {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java?rev=1177689&r1=1177688&r2=1177689&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java Fri Sep 30 14:56:57 2011
@@ -28,9 +28,8 @@ import java.util.concurrent.locks.Reentr
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * BlockingWaiter is a 'rendezvous' which delegates handling of
@@ -64,6 +63,8 @@ import org.apache.qpid.protocol.AMQMetho
  */
 public abstract class BlockingWaiter<T>
 {
+    private static final Logger _logger = LoggerFactory.getLogger(BlockingWaiter.class);
+
     /** This flag is used to indicate that the blocked for method has been received. */
     private volatile boolean _ready = false;
 
@@ -180,7 +181,7 @@ public abstract class BlockingWaiter<T>
                     }
                     catch (InterruptedException e)
                     {
-                        System.err.println(e.getMessage());
+                        _logger.error(e.getMessage(), e);
                         // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
                         // if (!_ready && timeout != -1)
                         // {
@@ -228,12 +229,12 @@ public abstract class BlockingWaiter<T>
     }
 
     /**
-     * This is a callback, called when an error has occured that should interupt any waiter.
+     * This is a callback, called when an error has occurred that should interrupt any waiter.
      * It is also called from within this class to avoid code repetition but it should only be called by the MINA threads.
      *
      * Once closed any notification of an exception will be ignored.
      *
-     * @param e The exception being propogated.
+     * @param e The exception being propagated.
      */
     public void error(Exception e)
     {
@@ -255,7 +256,7 @@ public abstract class BlockingWaiter<T>
             }
             else
             {
-                System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
+                _logger.error("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
             }
 
             if (_waiting.get())
@@ -272,7 +273,7 @@ public abstract class BlockingWaiter<T>
                     }
                     catch (InterruptedException e1)
                     {
-                        System.err.println(e.getMessage());
+                        _logger.error(e1.getMessage(), e1);
                     }
                 }
                 _errorAck = false;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org