You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2006/12/22 18:00:31 UTC
svn commit: r489691 [1/2] - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/protocol/
broker/src/main/java/org/apache/qpid...
Author: kpvdr
Date: Fri Dec 22 09:00:28 2006
New Revision: 489691
URL: http://svn.apache.org/viewvc?view=rev&rev=489691
Log:
AMQP version using new generator - Part 1. In these changes, all places where version-specific info is required, it has been hard-wired to major=8, minor=0. The next phase of changes will connect the version info to that obtained from ProtocolInitiation for the current session.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
incubator/qpid/trunk/qpid/java/common/pom.xml
incubator/qpid/trunk/qpid/java/common/protocol-version.xml
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
incubator/qpid/trunk/qpid/java/common/src/main/xsl/cluster.asl
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Fri Dec 22 09:00:28 2006
@@ -81,7 +81,11 @@
public CompositeAMQDataBlock getReturnMessage(int channel)
{
- BasicReturnBody returnBody = new BasicReturnBody();
+ // AMQP version change: All generated *Body classes are now version-aware.
+ // Shortcut: hardwire version to 0-8 (major=8, minor=0) for now.
+ // TODO: Connect the version to that returned by the ProtocolInitiation
+ // for this session.
+ BasicReturnBody returnBody = new BasicReturnBody((byte)8, (byte)0);
returnBody.exchange = _publishBody.exchange;
returnBody.replyCode = getReplyCode();
returnBody.replyText = _message;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -54,7 +54,12 @@
channel.unsubscribeConsumer(protocolSession, body.consumerTag);
if(!body.nowait)
{
- final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), body.consumerTag);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ body.consumerTag); // consumerTag
protocolSession.writeFrame(responseFrame);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -81,7 +81,12 @@
body.arguments, body.noLocal);
if (!body.nowait)
{
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag)); // consumerTag
}
//now allow queue to start async processing of any backlog of messages
@@ -90,16 +95,28 @@
catch (AMQInvalidSelectorException ise)
{
_log.info("Closing connection due to invalid selector");
- session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
- ise.getMessage(), BasicConsumeBody.CLASS_ID,
- BasicConsumeBody.METHOD_ID));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.INVALID_SELECTOR.getCode(), // replyCode
+ ise.getMessage())); // replyText
}
catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
- BasicConsumeBody.CLASS_ID,
- BasicConsumeBody.METHOD_ID));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -64,7 +64,15 @@
protocolSession.closeChannel(evt.getChannelId());
// TODO: modify code gen to make getClazz and getMethod public methods rather than protected
// then we can remove the hardcoded 0,0
- AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), 500, "Unknown exchange name", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ChannelCloseBody.getClazz((byte)8, (byte)0), // classId
+ ChannelCloseBody.getMethod((byte)8, (byte)0), // methodId
+ 500, // replyCode
+ "Unknown exchange name"); // replyText
protocolSession.writeFrame(cf);
}
else
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java Fri Dec 22 09:00:28 2006
@@ -44,6 +44,9 @@
AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
{
session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
- session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0)));
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Fri Dec 22 09:00:28 2006
@@ -55,7 +55,10 @@
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
" and method " + body.methodId);
protocolSession.closeChannel(evt.getChannelId());
- AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Fri Dec 22 09:00:28 2006
@@ -58,6 +58,12 @@
channel.setSuspended(!body.active);
_logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
- AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), body.active);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ body.active); // active
protocolSession.writeFrame(response);
- }}
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Fri Dec 22 09:00:28 2006
@@ -55,7 +55,10 @@
final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(),
exchangeRegistry);
protocolSession.addChannel(channel);
- AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -62,7 +62,10 @@
{
_logger.error("Error closing protocol session: " + e, e);
}
- final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -64,7 +64,12 @@
contextKey = generateClientID();
}
protocolSession.setContextKey(contextKey);
- AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, contextKey);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ contextKey); // knownHosts
stateManager.changeState(AMQState.CONNECTION_OPEN);
protocolSession.writeFrame(response);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -75,25 +75,43 @@
// throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
_logger.info("Authentication failed");
stateManager.changeState(AMQState.CONNECTION_CLOSING);
- AMQFrame close = ConnectionCloseBody.createAMQFrame(0, AMQConstant.NOT_ALLOWED.getCode(),
- AMQConstant.NOT_ALLOWED.getName(),
- ConnectionCloseBody.CLASS_ID,
- ConnectionCloseBody.METHOD_ID);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ConnectionCloseBody.getClazz((byte)8, (byte)0), // classId
+ ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName()); // replyText
protocolSession.writeFrame(close);
disposeSaslServer(protocolSession);
break;
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE,
- ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
- HeartbeatConfig.getInstance().getDelay());
+ // TODO: Check the value of channelMax here: This should be the max
+ // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
+ // not Integer.MAX_VALUE (which is signed 4 bytes).
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ Integer.MAX_VALUE, // channelMax
+ ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
+ HeartbeatConfig.getInstance().getDelay()); // heartbeat
protocolSession.writeFrame(tune);
disposeSaslServer(protocolSession);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ authResult.challenge); // challenge
protocolSession.writeFrame(challenge);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -92,13 +92,24 @@
_logger.info("Connected as: " + ss.getAuthorizationID());
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
- HeartbeatConfig.getInstance().getDelay());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ Integer.MAX_VALUE, // channelMax
+ getConfiguredFrameSize(), // frameMax
+ HeartbeatConfig.getInstance().getDelay()); // heartbeat
protocolSession.writeFrame(tune);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ authResult.challenge); // challenge
protocolSession.writeFrame(challenge);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Fri Dec 22 09:00:28 2006
@@ -64,6 +64,11 @@
ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ byte major = (byte)8;
+ byte minor = (byte)0;
+
ExchangeBoundBody body = evt.getMethod();
String exchangeName = body.exchange;
@@ -77,8 +82,11 @@
AMQFrame response;
if (exchange == null)
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND,
- "Exchange " + exchangeName + " not found");
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ EXCHANGE_NOT_FOUND, // replyCode
+ "Exchange " + exchangeName + " not found"); // replyText
}
else if (routingKey == null)
{
@@ -86,11 +94,19 @@
{
if (exchange.hasBindings())
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ NO_BINDINGS, // replyCode
+ null); // replyText
}
}
else
@@ -98,20 +114,29 @@
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
- "Queue " + queueName + " not found");
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_FOUND, // replyCode
+ "Queue " + queueName + " not found"); // replyText
}
else
{
if (exchange.isBound(queue))
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND,
- "Queue " + queueName + " not bound to exchange " +
- exchangeName);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_BOUND, // replyCode
+ "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText
}
}
}
@@ -121,24 +146,30 @@
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
- "Queue " + queueName + " not found");
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_FOUND, // replyCode
+ "Queue " + queueName + " not found"); // replyText
}
else
{
if (exchange.isBound(body.routingKey, queue))
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
- null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,
- "Queue " + queueName +
- " not bound with routing key " +
- body.routingKey + " to exchange " +
- exchangeName);
+ major, minor, // AMQP version (major, minor)
+ SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+ "Queue " + queueName + " not bound with routing key " +
+ body.routingKey + " to exchange " + exchangeName); // replyText
}
}
}
@@ -146,16 +177,20 @@
{
if (exchange.isBound(body.routingKey))
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
- null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- NO_QUEUE_BOUND_WITH_RK,
- "No queue bound with routing key " +
- body.routingKey + " to exchange " +
- exchangeName);
+ major, minor, // AMQP version (major, minor)
+ NO_QUEUE_BOUND_WITH_RK, // replyCode
+ "No queue bound with routing key " + body.routingKey +
+ " to exchange " + exchangeName); // replyText
}
}
protocolSession.writeFrame(response);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Fri Dec 22 09:00:28 2006
@@ -75,7 +75,10 @@
}
if(!body.nowait)
{
- AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Fri Dec 22 09:00:28 2006
@@ -53,7 +53,10 @@
try
{
exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
- AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
catch (ExchangeInUseException e)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Fri Dec 22 09:00:28 2006
@@ -90,7 +90,10 @@
}
if (!body.nowait)
{
- final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri Dec 22 09:00:28 2006
@@ -102,7 +102,14 @@
}
if (!body.nowait)
{
- AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), body.queue, 0L, 0L);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0L, // consumerCount
+ 0L, // messageCount
+ body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
protocolSession.writeFrame(response);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Fri Dec 22 09:00:28 2006
@@ -81,7 +81,12 @@
{
int purged = queue.delete(body.ifUnused, body.ifEmpty);
_store.removeQueue(queue.getName());
- session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), purged));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ purged)); // messageCount
}
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Fri Dec 22 09:00:28 2006
@@ -52,7 +52,10 @@
try{
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.commit();
- protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
channel.processReturns(protocolSession);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Fri Dec 22 09:00:28 2006
@@ -51,7 +51,10 @@
try{
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.rollback();
- protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
channel.resend(protocolSession);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Fri Dec 22 09:00:28 2006
@@ -48,6 +48,9 @@
AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
- protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Dec 22 09:00:28 2006
@@ -165,8 +165,17 @@
_minor = pi.protocolMinor;
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
- mechanisms.getBytes(), locales.getBytes());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short)8, // versionMajor
+ (short)0 // versionMinor
+ );
_minaProtocolSession.write(response);
}
catch (AMQException e)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Fri Dec 22 09:00:28 2006
@@ -168,11 +168,20 @@
}
else if(throwable instanceof IOException)
{
- _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
+ _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
}
else
{
- protocolSession.write(ConnectionCloseBody.createAMQFrame(0, 200, throwable.getMessage(), 0, 0));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ 200, // replyCode
+ throwable.getMessage() // replyText
+ ));
_logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
protocolSession.close();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Fri Dec 22 09:00:28 2006
@@ -193,8 +193,16 @@
public void closeConnection() throws JMException
{
- final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(),
- "Broker Management Console has closing the connection.", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "Broker Management Console has closing the connection." // replyText
+ );
_session.writeFrame(response);
try
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Dec 22 09:00:28 2006
@@ -157,10 +157,20 @@
public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
{
+
AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
- allFrames[0] = BasicDeliverBody.createAMQFrame(channel, consumerTag, deliveryTag, _redelivered,
- getExchangeName(), getRoutingKey());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ getExchangeName(), // exchange
+ _redelivered, // redelivered
+ getRoutingKey() // routingKey
+ );
allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
for (int i = 2; i < allFrames.length; i++)
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Dec 22 09:00:28 2006
@@ -379,7 +379,13 @@
if (!_closed)
{
_logger.info("Closing autoclose subscription:" + this);
- protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag // consumerTag
+ ));
_closed = true;
}
}
@@ -392,9 +398,17 @@
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
- deliveryTag, false, exchange,
- routingKey);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ exchange, // exchange
+ false, // redelivered
+ routingKey // routingKey
+ );
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
deliverFrame.writePayload(buf);
buf.flip();
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Dec 22 09:00:28 2006
@@ -465,12 +465,25 @@
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+ ChannelOpenBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null), // outOfBand
+ ChannelOpenOkBody.class);
//todo send low water mark when protocol allows.
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false),
+ BasicQosBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false, // global
+ prefetchHigh, // prefetchCount
+ 0), // prefetchSize
BasicQosOkBody.class);
if (transacted)
@@ -479,7 +492,10 @@
{
_logger.debug("Issuing TxSelect for " + channelId);
}
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Dec 22 09:00:28 2006
@@ -477,7 +477,10 @@
}
// Commits outstanding messages sent and outstanding acknowledgements.
- _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -492,8 +495,11 @@
checkTransacted();
try
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class);
+ TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class);
}
catch (AMQException e)
{
@@ -516,8 +522,15 @@
try
{
_connection.getProtocolHandler().closeSession(this);
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(
- getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "JMS client closing channel"); // replyText
_connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -707,7 +720,12 @@
{
consumer.clearUnackedMessages();
}
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -1039,7 +1057,20 @@
public void declareExchangeSynch(String name, String type) throws AMQException
{
- AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, false, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ false, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
_connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
@@ -1050,7 +1081,20 @@
private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
{
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
protocolHandler.writeFrame(exchangeDeclare);
}
@@ -1072,9 +1116,19 @@
amqd.setQueueName(protocolHandler.generateQueueName());
}
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(),
- false, amqd.isDurable(), amqd.isExclusive(),
- amqd.isAutoDelete(), true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ true, // nowait
+ false, // passive
+ amqd.getQueueName(), // queue
+ 0); // ticket
protocolHandler.writeFrame(queueDeclare);
return amqd.getQueueName();
@@ -1082,9 +1136,17 @@
private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
{
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0,
- queueName, amqd.getExchangeName(),
- amqd.getRoutingKey(), true, ft);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ft, // arguments
+ amqd.getExchangeName(), // exchange
+ true, // nowait
+ queueName, // queue
+ amqd.getRoutingKey(), // routingKey
+ 0); // ticket
protocolHandler.writeFrame(queueBind);
}
@@ -1122,10 +1184,19 @@
try
{
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
- queueName, tag, consumer.isNoLocal(),
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
- consumer.isExclusive(), nowait, arguments);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ arguments, // arguments
+ tag, // consumerTag
+ consumer.isExclusive(), // exclusive
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+ consumer.isNoLocal(), // noLocal
+ nowait, // nowait
+ queueName, // queue
+ 0); // ticket
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1302,8 +1373,16 @@
{
try
{
- AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false,
- false, true);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false, // ifEmpty
+ false, // ifUnused
+ true, // nowait
+ queueName, // queue
+ 0); // ticket
_connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
catch (AMQException e)
@@ -1389,8 +1468,14 @@
boolean isQueueBound(String queueName, String routingKey) throws JMSException
{
- AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- routingKey, queueName);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
+ queueName, // queue
+ routingKey); // routingKey
AMQMethodEvent response = null;
try
{
@@ -1447,7 +1532,13 @@
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ deliveryTag, // deliveryTag
+ multiple); // multiple
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
@@ -1606,14 +1697,24 @@
private void suspendChannel()
{
_logger.warn("Suspending channel");
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
private void unsuspendChannel()
{
_logger.warn("Unsuspending channel");
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ true); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Dec 22 09:00:28 2006
@@ -448,7 +448,13 @@
{
if(sendClose)
{
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ _consumerTag, // consumerTag
+ false); // nowait
try
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Fri Dec 22 09:00:28 2006
@@ -134,9 +134,20 @@
{
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
- AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
- destination.getExchangeClass(), false,
- false, false, false, true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ destination.getExchangeClass()); // type
_protocolHandler.writeFrame(declare);
}
@@ -512,8 +523,16 @@
AbstractJMSMessage message = convertToNativeMessage(origMessage);
message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
- AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
- destination.getRoutingKey(), mandatory, immediate);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ destination.getExchangeName(), // exchange
+ immediate, // immediate
+ mandatory, // mandatory
+ destination.getRoutingKey(), // routingKey
+ 0); // ticket
long currentTime = 0;
if (!_disableTimestamps)
@@ -555,7 +574,9 @@
}
// weight argument of zero indicates no child content headers, just bodies
- AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0,
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)8, (byte)0), 0,
contentHeaderProperties,
size);
if (_logger.isDebugEnabled())
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -57,7 +57,10 @@
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
evt.getProtocolSession().writeFrame(frame);
if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -59,7 +59,10 @@
String reason = method.replyText;
// TODO: check whether channel id of zero is appropriate
- evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)8, (byte)0));
if (errorCode != 200)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -54,7 +54,12 @@
{
// Evaluate server challenge
byte[] response = client.evaluateChallenge(body.challenge);
- AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ response); // response
evt.getProtocolSession().writeFrame(responseFrame);
}
catch (SaslException e)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=489691&r1=489690&r2=489691
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Fri Dec 22 09:00:28 2006
@@ -126,8 +126,15 @@
clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
- ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
- saslResponse, selectedLocale));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ clientProperties, // clientProperties
+ selectedLocale, // locale
+ mechanism, // mechanism
+ saslResponse)); // response
}
catch (UnsupportedEncodingException e)
{