You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/01/17 17:20:04 UTC

svn commit: r497062 - in /incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client: ./ handler/ message/ protocol/

Author: rajith
Date: Wed Jan 17 08:20:02 2007
New Revision: 497062

URL: http://svn.apache.org/viewvc?view=rev&rev=497062
Log:
Filled in the MessageTransferMethodHandler and added a few fields to the MessageHeaders class.
Minor modifications to AMQProtocolSession to directly fire the unprocessed message to AMQSession instead of strong in a map.


Modified:
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jan 17 08:20:02 2007
@@ -221,13 +221,13 @@
 
         private void dispatchMessage(UnprocessedMessage message)
         {
-            if (message.deliverBody != null)
+            if (message.content != null)
             {
-                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
+                final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.content.consumerTag);
 
                 if (consumer == null)
                 {
-                    _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring...");
+                    _logger.warn("Received a message from queue " + message.content.consumerTag + " without a handler - ignoring...");
                     _logger.warn("Consumers that exist: " + _consumers);
                     _logger.warn("Session hashcode: " + System.identityHashCode(this));
                 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Jan 17 08:20:02 2007
@@ -501,12 +501,12 @@
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag);
+            _logger.debug("notifyMessage called with message number " + messageFrame.content.deliveryTag);
         }
         try
         {
-            AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag,
-                                                                          messageFrame.deliverBody.redelivered,
+            AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.content.deliveryTag,
+                                                                          messageFrame.content.redelivered,
                                                                           messageFrame.contentHeader,
                                                                           messageFrame.bodies);
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Wed Jan 17 08:20:02 2007
@@ -20,9 +20,12 @@
  */
 package org.apache.qpid.client.handler;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageTransferBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.message.MessageHeaders;
