You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/07/29 04:07:22 UTC
svn commit: r680602 - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpidity/nclient/
common/src/main/java/org/apache/qpidity/
common/src/main/java/org/apache/qpidity/transport/
Author: rhs
Date: Mon Jul 28 19:07:20 2008
New Revision: 680602
URL: http://svn.apache.org/viewvc?rev=680602&view=rev
Log:
QPID-1201: fixed up version of aidan's patch, there are still failures when running against an external java broker, however we seem to get past basic connection negotiation now
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolVersionException.java (with props)
Removed:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolException.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=680602&r1=680601&r2=680602&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Jul 28 19:07:20 2008
@@ -36,7 +36,6 @@
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.transport.TransportConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +57,7 @@
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
@@ -364,10 +364,6 @@
}
else
{
- // We always assume that the broker supports the lates AMQ protocol verions
- // thie is currently 0.10
- // TODO: use this code once we have switch to 0.10
- // getDelegate();
_delegate = new AMQConnectionDelegate_0_10(this);
}
@@ -420,21 +416,10 @@
Exception connectionException = null;
while (!_connected && retryAllowed)
{
+ ProtocolVersion pe = null;
try
{
- makeBrokerConnection(brokerDetails);
- }
- catch (AMQProtocolException pe)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info(pe.getMessage());
- _logger.info("Trying broker supported protocol version: " +
- TransportConstants.getVersionMajor() + "." +
- TransportConstants.getVersionMinor());
- }
- // we need to check whether we have a delegate for the supported protocol
- getDelegate();
+ pe = makeBrokerConnection(brokerDetails);
}
catch (Exception e)
{
@@ -447,6 +432,13 @@
connectionException = e;
}
+ if (pe != null)
+ {
+ // reset the delegate to the version returned by the
+ // broker
+ initDelegate(pe);
+ }
+
if (!_connected)
{
retryAllowed = _failoverPolicy.failoverAllowed();
@@ -518,23 +510,41 @@
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
- private void getDelegate() throws AMQProtocolException
+ private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
{
try
{
- Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" +
- TransportConstants.getVersionMajor() + "_" +
- TransportConstants.getVersionMinor());
+ Class c = Class.forName(String.format
+ ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
+ pe.getMajorVersion(), pe.getMinorVersion()));
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
}
- catch (Exception e)
+ catch (ClassNotFoundException e)
+ {
+ throw new AMQProtocolException
+ (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
+ String.format("Protocol: %s.%s is rquired by the broker but is not " +
+ "currently supported by this client library implementation",
+ pe.getMajorVersion(), pe.getMinorVersion()),
+ e);
+ }
+ catch (NoSuchMethodException e)
+ {
+ throw new RuntimeException("unable to locate constructor for delegate", e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new RuntimeException("error instantiating delegate", e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException("error accessing delegate", e);
+ }
+ catch (InvocationTargetException e)
{
- throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
- "Protocol: " + TransportConstants.getVersionMajor() + "."
- + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " +
- "currently supported by this client library implementation", e);
+ throw new RuntimeException("error invoking delegate", e);
}
}
@@ -615,9 +625,9 @@
return false;
}
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
- _delegate.makeBrokerConnection(brokerDetail);
+ return _delegate.makeBrokerConnection(brokerDetail);
}
/**
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=680602&r1=680601&r2=680602&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Mon Jul 28 19:07:20 2008
@@ -27,12 +27,13 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
public interface AMQConnectionDelegate
{
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
public Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=680602&r1=680601&r2=680602&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Mon Jul 28 19:07:20 2008
@@ -9,13 +9,14 @@
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.ProtocolException;
+import org.apache.qpidity.transport.ProtocolVersionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,7 +102,7 @@
* @throws IOException
* @throws AMQException
*/
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
_qpidConnection = Client.createConnection();
try
@@ -117,14 +118,16 @@
_qpidConnection.setClosedListener(this);
_conn._connected = true;
}
- catch(ProtocolException pe)
+ catch(ProtocolVersionException pe)
{
- throw new AMQProtocolException(null, pe.getMessage(), pe);
+ return new ProtocolVersion(pe.getMajor(), pe.getMinor());
}
catch (QpidException e)
{
throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
}
+
+ return null;
}
/**
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=680602&r1=680601&r2=680602&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Mon Jul 28 19:07:20 2008
@@ -43,6 +43,7 @@
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
@@ -79,7 +80,7 @@
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
{
final Set<AMQState> openOrClosedStates =
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
@@ -98,6 +99,8 @@
_conn._failoverPolicy.attainedConnection();
_conn._connected = true;
}
+
+ return null;
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=680602&r1=680601&r2=680602&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java Mon Jul 28 19:07:20 2008
@@ -38,7 +38,6 @@
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.transport.TransportConstants;
public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
@@ -434,23 +433,15 @@
*/
public XAConnection createXAConnection() throws JMSException
{
- if (TransportConstants.getVersionMajor() == 0 &&
- TransportConstants.getVersionMinor() == 8)
+ try
{
- throw new UnsupportedOperationException("This protocol version does not support XA operations");
+ return new XAConnectionImpl(_connectionDetails, _sslConfig);
}
- else
+ catch (Exception e)
{
- try
- {
- return new XAConnectionImpl(_connectionDetails, _sslConfig);
- }
- catch (Exception e)
- {
- JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
- jmse.setLinkedException(e);
- throw jmse;
- }
+ JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=680602&r1=680601&r2=680602&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java Mon Jul 28 19:07:20 2008
@@ -31,7 +31,6 @@
import org.apache.qpid.url.QpidURL;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.ProtocolException;
import org.apache.qpidity.nclient.impl.ClientSession;
import org.apache.qpidity.nclient.impl.ClientSessionDelegate;
import org.apache.qpidity.transport.Channel;
@@ -40,8 +39,8 @@
import org.apache.qpidity.transport.ConnectionClose;
import org.apache.qpidity.transport.ConnectionCloseCode;
import org.apache.qpidity.transport.ConnectionCloseOk;
-import org.apache.qpidity.transport.TransportConstants;
import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.ProtocolVersionException;
import org.apache.qpidity.transport.SessionDelegate;
import org.apache.qpidity.transport.network.io.IoTransport;
import org.apache.qpidity.transport.network.mina.MinaHandler;
@@ -60,6 +59,8 @@
private boolean closed = false;
private long timeout = 60000;
+ private ProtocolHeader header = null;
+
/**
*
* @return returns a new connection to the broker.
@@ -79,7 +80,6 @@
ClientDelegate connectionDelegate = new ClientDelegate()
{
private boolean receivedClose = false;
- private String _unsupportedProtocol;
public SessionDelegate getSessionDelegate()
{
return new ClientSessionDelegate();
@@ -138,28 +138,18 @@
this.receivedClose = true;
}
-
@Override public void init(Channel ch, ProtocolHeader hdr)
{
// TODO: once the merge is done we'll need to update this code
- // for handling 0.8 protocol version type i.e. major=8 and minor=0 :(
- if (hdr.getMajor() != TransportConstants.getVersionMajor()
- || hdr.getMinor() != TransportConstants.getVersionMinor())
+ // for handling 0.8 protocol version type i.e. major=8 and mino
+ if (hdr.getMajor() != 0 || hdr.getMinor() != 10)
{
- _unsupportedProtocol = TransportConstants.getVersionMajor() + "." +
- TransportConstants.getVersionMinor();
- TransportConstants.setVersionMajor( hdr.getMajor() );
- TransportConstants.setVersionMinor( hdr.getMinor() );
+ Client.this.header = hdr;
_lock.lock();
negotiationComplete.signalAll();
_lock.unlock();
}
}
-
- @Override public String getUnsupportedProtocol()
- {
- return _unsupportedProtocol;
- }
};
connectionDelegate.setCondition(_lock,negotiationComplete);
@@ -186,18 +176,15 @@
}
// XXX: hardcoded version numbers
- _conn.send(new ProtocolHeader(1, TransportConstants.getVersionMajor(),
- TransportConstants.getVersionMinor()));
+ _conn.send(new ProtocolHeader(1, 0, 10));
try
{
negotiationComplete.await(timeout, TimeUnit.MILLISECONDS);
- if( connectionDelegate.getUnsupportedProtocol() != null )
+ if (header != null)
{
_conn.close();
- throw new ProtocolException("Unsupported protocol version: " + connectionDelegate.getUnsupportedProtocol()
- , ErrorCode.UNSUPPORTED_PROTOCOL, null);
-
+ throw new ProtocolVersionException(header.getMajor(), header.getMinor());
}
}
catch (InterruptedException e)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java?rev=680602&r1=680601&r2=680602&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java Mon Jul 28 19:07:20 2008
@@ -76,9 +76,7 @@
public void closed() {}
});
conn.send(new ProtocolHeader
- (1,
- TransportConstants.getVersionMajor(),
- TransportConstants.getVersionMinor()));
+ (1, 0, 10));
Channel ch = conn.getChannel(0);
Session ssn = new Session("my-session".getBytes());
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java?rev=680602&r1=680601&r2=680602&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java Mon Jul 28 19:07:20 2008
@@ -31,10 +31,9 @@
public void init(Channel ch, ProtocolHeader hdr)
{
- if (hdr.getMajor() != TransportConstants.getVersionMajor() &&
- hdr.getMinor() != TransportConstants.getVersionMinor())
+ if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
{
- throw new RuntimeException("version missmatch: " + hdr);
+ throw new ProtocolVersionException(hdr.getMajor(), hdr.getMinor());
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java?rev=680602&r1=680601&r2=680602&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java Mon Jul 28 19:07:20 2008
@@ -82,28 +82,12 @@
public void init(Channel ch, ProtocolHeader hdr)
{
- ch.getConnection().send(new ProtocolHeader
- (1,
- TransportConstants.getVersionMajor(),
- TransportConstants.getVersionMinor()));
- if (hdr.getMajor() != TransportConstants.getVersionMajor() &&
- hdr.getMinor() != TransportConstants.getVersionMinor())
- {
- // XXX
- ch.getConnection().send(new ProtocolHeader
- (1,
- TransportConstants.getVersionMajor(),
- TransportConstants.getVersionMinor()));
- ch.getConnection().close();
- }
- else
- {
- List<Object> plain = new ArrayList<Object>();
- plain.add("PLAIN");
- List<Object> utf8 = new ArrayList<Object>();
- utf8.add("utf8");
- ch.connectionStart(null, plain, utf8);
- }
+ ch.getConnection().send(new ProtocolHeader (1, hdr.getMajor(), hdr.getMinor()));
+ List<Object> plain = new ArrayList<Object>();
+ plain.add("PLAIN");
+ List<Object> utf8 = new ArrayList<Object>();
+ utf8.add("utf8");
+ ch.connectionStart(null, plain, utf8);
}
// ----------------------------------------------
@@ -294,8 +278,4 @@
_virtualHost = host;
}
- public String getUnsupportedProtocol()
- {
- return null;
- }
}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolVersionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolVersionException.java?rev=680602&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolVersionException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolVersionException.java Mon Jul 28 19:07:20 2008
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * ProtocolVersionException
+ *
+ */
+
+public final class ProtocolVersionException extends TransportException
+{
+
+ private final byte major;
+ private final byte minor;
+
+ public ProtocolVersionException(byte major, byte minor)
+ {
+ super(String.format("version missmatch: %s-%s", major, minor));
+ this.major = major;
+ this.minor = minor;
+ }
+
+ public byte getMajor()
+ {
+ return this.major;
+ }
+
+ public byte getMinor()
+ {
+ return this.minor;
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolVersionException.java
------------------------------------------------------------------------------
svn:eol-style = native