You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/11/26 02:41:32 UTC
svn commit: r598105 - in /incubator/qpid/branches/M2.1:
gentools/src/org/apache/qpid/gentools/
java/broker/src/main/java/org/apache/qpid/server/exchange/
java/broker/src/main/java/org/apache/qpid/server/handler/
java/client/src/main/java/org/apache/qpi...
Author: rgodfrey
Date: Sun Nov 25 17:41:31 2007
New Revision: 598105
URL: http://svn.apache.org/viewvc?rev=598105&view=rev
Log:
QPID-567 : Add mutliversion support to Qpid/Java, fixed client support when server returns Protocol header.
Added QueueUnbind
Added ability to select protocol version in ConnectionURL or with -Dorg.apache.qpid.amqp_version
Added:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
Modified:
incubator/qpid/branches/M2.1/gentools/src/org/apache/qpid/gentools/Generator.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
incubator/qpid/branches/M2.1/java/common/templates/model/ProtocolVersionListClass.vm
Modified: incubator/qpid/branches/M2.1/gentools/src/org/apache/qpid/gentools/Generator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/gentools/src/org/apache/qpid/gentools/Generator.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/gentools/src/org/apache/qpid/gentools/Generator.java (original)
+++ incubator/qpid/branches/M2.1/gentools/src/org/apache/qpid/gentools/Generator.java Sun Nov 25 17:41:31 2007
@@ -479,7 +479,11 @@
Template velocityTemplate = Velocity.getTemplate(template.getName());
velocityTemplate.merge(context, sw);
String filename = String.valueOf(context.get("filename"));
- FileWriter outputFileWriter = new FileWriter(getOutputDirectory() + Utils.FILE_SEPARATOR + filename);
+
+ File outputFile = new File(getOutputDirectory() + Utils.FILE_SEPARATOR + filename);
+ outputFile.getParentFile().mkdirs();
+ FileWriter outputFileWriter = new FileWriter(outputFile);
+
outputFileWriter.append(sw.toString());
outputFileWriter.close();
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Sun Nov 25 17:41:31 2007
@@ -34,6 +34,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -150,7 +151,7 @@
if (!_index.remove(routingKey, queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
+ throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() +
" with routing key " + routingKey + ". No queue was registered with that _routing key");
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Sun Nov 25 17:41:31 2007
@@ -22,6 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -277,7 +278,7 @@
List<AMQQueue> queues = _routingKey2queues.get(routingKey);
if (queues == null)
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName()
+ throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
+ " with routing key " + routingKey + ". No queue was registered with that _routing key");
}
@@ -285,7 +286,7 @@
boolean removedQ = queues.remove(queue);
if (!removedQ)
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName()
+ throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
+ " with routing key " + routingKey);
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Sun Nov 25 17:41:31 2007
@@ -22,6 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -146,7 +147,7 @@
if (!_queues.remove(queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + ". ");
+ throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + ". ");
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Sun Nov 25 17:41:31 2007
@@ -22,6 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.AMQTypedValue;
@@ -200,7 +201,11 @@
public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
_logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName());
- _bindings.remove(new Registration(new HeadersBinding(args), queue));
+ if(!_bindings.remove(new Registration(new HeadersBinding(args), queue)))
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
+ + " with headers args " + args);
+ }
}
public void route(AMQMessage payload) throws AMQException
Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=598105&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Sun Nov 25 17:41:31 2007
@@ -0,0 +1,110 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
+{
+ private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
+
+ private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
+
+ public static QueueUnbindHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueUnbindHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+ if (body.getQueue() == null)
+ {
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
+ }
+
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+
+ }
+ else
+ {
+ queue = queueRegistry.getQueue(body.getQueue());
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+ }
+
+ if (queue == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ }
+ final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+ }
+
+
+ try
+ {
+ queue.unBind(routingKey, body.getArguments(), exch);
+ }
+ catch (AMQInvalidRoutingKeyException rke)
+ {
+ throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
+ }
+ catch (AMQException e)
+ {
+ if(e.getErrorCode() == AMQConstant.NOT_FOUND)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e);
+ }
+ throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
+ }
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+
+ }
+}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java Sun Nov 25 17:41:31 2007
@@ -36,6 +36,9 @@
private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
BasicRecoverSyncMethodHandler.getInstance();
+ private static final QueueUnbindHandler _queueUnbindHandler =
+ QueueUnbindHandler.getInstance();
+
public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
{
@@ -155,6 +158,7 @@
public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
{
- return false;
+ _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+ return true;
}
}
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Sun Nov 25 17:41:31 2007
@@ -32,13 +32,7 @@
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.Connection;
@@ -161,6 +155,7 @@
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
private static final long DEFAULT_TIMEOUT = 1000 * 30;
+ private ProtocolVersion _protocolVersion;
/**
* @param broker brokerdetails
@@ -253,6 +248,9 @@
_clientName = connectionURL.getClientName();
_username = connectionURL.getUsername();
_password = connectionURL.getPassword();
+
+ _protocolVersion = connectionURL.getProtocolVersion();
+
setVirtualHost(connectionURL.getVirtualHost());
if (connectionURL.getDefaultQueueExchangeName() != null)
@@ -393,16 +391,24 @@
private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
+ final Set<AMQState> openOrClosedStates =
+ EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
try
{
TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
- _protocolHandler.attainState(AMQState.CONNECTION_OPEN);
- _failoverPolicy.attainedConnection();
- // Again this should be changed to a suitable notify
- _connected = true;
+ //_protocolHandler.attainState(AMQState.CONNECTION_OPEN);
+ AMQState state = _protocolHandler.attainState(openOrClosedStates);
+ if(state == AMQState.CONNECTION_OPEN)
+ {
+
+ _failoverPolicy.attainedConnection();
+
+ // Again this should be changed to a suitable notify
+ _connected = true;
+ }
}
catch (AMQException e)
{
@@ -1285,4 +1291,16 @@
{
return _sessions.get(channelId);
}
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
+
+ public void setProtocolVersion(ProtocolVersion protocolVersion)
+ {
+ _protocolVersion = protocolVersion;
+ _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
+ }
+
}
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java Sun Nov 25 17:41:31 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.URLHelper;
@@ -52,6 +53,7 @@
private AMQShortString _defaultTopicExchangeName;
private AMQShortString _temporaryTopicExchangeName;
private AMQShortString _temporaryQueueExchangeName;
+ private ProtocolVersion _protocolVersion = ProtocolVersion.defaultProtocolVersion();
public AMQConnectionURL(String fullURL) throws URLSyntaxException
{
@@ -255,6 +257,15 @@
{
_temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE));
}
+ if(_options.containsKey(OPTIONS_PROTOCOL_VERSION))
+ {
+ ProtocolVersion pv = ProtocolVersion.parse(_options.get(OPTIONS_PROTOCOL_VERSION));
+ if(pv != null)
+ {
+ _protocolVersion = pv;
+ }
+ }
+
}
public String getURL()
@@ -375,6 +386,11 @@
public AMQShortString getTemporaryTopicExchangeName()
{
return _temporaryTopicExchangeName;
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
}
public String toString()
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Sun Nov 25 17:41:31 2007
@@ -52,6 +52,7 @@
import org.slf4j.LoggerFactory;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
@@ -387,94 +388,109 @@
public void messageReceived(IoSession session, Object message) throws Exception
{
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
-
- if (debug && ((msgNumber % 1000) == 0))
+ if(message instanceof AMQFrame)
{
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
- AMQFrame frame = (AMQFrame) message;
+ if (debug && ((msgNumber % 1000) == 0))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
- final AMQBody bodyFrame = frame.getBodyFrame();
+ AMQFrame frame = (AMQFrame) message;
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+ final AMQBody bodyFrame = frame.getBodyFrame();
- switch (bodyFrame.getFrameType())
- {
- case AMQMethodBody.TYPE:
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
- }
+ switch (bodyFrame.getFrameType())
+ {
+ case AMQMethodBody.TYPE:
- final AMQMethodEvent<AMQMethodBody> evt =
- new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+ if (debug)
+ {
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
+ }
- try
- {
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
+ try
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
}
- }
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners);
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
+ + _frameListeners);
+ }
}
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
+ catch (AMQException e)
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ getStateManager().error(e);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
}
+
+ exceptionCaught(session, e);
}
- exceptionCaught(session, e);
- }
+ break;
- break;
+ case ContentHeaderBody.TYPE:
- case ContentHeaderBody.TYPE:
+ _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
+ break;
- _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
- break;
+ case ContentBody.TYPE:
- case ContentBody.TYPE:
+ _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
+ break;
- _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
- break;
+ case HeartbeatBody.TYPE:
- case HeartbeatBody.TYPE:
+ if (debug)
+ {
+ _logger.debug("Received heartbeat");
+ }
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
+ break;
- break;
+ default:
- default:
+ }
+ _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ ProtocolVersion pv = protocolInit.checkVersion();
+ getConnection().setProtocolVersion(pv);
- _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
}
private static int _messagesOut;
@@ -514,6 +530,12 @@
{
getStateManager().attainState(s);
}
+
+ public AMQState attainState(Set<AMQState> states) throws AMQException
+ {
+ return getStateManager().attainState(states);
+ }
+
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Sun Nov 25 17:41:31 2007
@@ -121,7 +121,7 @@
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = stateManager;
_stateManager.setProtocolSession(this);
- _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
+ _protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
stateManager);
_connection = connection;
@@ -133,7 +133,7 @@
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+ _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
}
public String getClientID()
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java Sun Nov 25 17:41:31 2007
@@ -24,8 +24,22 @@
* States used in the AMQ protocol. Used by the finite state machine to determine
* valid responses.
*/
-public class AMQState
+public enum AMQState
{
+
+ CONNECTION_NOT_STARTED(1, "CONNECTION_NOT_STARTED"),
+
+ CONNECTION_NOT_TUNED(2, "CONNECTION_NOT_TUNED"),
+
+ CONNECTION_NOT_OPENED(3, "CONNECTION_NOT_OPENED"),
+
+ CONNECTION_OPEN(4, "CONNECTION_OPEN"),
+
+ CONNECTION_CLOSING(5, "CONNECTION_CLOSING"),
+
+ CONNECTION_CLOSED(6, "CONNECTION_CLOSED");
+
+
private final int _id;
private final String _name;
@@ -41,16 +55,6 @@
return "AMQState: id = " + _id + " name: " + _name;
}
- public static final AMQState CONNECTION_NOT_STARTED = new AMQState(1, "CONNECTION_NOT_STARTED");
-
- public static final AMQState CONNECTION_NOT_TUNED = new AMQState(2, "CONNECTION_NOT_TUNED");
-
- public static final AMQState CONNECTION_NOT_OPENED = new AMQState(3, "CONNECTION_NOT_OPENED");
-
- public static final AMQState CONNECTION_OPEN = new AMQState(4, "CONNECTION_OPEN");
-
- public static final AMQState CONNECTION_CLOSING = new AMQState(5, "CONNECTION_CLOSING");
-
- public static final AMQState CONNECTION_CLOSED = new AMQState(6, "CONNECTION_CLOSED");
-
+
+
}
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Sun Nov 25 17:41:31 2007
@@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
@@ -164,5 +165,42 @@
public MethodRegistry getMethodRegistry()
{
return getProtocolSession().getMethodRegistry();
+ }
+
+ public AMQState attainState(Set<AMQState> stateSet) throws AMQException
+ {
+ synchronized (_stateLock)
+ {
+ final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
+ long waitTime = MAXIMUM_STATE_WAIT_TIME;
+
+ while (!stateSet.contains(_currentState) && (waitTime > 0))
+ {
+ try
+ {
+ _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Thread interrupted");
+ }
+
+ if (!stateSet.contains(_currentState))
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
+
+ if (!stateSet.contains(_currentState))
+ {
+ _logger.warn("State not achieved within permitted time. Current state " + _currentState
+ + ", desired state: " + stateSet);
+ throw new AMQException("State not achieved within permitted time. Current state " + _currentState
+ + ", desired state: " + stateSet);
+ }
+ return _currentState;
+ }
+
+
}
}
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Sun Nov 25 17:41:31 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.jms;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
import java.util.List;
@@ -41,6 +42,7 @@
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
+ public static final String OPTIONS_PROTOCOL_VERSION = "protocolVersion";
String getURL();
@@ -83,4 +85,6 @@
AMQShortString getTemporaryQueueExchangeName();
AMQShortString getTemporaryTopicExchangeName();
+
+ ProtocolVersion getProtocolVersion();
}
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Sun Nov 25 17:41:31 2007
@@ -56,6 +56,7 @@
/** Flag to indicate whether this decoder needs to handle protocol initiation. */
private boolean _expectProtocolInitiation;
+ private boolean firstDecode = true;
/**
* Creates a new AMQP decoder.
@@ -81,14 +82,24 @@
*/
protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
- if (_expectProtocolInitiation)
+
+ boolean decoded;
+ if (_expectProtocolInitiation
+ || (firstDecode
+ && (in.remaining() > 0)
+ && (in.get(in.position()) == (byte)'A')))
{
- return doDecodePI(session, in, out);
+ decoded = doDecodePI(session, in, out);
}
else
{
- return doDecodeDataBlock(session, in, out);
+ decoded = doDecodeDataBlock(session, in, out);
+ }
+ if(firstDecode && decoded)
+ {
+ firstDecode = false;
}
+ return decoded;
}
/**
Modified: incubator/qpid/branches/M2.1/java/common/templates/model/ProtocolVersionListClass.vm
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/templates/model/ProtocolVersionListClass.vm?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/templates/model/ProtocolVersionListClass.vm (original)
+++ incubator/qpid/branches/M2.1/java/common/templates/model/ProtocolVersionListClass.vm Sun Nov 25 17:41:31 2007
@@ -33,6 +33,8 @@
import java.util.SortedSet;
import java.util.Collections;
import java.util.TreeSet;
+import java.util.Map;
+import java.util.HashMap;
public class ProtocolVersion implements Comparable
@@ -124,6 +126,9 @@
}
private static final SortedSet<ProtocolVersion> _supportedVersions;
+ private static final Map<String, ProtocolVersion> _nameToVersionMap =
+ new HashMap<String, ProtocolVersion>();
+ private static final ProtocolVersion _defaultVersion;
#foreach( $version in $model.getVersionSet() )
@@ -138,8 +143,17 @@
#foreach( $version in $model.getVersionSet() )
#set( $versionId = "v$version.getMajor()_$version.getMinor()" )
versions.add($versionId);
+ _nameToVersionMap.put("${version.getMajor()}-${version.getMinor()}", $versionId);
#end
_supportedVersions = Collections.unmodifiableSortedSet(versions);
+
+
+ ProtocolVersion systemDefinedVersion =
+ _nameToVersionMap.get(System.getProperty("org.apache.qpid.amqp_version"));
+
+ _defaultVersion = (systemDefinedVersion == null)
+ ? getLatestSupportedVersion()
+ : systemDefinedVersion;
}
@@ -149,7 +163,16 @@
}
+
+ public static ProtocolVersion parse(String name)
+ {
+ return _nameToVersionMap.get(name);
+ }
+ public static ProtocolVersion defaultProtocolVersion()
+ {
+ return _defaultVersion;
+ }
}