You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC
svn commit: r507672 [10/16] - in /incubator/qpid/branches/qpid.0-9:
gentools/src/org/apache/qpid/gentools/ gentools/templ.java/
gentools/templ.net/ java/ java/broker/ java/broker/bin/
java/broker/distribution/ java/broker/distribution/src/ java/broker/...
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -21,14 +21,18 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MessageRejectBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+//import org.apache.log4j.Logger;
public class MessageRejectMethodHandler implements StateAwareMethodListener
{
+ //private static final Logger _logger = Logger.getLogger(MessageRejectMethodHandler.class);
+
private static MessageRejectMethodHandler _instance = new MessageRejectMethodHandler();
public static MessageRejectMethodHandler getInstance()
@@ -37,12 +41,8 @@
}
private MessageRejectMethodHandler() {}
-
-
- public void methodReceived (AMQStateManager stateManager,
- AMQProtocolSession protocolSession,
- AMQMethodEvent evt)
- throws AMQException
+
+ public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
// TODO
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -21,14 +21,18 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MessageResumeBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+//import org.apache.log4j.Logger;
public class MessageResumeMethodHandler implements StateAwareMethodListener
{
+ //private static final Logger _logger = Logger.getLogger(MessageResumeMethodHandler.class);
+
private static MessageResumeMethodHandler _instance = new MessageResumeMethodHandler();
public static MessageResumeMethodHandler getInstance()
@@ -37,14 +41,10 @@
}
private MessageResumeMethodHandler() {}
-
-
- public void methodReceived (AMQStateManager stateManager,
- AMQProtocolSession protocolSession,
- AMQMethodEvent evt)
- throws AMQException
+
+ public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
- System.out.println("");
+ // TODO
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.client.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.MessageHeaders;
import org.apache.qpid.client.message.UnprocessedMessage;
@@ -31,10 +30,13 @@
import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.log4j.Logger;
+
public class MessageTransferMethodHandler implements StateAwareMethodListener
{
- private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler();
private static final Logger _logger = Logger.getLogger(MessageTransferMethodHandler.class);
+
+ private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler();
public static MessageTransferMethodHandler getInstance()
{
@@ -42,18 +44,11 @@
}
private MessageTransferMethodHandler() {}
-
-
- public void methodReceived (AMQStateManager stateManager,
- AMQProtocolSession protocolSession,
- AMQMethodEvent evt)
- throws AMQException
+
+ public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
- final UnprocessedMessage msg = new UnprocessedMessage();
MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod();
- msg.channelId = evt.getChannelId();
- msg.deliveryTag = evt.getRequestId();
_logger.debug("New JmsDeliver method received");
MessageHeaders messageHeaders = new MessageHeaders();
@@ -73,7 +68,7 @@
messageHeaders.setDeliveryMode(transferBody.getDeliveryMode());
messageHeaders.setJMSHeaders(transferBody.getApplicationHeaders());
- msg.contentHeader = messageHeaders;
+ final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), messageHeaders);
if(transferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T)
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -17,38 +17,35 @@
*/
package org.apache.qpid.client.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+
import org.apache.log4j.Logger;
-/**
- * @author Apache Software Foundation
- */
public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
{
- private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class);
- private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
+ private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class);
+
+ private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
+
+ public static QueueDeleteOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueDeleteOkMethodHandler() {}
- public static QueueDeleteOkMethodHandler getInstance()
- {
- return _instance;
- }
-
- private QueueDeleteOkMethodHandler()
- {
- }
-
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
- {
- if (_logger.isDebugEnabled())
- {
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ {
+ if (_logger.isDebugEnabled())
+ {
QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
_logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
- }
- }
+ }
+ }
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Wed Feb 14 12:02:03 2007
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.charset.Charset;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
@@ -54,7 +55,7 @@
AbstractBytesMessage(ByteBuffer data)
{
super(data); // this instanties a content header
- getMessageHeaders().setContentType(getMimeType());
+ getMessageHeaders().setContentType(getMimeTypeAsShortString());
if (_data == null)
{
@@ -68,13 +69,12 @@
_data.setAutoExpand(true);
}
-
AbstractBytesMessage(long messageNbr, MessageHeaders contentHeader, ByteBuffer data)
throws AMQException
{
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
super(messageNbr, contentHeader, data);
- getMessageHeaders().setContentType(getMimeType());
+ getMessageHeaders().setContentType(getMimeTypeAsShortString());
}
public void clearBodyImpl() throws JMSException
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java Wed Feb 14 12:02:03 2007
@@ -10,6 +10,7 @@
import javax.jms.MessageNotWriteableException;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
/**
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Feb 14 12:02:03 2007
@@ -34,15 +34,22 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.client.CustomJMXProperty;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import javax.jms.*;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+
public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+
protected boolean _redelivered;
@@ -51,6 +58,7 @@
protected boolean _readableMessage = false;
protected boolean _changedData;
private Destination _destination;
+ private JMSHeaderAdapter _headerAdapter;
private BasicMessageConsumer _consumer;
protected AbstractJMSMessage(ByteBuffer data)
@@ -84,17 +92,29 @@
_readableProperties = (_messageHeaders != null);
}
- public String getJMSMessageID() throws JMSException
+ public AMQShortString getJMSMessageIDAsShortString() throws JMSException
{
if (getMessageHeaders().getMessageId() == null)
{
- getMessageHeaders().setMessageId("ID:" + _deliveryTag);
+ getMessageHeaders().setMessageId(new AMQShortString("ID:" + _deliveryTag));
}
return getMessageHeaders().getMessageId();
}
+
+ // The String version is required for javax.jms.Message
+ public String getJMSMessageID() throws JMSException
+ {
+ return getJMSMessageIDAsShortString().asString();
+ }
+ // The String version is required for javax.jms.Message
public void setJMSMessageID(String messageId) throws JMSException
{
+ setJMSMessageID(new AMQShortString(messageId));
+ }
+
+ public void setJMSMessageID(AMQShortString messageId) throws JMSException
+ {
getMessageHeaders().setMessageId(messageId);
}
@@ -115,22 +135,35 @@
public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
{
- getMessageHeaders().setCorrelationId(new String(bytes));
+ getMessageHeaders().setCorrelationId(new AMQShortString(bytes));
}
+ // The String version is required for javax.jms.Message
public void setJMSCorrelationID(String correlationId) throws JMSException
{
+ setJMSCorrelationID(new AMQShortString(correlationId));
+ }
+
+ public void setJMSCorrelationID(AMQShortString correlationId) throws JMSException
+ {
getMessageHeaders().setCorrelationId(correlationId);
}
- public String getJMSCorrelationID() throws JMSException
+ public AMQShortString getJMSCorrelationIDAsShortString() throws JMSException
{
return getMessageHeaders().getCorrelationId();
}
+ // The String version is required for javax.jms.Message
+ public String getJMSCorrelationID() throws JMSException
+ {
+ AMQShortString ss = getMessageHeaders().getCorrelationId();
+ return ss == null ? null : ss.asString();
+ }
+
public Destination getJMSReplyTo() throws JMSException
{
- String replyToEncoding = getMessageHeaders().getReplyTo();
+ AMQShortString replyToEncoding = getMessageHeaders().getReplyTo();
if (replyToEncoding == null)
{
return null;
@@ -142,7 +175,7 @@
{
try
{
- BindingURL binding = new AMQBindingURL(replyToEncoding);
+ BindingURL binding = new AMQBindingURL(replyToEncoding.asString());
dest = AMQDestination.createDestination(binding);
}
catch (URLSyntaxException e)
@@ -165,11 +198,11 @@
if (!(destination instanceof AMQDestination))
{
throw new IllegalArgumentException("ReplyTo destination may only be an AMQDestination - passed argument was type " +
- destination.getClass());
+ destination.getClass());
}
final AMQDestination amqd = (AMQDestination) destination;
- final String encodedDestination = amqd.getEncodedName();
+ final AMQShortString encodedDestination = amqd.getEncodedName();
_destinationCache.put(encodedDestination, destination);
getMessageHeaders().setReplyTo(encodedDestination);
}
@@ -179,7 +212,7 @@
return _destination;
}
- public void setJMSDestination(Destination destination) throws JMSException
+ public void setJMSDestination(Destination destination)
{
_destination = destination;
}
@@ -209,7 +242,7 @@
// Since the type field is not a part of message.transport and is used only for
// JMS messages, this change to JMS Headers solves the problem.
// return getMessageHeaders().getType();
- return getStringProperty(CustomJMXProperty.JMSXType.toString());
+ return getStringProperty(CustomJMSXProperty.JMSXType.toString());
}
public void setJMSType(String string) throws JMSException
@@ -217,7 +250,7 @@
// Since the type field is not a part of message.transport and is used only for
// JMS messages, this change to JMS Headers solves the problem.
// getMessageHeaders().setType(string);
- setStringProperty(CustomJMXProperty.JMSXType.toString(), string);
+ setStringProperty(CustomJMSXProperty.JMSXType.toString(), string);
}
public long getJMSExpiration() throws JMSException
@@ -243,7 +276,6 @@
public void clearProperties() throws JMSException
{
getMessageHeaders().getJMSHeaders().clear();
-
_readableProperties = false;
}
@@ -253,11 +285,24 @@
_readableMessage = false;
}
+ public boolean propertyExists(AMQShortString propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ return getMessageHeaders().getJMSHeaders().propertyExists(propertyName);
+ }
public boolean propertyExists(String propertyName) throws JMSException
{
+ return propertyExists(new AMQShortString(propertyName));
+ }
+
+ public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
+ {
checkPropertyName(propertyName);
- return getMessageHeaders().getJMSHeaders().propertyExists(propertyName);
+ Boolean b = getMessageHeaders().getJMSHeaders().getBoolean(propertyName);
+ if (b != null)
+ return b;
+ return false;
}
public boolean getBooleanProperty(String propertyName) throws JMSException
@@ -267,7 +312,6 @@
if (b != null)
return b;
return false;
-// return getMessageHeaders().getJMSHeaders().getBoolean(propertyName);
}
public byte getByteProperty(String propertyName) throws JMSException
@@ -277,7 +321,13 @@
if (b == null)
throw new NumberFormatException("Byte value null");
return b;
-// return getMessageHeaders().getJMSHeaders().getByte(propertyName);
+ }
+
+ public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ return getMessageHeaders().getJMSHeaders().getBytes(propertyName);
+
}
public short getShortProperty(String propertyName) throws JMSException
@@ -291,7 +341,6 @@
if (b == null)
throw new NumberFormatException("Short value null");
return (short)b;
-// return getMessageHeaders().getJMSHeaders().getShort(propertyName);
}
public int getIntProperty(String propertyName) throws JMSException
@@ -309,7 +358,6 @@
if (b == null)
throw new NumberFormatException("Int value null");
return (short)b;
-// return getMessageHeaders().getJMSHeaders().getInteger(propertyName);
}
public long getLongProperty(String propertyName) throws JMSException
@@ -318,6 +366,7 @@
Long l = getMessageHeaders().getJMSHeaders().getLong(propertyName);
if (l != null)
return l;
+ // try Integer
Integer i = getMessageHeaders().getJMSHeaders().getInteger(propertyName);
if (i != null)
return i;
@@ -330,7 +379,6 @@
if (b == null)
throw new NumberFormatException("Long value null");
return (short)b;
-// return getMessageHeaders().getJMSHeaders().getLong(propertyName);
}
public float getFloatProperty(String propertyName) throws JMSException
@@ -374,13 +422,18 @@
return getMessageHeaders().getJMSHeaders().getPropertyNames();
}
- public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+ public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
{
checkWritableProperties();
checkPropertyName(propertyName);
getMessageHeaders().getJMSHeaders().setBoolean(propertyName, b);
}
+ public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+ {
+ setBooleanProperty(new AMQShortString(propertyName), b);
+ }
+
public void setByteProperty(String propertyName, byte b) throws JMSException
{
checkWritableProperties();
@@ -437,7 +490,7 @@
getMessageHeaders().getJMSHeaders().setObject(propertyName, object);
}
- protected void removeProperty(String propertyName) throws JMSException
+ protected void removeProperty(AMQShortString propertyName) throws JMSException
{
checkPropertyName(propertyName);
getMessageHeaders().getJMSHeaders().remove(propertyName);
@@ -468,7 +521,7 @@
public void acknowledge() throws JMSException
{
- if(_session != null)
+ if (_session != null)
{
_session.acknowledge();
}
@@ -488,7 +541,12 @@
*/
public abstract String toBodyString() throws JMSException;
- public abstract String getMimeType();
+ public String getMimeType()
+ {
+ return getMimeTypeAsShortString().toString();
+ }
+
+ public abstract AMQShortString getMimeTypeAsShortString();
public String toString()
{
@@ -496,6 +554,7 @@
{
StringBuffer buf = new StringBuffer("Body:\n");
buf.append(toBodyString());
+ buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID());
buf.append("\nJMS timestamp: ").append(getJMSTimestamp());
buf.append("\nJMS expiration: ").append(getJMSExpiration());
buf.append("\nJMS priority: ").append(getJMSPriority());
@@ -525,13 +584,13 @@
getMessageHeaders().setJMSHeaders(messageProperties);
}
- private void checkPropertyName(String propertyName)
+ private void checkPropertyName(CharSequence propertyName)
{
if (propertyName == null)
{
throw new IllegalArgumentException("Property name must not be null");
}
- else if ("".equals(propertyName))
+ else if (propertyName.length() == 0)
{
throw new IllegalArgumentException("Property name must not be the empty string");
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client.message;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.Iterator;
import java.util.List;
@@ -27,7 +30,6 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
public abstract class AbstractJMSMessageFactory implements MessageFactory
{
@@ -61,5 +63,4 @@
msg.setJMSRedelivered(redelivered);
return msg;
}
-
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Wed Feb 14 12:02:03 2007
@@ -25,6 +25,7 @@
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
@@ -35,9 +36,11 @@
public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
{
- private static final String MIME_TYPE = "application/octet-stream";
+ public static final String MIME_TYPE = "application/octet-stream";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
- JMSBytesMessage()
+
+ public JMSBytesMessage()
{
this(null);
}
@@ -65,9 +68,9 @@
_readableMessage = true;
}
- public String getMimeType()
+ public AMQShortString getMimeTypeAsShortString()
{
- return MIME_TYPE;
+ return MIME_TYPE_SHORT_STRING;
}
public long getBodyLength() throws JMSException
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -24,6 +24,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Wed Feb 14 12:02:03 2007
@@ -25,13 +25,14 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
{
@@ -39,10 +40,11 @@
public static final String MIME_TYPE = "jms/map-message";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
private Map<String,Object> _map = new HashMap<String, Object>();
- JMSMapMessage() throws JMSException
+ public JMSMapMessage() throws JMSException
{
this(null);
}
@@ -67,15 +69,14 @@
}
}
-
public String toBodyString() throws JMSException
{
return _map.toString();
}
- public String getMimeType()
+ public AMQShortString getMimeTypeAsShortString()
{
- return MIME_TYPE;
+ return MIME_TYPE_SHORT_STRING;
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
import javax.jms.JMSException;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
public class JMSMapMessageFactory extends AbstractJMSMessageFactory
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Wed Feb 14 12:02:03 2007
@@ -27,6 +27,7 @@
import java.io.Serializable;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
@@ -37,14 +38,15 @@
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
- static final String MIME_TYPE = "application/java-object-stream";
+ public static final String MIME_TYPE = "application/java-object-stream";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
private static final int DEFAULT_BUFFER_SIZE = 1024;
/**
* Creates empty, writable message for use by producers
*/
- JMSObjectMessage()
+ public JMSObjectMessage()
{
this(null);
}
@@ -57,7 +59,7 @@
_data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
_data.setAutoExpand(true);
}
- getMessageHeaders().setContentType(MIME_TYPE);
+ getMessageHeaders().setContentType(MIME_TYPE_SHORT_STRING);
}
/**
@@ -83,9 +85,9 @@
return toString(_data);
}
- public String getMimeType()
+ public AMQShortString getMimeTypeAsShortString()
{
- return MIME_TYPE;
+ return MIME_TYPE_SHORT_STRING;
}
public void setObject(Serializable serializable) throws JMSException
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -24,6 +24,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Wed Feb 14 12:02:03 2007
@@ -25,6 +25,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
/**
* @author Apache Software Foundation
@@ -32,6 +33,7 @@
public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage
{
public static final String MIME_TYPE="jms/stream-message";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
/**
@@ -40,7 +42,7 @@
*/
private int _byteArrayRemaining = -1;
- JMSStreamMessage()
+ public JMSStreamMessage()
{
this(null);
}
@@ -69,9 +71,9 @@
_readableMessage = true;
}
- public String getMimeType()
+ public AMQShortString getMimeTypeAsShortString()
{
- return MIME_TYPE;
+ return MIME_TYPE_SHORT_STRING;
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
import javax.jms.JMSException;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client.message;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
import java.io.UnsupportedEncodingException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
@@ -27,42 +30,49 @@
import javax.jms.JMSException;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
{
private static final String MIME_TYPE = "text/plain";
+ private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
private String _decodedValue;
/**
* This constant represents the name of a property that is set when the message payload is null.
*/
- private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL";
+ private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL");
+ private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
- JMSTextMessage() throws JMSException
+ public JMSTextMessage() throws JMSException
{
- this(null, null);
+ this(null, (AMQShortString)null);
}
- JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+ JMSTextMessage(ByteBuffer data, AMQShortString encoding) throws JMSException
{
super(data); // this instantiates a content header
- getMessageHeaders().setContentType(MIME_TYPE);
+ getMessageHeaders().setContentType(MIME_TYPE_SHORT_STRING);
getMessageHeaders().setEncoding(encoding);
}
+ JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+ {
+ this(data, new AMQShortString(encoding));
+ }
+
JMSTextMessage(long deliveryTag, MessageHeaders contentHeader, ByteBuffer data)
throws AMQException
{
super(deliveryTag, contentHeader, data);
- contentHeader.setContentType(MIME_TYPE);
+ contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
_data = data;
}
JMSTextMessage(ByteBuffer data) throws JMSException
{
- this(data, null);
+ this(data, (AMQShortString)null);
}
JMSTextMessage(String text) throws JMSException
@@ -91,9 +101,9 @@
_data = data;
}
- public String getMimeType()
+ public AMQShortString getMimeTypeAsShortString()
{
- return MIME_TYPE;
+ return MIME_TYPE_SHORT_STRING;
}
public void setText(String text) throws JMSException
@@ -115,7 +125,7 @@
}
else
{
- _data.put(text.getBytes(getMessageHeaders().getEncoding()));
+ _data.put(text.getBytes(getMessageHeaders().getEncoding().asString()));
}
_changedData=true;
}
@@ -151,7 +161,7 @@
{
try
{
- _decodedValue = _data.getString(Charset.forName(getMessageHeaders().getEncoding()).newDecoder());
+ _decodedValue = _data.getString(Charset.forName(getMessageHeaders().getEncoding().asString()).newDecoder());
}
catch (CharacterCodingException e)
{
@@ -164,7 +174,7 @@
{
try
{
- _decodedValue = _data.getString(Charset.defaultCharset().newDecoder());
+ _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder());
}
catch (CharacterCodingException e)
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -24,6 +24,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
public class JMSTextMessageFactory extends AbstractJMSMessageFactory
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
import java.util.List;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.JMSException;
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Wed Feb 14 12:02:03 2007
@@ -28,10 +28,12 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.AMQShortString;
public class MessageFactoryRegistry
{
- private final Map _mimeToFactoryMap = new HashMap();
+ private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
+ private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = new HashMap<AMQShortString, MessageFactory>();
public void registerFactory(String mimeType, MessageFactory mf)
{
@@ -39,12 +41,14 @@
{
throw new IllegalArgumentException("Message factory must not be null");
}
- _mimeToFactoryMap.put(mimeType, mf);
+ _mimeStringToFactoryMap.put(mimeType, mf);
+ _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf);
}
public MessageFactory deregisterFactory(String mimeType)
{
- return (MessageFactory) _mimeToFactoryMap.remove(mimeType);
+ _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType));
+ return _mimeStringToFactoryMap.remove(mimeType);
}
/**
@@ -62,7 +66,7 @@
MessageHeaders contentHeader,
List contents) throws AMQException, JMSException
{
- MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(contentHeader.getContentType());
+ MessageFactory mf = _mimeShortStringToFactoryMap.get(contentHeader.getContentType());
if (mf == null)
{
throw new AMQException("Unsupport MIME type of " + contentHeader.getContentType());
@@ -79,7 +83,7 @@
{
throw new IllegalArgumentException("Mime type must not be null");
}
- MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(mimeType);
+ MessageFactory mf = _mimeStringToFactoryMap.get(mimeType);
if (mf == null)
{
throw new AMQException("Unsupport MIME type of " + mimeType);
@@ -100,7 +104,7 @@
mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
mf.registerFactory("text/plain", new JMSTextMessageFactory());
mf.registerFactory("text/xml", new JMSTextMessageFactory());
- mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory());
+ mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
mf.registerFactory(null, new JMSBytesMessageFactory());
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java Wed Feb 14 12:02:03 2007
@@ -24,6 +24,7 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -35,13 +36,13 @@
{
private static final Logger _logger = Logger.getLogger(MessageHeaders.class);
- private String _contentType;
+ private AMQShortString _contentType;
- private String _encoding;
+ private AMQShortString _encoding;
- private String _destination;
+ private AMQShortString _destination;
- private String _exchange;
+ private AMQShortString _exchange;
private FieldTable _jmsHeaders;
@@ -49,33 +50,35 @@
private short _priority;
- private String _correlationId;
+ private AMQShortString _correlationId;
- private String _replyTo;
+ private AMQShortString _replyTo;
private long _expiration;
- private String _messageId;
+ private AMQShortString _messageId;
private long _timestamp;
- private String _type;
+ private AMQShortString _type;
- private String _userId;
+ private AMQShortString _userId;
- private String _appId;
+ private AMQShortString _appId;
- private String _transactionId;
+ private AMQShortString _transactionId;
- private String _routingKey;
+ private AMQShortString _routingKey;
private int _size;
- public int getSize() {
+ public int getSize()
+ {
return _size;
}
- public void setSize(int size) {
+ public void setSize(int size)
+ {
this._size = size;
}
@@ -83,29 +86,24 @@
{
}
- public String getContentTypeShortString()
+ public AMQShortString getContentType()
{
return _contentType;
}
- public String getContentType()
+ public void setContentType(AMQShortString contentType)
{
- return _contentType == null ? null : _contentType.toString();
+ _contentType = contentType;
}
- public void setContentType(String contentType)
+ public AMQShortString getEncoding()
{
- _contentType = contentType == null ? null : new String(contentType);
+ return _encoding;
}
- public String getEncoding()
+ public void setEncoding(AMQShortString encoding)
{
- return _encoding == null ? null : _encoding.toString();
- }
-
- public void setEncoding(String encoding)
- {
- _encoding = encoding == null ? null : new String(encoding);
+ _encoding = encoding;
}
public FieldTable getJMSHeaders()
@@ -144,22 +142,22 @@
_priority = priority;
}
- public String getCorrelationId()
+ public AMQShortString getCorrelationId()
{
- return _correlationId == null ? null : _correlationId.toString();
+ return _correlationId;
}
- public void setCorrelationId(String correlationId)
+ public void setCorrelationId(AMQShortString correlationId)
{
- _correlationId = correlationId == null ? null : new String(correlationId);
+ _correlationId = correlationId;
}
- public String getReplyTo()
+ public AMQShortString getReplyTo()
{
- return _replyTo == null ? null : _replyTo.toString();
+ return _replyTo;
}
- public void setReplyTo(String replyTo)
+ public void setReplyTo(AMQShortString replyTo)
{
_replyTo = replyTo;
}
@@ -175,12 +173,12 @@
}
- public String getMessageId()
+ public AMQShortString getMessageId()
{
return _messageId;
}
- public void setMessageId(String messageId)
+ public void setMessageId(AMQShortString messageId)
{
_messageId = messageId;
}
@@ -195,39 +193,39 @@
_timestamp = timestamp;
}
- public String getType()
+ public AMQShortString getType()
{
- return _type == null ? null : _type.toString();
+ return _type;
}
- public void setType(String type)
+ public void setType(AMQShortString type)
{
- _type = type == null ? null : new String(type);
+ _type = type;
}
- public String getUserId()
+ public AMQShortString getUserId()
{
- return _userId == null ? null : _userId.toString();
+ return _userId;
}
- public void setUserId(String userId)
+ public void setUserId(AMQShortString userId)
{
- _userId = userId == null ? null : new String(userId);
+ _userId = userId;
}
- public String getAppId()
+ public AMQShortString getAppId()
{
- return _appId == null ? null : _appId.toString();
+ return _appId;
}
- public void setAppId(String appId)
+ public void setAppId(AMQShortString appId)
{
- _appId = appId == null ? null : new String(appId);
+ _appId = appId;
}
// MapMessage Interface
- public boolean getBoolean(String string) throws JMSException
+ public boolean getBoolean(AMQShortString string) throws JMSException
{
Boolean b = getJMSHeaders().getBoolean(string);
@@ -237,13 +235,13 @@
{
Object str = getJMSHeaders().getObject(string);
- if (str == null || !(str instanceof String))
+ if (str == null || !(str instanceof AMQShortString))
{
throw new MessageFormatException("getBoolean can't use " + string + " item.");
}
else
{
- return Boolean.valueOf((String) str);
+ return Boolean.valueOf(((AMQShortString)str).asString());
}
}
else
@@ -255,13 +253,13 @@
return b;
}
- public char getCharacter(String string) throws JMSException
+ public char getCharacter(AMQShortString string) throws JMSException
{
Character c = getJMSHeaders().getCharacter(string);
if (c == null)
{
- if (getJMSHeaders().isNullStringValue(string))
+ if (getJMSHeaders().isNullStringValue(string.asString()))
{
throw new NullPointerException("Cannot convert null char");
}
@@ -276,7 +274,7 @@
}
}
- public byte[] getBytes(String string) throws JMSException
+ public byte[] getBytes(AMQShortString string) throws JMSException
{
byte[] bs = getJMSHeaders().getBytes(string);
@@ -290,7 +288,7 @@
}
}
- public byte getByte(String string) throws JMSException
+ public byte getByte(AMQShortString string) throws JMSException
{
Byte b = getJMSHeaders().getByte(string);
if (b == null)
@@ -299,13 +297,13 @@
{
Object str = getJMSHeaders().getObject(string);
- if (str == null || !(str instanceof String))
+ if (str == null || !(str instanceof AMQShortString))
{
throw new MessageFormatException("getByte can't use " + string + " item.");
}
else
{
- return Byte.valueOf((String) str);
+ return Byte.valueOf(((AMQShortString)str).asString());
}
}
else
@@ -317,7 +315,7 @@
return b;
}
- public short getShort(String string) throws JMSException
+ public short getShort(AMQShortString string) throws JMSException
{
Short s = getJMSHeaders().getShort(string);
@@ -329,7 +327,7 @@
return s;
}
- public int getInteger(String string) throws JMSException
+ public int getInteger(AMQShortString string) throws JMSException
{
Integer i = getJMSHeaders().getInteger(string);
@@ -341,7 +339,7 @@
return i;
}
- public long getLong(String string) throws JMSException
+ public long getLong(AMQShortString string) throws JMSException
{
Long l = getJMSHeaders().getLong(string);
@@ -353,7 +351,7 @@
return l;
}
- public float getFloat(String string) throws JMSException
+ public float getFloat(AMQShortString string) throws JMSException
{
Float f = getJMSHeaders().getFloat(string);
@@ -363,13 +361,13 @@
{
Object str = getJMSHeaders().getObject(string);
- if (str == null || !(str instanceof String))
+ if (str == null || !(str instanceof AMQShortString))
{
throw new MessageFormatException("getFloat can't use " + string + " item.");
}
else
{
- return Float.valueOf((String) str);
+ return Float.valueOf(((AMQShortString)str).asString());
}
}
else
@@ -382,7 +380,7 @@
return f;
}
- public double getDouble(String string) throws JMSException
+ public double getDouble(AMQShortString string) throws JMSException
{
Double d = getJMSHeaders().getDouble(string);
@@ -394,9 +392,9 @@
return d;
}
- public String getString(String string) throws JMSException
+ public AMQShortString getString(AMQShortString string) throws JMSException
{
- String s = getJMSHeaders().getString(string);
+ AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString()));
if (s == null)
{
@@ -415,7 +413,7 @@
}
else
{
- s = String.valueOf(o);
+ s = (AMQShortString) o;
}
}
}
@@ -424,76 +422,76 @@
return s;
}
- public Object getObject(String string) throws JMSException
+ public Object getObject(AMQShortString string) throws JMSException
{
return getJMSHeaders().getObject(string);
}
- public void setBoolean(String string, boolean b) throws JMSException
+ public void setBoolean(AMQShortString string, boolean b) throws JMSException
{
checkPropertyName(string);
getJMSHeaders().setBoolean(string, b);
}
- public void setChar(String string, char c) throws JMSException
+ public void setChar(AMQShortString string, char c) throws JMSException
{
checkPropertyName(string);
getJMSHeaders().setChar(string, c);
}
- public Object setBytes(String string, byte[] bytes)
+ public Object setBytes(AMQShortString string, byte[] bytes)
{
return getJMSHeaders().setBytes(string, bytes);
}
- public Object setBytes(String string, byte[] bytes, int start, int length)
+ public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
{
return getJMSHeaders().setBytes(string, bytes, start, length);
}
- public void setByte(String string, byte b) throws JMSException
+ public void setByte(AMQShortString string, byte b) throws JMSException
{
checkPropertyName(string);
getJMSHeaders().setByte(string, b);
}
- public void setShort(String string, short i) throws JMSException
+ public void setShort(AMQShortString string, short i) throws JMSException
{
checkPropertyName(string);
getJMSHeaders().setShort(string, i);
}
- public void setInteger(String string, int i) throws JMSException
+ public void setInteger(AMQShortString string, int i) throws JMSException
{
checkPropertyName(string);
getJMSHeaders().setInteger(string, i);
}
- public void setLong(String string, long l) throws JMSException
+ public void setLong(AMQShortString string, long l) throws JMSException
{
checkPropertyName(string);
getJMSHeaders().setLong(string, l);
}
- public void setFloat(String string, float v) throws JMSException
+ public void setFloat(AMQShortString string, float v) throws JMSException
{
checkPropertyName(string);
getJMSHeaders().setFloat(string, v);
}
- public void setDouble(String string, double v) throws JMSException
+ public void setDouble(AMQShortString string, double v) throws JMSException
{
checkPropertyName(string);
getJMSHeaders().setDouble(string, v);
}
- public void setString(String string, String string1) throws JMSException
+ public void setString(AMQShortString string, AMQShortString string1) throws JMSException
{
checkPropertyName(string);
- getJMSHeaders().setString(string, string1);
+ getJMSHeaders().setString(string.asString(), string1.asString());
}
- public void setObject(String string, Object object) throws JMSException
+ public void setObject(AMQShortString string, Object object) throws JMSException
{
checkPropertyName(string);
try
@@ -506,7 +504,7 @@
}
}
- public boolean itemExists(String string) throws JMSException
+ public boolean itemExists(AMQShortString string) throws JMSException
{
return getJMSHeaders().containsKey(string);
}
@@ -521,7 +519,7 @@
getJMSHeaders().clear();
}
- public boolean propertyExists(String propertyName)
+ public boolean propertyExists(AMQShortString propertyName)
{
return getJMSHeaders().propertyExists(propertyName);
}
@@ -531,7 +529,7 @@
return getJMSHeaders().setObject(key.toString(), value);
}
- public Object remove(String propertyName)
+ public Object remove(AMQShortString propertyName)
{
return getJMSHeaders().remove(propertyName);
}
@@ -637,35 +635,43 @@
}
- public String getTransactionId() {
+ public AMQShortString getTransactionId()
+ {
return _transactionId;
}
- public void setTransactionId(String id) {
+ public void setTransactionId(AMQShortString id)
+ {
_transactionId = id;
}
- public String getDestination() {
+ public AMQShortString getDestination()
+ {
return _destination;
}
- public void setDestination(String destination) {
+ public void setDestination(AMQShortString destination)
+ {
this._destination = destination;
}
- public String getExchange() {
+ public AMQShortString getExchange()
+ {
return _exchange;
}
- public void setExchange(String exchange) {
+ public void setExchange(AMQShortString exchange)
+ {
this._exchange = exchange;
}
- public String getRoutingKey() {
+ public AMQShortString getRoutingKey()
+ {
return _routingKey;
}
- public void setRoutingKey(String routingKey) {
+ public void setRoutingKey(AMQShortString routingKey)
+ {
this._routingKey = routingKey;
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Wed Feb 14 12:02:03 2007
@@ -24,28 +24,69 @@
import java.util.List;
/**
- * This class contains everything needed to process a JMS message. It assembles the
- * deliver body, the content header and the content body/ies.
+ * This class contains everything needed to process a JMS message.
*
* Note that the actual work of creating a JMS message for the client code's use is done
* outside of the MINA dispatcher thread in order to minimise the amount of work done in
* the MINA dispatcher thread.
*
*/
-public class UnprocessedMessage {
- public int bytesReceived = 0;
+public class UnprocessedMessage
+{
+ private int bytesReceived = 0;
+ private int channelId;
+ private List<byte[]> contents = new LinkedList();
+ private long deliveryTag;
+ private MessageHeaders messageHeaders;
+
+ public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders)
+ {
+ this.channelId = channelId;
+ this.deliveryTag = deliveryTag;
+ this.messageHeaders = messageHeaders;
+ }
+
+ public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content)
+ {
+ this.channelId = channelId;
+ this.deliveryTag = deliveryTag;
+ this.messageHeaders = messageHeaders;
+ addContent(content);
+ }
- public List contents = new LinkedList();
-
- public int channelId;
-
- public long deliveryTag;
-
- public MessageHeaders contentHeader;
-
- public void addContent(byte[] content) {
+ public void addContent(byte[] content)
+ {
contents.add(content);
- bytesReceived = bytesReceived + content.length;
+ bytesReceived += content.length;
}
+ public int getBytesReceived()
+ {
+ return bytesReceived;
+ }
+
+ public int getChannelId()
+ {
+ return channelId;
+ }
+
+ public List<byte[]> getContents()
+ {
+ return contents;
+ }
+
+ public long getDeliveryTag()
+ {
+ return deliveryTag;
+ }
+
+ public MessageHeaders getMessageHeaders()
+ {
+ return messageHeaders;
+ }
+
+ public String toString()
+ {
+ return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + new String(contents.get(0));
+ }
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Wed Feb 14 12:02:03 2007
@@ -24,11 +24,15 @@
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.failover.FailoverHandler;
@@ -37,11 +41,13 @@
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.HeartbeatBody;
@@ -54,6 +60,7 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+
public class AMQProtocolHandler extends IoHandlerAdapter
{
private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
@@ -95,24 +102,11 @@
private CountDownLatch _failoverLatch;
+ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
-
- // We add a proxy for the state manager so that we can substitute the state manager easily in this class.
- // We substitute the state manager when performing failover
- _frameListeners.add(new AMQMethodListener()
- {
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
- {
- return _stateManager.methodReceived(evt);
- }
-
- public void error(Exception e)
- {
- _stateManager.error(e);
- }
- });
}
public boolean isUseSSL()
@@ -149,13 +143,25 @@
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
}
+ try
+ {
+
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+ }
+ catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ }
+
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
public void sessionOpened(IoSession session) throws Exception
{
- System.setProperty("foo", "bar");
+ //System.setProperty("foo", "bar");
}
/**
@@ -286,11 +292,14 @@
public void propagateExceptionToWaiters(Exception e)
{
getStateManager().error(e);
- final Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener ml = (AMQMethodListener) it.next();
- ml.error(e);
+ final Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener ml = (AMQMethodListener) it.next();
+ ml.error(e);
+ }
}
}
@@ -298,59 +307,26 @@
public void messageReceived(IoSession session, Object message) throws Exception
{
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
- if (_messageReceivedCount++ % 1000 == 0)
+ if (debug && (msgNumber % 1000 == 0))
{
_logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
- Iterator it = _frameListeners.iterator();
- AMQFrame frame = (AMQFrame) message;
- HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
+ AMQFrame frame = (AMQFrame) message;
+ final AMQBody bodyFrame = frame.getBodyFrame();
- if (frame.bodyFrame instanceof AMQRequestBody)
+ if (bodyFrame instanceof AMQRequestBody)
{
- _protocolSession.messageRequestBodyReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
+ _protocolSession.messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody)bodyFrame);
}
- else if (frame.bodyFrame instanceof AMQResponseBody)
+ else if (bodyFrame instanceof AMQResponseBody)
{
- _protocolSession.messageResponseBodyReceived(frame.channel, (AMQResponseBody)frame.bodyFrame);
+ _protocolSession.messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame);
}
-// if (frame.bodyFrame instanceof AMQMethodBody)
-// {
-// if (_logger.isDebugEnabled())
-// {
-// _logger.debug("Method frame received: " + frame);
-// }
-//
-// final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame);
-// try
-// {
-// boolean wasAnyoneInterested = false;
-// Q
-// }
-// catch (AMQException e)
-// {
-// it = _frameListeners.iterator();
-// while (it.hasNext())
-// {
-// final AMQMethodListener listener = (AMQMethodListener) it.next();
-// listener.error(e);
-// }
-// exceptionCaught(session, e);
-// }
-// }
-// else if (frame.bodyFrame instanceof ContentHeaderBody)
-// {
-// _protocolSession.messageContentHeaderReceived(frame.channel,
-// (ContentHeaderBody) frame.bodyFrame);
-// }
-// else if (frame.bodyFrame instanceof ContentBody)
-// {
-// _protocolSession.messageContentBodyReceived(frame.channel,
-// (ContentBody) frame.bodyFrame);
-// }
- else if (frame.bodyFrame instanceof HeartbeatBody)
+ else if (bodyFrame instanceof HeartbeatBody)
{
_logger.debug("Received heartbeat");
}
@@ -361,27 +337,32 @@
public void messageSent(IoSession session, Object message) throws Exception
{
- if (_messagesOut++ % 1000 == 0)
+ final long sentMessages = _messagesOut++;
+
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug && (sentMessages % 1000 == 0))
{
_logger.debug("Sent " + _messagesOut + " protocol messages");
}
_connection.bytesSent(session.getWrittenBytes());
- if (_logger.isDebugEnabled())
+ if (debug)
{
_logger.debug("Sent frame " + message);
}
}
- public void addFrameListener(AMQMethodListener listener)
- {
- _frameListeners.add(listener);
- }
-
- public void removeFrameListener(AMQMethodListener listener)
- {
- _frameListeners.remove(listener);
- }
-
+ /*
+ public void addFrameListener(AMQMethodListener listener)
+ {
+ _frameListeners.add(listener);
+ }
+
+ public void removeFrameListener(AMQMethodListener listener)
+ {
+ _frameListeners.remove(listener);
+ }
+ */
public void attainState(AMQState s) throws AMQException
{
getStateManager().attainState(s);
@@ -418,19 +399,36 @@
* a particular response. Equivalent to calling getProtocolSession().write() then
* waiting for the response.
*
+ * @param channelNum
* @param methodBody
- * @param listener the blocking listener. Note the calling thread will block.
+ * @param listener The blocking listener. Note the calling thread will block.
*/
private AMQMethodEvent writeCommandFrameAndWaitForReply(int channelNum, AMQMethodBody methodBody,
BlockingMethodFrameListener listener)
throws AMQException
{
+ return writeCommandFrameAndWaitForReply(channelNum, methodBody, listener, DEFAULT_SYNC_TIMEOUT);
+ }
+
+ /**
+ * Convenience method that writes a frame to the protocol session and waits for
+ * a particular response. Equivalent to calling getProtocolSession().write() then
+ * waiting for the response.
+ *
+ * @param channelNum
+ * @param methodBody
+ * @param listener The blocking listener. Note the calling thread will block.
+ */
+ private AMQMethodEvent writeCommandFrameAndWaitForReply(int channelNum, AMQMethodBody methodBody,
+ BlockingMethodFrameListener listener, long timeout)
+ throws AMQException
+ {
try
{
_frameListeners.add(listener);
_protocolSession.writeRequest(channelNum, methodBody, listener);
- AMQMethodEvent e = listener.blockForFrame();
+ AMQMethodEvent e = listener.blockForFrame(timeout);
return e;
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
@@ -440,7 +438,6 @@
// If we don't remove the listener then no-one will
_frameListeners.remove(listener);
}
-
}
/**
@@ -484,19 +481,26 @@
{
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody methodBody = ConnectionCloseBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
+ _protocolSession.getProtocolMajorVersion(), // AMQP major version
+ _protocolSession.getProtocolMinorVersion(), // AMQP minor version
0, // classId
0, // methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- "JMS client is closing the connection."); // replyText
-
- syncWrite(0, methodBody, ConnectionCloseOkBody.class);
+ new AMQShortString("JMS client is closing the connection.")); // replyText
+
+ try
+ {
+ syncWrite(0, methodBody, ConnectionCloseOkBody.class);
+ _protocolSession.closeProtocolSession();
+ }
+ catch (AMQTimeoutException e)
+ {
+ _protocolSession.closeProtocolSession(false);
+ }
+
- _protocolSession.closeProtocolSession();
}
/**
@@ -531,7 +535,7 @@
}
}
- public String generateQueueName()
+ public AMQShortString generateQueueName()
{
return _protocolSession.generateQueueName();
}
@@ -567,7 +571,7 @@
return _protocolSession;
}
- FailoverState getFailoverState()
+ public FailoverState getFailoverState()
{
return _failoverState;
}
@@ -577,8 +581,18 @@
_failoverState = failoverState;
}
- public int getConnectionId()
+ public long getConnectionId()
{
return _connection.getConnectionId();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolSession.getProtocolMajorVersion();
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolSession.getProtocolMinorVersion();
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Wed Feb 14 12:02:03 2007
@@ -37,28 +37,32 @@
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MainRegistry;
import org.apache.qpid.framing.MessageAppendBody;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.RequestManager;
import org.apache.qpid.framing.RequestResponseMappingException;
import org.apache.qpid.framing.ResponseManager;
+import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
/**
* Wrapper for protocol session that provides type-safe access to session attributes.
- *
+ * <p/>
* The underlying protocol session is still available but clients should not
* use it to obtain session attributes.
*/
-public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionList
+public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession
{
protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
@@ -107,7 +111,12 @@
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
- protected int _ConnectionId;
+ protected long _ConnectionId;
+
+ private byte _protocolMinorVersion;
+ private byte _protocolMajorVersion;
+ private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
+
/**
* No-arg constructor for use by test subclass - has to initialise final vars
@@ -130,6 +139,8 @@
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
// properties of the connection are made available to the event handlers
+ _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ //fixme - real value needed
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
_stateManager = new AMQStateManager(this);
@@ -143,6 +154,7 @@
{
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
+ _minaProtocolSession.setAttachment(this);
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
@@ -221,8 +233,9 @@
/**
* Store the SASL client currently being used for the authentication handshake
+ *
* @param client if non-null, stores this in the session. if null clears any existing client
- * being stored
+ * being stored
*/
public void setSaslClient(SaslClient client)
{
@@ -253,6 +266,7 @@
/**
* This is involed from MessageTransferMethodHandler if type is CONTENT_TYPE_REFERENCE
* This is invoked on the MINA dispatcher thread.
+ *
* @param message
* @throws AMQException if this was not expected
*/
@@ -296,13 +310,14 @@
* This is involed from MessageTransferMethodHandler if type is CONTENT_TYPE_INLINE
* Deliver a message to the appropriate session, removing the unprocessed message
* from our map
+ *
* @param channelId the channel id the message should be delivered to
- * @param msg the message
+ * @param msg the message
*/
public void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
{
AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
- msg.contentHeader.setSize(msg.bytesReceived);
+ msg.getMessageHeaders().setSize(msg.getBytesReceived());
session.messageReceived(msg);
}
@@ -360,6 +375,7 @@
WriteFuture f = _minaProtocolSession.write(frame);
if (wait)
{
+ //fixme -- time out?
f.join();
}
else
@@ -404,6 +420,7 @@
/**
* Starts the process of closing a session
+ *
* @param session the AMQSession being closed
*/
public void closeSession(AMQSession session)
@@ -425,19 +442,27 @@
* This method decides whether this is a response or an initiation. The latter
* case causes the AMQSession to be closed and an exception to be thrown if
* appropriate.
+ *
* @param channelId the id of the channel (session)
* @return true if the client must respond to the server, i.e. if the server
- * initiated the channel close, false if the channel close is just the server
- * responding to the client's earlier request to close the channel.
+ * initiated the channel close, false if the channel close is just the server
+ * responding to the client's earlier request to close the channel.
*/
- public boolean channelClosed(int channelId, int code, String text)
+ public boolean channelClosed(int channelId, int code, String text) throws AMQException
{
final Integer chId = channelId;
// if this is not a response to an earlier request to close the channel
if (_closingChannels.remove(chId) == null)
{
final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
- session.closed(new AMQException(_logger, code, text));
+ try
+ {
+ session.closed(new AMQException(_logger, code, text));
+ }
+ catch (JMSException e)
+ {
+ throw new AMQException("JMSException received while closing session", e);
+ }
return true;
}
else
@@ -453,15 +478,20 @@
public void closeProtocolSession()
{
+ closeProtocolSession(true);
+ }
+
+ public void closeProtocolSession(boolean waitLast)
+ {
_logger.debug("Waiting for last write to join.");
- if (_lastWriteFuture != null)
+ if (waitLast && _lastWriteFuture != null)
{
_lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
_logger.debug("Closing protocol session");
final CloseFuture future = _minaProtocolSession.close();
- future.join();
+ future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
public void failover(String host, int port)
@@ -469,20 +499,19 @@
_protocolHandler.failover(host, port);
}
- protected String generateQueueName()
+ protected AMQShortString generateQueueName()
{
int id;
- synchronized(_queueIdLock)
+ synchronized (_queueIdLock)
{
id = _queueId++;
}
//get rid of / and : and ; from address for spec conformance
- String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:","");
- return "tmp_" + localAddress + "_" + id;
+ String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+ return new AMQShortString("tmp_" + localAddress + "_" + id);
}
/**
- *
* @param delay delay in seconds (not ms)
*/
void initHeartbeats(int delay)
@@ -495,11 +524,39 @@
}
}
- public void confirmConsumerCancelled(int channelId, String consumerTag)
+ public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
{
final Integer chId = channelId;
final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
session.confirmConsumerCancelled(consumerTag);
}
+
+ public void setProtocolVersion(final byte versionMajor, final byte versionMinor)
+ {
+ _protocolMajorVersion = versionMajor;
+ _protocolMinorVersion = versionMinor;
+ _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public boolean isProtocolVersionEqual(byte major, byte minor)
+ {
+ return _protocolMinorVersion == major && _protocolMajorVersion == minor;
+ }
+
+ public VersionSpecificRegistry getRegistry()
+ {
+ return _registry;
+ }
+
}