+import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -30,7 +33,8 @@
 public class MessageTransferMethodHandler implements StateAwareMethodListener
 {
     private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler();
-
+    private static final Logger _logger = Logger.getLogger(MessageTransferMethodHandler.class);
+    
     public static MessageTransferMethodHandler getInstance()
     {
         return _instance;
@@ -44,7 +48,33 @@
                                	AMQMethodEvent evt)
                                 throws AMQException
     {
-		// TODO
+    	final UnprocessedMessage msg = new UnprocessedMessage();
+    	MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod();
+        msg.content = transferBody.getBody();
+        msg.channelId = evt.getChannelId();
+        _logger.debug("New JmsDeliver method received");
+        
+        MessageHeaders messageHeaders = new MessageHeaders();
+        messageHeaders.setMessageId(transferBody.getMessageId());
+        messageHeaders.setAppId(transferBody.getAppId());
+        messageHeaders.setContentType(transferBody.getContentType());
+        messageHeaders.setEncoding(transferBody.getContentEncoding());
+        messageHeaders.setCorrelationId(transferBody.getCorrelationId());
+        messageHeaders.setDestination(transferBody.getDestination());
+        messageHeaders.setExchange(transferBody.getExchange());
+        messageHeaders.setExpiration(transferBody.getExpiration());
+        messageHeaders.setReplyTo(transferBody.getReplyTo());
+        messageHeaders.setRoutingKey(transferBody.getRoutingKey());
+        messageHeaders.setTransactionId(transferBody.getTransactionId());
+        messageHeaders.setUserId(transferBody.getUserId());
+        messageHeaders.setPriority(transferBody.getPriority());
+        messageHeaders.setDeliveryMode(transferBody.getDeliveryMode());
+        messageHeaders.setJMSHeaders(transferBody.getApplicationHeaders());
+        
+        msg.contentHeader = messageHeaders;
+        
+        protocolSession.unprocessedMessageReceived(msg);
+        
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java Wed Jan 17 08:20:02 2007
@@ -34,16 +34,20 @@
 public class MessageHeaders
 {
     private static final Logger _logger = Logger.getLogger(MessageHeaders.class);
-
+    
     private String _contentType;
 
     private String _encoding;
+    
+    private String _destination;
+    
+    private String _exchange;
 
     private FieldTable _jmsHeaders;
 
-    private byte _deliveryMode;
+    private short _deliveryMode;
 
-    private byte _priority;
+    private short _priority;
 
     private String _correlationId;
 
@@ -63,6 +67,8 @@
 
     private String _transactionId;
 
+    private String _routingKey;
+    
     public MessageHeaders()
     {
     }
@@ -108,22 +114,22 @@
     }
 
 
-    public byte getDeliveryMode()
+    public short getDeliveryMode()
     {
         return _deliveryMode;
     }
 
-    public void setDeliveryMode(byte deliveryMode)
+    public void setDeliveryMode(short deliveryMode)
     {
         _deliveryMode = deliveryMode;
     }
 
-    public byte getPriority()
+    public short getPriority()
     {
         return _priority;
     }
 
-    public void setPriority(byte priority)
+    public void setPriority(short priority)
     {
         _priority = priority;
     }
@@ -161,12 +167,12 @@
 
     public String getMessageId()
     {
-        return _messageId == null ? null : _messageId.toString();
+        return _messageId;
     }
 
     public void setMessageId(String messageId)
     {
-        _messageId = messageId == null ? null : new String(messageId);
+        _messageId = messageId;
     }
 
     public long getTimestamp()
@@ -621,12 +627,36 @@
 
     }
 
-	public String get_transactionId() {
+	public String getTransactionId() {
 		return _transactionId;
 	}
 
-	public void set_transactionId(String id) {
+	public void setTransactionId(String id) {
 		_transactionId = id;
+	}
+
+	public String getDestination() {
+		return _destination;
+	}
+
+	public void setDestination(String destination) {
+		this._destination = destination;
+	}
+
+	public String getExchange() {
+		return _exchange;
+	}
+
+	public void setExchange(String exchange) {
+		this._exchange = exchange;
+	}
+
+	public String getRoutingKey() {
+		return _routingKey;
+	}
+
+	public void setRoutingKey(String routingKey) {
+		this._routingKey = routingKey;
 	}
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Wed Jan 17 08:20:02 2007
@@ -38,27 +38,8 @@
 {
     private long _bytesReceived = 0;
 
-    public BasicDeliverBody deliverBody;
-    public BasicReturnBody bounceBody; // TODO: check change (gustavo)
+    public Content content;
     public int channelId;
-    public ContentHeaderBody contentHeader;
-
-    /**
-     * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
-     */
-    public List bodies = new LinkedList();
-
-    public void receiveBody(ContentBody body) throws UnexpectedBodyReceivedException
-    {
-        bodies.add(body);
-        if (body.payload != null)
-        {
-            _bytesReceived += body.payload.remaining();
-        }
-    }
-
-    public boolean isAllBodyDataReceived()
-    {
-        return _bytesReceived == contentHeader.bodySize;
-    }
+    public MessageHeaders contentHeader;
+    
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Wed Jan 17 08:20:02 2007
@@ -309,7 +309,7 @@
         HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
 
         if (frame.bodyFrame instanceof AMQRequestBody)
-        {
+        {   
             _protocolSession.messageRequestBodyReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
         }
         else if (frame.bodyFrame instanceof AMQResponseBody)
@@ -327,15 +327,7 @@
 //             try
 //             {
 //                 boolean wasAnyoneInterested = false;
-//                 while (it.hasNext())
-//                 {
-//                     final AMQMethodListener listener = (AMQMethodListener) it.next();
-//                     wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
-//                 }
-//                 if (!wasAnyoneInterested)
-//                 {
-//                     throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"  + _frameListeners);
-//                 }
+//                 Q
 //             }
 //             catch (AMQException e)
 //             {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Wed Jan 17 08:20:02 2007
@@ -240,7 +240,8 @@
      */
     public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
     {
-        _channelId2UnprocessedMsgMap.put(message.channelId, message);
+        //_channelId2UnprocessedMsgMap.put(message.channelId, message);
+    	deliverMessageToAMQSession(message.channelId, message);
     }
     
     public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
@@ -323,7 +324,7 @@
     {
         AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
         session.messageReceived(msg);
-        _channelId2UnprocessedMsgMap.remove(channelId);
+        //_channelId2UnprocessedMsgMap.remove(channelId);
     }
     
     public long writeRequest(int channelNum, AMQMethodBody methodBody,