You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC
svn commit: r686136 [10/17] - in /incubator/qpid/branches/qpid.0-10/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/bin/ broker/etc/ broker...
Modified: incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_cpp?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_cpp (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_cpp Thu Aug 14 20:40:49 2008
@@ -2,7 +2,7 @@
cpp=$CPP/request-response
server_java(){
-java -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Server
+java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Server
}
background "can receive messages" server_java
Modified: incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_python
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_python?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_python (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_python Thu Aug 14 20:40:49 2008
@@ -1,8 +1,8 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-py=$PYTHON/request-response
+py=$PYTHON_EXAMPLES/request-response
server_java(){
-java -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Server
+java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Server
}
background "can receive messages" server_java
Modified: incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_python_java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_python_java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_python_java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_python_java Thu Aug 14 20:40:49 2008
@@ -1,8 +1,8 @@
# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify
-py=$PYTHON/request-response
+py=$PYTHON_EXAMPLES/request-response
client_java(){
-java -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Client
+java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Client
}
background "Request server running" $py/server.py
Modified: incubator/qpid/branches/qpid.0-10/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/pom.xml?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/pom.xml (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/pom.xml Thu Aug 14 20:40:49 2008
@@ -112,7 +112,7 @@
<configuration>
<sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
<outputDirectory>${basedir}/target/generated-sources</outputDirectory>
- <packageName>org.apache.qpidity.filter.selector</packageName>
+ <packageName>org.apache.qpid.filter.selector</packageName>
</configuration>
<goals>
<goal>javacc</goal>
@@ -237,16 +237,16 @@
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<excludePackageNames>
- org.apache.qpid.*:org.apache.qpidity.njms:org.apache.qpidity.njms.*:org.apache.qpidity.nclient.impl
+ org.apache.qpid.*:org.apache.qpid.njms:org.apache.qpid.njms.*:org.apache.qpid.nclient.impl
</excludePackageNames>
<groups>
<group>
<title>API</title>
- <packages>org.apache.qpidity.nclient</packages>
+ <packages>org.apache.qpid.nclient</packages>
</group>
<group>
<title>Utility Package</title>
- <packages>org.apache.qpidity.nclient.util</packages>
+ <packages>org.apache.qpid.nclient.util</packages>
</group>
</groups>
</configuration>
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/grammar/SelectorParser.jj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/grammar/SelectorParser.jj?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/grammar/SelectorParser.jj (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/grammar/SelectorParser.jj Thu Aug 14 20:40:49 2008
@@ -61,20 +61,20 @@
*
*/
-package org.apache.qpidity.filter.selector;
+package org.apache.qpid.filter.selector;
import java.io.StringReader;
import java.util.ArrayList;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.filter.ArithmeticExpression;
-import org.apache.qpidity.filter.BooleanExpression;
-import org.apache.qpidity.filter.ComparisonExpression;
-import org.apache.qpidity.filter.ConstantExpression;
-import org.apache.qpidity.filter.Expression;
-import org.apache.qpidity.filter.LogicExpression;
-import org.apache.qpidity.filter.PropertyExpression;
-import org.apache.qpidity.filter.UnaryExpression;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.filter.ArithmeticExpression;
+import org.apache.qpid.filter.BooleanExpression;
+import org.apache.qpid.filter.ComparisonExpression;
+import org.apache.qpid.filter.ConstantExpression;
+import org.apache.qpid.filter.Expression;
+import org.apache.qpid.filter.LogicExpression;
+import org.apache.qpid.filter.PropertyExpression;
+import org.apache.qpid.filter.UnaryExpression;
/**
* JMS Selector Parser generated by JavaCC
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/client.log4j
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/client.log4j?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/client.log4j (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/client.log4j Thu Aug 14 20:40:49 2008
@@ -21,15 +21,10 @@
#log4j.logger.org.apache.qpid=${amqj.logging.level}, console
#log4j.additivity.org.apache.qpid=false
-#log4j.logger.org.apache.qpidity.transport=TRACE, console
log4j.logger.org.apache.qpid=ERROR, console
log4j.additivity.org.apache.qpid=false
-log4j.logger.org.apache.qpidity=ERROR, console
-log4j.additivity.org.apache.qpidity=false
-
-#log4j.logger.org.apache.qpidity.transport=DEBUG, console
#log4j.logger.org.apache.qpid.client.message.AbstractBytesTypedMessage=DEBUG, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Aug 14 20:40:49 2008
@@ -36,7 +36,6 @@
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.transport.TransportConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +57,9 @@
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
+import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.*;
@@ -78,7 +79,7 @@
public AMQSession get(int channelId)
{
- if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
return _fastAccessSessions[channelId];
}
@@ -91,7 +92,7 @@
public AMQSession put(int channelId, AMQSession session)
{
AMQSession oldVal;
- if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
oldVal = _fastAccessSessions[channelId];
_fastAccessSessions[channelId] = session;
@@ -100,11 +101,11 @@
{
oldVal = _slowAccessSessions.put(channelId, session);
}
- if((oldVal != null) && (session == null))
+ if ((oldVal != null) && (session == null))
{
_size--;
}
- else if((oldVal == null) && (session != null))
+ else if ((oldVal == null) && (session != null))
{
_size++;
}
@@ -113,13 +114,12 @@
}
-
public AMQSession remove(int channelId)
{
AMQSession session;
- if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
- session = _fastAccessSessions[channelId];
+ session = _fastAccessSessions[channelId];
_fastAccessSessions[channelId] = null;
}
else
@@ -127,7 +127,7 @@
session = _slowAccessSessions.remove(channelId);
}
- if(session != null)
+ if (session != null)
{
_size--;
}
@@ -139,9 +139,9 @@
{
ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
- for(int i = 0; i < 16; i++)
+ for (int i = 0; i < 16; i++)
{
- if(_fastAccessSessions[i] != null)
+ if (_fastAccessSessions[i] != null)
{
values.add(_fastAccessSessions[i]);
}
@@ -160,14 +160,13 @@
{
_size = 0;
_slowAccessSessions.clear();
- for(int i = 0; i<16; i++)
+ for (int i = 0; i < 16; i++)
{
_fastAccessSessions[i] = null;
}
}
}
-
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
protected AtomicInteger _idFactory = new AtomicInteger(0);
@@ -209,7 +208,6 @@
/** The virtual path to connect to on the AMQ server */
private String _virtualHost;
-
protected ExceptionListener _exceptionListener;
@@ -232,11 +230,6 @@
protected boolean _connected;
/*
- * The last error code that occured on the connection. Used to return the correct exception to the client
- */
- protected AMQException _lastAMQException = null;
-
- /*
* The connection meta data
*/
private QpidConnectionMetaData _connectionMetaData;
@@ -255,7 +248,7 @@
private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this
protected AMQConnectionDelegate _delegate;
-
+
// this connection maximum number of prefetched messages
private long _maxPrefetch;
@@ -337,20 +330,20 @@
/**
* @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
- * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
+ * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
*/
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
{
- _maxPrefetch = Long.parseLong( connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+ _maxPrefetch = Long.parseLong(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
}
else
{
// use the defaul value set for all connections
_maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
- ClientProperties.MAX_PREFETCH_DEFAULT));
+ ClientProperties.MAX_PREFETCH_DEFAULT));
}
if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
@@ -367,37 +360,13 @@
BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails();
if (brokerDetails.getTransport().equals(BrokerDetails.VM))
{
- _delegate = new AMQConnectionDelegate_0_8(this);
+ _delegate = new AMQConnectionDelegate_8_0(this);
}
else
{
- // We always assume that the broker supports the lates AMQ protocol verions
- // thie is currently 0.10
- // TODO: use this code once we have switch to 0.10
- // getDelegate();
_delegate = new AMQConnectionDelegate_0_10(this);
}
- final ArrayList<JMSException> exceptions = new ArrayList<JMSException>();
-
- class Listener implements ExceptionListener
- {
- public void onException(JMSException e)
- {
- exceptions.add(e);
- }
- }
-
- try
- {
- setExceptionListener(new Listener());
- }
- catch (JMSException e)
- {
- // Shouldn't happen
- throw new AMQException(null, null, e);
- }
-
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
@@ -437,47 +406,40 @@
_temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
}
-
- _protocolHandler = new AMQProtocolHandler(this);
+ _protocolHandler = new AMQProtocolHandler(this);
// We are not currently connected
_connected = false;
- Exception lastException = new Exception();
- lastException.initCause(new ConnectException());
-
- // TMG FIXME this seems... wrong...
boolean retryAllowed = true;
- while (!_connected && retryAllowed )
+ Exception connectionException = null;
+ while (!_connected && retryAllowed)
{
+ ProtocolVersion pe = null;
try
{
- makeBrokerConnection(brokerDetails);
- lastException = null;
- _connected = true;
+ pe = makeBrokerConnection(brokerDetails);
}
- catch (AMQProtocolException pe)
+ catch (Exception e)
{
if (_logger.isInfoEnabled())
{
- _logger.info(pe.getMessage());
- _logger.info("Trying broker supported protocol version: " +
- TransportConstants.getVersionMajor() + "." +
- TransportConstants.getVersionMinor());
+ _logger.info("Unable to connect to broker at " +
+ _failoverPolicy.getCurrentBrokerDetails(),
+ e);
}
- // we need to check whether we have a delegate for the supported protocol
- getDelegate();
+ connectionException = e;
}
- catch (Exception e)
- {
- lastException = e;
- if (_logger.isInfoEnabled())
- {
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
- e.getCause());
- }
- retryAllowed = _failoverPolicy.failoverAllowed();
+ if (pe != null)
+ {
+ // reset the delegate to the version returned by the
+ // broker
+ initDelegate(pe);
+ }
+ else if (!_connected)
+ {
+ retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
@@ -490,39 +452,17 @@
if (!_connected)
{
String message = null;
- try
- {
- Thread.sleep(150);
- }
- catch (InterruptedException e)
- {
- // Eat it, we've hopefully got all the exceptions if this happened
- }
- if (exceptions.size() > 0)
- {
- JMSException e = exceptions.get(0);
- int code = -1;
- try
- {
- code = new Integer(e.getErrorCode()).intValue();
- }
- catch (NumberFormatException nfe)
- {
- // Ignore this, we have some error codes and messages swapped around
- }
- throw new AMQConnectionFailureException(AMQConstant.getConstant(code),
- e.getMessage(), e);
- }
- else if (lastException != null)
+ if (connectionException != null)
{
- if (lastException.getCause() != null)
+ if (connectionException.getCause() != null)
{
- message = lastException.getCause().getMessage();
+ message = connectionException.getCause().getMessage();
+ connectionException.getCause().printStackTrace();
}
else
{
- message = lastException.getMessage();
+ message = connectionException.getMessage();
}
}
@@ -534,27 +474,23 @@
}
else // can only be "" if getMessage() returned it therfore lastException != null
{
- message = "Unable to Connect:" + lastException.getClass();
+ message = "Unable to Connect:" + connectionException.getClass();
}
}
- AMQException e = new AMQConnectionFailureException(message, null);
-
- if (lastException != null)
+ for (Throwable th = connectionException; th != null; th = th.getCause())
{
- if (lastException instanceof UnresolvedAddressException)
+ if (th instanceof UnresolvedAddressException ||
+ th instanceof UnknownHostException)
{
- e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(),
- null);
+ throw new AMQUnresolvedAddressException
+ (message,
+ _failoverPolicy.getCurrentBrokerDetails().toString(),
+ connectionException);
}
-
- if (e.getCause() != null)
- {
- e.initCause(lastException);
- }
}
- throw e;
+ throw new AMQConnectionFailureException(message, connectionException);
}
_connectionMetaData = new QpidConnectionMetaData(this);
@@ -572,23 +508,41 @@
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
- private void getDelegate() throws AMQProtocolException
+ private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
{
try
{
- Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" +
- TransportConstants.getVersionMajor() + "_" +
- TransportConstants.getVersionMinor());
- Class partypes[] = new Class[1];
+ Class c = Class.forName(String.format
+ ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
+ pe.getMajorVersion(), pe.getMinorVersion()));
+ Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
}
- catch (Exception e)
+ catch (ClassNotFoundException e)
+ {
+ throw new AMQProtocolException
+ (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
+ String.format("Protocol: %s.%s is rquired by the broker but is not " +
+ "currently supported by this client library implementation",
+ pe.getMajorVersion(), pe.getMinorVersion()),
+ e);
+ }
+ catch (NoSuchMethodException e)
{
- throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
- "Protocol: " + TransportConstants.getVersionMajor() + "."
- + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " +
- "currently supported by this client library implementation", e);
+ throw new RuntimeException("unable to locate constructor for delegate", e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new RuntimeException("error instantiating delegate", e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException("error accessing delegate", e);
+ }
+ catch (InvocationTargetException e)
+ {
+ throw new RuntimeException("error invoking delegate", e);
}
}
@@ -669,9 +623,9 @@
return false;
}
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
- _delegate.makeBrokerConnection(brokerDetail);
+ return _delegate.makeBrokerConnection(brokerDetail);
}
/**
@@ -879,7 +833,6 @@
}
}
-
}
}
@@ -904,14 +857,14 @@
}
}
- public void close() throws JMSException
+ public void close() throws JMSException
{
close(DEFAULT_TIMEOUT);
}
public void close(long timeout) throws JMSException
{
- close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+ close(new ArrayList<AMQSession>(_sessions.values()), timeout);
}
public void close(List<AMQSession> sessions, long timeout) throws JMSException
@@ -924,12 +877,12 @@
private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
{
- synchronized(_sessionCreationLock)
+ synchronized (_sessionCreationLock)
{
- if(!sessions.isEmpty())
+ if (!sessions.isEmpty())
{
AMQSession session = sessions.remove(0);
- synchronized(session.getMessageDeliveryLock())
+ synchronized (session.getMessageDeliveryLock())
{
doClose(sessions, timeout);
}
@@ -1132,7 +1085,7 @@
{
return _sessions;
}
-
+
public String getUsername()
{
return _username;
@@ -1309,6 +1262,8 @@
if (cause instanceof IOException)
{
closer = !_closed.getAndSet(true);
+
+ _protocolHandler.getProtocolSession().notifyError(je);
}
if (_exceptionListener != null)
@@ -1351,7 +1306,7 @@
{
if (cause instanceof AMQException)
{
- return ((AMQException)cause).isHardError();
+ return ((AMQException) cause).isHardError();
}
return true;
@@ -1367,24 +1322,6 @@
_sessions.remove(channelId);
}
- /**
- * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
- * The caller must hold the failover mutex before calling this method.
- */
- public void resubscribeSesssions() throws JMSException, AMQException, FailoverException
- {
- ArrayList sessions = new ArrayList(_sessions.values());
- _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
- for (Iterator it = sessions.iterator(); it.hasNext();)
- {
- AMQSession s = (AMQSession) it.next();
- // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
- reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
- s.resubscribe();
- s.setFlowControl(true);
- }
- }
-
public String toString()
{
StringBuffer buf = new StringBuffer("AMQConnection:\n");
@@ -1495,7 +1432,7 @@
*/
public long getMaxPrefetch()
{
- return _maxPrefetch;
+ return _maxPrefetch;
}
/**
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Thu Aug 14 20:40:49 2008
@@ -27,12 +27,13 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
public interface AMQConnectionDelegate
{
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
public Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException;
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Aug 14 20:40:49 2008
@@ -9,13 +9,14 @@
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
-import org.apache.qpidity.nclient.Client;
-import org.apache.qpidity.nclient.ClosedListener;
-import org.apache.qpidity.ErrorCode;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.ProtocolException;
+import org.apache.qpid.nclient.Client;
+import org.apache.qpid.nclient.ClosedListener;
+import org.apache.qpid.ErrorCode;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.transport.ProtocolVersionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +35,7 @@
/**
* The QpidConeection instance that is mapped with thie JMS connection.
*/
- org.apache.qpidity.nclient.Connection _qpidConnection;
+ org.apache.qpid.nclient.Connection _qpidConnection;
//--- constructor
public AMQConnectionDelegate_0_10(AMQConnection conn)
@@ -101,7 +102,7 @@
* @throws IOException
* @throws AMQException
*/
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
_qpidConnection = Client.createConnection();
try
@@ -115,15 +116,18 @@
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword());
_qpidConnection.setClosedListener(this);
+ _conn._connected = true;
}
- catch(ProtocolException pe)
+ catch(ProtocolVersionException pe)
{
- throw new AMQProtocolException(null, pe.getMessage(), pe);
+ return new ProtocolVersion(pe.getMajor(), pe.getMinor());
}
catch (QpidException e)
{
throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
}
+
+ return null;
}
/**
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Thu Aug 14 20:40:49 2008
@@ -38,7 +38,6 @@
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.transport.TransportConstants;
public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
@@ -294,14 +293,23 @@
public Connection createConnection(String userName, String password) throws JMSException
{
+ return createConnection(userName, password, null);
+ }
+
+ public Connection createConnection(String userName, String password, String id) throws JMSException
+ {
try
{
if (_connectionDetails != null)
{
_connectionDetails.setUsername(userName);
_connectionDetails.setPassword(password);
-
- if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals(""))
+
+ if (id != null && !id.equals(""))
+ {
+ _connectionDetails.setClientName(id);
+ }
+ else if (_connectionDetails.getClientName() == null || _connectionDetails.getClientName().equals(""))
{
_connectionDetails.setClientName(getUniqueClientID());
}
@@ -309,7 +317,7 @@
}
else
{
- return new AMQConnection(_host, _port, userName, password, getUniqueClientID(), _virtualPath);
+ return new AMQConnection(_host, _port, userName, password, (id != null ? id : getUniqueClientID()), _virtualPath);
}
}
catch (Exception e)
@@ -434,23 +442,15 @@
*/
public XAConnection createXAConnection() throws JMSException
{
- if (TransportConstants.getVersionMajor() == 0 &&
- TransportConstants.getVersionMinor() == 8)
+ try
{
- throw new UnsupportedOperationException("This protocol version does not support XA operations");
+ return new XAConnectionImpl(_connectionDetails, _sslConfig);
}
- else
+ catch (Exception e)
{
- try
- {
- return new XAConnectionImpl(_connectionDetails, _sslConfig);
- }
- catch (Exception e)
- {
- JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
- jmse.setLinkedException(e);
- throw jmse;
- }
+ JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java Thu Aug 14 20:40:49 2008
@@ -289,7 +289,7 @@
public static void main(String[] args) throws URLSyntaxException
{
String url2 =
- "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
+ "amqp://ritchiem:bob@temp/testHost?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
// "amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Aug 14 20:40:49 2008
@@ -65,9 +65,9 @@
private static final int IS_EXCLUSIVE_MASK = 0x2;
private static final int IS_AUTODELETE_MASK = 0x4;
- public static final Integer QUEUE_TYPE = Integer.valueOf(1);
- public static final Integer TOPIC_TYPE = Integer.valueOf(2);
- public static final Integer UNKNOWN_TYPE = Integer.valueOf(3);
+ public static final int QUEUE_TYPE = 1;
+ public static final int TOPIC_TYPE = 2;
+ public static final int UNKNOWN_TYPE = 3;
protected AMQDestination(String url) throws URISyntaxException
{