You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/16 16:16:55 UTC
svn commit: r496725 [4/11] - in /incubator/qpid/branches/perftesting/qpid:
gentools/ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/
gentools/templ.java/ java/ java/broker/ java/broker/etc/
java/broker/src/main/java/org/apache/qpid/server/ j...
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jan 16 07:16:39 2007
@@ -477,7 +477,10 @@
}
// Commits outstanding messages sent and outstanding acknowledgements.
- _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -492,8 +495,11 @@
checkTransacted();
try
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class);
+ TxRollbackBody.createAMQFrame(_channelId, (byte)8, (byte)0), TxRollbackOkBody.class);
}
catch (AMQException e)
{
@@ -516,8 +522,15 @@
try
{
_connection.getProtocolHandler().closeSession(this);
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(
- getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "JMS client closing channel"); // replyText
_connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -707,7 +720,12 @@
{
consumer.clearUnackedMessages();
}
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -968,7 +986,7 @@
// ft.put("headers", rawSelector.getDataAsBytes());
if (rawSelector != null)
{
- ft.putAll(rawSelector);
+ ft.addAll(rawSelector);
}
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
_messageFactoryRegistry, AMQSession.this,
@@ -1039,7 +1057,20 @@
public void declareExchangeSynch(String name, String type) throws AMQException
{
- AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, false, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ false, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
_connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
@@ -1050,7 +1081,20 @@
private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
{
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ type); // type
protocolHandler.writeFrame(exchangeDeclare);
}
@@ -1072,9 +1116,19 @@
amqd.setQueueName(protocolHandler.generateQueueName());
}
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(),
- false, amqd.isDurable(), amqd.isExclusive(),
- amqd.isAutoDelete(), true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ true, // nowait
+ false, // passive
+ amqd.getQueueName(), // queue
+ 0); // ticket
protocolHandler.writeFrame(queueDeclare);
return amqd.getQueueName();
@@ -1082,9 +1136,17 @@
private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
{
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0,
- queueName, amqd.getExchangeName(),
- amqd.getRoutingKey(), true, ft);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ft, // arguments
+ amqd.getExchangeName(), // exchange
+ true, // nowait
+ queueName, // queue
+ amqd.getRoutingKey(), // routingKey
+ 0); // ticket
protocolHandler.writeFrame(queueBind);
}
@@ -1122,10 +1184,19 @@
try
{
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
- queueName, tag, consumer.isNoLocal(),
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
- consumer.isExclusive(), nowait, arguments);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ arguments, // arguments
+ tag, // consumerTag
+ consumer.isExclusive(), // exclusive
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+ consumer.isNoLocal(), // noLocal
+ nowait, // nowait
+ queueName, // queue
+ 0); // ticket
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1302,8 +1373,16 @@
{
try
{
- AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, 0, queueName, false,
- false, true);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false, // ifEmpty
+ false, // ifUnused
+ true, // nowait
+ queueName, // queue
+ 0); // ticket
_connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
catch (AMQException e)
@@ -1389,8 +1468,14 @@
boolean isQueueBound(String queueName, String routingKey) throws JMSException
{
- AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
- routingKey, queueName);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
+ queueName, // queue
+ routingKey); // routingKey
AMQMethodEvent response = null;
try
{
@@ -1447,7 +1532,13 @@
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ deliveryTag, // deliveryTag
+ multiple); // multiple
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
@@ -1606,14 +1697,24 @@
private void suspendChannel()
{
_logger.warn("Suspending channel");
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
private void unsuspendChannel()
{
_logger.warn("Unsuspending channel");
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ true); // active
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Jan 16 07:16:39 2007
@@ -448,7 +448,13 @@
{
if(sendClose)
{
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ _consumerTag, // consumerTag
+ false); // nowait
try
{
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Jan 16 07:16:39 2007
@@ -134,9 +134,20 @@
{
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
- AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
- destination.getExchangeClass(), false,
- false, false, false, true, null);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ 0, // ticket
+ destination.getExchangeClass()); // type
_protocolHandler.writeFrame(declare);
}
@@ -512,8 +523,16 @@
AbstractJMSMessage message = convertToNativeMessage(origMessage);
message.getJmsContentHeaderProperties().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
- AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
- destination.getRoutingKey(), mandatory, immediate);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ destination.getExchangeName(), // exchange
+ immediate, // immediate
+ mandatory, // mandatory
+ destination.getRoutingKey(), // routingKey
+ 0); // ticket
long currentTime = 0;
if (!_disableTimestamps)
@@ -555,7 +574,9 @@
}
// weight argument of zero indicates no child content headers, just bodies
- AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.CLASS_ID, 0,
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)8, (byte)0), 0,
contentHeaderProperties,
size);
if (_logger.isDebugEnabled())
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Tue Jan 16 07:16:39 2007
@@ -22,10 +22,9 @@
import org.apache.qpid.common.QpidProperties;
-import java.util.Enumeration;
-
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
+import java.util.Enumeration;
public class QpidConnectionMetaData implements ConnectionMetaData
{
@@ -90,7 +89,7 @@
public String getClientVersion()
{
- return QpidProperties.getBuildVerision();
+ return QpidProperties.getBuildVersion();
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java Tue Jan 16 07:16:39 2007
@@ -132,10 +132,6 @@
throw new javax.jms.IllegalStateException("Publisher is closed");
}
- if(queue == null){
- throw new UnsupportedOperationException("Queue is null");
- }
-
AMQSession session = ((BasicMessageProducer) _delegate).getSession();
if(session == null || session.isClosed()){
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -57,7 +57,10 @@
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
evt.getProtocolSession().writeFrame(frame);
if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
{
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -59,7 +59,10 @@
String reason = method.replyText;
// TODO: check whether channel id of zero is appropriate
- evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)8, (byte)0));
if (errorCode != 200)
{
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -54,7 +54,12 @@
{
// Evaluate server challenge
byte[] response = client.evaluateChallenge(body.challenge);
- AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), response);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ response); // response
evt.getProtocolSession().writeFrame(responseFrame);
}
catch (SaslException e)
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -124,10 +124,17 @@
clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
- clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVerision());
+ clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
- ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
- saslResponse, selectedLocale));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ clientProperties, // clientProperties
+ selectedLocale, // locale
+ mechanism, // mechanism
+ saslResponse)); // response
}
catch (UnsupportedEncodingException e)
{
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -72,11 +72,25 @@
protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
{
- return ConnectionOpenBody.createAMQFrame(channel, path, capabilities, insist);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ return ConnectionOpenBody.createAMQFrame(channel,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ capabilities, // capabilities
+ insist, // insist
+ path); // virtualHost
}
protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params)
{
- return ConnectionTuneOkBody.createAMQFrame(channel, params.getChannelMax(), params.getFrameMax(), params.getHeartbeat());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ return ConnectionTuneOkBody.createAMQFrame(channel,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ params.getChannelMax(), // channelMax
+ params.getFrameMax(), // frameMax
+ params.getHeartbeat()); // heartbeat
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Tue Jan 16 07:16:39 2007
@@ -63,7 +63,7 @@
}
}
- private void allocateInitialBuffer()
+ protected void allocateInitialBuffer()
{
_data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
_data.setAutoExpand(true);
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Jan 16 07:16:39 2007
@@ -452,10 +452,6 @@
}
}
- public Map getUnderlyingMessagePropertiesMap()
- {
- return getJmsContentHeaderProperties().getHeaders();
- }
public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties)
{
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Tue Jan 16 07:16:39 2007
@@ -22,23 +22,23 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.JMSPropertyFieldTable;
-import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import javax.jms.JMSException;
-import java.util.Enumeration;
+import javax.jms.MessageFormatException;
+import java.util.*;
+import java.nio.charset.Charset;
+import java.nio.charset.CharacterCodingException;
-public class JMSMapMessage extends JMSBytesMessage implements javax.jms.MapMessage
+public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
{
private static final Logger _logger = Logger.getLogger(JMSMapMessage.class);
public static final String MIME_TYPE = "jms/map-message";
- private JMSPropertyFieldTable _properties;
+ private Map<String,Object> _map = new HashMap<String, Object>();
JMSMapMessage() throws JMSException
{
@@ -48,45 +48,30 @@
JMSMapMessage(ByteBuffer data) throws JMSException
{
super(data); // this instantiates a content header
- _properties = new JMSPropertyFieldTable();
+ populateMapFromData();
}
+
JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
throws AMQException
{
super(messageNbr, contentHeader, data);
-
- if (data != null)
+ try
{
-
- long tableSize = EncodingUtils.readInteger(_data);
- try
- {
- _properties = new JMSPropertyFieldTable(_data, tableSize);
- }
- catch (JMSException e)
- {
- Exception error = e.getLinkedException();
- if (error instanceof AMQFrameDecodingException)
- {
- throw(AMQFrameDecodingException) error;
- }
- else
- {
- throw new AMQException(e.getMessage(), e);
- }
- }
- }
- else
+ populateMapFromData();
+ }
+ catch (JMSException je)
{
- _properties = new JMSPropertyFieldTable();
+ throw new AMQException("Error populating MapMessage from ByteBuffer", je);
+
}
+
}
public String toBodyString() throws JMSException
{
- return _properties.toString();
+ return _map.toString();
}
public String getMimeType()
@@ -98,161 +83,437 @@
public ByteBuffer getData()
{
//What if _data is null?
- _properties.writeToBuffer(_data);
+ writeMapToData();
return super.getData();
}
+
+
@Override
public void clearBodyImpl() throws JMSException
{
super.clearBodyImpl();
- _properties.clear();
+ _map.clear();
}
- public boolean getBoolean(String string) throws JMSException
+ public boolean getBoolean(String propName) throws JMSException
{
- return _properties.getBoolean(string);
+ Object value = _map.get(propName);
+
+ if(value instanceof Boolean)
+ {
+ return ((Boolean)value).booleanValue();
+ }
+ else if((value instanceof String) || (value == null))
+ {
+ return Boolean.valueOf((String)value);
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " +
+ value.getClass().getName() + " cannot be converted to boolean.");
+ }
+
}
- public byte getByte(String string) throws JMSException
+ public byte getByte(String propName) throws JMSException
{
- return _properties.getByte(string);
+ Object value = _map.get(propName);
+
+ if(value instanceof Byte)
+ {
+ return ((Byte)value).byteValue();
+ }
+ else if((value instanceof String) || (value==null))
+ {
+ return Byte.valueOf((String)value).byteValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " +
+ value.getClass().getName() + " cannot be converted to byte.");
+ }
}
- public short getShort(String string) throws JMSException
+ public short getShort(String propName) throws JMSException
{
- return _properties.getShort(string);
+ Object value = _map.get(propName);
+
+ if(value instanceof Short)
+ {
+ return ((Short)value).shortValue();
+ }
+ else if(value instanceof Byte)
+ {
+ return ((Byte)value).shortValue();
+ }
+ else if((value instanceof String) || (value==null))
+ {
+ return Short.valueOf((String)value).shortValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " +
+ value.getClass().getName() + " cannot be converted to short.");
+ }
+
}
- public char getChar(String string) throws JMSException
+
+ public int getInt(String propName) throws JMSException
{
- Character result = _properties.getCharacter(string);
+ Object value = _map.get(propName);
- if (result == null)
+ if(value instanceof Integer)
+ {
+ return ((Integer)value).intValue();
+ }
+ else if(value instanceof Short)
+ {
+ return ((Short)value).intValue();
+ }
+ else if(value instanceof Byte)
{
- throw new NullPointerException("getChar couldn't find " + string + " item.");
+ return ((Byte)value).intValue();
+ }
+ else if((value instanceof String) || (value==null))
+ {
+ return Integer.valueOf((String)value).intValue();
}
else
{
- return result;
+ throw new MessageFormatException("Property " + propName + " of type " +
+ value.getClass().getName() + " cannot be converted to int.");
}
+
}
- public int getInt(String string) throws JMSException
+ public long getLong(String propName) throws JMSException
{
- return _properties.getInteger(string);
+ Object value = _map.get(propName);
+
+ if(value instanceof Long)
+ {
+ return ((Long)value).longValue();
+ }
+ else if(value instanceof Integer)
+ {
+ return ((Integer)value).longValue();
+ }
+ if(value instanceof Short)
+ {
+ return ((Short)value).longValue();
+ }
+ if(value instanceof Byte)
+ {
+ return ((Byte)value).longValue();
+ }
+ else if((value instanceof String) || (value==null))
+ {
+ return Long.valueOf((String)value).longValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " +
+ value.getClass().getName() + " cannot be converted to long.");
+ }
+
}
- public long getLong(String string) throws JMSException
+ public char getChar(String propName) throws JMSException
{
- return _properties.getLong(string);
+ Object value = _map.get(propName);
+
+ if(!_map.containsKey(propName))
+ {
+ throw new MessageFormatException("Property " + propName + " not present");
+ }
+ else if(value instanceof Character)
+ {
+ return ((Character)value).charValue();
+ }
+ else if (value == null)
+ {
+ throw new NullPointerException("Property " + propName + " has null value and therefore cannot " +
+ "be converted to char.");
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " +
+ value.getClass().getName() + " cannot be converted to boolan.");
+ }
+
}
- public float getFloat(String string) throws JMSException
+
+
+ public float getFloat(String propName) throws JMSException
{
- return _properties.getFloat(string);
+ Object value = _map.get(propName);
+
+ if(value instanceof Float)
+ {
+ return ((Float)value).floatValue();
+ }
+ else if((value instanceof String) || (value==null))
+ {
+ return Float.valueOf((String)value).floatValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " +
+ value.getClass().getName() + " cannot be converted to float.");
+ }
}
- public double getDouble(String string) throws JMSException
+ public double getDouble(String propName) throws JMSException
{
- return _properties.getDouble(string);
+ Object value = _map.get(propName);
+
+ if(value instanceof Double)
+ {
+ return ((Double)value).doubleValue();
+ }
+ else if(value instanceof Float)
+ {
+ return ((Float)value).doubleValue();
+ }
+ else if((value instanceof String) || (value==null))
+ {
+ return Double.valueOf((String)value).doubleValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " +
+ value.getClass().getName() + " cannot be converted to double.");
+ }
}
- public String getString(String string) throws JMSException
+ public String getString(String propName) throws JMSException
{
- return _properties.getString(string);
+ Object value = _map.get(propName);
+
+ if((value instanceof String) || (value == null))
+ {
+ return (String) value;
+ }
+ else if(value instanceof byte[])
+ {
+ throw new MessageFormatException("Property " + propName + " of type byte[] " +
+ "cannot be converted to String.");
+ }
+ else
+ {
+ return value.toString();
+ }
+
}
- public byte[] getBytes(String string) throws JMSException
+ public byte[] getBytes(String propName) throws JMSException
{
- return _properties.getBytes(string);
+ Object value = _map.get(propName);
+
+ if(!_map.containsKey(propName))
+ {
+ throw new MessageFormatException("Property " + propName + " not present");
+ }
+ else if((value instanceof byte[]) || (value == null))
+ {
+ return (byte[])value;
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + propName + " of type " +
+ value.getClass().getName() + " cannot be converted to byte[].");
+ }
}
- public Object getObject(String string) throws JMSException
+ public Object getObject(String propName) throws JMSException
{
- return _properties.getObject(string);
+ return _map.get(propName);
}
public Enumeration getMapNames() throws JMSException
{
- return _properties.getMapNames();
+ return Collections.enumeration(_map.keySet());
}
- public void setBoolean(String string, boolean b) throws JMSException
+ public void setBoolean(String propName, boolean b) throws JMSException
{
checkWritable();
- _properties.setBoolean(string, b);
+ checkPropertyName(propName);
+ _map.put(propName, b);
}
- public void setByte(String string, byte b) throws JMSException
+ public void setByte(String propName, byte b) throws JMSException
{
checkWritable();
- _properties.setByte(string, b);
+ checkPropertyName(propName);
+ _map.put(propName, b);
}
- public void setShort(String string, short i) throws JMSException
+ public void setShort(String propName, short i) throws JMSException
{
checkWritable();
- _properties.setShort(string, i);
+ checkPropertyName(propName);
+ _map.put(propName, i);
}
- public void setChar(String string, char c) throws JMSException
+ public void setChar(String propName, char c) throws JMSException
{
checkWritable();
- _properties.setChar(string, c);
+ checkPropertyName(propName);
+ _map.put(propName, c);
}
- public void setInt(String string, int i) throws JMSException
+ public void setInt(String propName, int i) throws JMSException
{
checkWritable();
- _properties.setInteger(string, i);
+ checkPropertyName(propName);
+ _map.put(propName, i);
}
- public void setLong(String string, long l) throws JMSException
+ public void setLong(String propName, long l) throws JMSException
{
checkWritable();
- _properties.setLong(string, l);
+ checkPropertyName(propName);
+ _map.put(propName, l);
}
- public void setFloat(String string, float v) throws JMSException
+ public void setFloat(String propName, float v) throws JMSException
{
checkWritable();
- _properties.setFloat(string, v);
+ checkPropertyName(propName);
+ _map.put(propName, v);
}
- public void setDouble(String string, double v) throws JMSException
+ public void setDouble(String propName, double v) throws JMSException
{
checkWritable();
- _properties.setDouble(string, v);
+ checkPropertyName(propName);
+ _map.put(propName, v);
}
- public void setString(String string, String string1) throws JMSException
+ public void setString(String propName, String string1) throws JMSException
{
checkWritable();
- _properties.setString(string, string1);
+ checkPropertyName(propName);
+ _map.put(propName, string1);
}
- public void setBytes(String string, byte[] bytes) throws JMSException
+ public void setBytes(String propName, byte[] bytes) throws JMSException
{
- this.setBytes(string, bytes, 0, bytes.length);
+ checkWritable();
+ checkPropertyName(propName);
+ _map.put(propName, bytes);
}
- public void setBytes(String string, byte[] bytes, int i, int i1) throws JMSException
+ public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException
{
+ if((offset == 0) && (length == bytes.length))
+ {
+ setBytes(propName,bytes);
+ }
+ else
+ {
+ byte[] newBytes = new byte[length];
+ System.arraycopy(bytes,offset,newBytes,0,length);
+ setBytes(propName,newBytes);
+ }
+ }
+
+ public void setObject(String propName, Object value) throws JMSException
+ {
checkWritable();
- _properties.setBytes(string, bytes, i, i1);
+ checkPropertyName(propName);
+ if(value instanceof Boolean
+ || value instanceof Byte
+ || value instanceof Short
+ || value instanceof Integer
+ || value instanceof Long
+ || value instanceof Character
+ || value instanceof Float
+ || value instanceof Double
+ || value instanceof String
+ || value instanceof byte[]
+ || value == null)
+ {
+ _map.put(propName, value);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot set property " + propName + " to value " + value +
+ "of type " + value.getClass().getName() + ".");
+ }
}
- public void setObject(String string, Object object) throws JMSException
+ private void checkPropertyName(String propName)
{
- checkWritable();
- _properties.setObject(string, object);
+ if(propName == null || propName.equals(""))
+ {
+ throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
+ }
+ }
+
+ public boolean itemExists(String propName) throws JMSException
+ {
+ return _map.containsKey(propName);
+ }
+
+
+ private void populateMapFromData() throws JMSException
+ {
+ if(_data != null)
+ {
+ _data.rewind();
+
+ final int entries = readIntImpl();
+ for(int i = 0; i < entries; i++)
+ {
+ String propName = readStringImpl();
+ Object value = readObject();
+ _map.put(propName,value);
+ }
+ }
+ else
+ {
+ _map.clear();
+ }
}
- public boolean itemExists(String string) throws JMSException
+ private void writeMapToData()
{
- return _properties.itemExists(string);
+ allocateInitialBuffer();
+ final int size = _map.size();
+ writeIntImpl(size);
+ for(Map.Entry<String, Object> entry : _map.entrySet())
+ {
+ try
+ {
+ writeStringImpl(entry.getKey());
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(),e);
+
+
+ }
+ try
+ {
+ writeObject(entry.getValue());
+ }
+ catch (JMSException e)
+ {
+ Object value = entry.getValue();
+ throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() +
+ " value : " + value + " (type: " + value.getClass().getName() + ").",e);
+ }
+ }
+
}
+
+
+
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Tue Jan 16 07:16:39 2007
@@ -31,31 +31,10 @@
/**
* @author Apache Software Foundation
*/
-public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage
+public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage
{
public static final String MIME_TYPE="jms/stream-message";
- private static final byte BOOLEAN_TYPE = (byte) 1;
-
- private static final byte BYTE_TYPE = (byte) 2;
-
- private static final byte BYTEARRAY_TYPE = (byte) 3;
-
- private static final byte SHORT_TYPE = (byte) 4;
-
- private static final byte CHAR_TYPE = (byte) 5;
-
- private static final byte INT_TYPE = (byte) 6;
-
- private static final byte LONG_TYPE = (byte) 7;
-
- private static final byte FLOAT_TYPE = (byte) 8;
-
- private static final byte DOUBLE_TYPE = (byte) 9;
-
- private static final byte STRING_TYPE = (byte) 10;
-
- private static final byte NULL_STRING_TYPE = (byte) 11;
/**
* This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
@@ -97,128 +76,22 @@
return MIME_TYPE;
}
- private byte readWireType() throws MessageFormatException, MessageEOFException,
- MessageNotReadableException
- {
- checkReadable();
- checkAvailable(1);
- return _data.get();
- }
- private void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
- {
- checkWritable();
- _data.put(type);
- _changedData = true;
- }
public boolean readBoolean() throws JMSException
{
- int position = _data.position();
- byte wireType = readWireType();
- boolean result;
- try
- {
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Boolean.parseBoolean(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
+ return super.readBoolean();
}
- private boolean readBooleanImpl()
- {
- return _data.get() != 0;
- }
public byte readByte() throws JMSException
{
- int position = _data.position();
- byte wireType = readWireType();
- byte result;
- try
- {
- switch (wireType)
- {
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Byte.parseByte(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
- }
-
- private byte readByteImpl()
- {
- return _data.get();
+ return super.readByte();
}
public short readShort() throws JMSException
{
- int position = _data.position();
- byte wireType = readWireType();
- short result;
- try
- {
- switch (wireType)
- {
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Short.parseShort(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
- }
-
- private short readShortImpl()
- {
- return _data.getShort();
+ return super.readShort();
}
/**
@@ -229,564 +102,102 @@
*/
public char readChar() throws JMSException
{
- int position = _data.position();
- byte wireType = readWireType();
- try
- {
- if(wireType == NULL_STRING_TYPE){
- throw new NullPointerException();
- }
-
- if (wireType != CHAR_TYPE)
- {
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
- }
- else
- {
- checkAvailable(2);
- return readCharImpl();
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private char readCharImpl()
- {
- return _data.getChar();
+ return super.readChar();
}
public int readInt() throws JMSException
{
- int position = _data.position();
- byte wireType = readWireType();
- int result;
- try
- {
- switch (wireType)
- {
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Integer.parseInt(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private int readIntImpl()
- {
- return _data.getInt();
+ return super.readInt();
}
public long readLong() throws JMSException
{
- int position = _data.position();
- byte wireType = readWireType();
- long result;
- try
- {
- switch (wireType)
- {
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Long.parseLong(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private long readLongImpl()
- {
- return _data.getLong();
+ return super.readLong();
}
public float readFloat() throws JMSException
{
- int position = _data.position();
- byte wireType = readWireType();
- float result;
- try
- {
- switch (wireType)
- {
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Float.parseFloat(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private float readFloatImpl()
- {
- return _data.getFloat();
+ return super.readFloat();
}
public double readDouble() throws JMSException
{
- int position = _data.position();
- byte wireType = readWireType();
- double result;
- try
- {
- switch (wireType)
- {
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Double.parseDouble(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private double readDoubleImpl()
- {
- return _data.getDouble();
+ return super.readDouble();
}
public String readString() throws JMSException
{
- int position = _data.position();
- byte wireType = readWireType();
- String result;
- try
- {
- switch (wireType)
- {
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- throw new NullPointerException("data is null");
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = String.valueOf(readBooleanImpl());
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = String.valueOf(readLongImpl());
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readIntImpl());
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = String.valueOf(readShortImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = String.valueOf(readByteImpl());
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readFloatImpl());
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = String.valueOf(readDoubleImpl());
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = String.valueOf(readCharImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private String readStringImpl() throws JMSException
- {
- try
- {
- return _data.getString(Charset.forName("UTF-8").newDecoder());
- }
- catch (CharacterCodingException e)
- {
- JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
- je.setLinkedException(e);
- throw je;
- }
+ return super.readString();
}
public int readBytes(byte[] bytes) throws JMSException
{
- if (bytes == null)
- {
- throw new IllegalArgumentException("byte array must not be null");
- }
- checkReadable();
- // first call
- if (_byteArrayRemaining == -1)
- {
- // type discriminator checked separately so you get a MessageFormatException rather than
- // an EOF even in the case where both would be applicable
- checkAvailable(1);
- byte wireType = readWireType();
- if (wireType != BYTEARRAY_TYPE)
- {
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
- }
- checkAvailable(4);
- int size = _data.getInt();
- // size of -1 indicates null
- if (size == -1)
- {
- return -1;
- }
- else
- {
- if (size > _data.remaining())
- {
- throw new MessageEOFException("Byte array has stated size " + size + " but message only contains " +
- _data.remaining() + " bytes");
- }
- else
- {
- _byteArrayRemaining = size;
- }
- }
- }
- else if (_byteArrayRemaining == 0)
- {
- _byteArrayRemaining = -1;
- return -1;
- }
-
- int returnedSize = readBytesImpl(bytes);
- if (returnedSize < bytes.length)
- {
- _byteArrayRemaining = -1;
- }
- return returnedSize;
- }
-
- private int readBytesImpl(byte[] bytes)
- {
- int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
- _byteArrayRemaining -= count;
-
- if (count == 0)
- {
- return 0;
- }
- else
- {
- _data.get(bytes, 0, count);
- return count;
- }
+ return super.readBytes(bytes);
}
+
public Object readObject() throws JMSException
{
- int position = _data.position();
- byte wireType = readWireType();
- Object result = null;
- try
- {
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case BYTEARRAY_TYPE:
- checkAvailable(4);
- int size = _data.getInt();
- if (size == -1)
- {
- result = null;
- }
- else
- {
- _byteArrayRemaining = size;
- result = new byte[size];
- readBytesImpl(new byte[size]);
- }
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = readCharImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
+ return super.readObject();
}
public void writeBoolean(boolean b) throws JMSException
{
- writeTypeDiscriminator(BOOLEAN_TYPE);
- _data.put(b ? (byte) 1 : (byte) 0);
+ super.writeBoolean(b);
}
public void writeByte(byte b) throws JMSException
{
- writeTypeDiscriminator(BYTE_TYPE);
- _data.put(b);
+ super.writeByte(b);
}
public void writeShort(short i) throws JMSException
{
- writeTypeDiscriminator(SHORT_TYPE);
- _data.putShort(i);
+ super.writeShort(i);
}
public void writeChar(char c) throws JMSException
{
- writeTypeDiscriminator(CHAR_TYPE);
- _data.putChar(c);
+ super.writeChar(c);
}
public void writeInt(int i) throws JMSException
{
- writeTypeDiscriminator(INT_TYPE);
- _data.putInt(i);
+ super.writeInt(i);
}
public void writeLong(long l) throws JMSException
{
- writeTypeDiscriminator(LONG_TYPE);
- _data.putLong(l);
+ super.writeLong(l);
}
public void writeFloat(float v) throws JMSException
{
- writeTypeDiscriminator(FLOAT_TYPE);
- _data.putFloat(v);
+ super.writeFloat(v);
}
public void writeDouble(double v) throws JMSException
{
- writeTypeDiscriminator(DOUBLE_TYPE);
- _data.putDouble(v);
+ super.writeDouble(v);
}
public void writeString(String string) throws JMSException
{
- if (string == null)
- {
- writeTypeDiscriminator(NULL_STRING_TYPE);
- }
- else
- {
- writeTypeDiscriminator(STRING_TYPE);
- try
- {
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must write the null terminator ourselves
- _data.put((byte) 0);
- }
- catch (CharacterCodingException e)
- {
- JMSException ex = new JMSException("Unable to encode string: " + e);
- ex.setLinkedException(e);
- throw ex;
- }
- }
+ super.writeString(string);
}
public void writeBytes(byte[] bytes) throws JMSException
{
- writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
+ super.writeBytes(bytes);
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
{
- writeTypeDiscriminator(BYTEARRAY_TYPE);
- if (bytes == null)
- {
- _data.putInt(-1);
- }
- else
- {
- _data.putInt(length);
- _data.put(bytes, offset, length);
- }
+ super.writeBytes(bytes,offset,length);
}
public void writeObject(Object object) throws JMSException
{
- checkWritable();
- Class clazz = null;
- if (object == null)
- {
- // string handles the output of null values
- clazz = String.class;
- }
- else
- {
- clazz = object.getClass();
- }
-
- if (clazz == Byte.class)
- {
- writeByte((Byte) object);
- }
- else if (clazz == Boolean.class)
- {
- writeBoolean((Boolean) object);
- }
- else if (clazz == byte[].class)
- {
- writeBytes((byte[]) object);
- }
- else if (clazz == Short.class)
- {
- writeShort((Short) object);
- }
- else if (clazz == Character.class)
- {
- writeChar((Character) object);
- }
- else if (clazz == Integer.class)
- {
- writeInt((Integer) object);
- }
- else if (clazz == Long.class)
- {
- writeLong((Long) object);
- }
- else if (clazz == Float.class)
- {
- writeFloat((Float) object);
- }
- else if (clazz == Double.class)
- {
- writeDouble((Double) object);
- }
- else if (clazz == String.class)
- {
- writeString((String) object);
- }
- else
- {
- throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
- }
+ super.writeObject(object);
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Jan 16 07:16:39 2007
@@ -478,8 +478,15 @@
{
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
- final AMQFrame frame = ConnectionCloseBody.createAMQFrame(
- 0, AMQConstant.REPLY_SUCCESS.getCode(), "JMS client is closing the connection.", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "JMS client is closing the connection."); // replyText
syncWrite(frame, ConnectionCloseOkBody.class);
_protocolSession.closeProtocolSession();
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Tue Jan 16 07:16:39 2007
@@ -49,10 +49,10 @@
final VmPipeConnector ioConnector = new VmPipeConnector();
final IoServiceConfig cfg = ioConnector.getDefaultConfig();
ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS,
+ PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService,
"AsynchronousReadFilter");
cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
- PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS,
+ PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService,
"AsynchronousWriteFilter");
cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java Tue Jan 16 07:16:39 2007
@@ -81,7 +81,7 @@
{
_connection = connection;
_destination = destination;
- _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//set up a slow consumer
_session.createConsumer(destination).setMessageListener(this);
@@ -1111,6 +1111,9 @@
Assert.fail("Message should be writeable");
}
}
+
+
+
private void testMapValues(JMSMapMessage m, int count) throws JMSException
{
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java Tue Jan 16 07:16:39 2007
@@ -11,7 +11,6 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.PropertyFieldTable;
import javax.jms.*;
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Tue Jan 16 07:16:39 2007
@@ -17,7 +17,6 @@
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.PropertyFieldTable;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -53,7 +52,7 @@
AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
- FieldTable ft = new PropertyFieldTable();
+ FieldTable ft = new FieldTable();
ft.setString("F1000","1");
MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft);
Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java Tue Jan 16 07:16:39 2007
@@ -112,7 +112,9 @@
private void ping(Broker b) throws AMQException
{
- ClusterPingBody ping = new ClusterPingBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0);
ping.broker = _group.getLocal().getDetails();
ping.responseRequired = true;
ping.load = _loadTable.getLocalLoad();
@@ -158,7 +160,9 @@
Broker leader = connectToLeader(member);
_logger.info(new LogMessage("Connected to {0}. joining", leader));
- ClusterJoinBody join = new ClusterJoinBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0);
join.broker = _group.getLocal().getDetails();
send(leader, new SimpleSendable(join));
}
@@ -177,7 +181,9 @@
public void leave() throws AMQException
{
- ClusterLeaveBody leave = new ClusterLeaveBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0);
leave.broker = _group.getLocal().getDetails();
send(getLeader(), new SimpleSendable(leave));
}
@@ -198,7 +204,9 @@
}
else
{
- ClusterSuspectBody suspect = new ClusterSuspectBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0);
suspect.broker = broker.getDetails();
send(getLeader(), new SimpleSendable(suspect));
}
@@ -220,7 +228,9 @@
else
{
//pass request on to leader:
- ClusterJoinBody request = new ClusterJoinBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0);
request.broker = member.getDetails();
Broker leader = getLeader();
send(leader, new SimpleSendable(request));
@@ -265,7 +275,9 @@
private ClusterMembershipBody createAnnouncement(String membership)
{
- ClusterMembershipBody announce = new ClusterMembershipBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0);
//TODO: revise this way of converting String to bytes...
announce.members = membership.getBytes();
return announce;
Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java Tue Jan 16 07:16:39 2007
@@ -48,7 +48,13 @@
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session));
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), evt.getMethod().queue));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ evt.getMethod().queue // consumerTag
+ ));
}
else
{
Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java Tue Jan 16 07:16:39 2007
@@ -51,7 +51,9 @@
{
for(String queue : _counts.keySet())
{
- BasicConsumeBody m = new BasicConsumeBody();
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0);
m.queue = queue;
m.consumerTag = queue;
replay(m, messages);