You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/01/18 23:54:32 UTC
svn commit: r497616 - in
/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client:
handler/ message/ protocol/
Author: rajith
Date: Thu Jan 18 14:54:31 2007
New Revision: 497616
URL: http://svn.apache.org/viewvc?view=rev&rev=497616
Log:
implemented the logic for MessageTransfer and MessageAppend
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java Thu Jan 18 14:54:31 2007
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.handler;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageAppendBody;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -30,6 +31,7 @@
public class MessageAppendMethodHandler implements StateAwareMethodListener
{
private static MessageAppendMethodHandler _instance = new MessageAppendMethodHandler();
+ private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
public static MessageAppendMethodHandler getInstance()
{
@@ -44,7 +46,11 @@
AMQMethodEvent evt)
throws AMQException
{
- // TODO
+ try {
+ protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod());
+ } catch (Exception e) {
+ _logger.error("Unable to add data from MessageAppendBody",e);
+ }
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java Thu Jan 18 14:54:31 2007
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.handler;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageCloseBody;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -30,7 +31,8 @@
public class MessageCloseMethodHandler implements StateAwareMethodListener
{
private static MessageCloseMethodHandler _instance = new MessageCloseMethodHandler();
-
+ private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+
public static MessageCloseMethodHandler getInstance()
{
return _instance;
@@ -44,7 +46,10 @@
AMQMethodEvent evt)
throws AMQException
{
- // TODO
+ MessageCloseBody body = (MessageCloseBody)evt.getMethod();
+ String referenceId = new String(body.getReference());
+ protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId);
+ _logger.debug("Method Close Body received, notify session to accept unprocessed message");
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Thu Jan 18 14:54:31 2007
@@ -22,13 +22,14 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.message.MessageHeaders;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
public class MessageTransferMethodHandler implements StateAwareMethodListener
{
@@ -50,7 +51,7 @@
{
final UnprocessedMessage msg = new UnprocessedMessage();
MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod();
- msg.content = transferBody.getBody();
+
msg.channelId = evt.getChannelId();
msg.deliveryTag = evt.getRequestId();
_logger.debug("New JmsDeliver method received");
@@ -74,7 +75,16 @@
msg.contentHeader = messageHeaders;
- protocolSession.unprocessedMessageReceived(msg);
+ if(transferBody.getBody().contentType == Content.ContentTypeEnum.CONTENT_TYPE_INLINE)
+ {
+ msg.addContent(transferBody.getBody().getContentAsByteArray());
+ protocolSession.deliverMessageToAMQSession(evt.getChannelId(), msg);
+ }
+ else
+ {
+ String referenceId = new String(transferBody.getBody().getContentAsByteArray());
+ protocolSession.deliverMessageToAMQSession(evt.getChannelId(),referenceId);
+ }
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Jan 18 14:54:31 2007
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.Content;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-
-import javax.jms.JMSException;
import java.util.Iterator;
import java.util.List;
+import javax.jms.JMSException;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+
public abstract class AbstractJMSMessageFactory implements MessageFactory
{
private static final Logger _logger = Logger.getLogger(AbstractJMSMessageFactory.class);
@@ -38,11 +38,14 @@
ByteBuffer data, MessageHeaders contentHeader) throws AMQException;
protected AbstractJMSMessage createMessageWithBody(long messageNbr,
- MessageHeaders contentHeader, Content body) throws AMQException {
- ByteBuffer data;
-
- data = ByteBuffer.allocate(body.content.remaining());
- data.put(body.content);
+ MessageHeaders contentHeader, List contents) throws AMQException {
+
+ ByteBuffer data = ByteBuffer.allocate((int)contentHeader.getSize());
+ for (final Iterator it = contents.iterator();it.hasNext();)
+ {
+ byte[] bytes = (byte[]) it.next();
+ data.put(bytes);
+ }
data.flip();
_logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining());
@@ -52,9 +55,9 @@
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered,
MessageHeaders contentHeader,
- Content body) throws JMSException, AMQException
+ List contents) throws JMSException, AMQException
{
- final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, body);
+ final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, contents);
msg.setJMSRedelivered(redelivered);
return msg;
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Thu Jan 18 14:54:31 2007
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.client.message;
+import java.util.List;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.Content;
import javax.jms.JMSException;
@@ -30,7 +31,7 @@
{
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
MessageHeaders contentHeader,
- Content body)
+ List contents)
throws JMSException, AMQException;
AbstractJMSMessage createMessage() throws JMSException;
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Jan 18 14:54:31 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.client.message;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
@@ -59,7 +60,7 @@
*/
public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
MessageHeaders contentHeader,
- Content body) throws AMQException, JMSException
+ List contents) throws AMQException, JMSException
{
MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(contentHeader.getContentType());
if (mf == null)
@@ -68,7 +69,7 @@
}
else
{
- return mf.createMessage(deliveryTag, redelivered, contentHeader, body);
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, contents);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java Thu Jan 18 14:54:31 2007
@@ -69,7 +69,17 @@
private String _routingKey;
- public MessageHeaders()
+ private int _size;
+
+ public int getSize() {
+ return _size;
+ }
+
+ public void setSize(int size) {
+ this._size = size;
+ }
+
+ public MessageHeaders()
{
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Thu Jan 18 14:54:31 2007
@@ -20,10 +20,8 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.framing.*;
-
-import java.util.List;
import java.util.LinkedList;
+import java.util.List;
/**
* This class contains everything needed to process a JMS message. It assembles the
@@ -34,13 +32,20 @@
* the MINA dispatcher thread.
*
*/
-public class UnprocessedMessage
-{
- private long _bytesReceived = 0;
-
- public Content content;
- public int channelId;
- public long deliveryTag;
- public MessageHeaders contentHeader;
-
+public class UnprocessedMessage {
+ public int bytesReceived = 0;
+
+ public List contents = new LinkedList();
+
+ public int channelId;
+
+ public long deliveryTag;
+
+ public MessageHeaders contentHeader;
+
+ public void addContent(byte[] content) {
+ contents.add(content);
+ bytesReceived = bytesReceived + content.length;
+ }
+
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Jan 18 14:54:31 2007
@@ -42,6 +42,7 @@
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.MessageAppendBody;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.RequestManager;
@@ -94,7 +95,7 @@
* Maps from a channel id to an unprocessed message. This is used to tie together the
* JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
*/
- protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+ protected ConcurrentMap _referenceId2UnprocessedMsgMap = new ConcurrentHashMap();
protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();
@@ -244,15 +245,21 @@
}
/**
- * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
+ * This is involed from MessageTransferMethodHandler if type is CONTENT_TYPE_REFERENCE
* This is invoked on the MINA dispatcher thread.
* @param message
* @throws AMQException if this was not expected
*/
- public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
+ public void unprocessedMessageReceived(String referenceId, UnprocessedMessage message) throws AMQException
{
- //_channelId2UnprocessedMsgMap.put(message.channelId, message);
- deliverMessageToAMQSession(message.channelId, message);
+ _referenceId2UnprocessedMsgMap.put(referenceId, message);
+ }
+
+ public void messageAppendBodyReceived(MessageAppendBody appendBody) throws Exception
+ {
+ String referenceId = new String(appendBody.getReference());
+ UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+ msg.addContent(appendBody.bytes);
}
public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
@@ -279,63 +286,30 @@
requestManager.responseReceived(responseBody);
}
-// public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader)
-// throws AMQException
-// {
-// UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
-// if (msg == null)
-// {
-// throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
-// }
-// if (msg.contentHeader != null)
-// {
-// throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
-// }
-// msg.contentHeader = contentHeader;
-// if (contentHeader.bodySize == 0)
-// {
-// deliverMessageToAMQSession(channelId, msg);
-// }
-// }
-//
-// public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
-// {
-// UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
-// if (msg == null)
-// {
-// throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
-// }
-// if (msg.contentHeader == null)
-// {
-// _channelId2UnprocessedMsgMap.remove(channelId);
-// throw new AMQException("Error: received content body without having received a ContentHeader frame first");
-// }
-// try
-// {
-// msg.receiveBody(contentBody);
-// }
-// catch (UnexpectedBodyReceivedException e)
-// {
-// _channelId2UnprocessedMsgMap.remove(channelId);
-// throw e;
-// }
-// if (msg.isAllBodyDataReceived())
-// {
-// deliverMessageToAMQSession(channelId, msg);
-// }
-// }
-
/**
+ * This is involed from MessageTransferMethodHandler if type is CONTENT_TYPE_INLINE
* Deliver a message to the appropriate session, removing the unprocessed message
* from our map
* @param channelId the channel id the message should be delivered to
* @param msg the message
*/
- private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
+ public void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
{
AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+ msg.contentHeader.setSize(msg.bytesReceived);
session.messageReceived(msg);
- //_channelId2UnprocessedMsgMap.remove(channelId);
+ }
+
+ /**
+ * This is involed from MessageCloseMethodHandler if type is CONTENT_TYPE_REFERENCE
+ * In this case we use the reference id to obtain the unprocessed message
+ * The channel id is used to retrive a session
+ */
+ public void deliverMessageToAMQSession(int channelId, String referenceId)
+ {
+ UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+ deliverMessageToAMQSession(channelId,msg);
+ _referenceId2UnprocessedMsgMap.remove(referenceId);
}
public long writeRequest(int channelNum, AMQMethodBody methodBody,