You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/07/03 16:33:11 UTC

svn commit: r673688 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/protocol/ client/src/main/java/org/apache/qpid/client/state/ client/src/test/java/org/apache/qpid/test/un...

Author: aidan
Date: Thu Jul  3 07:33:10 2008
New Revision: 673688

URL: http://svn.apache.org/viewvc?rev=673688&view=rev
Log:
QPID-962 Exception handling was... unpleasing... Fix up of patch from rhs

AMQConnection.java: Refactor listener and stack exceptions in a list. Add get lastException, which can now be any Exception. Don't set connected, let the delegate decide.

AMQConnectionDelegate_8_0.java, AMQConnectionDelete_0_10.java: set _connected to true if we suceed

AMQProtocolHandler.java: attainState can now throw any sort of Exception

AMQStateManager.java: attainState can now throw any Exception

ConnectionTest.java: check that exception cause is not null

AMQConnectionFailureException.java: Add ability to store a Collection of Exceptions in case there are multiple possible causes of the failure. Which there shouldn't be, but it can happen.

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Jul  3 07:33:10 2008
@@ -26,6 +26,7 @@
 import org.apache.qpid.AMQUnresolvedAddressException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
@@ -75,6 +76,7 @@
         private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
         private int _size = 0;
         private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+        
 
         public AMQSession get(int channelId)
         {
@@ -232,11 +234,6 @@
     protected boolean _connected;
 
     /*
-     * The last error code that occured on the connection. Used to return the correct exception to the client
-     */
-    protected AMQException _lastAMQException = null;
-
-    /*
      * The connection meta data
      */
     private QpidConnectionMetaData _connectionMetaData;
@@ -261,6 +258,9 @@
 
     //Indicates whether persistent messages are synchronized
     private boolean _syncPersistence;
+    
+    /** used to hold a list of all exceptions that have been thrown during connection construction. gross */
+    final ArrayList<Exception> _exceptions = new ArrayList<Exception>();
 
     /**
      * @param broker      brokerdetails
@@ -378,13 +378,12 @@
             _delegate = new AMQConnectionDelegate_0_10(this);
         }
 
-        final ArrayList<JMSException> exceptions = new ArrayList<JMSException>();
-
+       
         class Listener implements ExceptionListener
         {
             public void onException(JMSException e)
             {
-                exceptions.add(e);
+                _exceptions.add(e);
             }
         }
 
@@ -443,9 +442,6 @@
         // We are not currently connected
         _connected = false;
 
-        Exception lastException = new Exception();
-        lastException.initCause(new ConnectException());
-
         // TMG FIXME this seems... wrong...
         boolean retryAllowed = true;
         while (!_connected && retryAllowed )
@@ -453,8 +449,6 @@
             try
             {
                 makeBrokerConnection(brokerDetails);
-                lastException = null;
-                _connected = true;
             }
             catch (AMQProtocolException pe)
             {
@@ -470,17 +464,29 @@
             }
             catch (Exception e)
             {
-                lastException = e;
-
+                _exceptions.add(e);
                 if (_logger.isInfoEnabled())
                 {
-                    _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
-                            e.getCause());
+                    _logger.info("Unable to connect to broker at " + 
+                                _failoverPolicy.getCurrentBrokerDetails(),
+                                e);
                 }
+            }
+            
+            if (!_connected)
+            {
                 retryAllowed = _failoverPolicy.failoverAllowed();
                 brokerDetails = _failoverPolicy.getNextBrokerDetails();
             }
         }
+        try
+        {
+            setExceptionListener(null);
+        }
+        catch (JMSException e1)
+        {
+            // Can't happen
+        }
 
         if (_logger.isDebugEnabled())
         {
@@ -498,24 +504,11 @@
             {
                 // Eat it, we've hopefully got all the exceptions if this happened
             }
-            if (exceptions.size() > 0)
-            {
-                JMSException e = exceptions.get(0);
-                int code = -1;
-                try
-                {
-                    code = new Integer(e.getErrorCode()).intValue();
-                }
-                catch (NumberFormatException nfe)
-                {
-                    // Ignore this, we have some error codes and messages swapped around
-                }
-
-                throw new AMQConnectionFailureException(AMQConstant.getConstant(code),
-                                                        e.getMessage(), e);
-            }
-            else if (lastException != null)
+            
+            Exception lastException = null;
+            if (_exceptions.size() > 0)
             {
+                lastException = _exceptions.get(_exceptions.size() - 1);
                 if (lastException.getCause() != null)
                 {
                     message = lastException.getCause().getMessage();
@@ -538,8 +531,8 @@
                 }
             }
 
-            AMQException e = new AMQConnectionFailureException(message, null);
-
+            AMQException e = new AMQConnectionFailureException(message, _exceptions);
+                        
             if (lastException != null)
             {
                 if (lastException instanceof UnresolvedAddressException)
@@ -547,13 +540,8 @@
                     e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(),
                                                           null);
                 }
-
-                if (e.getCause() != null)
-                {
-                    e.initCause(lastException);
-	        }
+                
             }
-
             throw e;
         }
 
@@ -1507,4 +1495,14 @@
     {
         return _syncPersistence;
     }
+
+    public Exception getLastException()
+    {
+        if (_exceptions.size() > 0)
+        {
+            return _exceptions.get(_exceptions.size() - 1);
+        }
+        return null;
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Jul  3 07:33:10 2008
@@ -115,6 +115,7 @@
             _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
                                     _conn.getUsername(), _conn.getPassword());
             _qpidConnection.setClosedListener(this);
+            _conn._connected = true;
         }
         catch(ProtocolException pe)
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Jul  3 07:33:10 2008
@@ -25,11 +25,14 @@
 import java.nio.channels.UnresolvedAddressException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.Iterator;
+import java.util.Set;
 
 import javax.jms.JMSException;
 import javax.jms.XASession;
 
+import org.apache.qpid.AMQConnectionFailureException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -76,24 +79,21 @@
         return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
     }
 
-    public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+    public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
     {
-        try
+        final Set<AMQState> openOrClosedStates =
+                EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
+
+        TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+        // this blocks until the connection has been set up or when an error
+        // has prevented the connection being set up
+
+        AMQState state = _conn._protocolHandler.attainState(openOrClosedStates);
+        if(state == AMQState.CONNECTION_OPEN)
         {
-            TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
-            // this blocks until the connection has been set up or when an error
-            // has prevented the connection being set up
-            _conn._protocolHandler.attainState(AMQState.CONNECTION_OPEN);
             _conn._failoverPolicy.attainedConnection();
-
-            // Again this should be changed to a suitable notify
             _conn._connected = true;
         }
-        catch (AMQException e)
-        {
-            _conn._lastAMQException = e;
-            throw e;
-        }
     }
 
     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Jul  3 07:33:10 2008
@@ -559,7 +559,7 @@
           _frameListeners.remove(listener);
       }
      */
-    public void attainState(AMQState s) throws AMQException
+    public void attainState(AMQState s) throws Exception
     {
         getStateManager().attainState(s);
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Jul  3 07:33:10 2008
@@ -102,7 +102,7 @@
     }
 
 
-    public void attainState(final AMQState s) throws AMQException
+    public void attainState(final AMQState s) throws Exception
     {
         synchronized (_stateLock)
         {
@@ -118,6 +118,11 @@
                 catch (InterruptedException e)
                 {
                     _logger.warn("Thread interrupted");
+                    if (_protocolSession.getAMQConnection().getLastException() != null)
+                    {
+                        throw _protocolSession.getAMQConnection().getLastException();
+                    }
+
                 }
 
                 if (_currentState != s)
@@ -169,6 +174,11 @@
                 catch (InterruptedException e)
                 {
                     _logger.warn("Thread interrupted");
+                    if (_protocolSession.getAMQConnection().getLastException() != null)
+                    {
+                        throw new AMQException(null, "Could not attain state due to exception",
+                                _protocolSession.getAMQConnection().getLastException());
+                    }
                 }
 
                 if (!stateSet.contains(_currentState))

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Thu Jul  3 07:33:10 2008
@@ -134,6 +134,7 @@
         }
         catch (AMQException amqe)
         {
+            assertNotNull("No cause set", amqe.getCause());
             if (amqe.getCause().getClass() == Exception.class)
             {
                 System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure.");

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java Thu Jul  3 07:33:10 2008
@@ -21,6 +21,10 @@
 
 package org.apache.qpid;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
 import org.apache.qpid.protocol.AMQConstant;
 
 /**
@@ -35,6 +39,8 @@
  */
 public class AMQConnectionFailureException extends AMQException
 {
+    Collection<Exception> _exceptions;
+    
 	public AMQConnectionFailureException(String message, Throwable cause)
 	{
 		super(null, message, cause);
@@ -44,4 +50,16 @@
     {
         super(errorCode, message, cause);
     }
+
+    public AMQConnectionFailureException(String message, Collection<Exception> exceptions)
+    {
+        // Blah, I hate ? but java won't let super() be anything other than the first thing, sorry...
+        super (null, message, exceptions.isEmpty() ? null : exceptions.iterator().next());
+        this._exceptions = exceptions;
+    }
+    
+    public Collection<Exception> getLinkedExceptions()
+    {
+        return _exceptions;
+    }
 }