You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/07/03 16:33:11 UTC
svn commit: r673688 - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/protocol/
client/src/main/java/org/apache/qpid/client/state/
client/src/test/java/org/apache/qpid/test/un...
Author: aidan
Date: Thu Jul 3 07:33:10 2008
New Revision: 673688
URL: http://svn.apache.org/viewvc?rev=673688&view=rev
Log:
QPID-962 Exception handling was... unpleasing... Fix up of patch from rhs
AMQConnection.java: Refactor listener and stack exceptions in a list. Add get lastException, which can now be any Exception. Don't set connected, let the delegate decide.
AMQConnectionDelegate_8_0.java, AMQConnectionDelete_0_10.java: set _connected to true if we suceed
AMQProtocolHandler.java: attainState can now throw any sort of Exception
AMQStateManager.java: attainState can now throw any Exception
ConnectionTest.java: check that exception cause is not null
AMQConnectionFailureException.java: Add ability to store a Collection of Exceptions in case there are multiple possible causes of the failure. Which there shouldn't be, but it can happen.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Jul 3 07:33:10 2008
@@ -26,6 +26,7 @@
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -75,6 +76,7 @@
private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
private int _size = 0;
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+
public AMQSession get(int channelId)
{
@@ -232,11 +234,6 @@
protected boolean _connected;
/*
- * The last error code that occured on the connection. Used to return the correct exception to the client
- */
- protected AMQException _lastAMQException = null;
-
- /*
* The connection meta data
*/
private QpidConnectionMetaData _connectionMetaData;
@@ -261,6 +258,9 @@
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
+
+ /** used to hold a list of all exceptions that have been thrown during connection construction. gross */
+ final ArrayList<Exception> _exceptions = new ArrayList<Exception>();
/**
* @param broker brokerdetails
@@ -378,13 +378,12 @@
_delegate = new AMQConnectionDelegate_0_10(this);
}
- final ArrayList<JMSException> exceptions = new ArrayList<JMSException>();
-
+
class Listener implements ExceptionListener
{
public void onException(JMSException e)
{
- exceptions.add(e);
+ _exceptions.add(e);
}
}
@@ -443,9 +442,6 @@
// We are not currently connected
_connected = false;
- Exception lastException = new Exception();
- lastException.initCause(new ConnectException());
-
// TMG FIXME this seems... wrong...
boolean retryAllowed = true;
while (!_connected && retryAllowed )
@@ -453,8 +449,6 @@
try
{
makeBrokerConnection(brokerDetails);
- lastException = null;
- _connected = true;
}
catch (AMQProtocolException pe)
{
@@ -470,17 +464,29 @@
}
catch (Exception e)
{
- lastException = e;
-
+ _exceptions.add(e);
if (_logger.isInfoEnabled())
{
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
- e.getCause());
+ _logger.info("Unable to connect to broker at " +
+ _failoverPolicy.getCurrentBrokerDetails(),
+ e);
}
+ }
+
+ if (!_connected)
+ {
retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
+ try
+ {
+ setExceptionListener(null);
+ }
+ catch (JMSException e1)
+ {
+ // Can't happen
+ }
if (_logger.isDebugEnabled())
{
@@ -498,24 +504,11 @@
{
// Eat it, we've hopefully got all the exceptions if this happened
}
- if (exceptions.size() > 0)
- {
- JMSException e = exceptions.get(0);
- int code = -1;
- try
- {
- code = new Integer(e.getErrorCode()).intValue();
- }
- catch (NumberFormatException nfe)
- {
- // Ignore this, we have some error codes and messages swapped around
- }
-
- throw new AMQConnectionFailureException(AMQConstant.getConstant(code),
- e.getMessage(), e);
- }
- else if (lastException != null)
+
+ Exception lastException = null;
+ if (_exceptions.size() > 0)
{
+ lastException = _exceptions.get(_exceptions.size() - 1);
if (lastException.getCause() != null)
{
message = lastException.getCause().getMessage();
@@ -538,8 +531,8 @@
}
}
- AMQException e = new AMQConnectionFailureException(message, null);
-
+ AMQException e = new AMQConnectionFailureException(message, _exceptions);
+
if (lastException != null)
{
if (lastException instanceof UnresolvedAddressException)
@@ -547,13 +540,8 @@
e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(),
null);
}
-
- if (e.getCause() != null)
- {
- e.initCause(lastException);
- }
+
}
-
throw e;
}
@@ -1507,4 +1495,14 @@
{
return _syncPersistence;
}
+
+ public Exception getLastException()
+ {
+ if (_exceptions.size() > 0)
+ {
+ return _exceptions.get(_exceptions.size() - 1);
+ }
+ return null;
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Jul 3 07:33:10 2008
@@ -115,6 +115,7 @@
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword());
_qpidConnection.setClosedListener(this);
+ _conn._connected = true;
}
catch(ProtocolException pe)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Jul 3 07:33:10 2008
@@ -25,11 +25,14 @@
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Iterator;
+import java.util.Set;
import javax.jms.JMSException;
import javax.jms.XASession;
+import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -76,24 +79,21 @@
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
{
- try
+ final Set<AMQState> openOrClosedStates =
+ EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
+
+ TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ // this blocks until the connection has been set up or when an error
+ // has prevented the connection being set up
+
+ AMQState state = _conn._protocolHandler.attainState(openOrClosedStates);
+ if(state == AMQState.CONNECTION_OPEN)
{
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
- // this blocks until the connection has been set up or when an error
- // has prevented the connection being set up
- _conn._protocolHandler.attainState(AMQState.CONNECTION_OPEN);
_conn._failoverPolicy.attainedConnection();
-
- // Again this should be changed to a suitable notify
_conn._connected = true;
}
- catch (AMQException e)
- {
- _conn._lastAMQException = e;
- throw e;
- }
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Jul 3 07:33:10 2008
@@ -559,7 +559,7 @@
_frameListeners.remove(listener);
}
*/
- public void attainState(AMQState s) throws AMQException
+ public void attainState(AMQState s) throws Exception
{
getStateManager().attainState(s);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Jul 3 07:33:10 2008
@@ -102,7 +102,7 @@
}
- public void attainState(final AMQState s) throws AMQException
+ public void attainState(final AMQState s) throws Exception
{
synchronized (_stateLock)
{
@@ -118,6 +118,11 @@
catch (InterruptedException e)
{
_logger.warn("Thread interrupted");
+ if (_protocolSession.getAMQConnection().getLastException() != null)
+ {
+ throw _protocolSession.getAMQConnection().getLastException();
+ }
+
}
if (_currentState != s)
@@ -169,6 +174,11 @@
catch (InterruptedException e)
{
_logger.warn("Thread interrupted");
+ if (_protocolSession.getAMQConnection().getLastException() != null)
+ {
+ throw new AMQException(null, "Could not attain state due to exception",
+ _protocolSession.getAMQConnection().getLastException());
+ }
}
if (!stateSet.contains(_currentState))
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Thu Jul 3 07:33:10 2008
@@ -134,6 +134,7 @@
}
catch (AMQException amqe)
{
+ assertNotNull("No cause set", amqe.getCause());
if (amqe.getCause().getClass() == Exception.class)
{
System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure.");
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java?rev=673688&r1=673687&r2=673688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java Thu Jul 3 07:33:10 2008
@@ -21,6 +21,10 @@
package org.apache.qpid;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
import org.apache.qpid.protocol.AMQConstant;
/**
@@ -35,6 +39,8 @@
*/
public class AMQConnectionFailureException extends AMQException
{
+ Collection<Exception> _exceptions;
+
public AMQConnectionFailureException(String message, Throwable cause)
{
super(null, message, cause);
@@ -44,4 +50,16 @@
{
super(errorCode, message, cause);
}
+
+ public AMQConnectionFailureException(String message, Collection<Exception> exceptions)
+ {
+ // Blah, I hate ? but java won't let super() be anything other than the first thing, sorry...
+ super (null, message, exceptions.isEmpty() ? null : exceptions.iterator().next());
+ this._exceptions = exceptions;
+ }
+
+ public Collection<Exception> getLinkedExceptions()
+ {
+ return _exceptions;
+ }
}