You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/03/21 12:12:52 UTC

svn commit: r639598 - in /incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client: failover/FailoverHandler.java protocol/AMQProtocolHandler.java

Author: ritchiem
Date: Fri Mar 21 04:12:29 2008
New Revision: 639598

URL: http://svn.apache.org/viewvc?rev=639598&view=rev
Log:
QPID-866 : Based on Patch from ASkinner. Only the FailoverException makes sence to process this way so remove list and synchronized so we either do an add or throw the set FailoverException.

Modified:
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=639598&r1=639597&r2=639598&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Fri Mar 21 04:12:29 2008
@@ -120,13 +120,17 @@
         // We wake up listeners. If they can handle failover, they will extend the
         // FailoverRetrySupport class and will in turn block on the latch until failover
         // has completed before retrying the operation.
-        _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start"));
+        _amqProtocolHandler.notifyFailoverStarting();
 
         // Since failover impacts several structures we protect them all with a single mutex. These structures
         // are also in child objects of the connection. This allows us to manipulate them without affecting
         // client code which runs in a separate thread.
         synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
         {
+            //Clear the exception now that we have the failover mutex there can be no one else waiting for a frame so
+            // we can clear the exception.
+            _amqProtocolHandler.failoverInProgress();
+
             // We switch in a new state manager temporarily so that the interaction to get to the "connection open"
             // state works, without us having to terminate any existing "state waiters". We could theoretically
             // have a state waiter waiting until the connection is closed for some reason. Or in future we may have

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=639598&r1=639597&r2=639598&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Mar 21 04:12:29 2008
@@ -153,6 +153,10 @@
     /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
     private CountDownLatch _failoverLatch;
 
+
+    /** The last failover exception that occured */
+    private FailoverException _lastFailoverException;
+
     /** Defines the default timeout to use for synchronous protocol commands. */
     private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
 
@@ -419,6 +423,24 @@
         }
     }
 
+    public void notifyFailoverStarting()
+    {
+        // Set the last exception in the sync block to ensure the ordering with add.
+        // either this gets done and the add does the ml.error
+        // or the add completes first and the iterator below will do ml.error
+        synchronized (_frameListeners)
+        {
+            _lastFailoverException = new FailoverException("Failing over about to start");
+        }
+
+        propagateExceptionToWaiters(_lastFailoverException);
+    }
+
+    public void failoverInProgress()
+    {
+        _lastFailoverException = null;
+    }
+
     private static int _messageReceivedCount;
 
     public void messageReceived(IoSession session, Object message) throws Exception
@@ -471,11 +493,13 @@
                 new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
 
         try
-                    {
+        {
 
-                        boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+            boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
             if (!_frameListeners.isEmpty())
             {
+                //This iterator is safe from the error state as the frame listeners always add before they send so their
+                // will be ready and waiting for this response.
                 Iterator it = _frameListeners.iterator();
                 while (it.hasNext())
                 {
@@ -592,7 +616,15 @@
     {
         try
         {
-            _frameListeners.add(listener);
+            synchronized (_frameListeners)
+            {
+                if (_lastFailoverException != null)
+                {
+                    throw _lastFailoverException;
+                }
+                
+                _frameListeners.add(listener);
+            }
             _protocolSession.writeFrame(frame);
 
             AMQMethodEvent e = listener.blockForFrame(timeout);
@@ -600,10 +632,6 @@
             return e;
             // When control resumes before this line, a reply will have been received
             // that matches the criteria defined in the blocking listener
-        }
-        catch (AMQException e)
-        {
-            throw e;
         }
         finally
         {