You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/01/17 17:20:04 UTC
svn commit: r497062 - in
/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client:
./ handler/ message/ protocol/
Author: rajith
Date: Wed Jan 17 08:20:02 2007
New Revision: 497062
URL: http://svn.apache.org/viewvc?view=rev&rev=497062
Log:
Filled in the MessageTransferMethodHandler and added a few fields to the MessageHeaders class.
Minor modifications to AMQProtocolSession to directly fire the unprocessed message to AMQSession instead of strong in a map.
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jan 17 08:20:02 2007
@@ -221,13 +221,13 @@
private void dispatchMessage(UnprocessedMessage message)
{
- if (message.deliverBody != null)
+ if (message.content != null)
{
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.content.consumerTag);
if (consumer == null)
{
- _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring...");
+ _logger.warn("Received a message from queue " + message.content.consumerTag + " without a handler - ignoring...");
_logger.warn("Consumers that exist: " + _consumers);
_logger.warn("Session hashcode: " + System.identityHashCode(this));
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Jan 17 08:20:02 2007
@@ -501,12 +501,12 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag);
+ _logger.debug("notifyMessage called with message number " + messageFrame.content.deliveryTag);
}
try
{
- AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag,
- messageFrame.deliverBody.redelivered,
+ AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.content.deliveryTag,
+ messageFrame.content.redelivered,
messageFrame.contentHeader,
messageFrame.bodies);
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Wed Jan 17 08:20:02 2007
@@ -20,9 +20,12 @@
*/
package org.apache.qpid.client.handler;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.message.MessageHeaders;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -30,7 +33,8 @@
public class MessageTransferMethodHandler implements StateAwareMethodListener
{
private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler();
-
+ private static final Logger _logger = Logger.getLogger(MessageTransferMethodHandler.class);
+
public static MessageTransferMethodHandler getInstance()
{
return _instance;
@@ -44,7 +48,33 @@
AMQMethodEvent evt)
throws AMQException
{
- // TODO
+ final UnprocessedMessage msg = new UnprocessedMessage();
+ MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod();
+ msg.content = transferBody.getBody();
+ msg.channelId = evt.getChannelId();
+ _logger.debug("New JmsDeliver method received");
+
+ MessageHeaders messageHeaders = new MessageHeaders();
+ messageHeaders.setMessageId(transferBody.getMessageId());
+ messageHeaders.setAppId(transferBody.getAppId());
+ messageHeaders.setContentType(transferBody.getContentType());
+ messageHeaders.setEncoding(transferBody.getContentEncoding());
+ messageHeaders.setCorrelationId(transferBody.getCorrelationId());
+ messageHeaders.setDestination(transferBody.getDestination());
+ messageHeaders.setExchange(transferBody.getExchange());
+ messageHeaders.setExpiration(transferBody.getExpiration());
+ messageHeaders.setReplyTo(transferBody.getReplyTo());
+ messageHeaders.setRoutingKey(transferBody.getRoutingKey());
+ messageHeaders.setTransactionId(transferBody.getTransactionId());
+ messageHeaders.setUserId(transferBody.getUserId());
+ messageHeaders.setPriority(transferBody.getPriority());
+ messageHeaders.setDeliveryMode(transferBody.getDeliveryMode());
+ messageHeaders.setJMSHeaders(transferBody.getApplicationHeaders());
+
+ msg.contentHeader = messageHeaders;
+
+ protocolSession.unprocessedMessageReceived(msg);
+
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java Wed Jan 17 08:20:02 2007
@@ -34,16 +34,20 @@
public class MessageHeaders
{
private static final Logger _logger = Logger.getLogger(MessageHeaders.class);
-
+
private String _contentType;
private String _encoding;
+
+ private String _destination;
+
+ private String _exchange;
private FieldTable _jmsHeaders;
- private byte _deliveryMode;
+ private short _deliveryMode;
- private byte _priority;
+ private short _priority;
private String _correlationId;
@@ -63,6 +67,8 @@
private String _transactionId;
+ private String _routingKey;
+
public MessageHeaders()
{
}
@@ -108,22 +114,22 @@
}
- public byte getDeliveryMode()
+ public short getDeliveryMode()
{
return _deliveryMode;
}
- public void setDeliveryMode(byte deliveryMode)
+ public void setDeliveryMode(short deliveryMode)
{
_deliveryMode = deliveryMode;
}
- public byte getPriority()
+ public short getPriority()
{
return _priority;
}
- public void setPriority(byte priority)
+ public void setPriority(short priority)
{
_priority = priority;
}
@@ -161,12 +167,12 @@
public String getMessageId()
{
- return _messageId == null ? null : _messageId.toString();
+ return _messageId;
}
public void setMessageId(String messageId)
{
- _messageId = messageId == null ? null : new String(messageId);
+ _messageId = messageId;
}
public long getTimestamp()
@@ -621,12 +627,36 @@
}
- public String get_transactionId() {
+ public String getTransactionId() {
return _transactionId;
}
- public void set_transactionId(String id) {
+ public void setTransactionId(String id) {
_transactionId = id;
+ }
+
+ public String getDestination() {
+ return _destination;
+ }
+
+ public void setDestination(String destination) {
+ this._destination = destination;
+ }
+
+ public String getExchange() {
+ return _exchange;
+ }
+
+ public void setExchange(String exchange) {
+ this._exchange = exchange;
+ }
+
+ public String getRoutingKey() {
+ return _routingKey;
+ }
+
+ public void setRoutingKey(String routingKey) {
+ this._routingKey = routingKey;
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Wed Jan 17 08:20:02 2007
@@ -38,27 +38,8 @@
{
private long _bytesReceived = 0;
- public BasicDeliverBody deliverBody;
- public BasicReturnBody bounceBody; // TODO: check change (gustavo)
+ public Content content;
public int channelId;
- public ContentHeaderBody contentHeader;
-
- /**
- * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
- */
- public List bodies = new LinkedList();
-
- public void receiveBody(ContentBody body) throws UnexpectedBodyReceivedException
- {
- bodies.add(body);
- if (body.payload != null)
- {
- _bytesReceived += body.payload.remaining();
- }
- }
-
- public boolean isAllBodyDataReceived()
- {
- return _bytesReceived == contentHeader.bodySize;
- }
+ public MessageHeaders contentHeader;
+
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Wed Jan 17 08:20:02 2007
@@ -309,7 +309,7 @@
HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
if (frame.bodyFrame instanceof AMQRequestBody)
- {
+ {
_protocolSession.messageRequestBodyReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
}
else if (frame.bodyFrame instanceof AMQResponseBody)
@@ -327,15 +327,7 @@
// try
// {
// boolean wasAnyoneInterested = false;
-// while (it.hasNext())
-// {
-// final AMQMethodListener listener = (AMQMethodListener) it.next();
-// wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
-// }
-// if (!wasAnyoneInterested)
-// {
-// throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
-// }
+// Q
// }
// catch (AMQException e)
// {
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=497062&r1=497061&r2=497062
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Wed Jan 17 08:20:02 2007
@@ -240,7 +240,8 @@
*/
public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
{
- _channelId2UnprocessedMsgMap.put(message.channelId, message);
+ //_channelId2UnprocessedMsgMap.put(message.channelId, message);
+ deliverMessageToAMQSession(message.channelId, message);
}
public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
@@ -323,7 +324,7 @@
{
AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
session.messageReceived(msg);
- _channelId2UnprocessedMsgMap.remove(channelId);
+ //_channelId2UnprocessedMsgMap.remove(channelId);
}
public long writeRequest(int channelNum, AMQMethodBody methodBody,