You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/01/18 23:54:32 UTC

svn commit: r497616 - in /incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client: handler/ message/ protocol/

Author: rajith
Date: Thu Jan 18 14:54:31 2007
New Revision: 497616

URL: http://svn.apache.org/viewvc?view=rev&rev=497616
Log:
implemented the logic for MessageTransfer and MessageAppend

Modified:
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java Thu Jan 18 14:54:31 2007
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.client.handler;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageAppendBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -30,6 +31,7 @@
 public class MessageAppendMethodHandler implements StateAwareMethodListener
 {
     private static MessageAppendMethodHandler _instance = new MessageAppendMethodHandler();
+    private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
 
     public static MessageAppendMethodHandler getInstance()
     {
@@ -44,7 +46,11 @@
                                	AMQMethodEvent evt)
                                 throws AMQException
     {
-		// TODO
+    	try {
+			protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod());
+		} catch (Exception e) {
+			_logger.error("Unable to add data from MessageAppendBody",e); 
+		}
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java Thu Jan 18 14:54:31 2007
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.client.handler;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageCloseBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -30,7 +31,8 @@
 public class MessageCloseMethodHandler implements StateAwareMethodListener
 {
     private static MessageCloseMethodHandler _instance = new MessageCloseMethodHandler();
-
+    private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+    
     public static MessageCloseMethodHandler getInstance()
     {
         return _instance;
@@ -44,7 +46,10 @@
                                	AMQMethodEvent evt)
                                 throws AMQException
     {
-		// TODO
+		MessageCloseBody body = (MessageCloseBody)evt.getMethod();
+		String referenceId = new String(body.getReference());
+		protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId);
+		_logger.debug("Method Close Body received, notify session to accept unprocessed message");
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Thu Jan 18 14:54:31 2007
@@ -22,13 +22,14 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.client.message.MessageHeaders;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
 
 public class MessageTransferMethodHandler implements StateAwareMethodListener
 {
@@ -50,7 +51,7 @@
     {
     	final UnprocessedMessage msg = new UnprocessedMessage();
     	MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod();
-        msg.content = transferBody.getBody();
+        
         msg.channelId = evt.getChannelId();
         msg.deliveryTag = evt.getRequestId();
         _logger.debug("New JmsDeliver method received");
@@ -74,7 +75,16 @@
         
         msg.contentHeader = messageHeaders;
         
-        protocolSession.unprocessedMessageReceived(msg);
+        if(transferBody.getBody().contentType == Content.ContentTypeEnum.CONTENT_TYPE_INLINE)
+        {
+        	msg.addContent(transferBody.getBody().getContentAsByteArray());
+        	protocolSession.deliverMessageToAMQSession(evt.getChannelId(), msg);
+        }
+        else
+        {
+        	String referenceId = new String(transferBody.getBody().getContentAsByteArray());
+        	protocolSession.deliverMessageToAMQSession(evt.getChannelId(),referenceId);
+        }
         
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Jan 18 14:54:31 2007
@@ -20,15 +20,15 @@
  */
 package org.apache.qpid.client.message;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.Content;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-
-import javax.jms.JMSException;
 import java.util.Iterator;
 import java.util.List;
 
+import javax.jms.JMSException;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+
 public abstract class AbstractJMSMessageFactory implements MessageFactory
 {
     private static final Logger _logger = Logger.getLogger(AbstractJMSMessageFactory.class);
@@ -38,11 +38,14 @@
 			ByteBuffer data, MessageHeaders contentHeader) throws AMQException;
 
 	protected AbstractJMSMessage createMessageWithBody(long messageNbr,
-			MessageHeaders contentHeader, Content body) throws AMQException {
-        ByteBuffer data;
-
-        data = ByteBuffer.allocate(body.content.remaining());
-        data.put(body.content);
+			MessageHeaders contentHeader, List contents) throws AMQException {
+		
+		ByteBuffer data = ByteBuffer.allocate((int)contentHeader.getSize());        
+        for (final Iterator it = contents.iterator();it.hasNext();)
+        {
+            byte[] bytes = (byte[]) it.next();
+            data.put(bytes);
+        }
         data.flip();
         
         _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data.remaining());
@@ -52,9 +55,9 @@
 
     public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered,
     										MessageHeaders contentHeader,
-    										Content body) throws JMSException, AMQException
+    										List contents) throws JMSException, AMQException
     {
-        final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, body);
+        final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, contents);
         msg.setJMSRedelivered(redelivered);
         return msg;
     }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Thu Jan 18 14:54:31 2007
@@ -20,8 +20,9 @@
  */
 package org.apache.qpid.client.message;
 
+import java.util.List;
+
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.Content;
 
 import javax.jms.JMSException;
 
@@ -30,7 +31,7 @@
 {
     AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
                                      MessageHeaders contentHeader,
-                                     Content body)
+                                     List contents)
         throws JMSException, AMQException;
 
     AbstractJMSMessage createMessage() throws JMSException;

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Jan 18 14:54:31 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.client.message;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import javax.jms.JMSException;
@@ -59,7 +60,7 @@
      */
     public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
                                             MessageHeaders contentHeader,
