You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/29 12:04:48 UTC
svn commit: r501005 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
./ handler/ message/ protocol/
Author: rgreig
Date: Mon Jan 29 03:04:43 2007
New Revision: 501005
URL: http://svn.apache.org/viewvc?view=rev&rev=501005
Log:
QPID-324 : Patch supplied by Rob Godfrey - Only send byte indicating topic / queue / other in properties field table, not whole destination
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Mon Jan 29 03:04:43 2007
@@ -48,6 +48,12 @@
this(name, false);
}
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
+ {
+ super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
+ false, queueName, false); }
+
+
/**
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
* @param name the name of the queue
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Mon Jan 29 03:04:43 2007
@@ -44,6 +44,12 @@
this(new AMQShortString(name));
}
+ public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
+ {
+ super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
+ }
+
+
public AMQTopic(AMQShortString name)
{
this(name, true, null, false);
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java?view=auto&rev=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java Mon Jan 29 03:04:43 2007
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class AMQUndefinedDestination extends AMQDestination
+{
+
+ private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown");
+
+
+ public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName)
+ {
+ super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return getDestinationName();
+ }
+
+ public boolean isNameRequired()
+ {
+ return getAMQQueueName() == null;
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Mon Jan 29 03:04:43 2007
@@ -441,8 +441,24 @@
AbstractJMSMessage message = convertToNativeMessage(origMessage);
- message.getJmsHeaders().setBytes(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.
- getShortStringName(), destination.toByteEncoding());
+
+ byte type;
+ if(destination instanceof Topic)
+ {
+ type = AMQDestination.TOPIC_TYPE;
+ }
+ else if(destination instanceof Queue)
+ {
+ type = AMQDestination.QUEUE_TYPE;
+ }
+ else
+ {
+ type = AMQDestination.UNKNOWN_TYPE;
+ }
+
+ message.getJmsHeaders().setByte(CustomJMSXProperty.JMSZ_QPID_DESTTYPE.getShortStringName(),
+ type);
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Mon Jan 29 03:04:43 2007
@@ -26,7 +26,7 @@
public enum CustomJMSXProperty
{
- JMSX_QPID_JMSDESTINATIONURL,
+ JMSZ_QPID_DESTTYPE,
JMSXGroupID,
JMSXGroupSeq;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Mon Jan 29 03:04:43 2007
@@ -42,9 +42,7 @@
public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
- final UnprocessedMessage msg = new UnprocessedMessage();
- msg.deliverBody = (BasicDeliverBody) evt.getMethod();
- msg.channelId = evt.getChannelId();
+ final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod());
_logger.debug("New JmsDeliver method received");
protocolSession.unprocessedMessageReceived(msg);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Mon Jan 29 03:04:43 2007
@@ -43,10 +43,7 @@
public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.debug("New JmsBounce method received");
- final UnprocessedMessage msg = new UnprocessedMessage();
- msg.deliverBody = null;
- msg.bounceBody = (BasicReturnBody) evt.getMethod();
- msg.channelId = evt.getChannelId();
+ final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(),(BasicReturnBody)evt.getMethod());
protocolSession.unprocessedMessageReceived(msg);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Mon Jan 29 03:04:43 2007
@@ -24,6 +24,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
@@ -69,11 +70,11 @@
_data.setAutoExpand(true);
}
- AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
- super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+ super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java Mon Jan 29 03:04:43 2007
@@ -2,6 +2,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import javax.jms.*;
@@ -59,10 +60,10 @@
}
- AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Mon Jan 29 03:04:43 2007
@@ -26,8 +26,8 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.*;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
@@ -66,9 +66,33 @@
_headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders());
}
- protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
+ protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
this(contentHeader, deliveryTag);
+
+
+ byte type = contentHeader.getHeaders().getByte(CustomJMSXProperty.JMSZ_QPID_DESTTYPE.getShortStringName());
+
+ AMQDestination dest;
+
+ switch(type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ break;
+ }
+ //Destination dest = AMQDestination.createDestination(url);
+ setJMSDestination(dest);
+
+
+
_data = data;
if (_data != null)
{
@@ -181,7 +205,7 @@
return _destination;
}
- public void setJMSDestination(Destination destination) throws JMSException
+ public void setJMSDestination(Destination destination)
{
_destination = destination;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
@@ -36,10 +37,12 @@
protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data,
- ContentHeaderBody contentHeader) throws AMQException;
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException;
protected AbstractJMSMessage createMessageWithBody(long messageNbr,
ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
List bodies) throws AMQException
{
ByteBuffer data;
@@ -54,7 +57,7 @@
}
data = ((ContentBody)bodies.get(0)).payload;
}
- else
+ else if (bodies != null)
{
if(debug)
{
@@ -70,19 +73,24 @@
}
data.flip();
}
+ else // bodies == null
+ {
+ data = ByteBuffer.allocate(0);
+ }
if(debug)
{
_logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining());
}
- return createMessage(messageNbr, data, contentHeader);
+ return createMessage(messageNbr, data, exchange, routingKey, contentHeader);
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered,
ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
List bodies) throws JMSException, AMQException
{
- final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, bodies);
+ final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
msg.setJMSRedelivered(redelivered);
return msg;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Mon Jan 29 03:04:43 2007
@@ -56,10 +56,10 @@
super(data); // this instanties a content header
}
- JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
}
public void reset()
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -23,14 +23,17 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
{
- return new JMSBytesMessage(deliveryTag, contentHeader, data);
+ return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
public AbstractJMSMessage createMessage() throws JMSException
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java Mon Jan 29 03:04:43 2007
@@ -32,7 +32,7 @@
public final class JMSHeaderAdapter
{
- FieldTable _headers;
+ private final FieldTable _headers;
public JMSHeaderAdapter(FieldTable headers)
{
@@ -319,6 +319,13 @@
getHeaders().setByte(string, b);
}
+ public void setByte(AMQShortString string, byte b) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setByte(string, b);
+ }
+
+
public void setShort(String string, short i) throws JMSException
{
checkPropertyName(string);
@@ -326,6 +333,12 @@
}
public void setInteger(String string, int i) throws JMSException
+ {
+ checkPropertyName(string);
+ getHeaders().setInteger(string, i);
+ }
+
+ public void setInteger(AMQShortString string, int i) throws JMSException
{
checkPropertyName(string);
getHeaders().setInteger(string, i);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Mon Jan 29 03:04:43 2007
@@ -54,10 +54,10 @@
}
- JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
try
{
populateMapFromData();
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -22,6 +22,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import javax.jms.JMSException;
@@ -33,8 +34,10 @@
return new JMSMapMessage();
}
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
{
- return new JMSMapMessage(deliveryTag, contentHeader, data);
+ return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Mon Jan 29 03:04:43 2007
@@ -62,9 +62,10 @@
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException
+ JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+ super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
}
public void clearBodyImpl() throws JMSException
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -23,14 +23,17 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
{
- return new JMSObjectMessage(deliveryTag, contentHeader, data);
+ return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
public AbstractJMSMessage createMessage() throws JMSException
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Mon Jan 29 03:04:43 2007
@@ -61,10 +61,10 @@
}
- JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
- throws AMQException
+ JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
}
public void reset()
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -22,16 +22,18 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
import javax.jms.JMSException;
public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws
- AMQException
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
{
- return new JMSStreamMessage(deliveryTag, contentHeader, data);
+ return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
public AbstractJMSMessage createMessage() throws JMSException
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Mon Jan 29 03:04:43 2007
@@ -42,7 +42,7 @@
* This constant represents the name of a property that is set when the message payload is null.
*/
private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL");
- private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
+ private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
public JMSTextMessage() throws JMSException
{
@@ -56,10 +56,11 @@
getContentHeaderProperties().setEncoding(encoding);
}
- JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
+ JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+ AMQShortString routingKey, ByteBuffer data)
throws AMQException
{
- super(deliveryTag, contentHeader, data);
+ super(deliveryTag, contentHeader, exchange, routingKey, data);
contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
_data = data;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Mon Jan 29 03:04:43 2007
@@ -24,6 +24,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
@@ -35,8 +36,11 @@
return new JMSTextMessage();
}
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+ AMQShortString exchange, AMQShortString routingKey,
+ ContentHeaderBody contentHeader) throws AMQException
{
- return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, data);
+ return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties,
+ exchange, routingKey, data);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Mon Jan 29 03:04:43 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import java.util.List;
@@ -31,6 +32,7 @@
{
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
List bodies)
throws JMSException, AMQException;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Mon Jan 29 03:04:43 2007
@@ -63,6 +63,8 @@
* @throws JMSException
*/
public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ AMQShortString exchange,
+ AMQShortString routingKey,
ContentHeaderBody contentHeader,
List bodies) throws AMQException, JMSException
{
@@ -74,7 +76,7 @@
}
else
{
- return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies);
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Mon Jan 29 03:04:43 2007
@@ -24,6 +24,8 @@
import java.util.List;
import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.Collections;
/**
* This class contains everything needed to process a JMS message. It assembles the
@@ -38,27 +40,93 @@
{
private long _bytesReceived = 0;
- public BasicDeliverBody deliverBody;
- public BasicReturnBody bounceBody; // TODO: check change (gustavo)
- public int channelId;
- public ContentHeaderBody contentHeader;
+ private final BasicDeliverBody _deliverBody;
+ private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
+ private final int _channelId;
+ private ContentHeaderBody _contentHeader;
/**
* List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
*/
- public List bodies = new LinkedList();
+ private List<ContentBody> _bodies;
+
+ public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
+ {
+ _deliverBody = deliverBody;
+ _channelId = channelId;
+ _bounceBody = null;
+ }
+
+
+ public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
+ {
+ _deliverBody = null;
+ _channelId = channelId;
+ _bounceBody = bounceBody;
+ }
public void receiveBody(ContentBody body) throws UnexpectedBodyReceivedException
{
- bodies.add(body);
+
if (body.payload != null)
{
- _bytesReceived += body.payload.remaining();
+ final long payloadSize = body.payload.remaining();
+
+ if(_bodies == null)
+ {
+ if(payloadSize == getContentHeader().bodySize)
+ {
+ _bodies = Collections.singletonList(body);
+ }
+ else
+ {
+ _bodies = new ArrayList<ContentBody>();
+ }
+
+ }
+ else
+ {
+ _bodies.add(body);
+ }
+ _bytesReceived += payloadSize;
}
}
public boolean isAllBodyDataReceived()
{
- return _bytesReceived == contentHeader.bodySize;
+ return _bytesReceived == getContentHeader().bodySize;
+ }
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return _deliverBody;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return _bounceBody;
}
+
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+
+ public ContentHeaderBody getContentHeader()
+ {
+ return _contentHeader;
+ }
+
+ public void setContentHeader(ContentHeaderBody contentHeader)
+ {
+ this._contentHeader = contentHeader;
+ }
+
+ public List<ContentBody> getBodies()
+ {
+ return _bodies;
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Jan 29 03:04:43 2007
@@ -283,72 +283,87 @@
public void messageReceived(IoSession session, Object message) throws Exception
{
+ final boolean debug = _logger.isDebugEnabled();
final long msgNumber = ++_messageReceivedCount;
- if (_logger.isDebugEnabled() && (msgNumber % 1000 == 0))
+ if (debug && (msgNumber % 1000 == 0))
{
_logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
AMQFrame frame = (AMQFrame) message;
- HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- if (frame.bodyFrame instanceof AMQMethodBody)
+ switch(bodyFrame.getFrameType())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Method frame received: " + frame);
- }
+ case AMQMethodBody.TYPE:
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame);
+ if (debug)
+ {
+ _logger.debug("Method frame received: " + frame);
+ }
- try
- {
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
+ try
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
}
}
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
- }
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
+ catch (AMQException e)
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ getStateManager().error(e);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
}
+ exceptionCaught(session, e);
}
- exceptionCaught(session, e);
- }
- }
- else if (frame.bodyFrame instanceof ContentHeaderBody)
- {
- _protocolSession.messageContentHeaderReceived(frame.channel,
- (ContentHeaderBody) frame.bodyFrame);
- }
- else if (frame.bodyFrame instanceof ContentBody)
- {
- _protocolSession.messageContentBodyReceived(frame.channel,
- (ContentBody) frame.bodyFrame);
- }
- else if (frame.bodyFrame instanceof HeartbeatBody)
- {
- _logger.debug("Received heartbeat");
+ break;
+
+ case ContentHeaderBody.TYPE:
+
+ _protocolSession.messageContentHeaderReceived(frame.getChannel(),
+ (ContentHeaderBody) bodyFrame);
+ break;
+
+ case ContentBody.TYPE:
+
+ _protocolSession.messageContentBodyReceived(frame.getChannel(),
+ (ContentBody) bodyFrame);
+ break;
+
+ case HeartbeatBody.TYPE:
+
+ if(debug)
+ {
+ _logger.debug("Received heartbeat");
+ }
+ break;
+
+ default:
+
}
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -467,7 +482,7 @@
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
{
return writeCommandFrameAndWaitForReply(frame,
- new SpecificMethodFrameListener(frame.channel, responseClass), timeout);
+ new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
}
/**
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=501005&r1=501004&r2=501005
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Jan 29 03:04:43 2007
@@ -32,7 +32,7 @@
import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.commons.lang.StringUtils;
@@ -47,7 +47,7 @@
* The underlying protocol session is still available but clients should not
* use it to obtain session attributes.
*/
-public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionList
+public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession
{
protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
@@ -95,8 +95,7 @@
private byte _protocolMinorVersion;
private byte _protocolMajorVersion;
-
-
+ private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
/**
@@ -125,6 +124,7 @@
{
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
+ _minaProtocolSession.setAttachment(this);
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
//fixme - real value needed
@@ -239,7 +239,7 @@
*/
public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
{
- _channelId2UnprocessedMsgMap.put(message.channelId, message);
+ _channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
}
public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader)
@@ -250,11 +250,11 @@
{
throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
}
- if (msg.contentHeader != null)
+ if (msg.getContentHeader() != null)
{
throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
}
- msg.contentHeader = contentHeader;
+ msg.setContentHeader(contentHeader);
if (contentHeader.bodySize == 0)
{
deliverMessageToAMQSession(channelId, msg);
@@ -268,7 +268,7 @@
{
throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
}
- if (msg.contentHeader == null)
+ if (msg.getContentHeader() == null)
{
_channelId2UnprocessedMsgMap.remove(channelId);
throw new AMQException("Error: received content body without having received a ContentHeader frame first");
@@ -465,11 +465,11 @@
session.confirmConsumerCancelled(consumerTag);
}
- public void setProtocolVersion(byte versionMajor, byte versionMinor)
+ public void setProtocolVersion(final byte versionMajor, final byte versionMinor)
{
_protocolMajorVersion = versionMajor;
_protocolMinorVersion = versionMinor;
-
+ _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
}
public byte getProtocolMinorVersion()
@@ -480,6 +480,11 @@
public byte getProtocolMajorVersion()
{
return _protocolMajorVersion;
+ }
+
+ public VersionSpecificRegistry getRegistry()
+ {
+ return _registry;
}
}