You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/09/06 16:29:04 UTC
svn commit: r573282 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
./ handler/ message/ protocol/
Author: rajith
Date: Thu Sep 6 07:29:03 2007
New Revision: 573282
URL: http://svn.apache.org/viewvc?rev=573282&view=rev
Log:
Unprocessed message was made abstract and a 0-8 and 0-10 implementation is provided.
The return message extends unprocessed_msg_0_8 as return message is only a 0-8 feature.
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Sep 6 07:29:03 2007
@@ -71,79 +71,22 @@
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -1269,19 +1212,17 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Message["
- + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
- + "] received in session with channel id " + _channelId);
+ _logger.debug("Message[" + message.toString() + "] received in session");
}
- if (message.getDeliverBody() == null)
+ if (message instanceof ReturnMessage)
{
// Return of the bounced message.
- returnBouncedMessage(message);
+ returnBouncedMessage((ReturnMessage)message);
}
else
{
- _highestDeliveryTag.set(message.getDeliverBody().deliveryTag);
+ _highestDeliveryTag.set(message.getDeliveryTag());
_queue.add(message);
}
}
@@ -1374,10 +1315,10 @@
if (_logger.isTraceEnabled())
{
- _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+ _logger.trace("Rejecting Unacked message:" + message.getDeliveryTag());
}
- rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ rejectMessage(message.getDeliveryTag(), requeue);
}
public void rejectMessage(AbstractJMSMessage message, boolean requeue)
@@ -2320,12 +2261,12 @@
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if ((consumerTag == null) || message.getDeliverBody().consumerTag.equals(consumerTag))
+ if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag))
{
if (_logger.isDebugEnabled())
{
_logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
- + message.getDeliverBody().deliveryTag);
+ + message.getDeliveryTag());
}
messages.remove();
@@ -2334,7 +2275,7 @@
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
+ _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag);
}
}
}
@@ -2363,7 +2304,7 @@
}
}
- private void returnBouncedMessage(final UnprocessedMessage message)
+ private void returnBouncedMessage(final ReturnMessage msg)
{
_connection.performConnectionTask(new Runnable()
{
@@ -2373,11 +2314,11 @@
{
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
- _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange,
- message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies());
+ _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
+ msg.getExchange(), msg.getContentHeader(), msg.getBodies());
- AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
- AMQShortString reason = message.getBounceBody().replyText;
+ AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+ AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
// @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
@@ -2565,7 +2506,7 @@
_lock.wait();
}
- if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
+ if (message.getDeliveryTag() <= _rollbackMark.get())
{
rejectMessage(message, true);
}
@@ -2619,10 +2560,11 @@
private void dispatchMessage(UnprocessedMessage message)
{
- if (message.getDeliverBody() != null)
- {
+ //This if block is not needed anymore as bounce messages are handled separately
+ //if (message.getDeliverBody() != null)
+ //{
final BasicMessageConsumer consumer =
- (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+ (BasicMessageConsumer) _consumers.get(new AMQShortString(message.getConsumerTag()));
if ((consumer == null) || consumer.isClosed())
{
@@ -2631,13 +2573,13 @@
if (consumer == null)
{
_dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().deliveryTag + "] from queue "
- + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
+ + message.getDeliveryTag() + "] from queue "
+ + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
}
else
{
_dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
+ + message.getDeliveryTag() + "] from queue " + " consumer("
+ consumer.debugIdentity() + ") is closed rejecting(requeue)...");
}
}
@@ -2652,7 +2594,7 @@
consumer.notifyMessage(message, _channelId);
}
}
- }
+ //}
}
/*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Sep 6 07:29:03 2007
@@ -44,7 +44,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class BasicMessageConsumer extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<H,B> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -74,7 +74,7 @@
*/
private final ArrayBlockingQueue _synchronousQueue;
- private MessageFactoryRegistry _messageFactory;
+ protected MessageFactoryRegistry _messageFactory;
private final AMQSession _session;
@@ -543,16 +543,12 @@
if (debug)
{
- _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag);
+ _logger.debug("notifyMessage called with message number " + messageFrame.getDeliveryTag());
}
try
{
- AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
- messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
- messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
-
+ AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame);
if (debug)
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
@@ -589,6 +585,8 @@
}
}
}
+
+ public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H,B> messageFrame)throws Exception;
/**
* @param jmsMessage this message has already been processed so can't redo preDeliver
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Sep 6 07:29:03 2007
@@ -22,16 +22,20 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicCancelBody;
import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicMessageConsumer_0_8 extends BasicMessageConsumer
+public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeaderBody,ContentBody>
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -70,5 +74,14 @@
{
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
+ }
+
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody,ContentBody> messageFrame)throws Exception
+ {
+
+ return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
+ messageFrame.isRedelivered(), messageFrame.getExchange(),
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Thu Sep 6 07:29:03 2007
@@ -21,13 +21,12 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +44,14 @@
public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
throws AMQException
{
- final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod());
+ BasicDeliverBody deliveryBody = (BasicDeliverBody) evt.getMethod();
+ final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
+ evt.getChannelId(),
+ deliveryBody.deliveryTag,
+ deliveryBody.consumerTag.asString(),
+ deliveryBody.getExchange(),
+ deliveryBody.getRoutingKey(),
+ deliveryBody.getRedelivered());
_logger.debug("New JmsDeliver method received");
protocolSession.unprocessedMessageReceived(msg);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Thu Sep 6 07:29:03 2007
@@ -21,13 +21,12 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,8 +44,14 @@
public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
throws AMQException
{
+ BasicReturnBody returnBody = (BasicReturnBody)evt.getMethod();
_logger.debug("New JmsBounce method received");
- final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicReturnBody) evt.getMethod());
+ final ReturnMessage msg = new ReturnMessage(evt.getChannelId(),
+ returnBody.getExchange(),
+ returnBody.getRoutingKey(),
+ returnBody.getReplyText(),
+ returnBody.getReplyCode()
+ );
protocolSession.unprocessedMessageReceived(msg);
}
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java?rev=573282&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java Thu Sep 6 07:29:03 2007
@@ -0,0 +1,26 @@
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public class ReturnMessage extends UnprocessedMessage_0_8
+{
+ final private AMQShortString _replyText;
+ final private int _replyCode;
+
+ public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode)
+ {
+ super(channelId,-1,"",exchange,routingKey,false);
+ _replyText = replyText;
+ _replyCode = replyCode;
+ }
+
+ public int getReplyCode()
+ {
+ return _replyCode;
+ }
+
+ public AMQShortString getReplyText()
+ {
+ return _replyText;
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Thu Sep 6 07:29:03 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,14 +20,10 @@
*/
package org.apache.qpid.client.message;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -36,96 +32,60 @@
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage
+public abstract class UnprocessedMessage<H,B>
{
- private long _bytesReceived = 0;
-
- private final BasicDeliverBody _deliverBody;
- private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
private final int _channelId;
- private ContentHeaderBody _contentHeader;
-
- /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
- private List<ContentBody> _bodies;
+ private final long _deliveryId;
+ private final String _consumerTag;
+ protected AMQShortString _exchange;
+ protected AMQShortString _routingKey;
+ protected boolean _redelivered;
- public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
+ public UnprocessedMessage(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
{
- _deliverBody = deliverBody;
_channelId = channelId;
- _bounceBody = null;
+ _deliveryId = deliveryId;
+ _consumerTag = consumerTag;
+ _exchange = exchange;
+ _routingKey = routingKey;
+ _redelivered = redelivered;
}
+ public abstract void receiveBody(B nativeMessageBody);
- public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
- {
- _deliverBody = null;
- _channelId = channelId;
- _bounceBody = bounceBody;
- }
-
- public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException
- {
-
- if (body.payload != null)
- {
- final long payloadSize = body.payload.remaining();
-
- if (_bodies == null)
- {
- if (payloadSize == getContentHeader().bodySize)
- {
- _bodies = Collections.singletonList(body);
- }
- else
- {
- _bodies = new ArrayList<ContentBody>();
- _bodies.add(body);
- }
-
- }
- else
- {
- _bodies.add(body);
- }
- _bytesReceived += payloadSize;
- }
- }
+ public abstract void setContentHeader(H nativeMessageHeader);
- public boolean isAllBodyDataReceived()
+ public int getChannelId()
{
- return _bytesReceived == getContentHeader().bodySize;
+ return _channelId;
}
- public BasicDeliverBody getDeliverBody()
+ public long getDeliveryTag()
{
- return _deliverBody;
+ return _deliveryId;
}
- public BasicReturnBody getBounceBody()
+ public String getConsumerTag()
{
- return _bounceBody;
+ return _consumerTag;
}
-
- public int getChannelId()
+ public AMQShortString getExchange()
{
- return _channelId;
+ return _exchange;
}
-
- public ContentHeaderBody getContentHeader()
+ public AMQShortString getRoutingKey()
{
- return _contentHeader;
+ return _routingKey;
}
- public void setContentHeader(ContentHeaderBody contentHeader)
+ public boolean isRedelivered()
{
- this._contentHeader = contentHeader;
+ return _redelivered;
}
- public List<ContentBody> getBodies()
- {
- return _bodies;
- }
+ public abstract List<B> getBodies();
+ public abstract H getContentHeader();
}
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=573282&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Thu Sep 6 07:29:03 2007
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.Struct;
+
+/**
+ * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
+ * the content body/ies.
+ *
+ * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
+ * thread in order to minimise the amount of work done in the MINA dispatcher thread.
+ */
+public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer>
+{
+ private Struct[] _headers;
+
+ /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
+ private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
+
+ public UnprocessedMessage_0_10(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ {
+ super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
+ }
+
+ public void receiveBody(ByteBuffer body)
+ {
+
+ _bodies.add(body);
+ }
+
+ public void setContentHeader(Struct[] headers)
+ {
+ this._headers = headers;
+ for(Struct s: headers)
+ {
+ if (s instanceof DeliveryProperties)
+ {
+ DeliveryProperties props = (DeliveryProperties)s;
+ _exchange = new AMQShortString(props.getExchange());
+ _routingKey = new AMQShortString(props.getRoutingKey());
+ _redelivered = props.getRedelivered();
+ }
+ }
+ }
+
+ public Struct[] getContentHeader()
+ {
+ return _headers;
+ }
+
+ public List<ByteBuffer> getBodies()
+ {
+ return _bodies;
+ }
+
+}
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=573282&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Thu Sep 6 07:29:03 2007
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
+ * the content body/ies.
+ *
+ * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
+ * thread in order to minimise the amount of work done in the MINA dispatcher thread.
+ */
+public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody,ContentBody>
+{
+ private long _bytesReceived = 0;
+
+ private BasicDeliverBody _deliverBody;
+ private ContentHeaderBody _contentHeader;
+
+ /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
+ private List<ContentBody> _bodies;
+
+ public UnprocessedMessage_0_8(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ {
+ super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
+ }
+
+ public void receiveBody(ContentBody body)
+ {
+
+ if (body.payload != null)
+ {
+ final long payloadSize = body.payload.remaining();
+
+ if (_bodies == null)
+ {
+ if (payloadSize == getContentHeader().bodySize)
+ {
+ _bodies = Collections.singletonList(body);
+ }
+ else
+ {
+ _bodies = new ArrayList<ContentBody>();
+ _bodies.add(body);
+ }
+
+ }
+ else
+ {
+ _bodies.add(body);
+ }
+ _bytesReceived += payloadSize;
+ }
+ }
+
+ public void setMethodBody(BasicDeliverBody deliverBody)
+ {
+ _deliverBody = deliverBody;
+ }
+
+ public void setContentHeader(ContentHeaderBody contentHeader)
+ {
+ this._contentHeader = contentHeader;
+ }
+
+ public boolean isAllBodyDataReceived()
+ {
+ return _bytesReceived == getContentHeader().bodySize;
+ }
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return _deliverBody;
+ }
+
+ public ContentHeaderBody getContentHeader()
+ {
+ return _contentHeader;
+ }
+
+ public List<ContentBody> getBodies()
+ {
+ return _bodies;
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder();
+ buf.append("Channel Id : " + this.getChannelId());
+ if (_contentHeader != null)
+ {
+ buf.append("ContentHeader " + _contentHeader);
+ }
+ if(_deliverBody != null)
+ {
+ buf.append("Delivery tag " + _deliverBody.deliveryTag);
+ buf.append("Consumer tag " + _deliverBody.consumerTag);
+ buf.append("Deliver Body " + _deliverBody);
+ }
+
+ return buf.toString();
+ }
+
+}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=573282&r1=573281&r2=573282&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Sep 6 07:29:03 2007
@@ -32,7 +32,9 @@
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
// import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
+import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQShortString;
@@ -93,7 +95,7 @@
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
* first) with the subsequent content header and content bodies.
*/
- protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+ protected ConcurrentMap<Integer,UnprocessedMessage_0_8> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage_0_8>();
/** Counter to ensure unique queue names */
protected int _queueId = 1;
@@ -228,14 +230,14 @@
*
* @throws AMQException if this was not expected
*/
- public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
+ public void unprocessedMessageReceived(UnprocessedMessage_0_8 message) throws AMQException
{
_channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
}
public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId);
if (msg == null)
{
throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null);
@@ -255,7 +257,7 @@
public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ UnprocessedMessage_0_8 msg = _channelId2UnprocessedMsgMap.get(channelId);
if (msg == null)
{
throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null);