-                                            Content body) throws AMQException, JMSException
+                                            List contents) throws AMQException, JMSException
     {
         MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(contentHeader.getContentType());
         if (mf == null)
@@ -68,7 +69,7 @@
         }
         else
         {
-            return mf.createMessage(deliveryTag, redelivered, contentHeader, body);
+            return mf.createMessage(deliveryTag, redelivered, contentHeader, contents);
         }
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java Thu Jan 18 14:54:31 2007
@@ -69,7 +69,17 @@
 
     private String _routingKey;
     
-    public MessageHeaders()
+    private int _size;
+    
+    public int getSize() {
+		return _size;
+	}
+
+	public void setSize(int size) {
+		this._size = size;
+	}
+
+	public MessageHeaders()
     {
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Thu Jan 18 14:54:31 2007
@@ -20,10 +20,8 @@
  */
 package org.apache.qpid.client.message;
 
-import org.apache.qpid.framing.*;
-
-import java.util.List;
 import java.util.LinkedList;
+import java.util.List;
 
 /**
  * This class contains everything needed to process a JMS message. It assembles the
@@ -34,13 +32,20 @@
  * the MINA dispatcher thread.
  *
  */
-public class UnprocessedMessage
-{
-    private long _bytesReceived = 0;
-
-    public Content content;
-    public int channelId;
-    public long deliveryTag;
-    public MessageHeaders contentHeader;
-    
+public class UnprocessedMessage {
+	public int bytesReceived = 0;
+
+	public List contents = new LinkedList();
+
+	public int channelId;
+
+	public long deliveryTag;
+
+	public MessageHeaders contentHeader;
+
+	public void addContent(byte[] content) {
+		contents.add(content);
+		bytesReceived = bytesReceived + content.length;
+	}
+
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Jan 18 14:54:31 2007
@@ -42,6 +42,7 @@
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQRequestBody;
 import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.MessageAppendBody;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersionList;
 import org.apache.qpid.framing.RequestManager;
@@ -94,7 +95,7 @@
      * Maps from a channel id to an unprocessed message. This is used to tie together the
      * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
      */
-    protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+    protected ConcurrentMap _referenceId2UnprocessedMsgMap = new ConcurrentHashMap();
 
     protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
     protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();
@@ -244,15 +245,21 @@
     }
 
     /**
-     * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
+     * This is involed from MessageTransferMethodHandler if type is CONTENT_TYPE_REFERENCE
      * This is invoked on the MINA dispatcher thread.
      * @param message
      * @throws AMQException if this was not expected
      */
-    public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
+    public void unprocessedMessageReceived(String referenceId, UnprocessedMessage message) throws AMQException
     {
-        //_channelId2UnprocessedMsgMap.put(message.channelId, message);
-    	deliverMessageToAMQSession(message.channelId, message);
+        _referenceId2UnprocessedMsgMap.put(referenceId, message);
+    }
+    
+    public void messageAppendBodyReceived(MessageAppendBody appendBody) throws Exception
+    {
+    	String referenceId = new String(appendBody.getReference());
+    	UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+    	msg.addContent(appendBody.bytes);
     }
     
     public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
@@ -279,63 +286,30 @@
         requestManager.responseReceived(responseBody);
     }
 
-//     public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader)
-//             throws AMQException
-//     {
-//         UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
-//         if (msg == null)
-//         {
-//             throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
-//         }
-//         if (msg.contentHeader != null)
-//         {
-//             throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
-//         }
-//         msg.contentHeader = contentHeader;
-//         if (contentHeader.bodySize == 0)
-//         {
-//             deliverMessageToAMQSession(channelId, msg);
-//         }
-//     }
-// 
-//     public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
-//     {
-//         UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
-//         if (msg == null)
-//         {
-//             throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
-//         }
-//         if (msg.contentHeader == null)
-//         {
-//             _channelId2UnprocessedMsgMap.remove(channelId);
-//             throw new AMQException("Error: received content body without having received a ContentHeader frame first");
-//         }
-//         try
-//         {
-//             msg.receiveBody(contentBody);
-//         }
-//         catch (UnexpectedBodyReceivedException e)
-//         {
-//             _channelId2UnprocessedMsgMap.remove(channelId);
-//             throw e;
-//         }
-//         if (msg.isAllBodyDataReceived())
-//         {
-//             deliverMessageToAMQSession(channelId, msg);
-//         }
-//     }
-
     /**
+     * This is involed from MessageTransferMethodHandler if type is CONTENT_TYPE_INLINE
      * Deliver a message to the appropriate session, removing the unprocessed message
      * from our map
      * @param channelId the channel id the message should be delivered to
      * @param msg the message
      */
-    private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
+    public void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
     {
         AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+        msg.contentHeader.setSize(msg.bytesReceived);
         session.messageReceived(msg);
-        //_channelId2UnprocessedMsgMap.remove(channelId);
+    }
+    
+    /**
+     * This is involed from MessageCloseMethodHandler if type is CONTENT_TYPE_REFERENCE
+     * In this case we use the reference id to obtain the unprocessed message
+     * The channel id is used to retrive a session
+     */
+    public void deliverMessageToAMQSession(int channelId, String referenceId)
+    {
+    	UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+    	deliverMessageToAMQSession(channelId,msg);
+    	_referenceId2UnprocessedMsgMap.remove(referenceId);
     }
     
     public long writeRequest(int channelNum, AMQMethodBody methodBody,