You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/10/21 15:48:19 UTC
[4/5] qpid-jms git commit: QPIDJMS-215 Perform Message encoding at
send time
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index 89094a1..9f90d87 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -34,20 +34,23 @@ import javax.jms.MessageFormatException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.exceptions.IdConversionException;
+import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
-import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
-import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.amqp.messaging.Section;
+
+import io.netty.buffer.ByteBuf;
public class AmqpJmsMessageFacade implements JmsMessageFacade {
@@ -55,11 +58,15 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final long UINT_MAX = 0xFFFFFFFFL;
- protected final Message message;
- protected final AmqpConnection connection;
+ protected AmqpConnection connection;
- private Map<Symbol,Object> messageAnnotationsMap;
- private Map<String,Object> applicationPropertiesMap;
+ private Properties properties;
+ private Header header;
+ private Section body;
+ private Map<Symbol, Object> messageAnnotationsMap;
+ private Map<String, Object> applicationPropertiesMap;
+ private Map<Symbol, Object> deliveryAnnotationsMap;
+ private Map<Symbol, Object> footerMap;
private JmsDestination replyTo;
private JmsDestination destination;
@@ -74,42 +81,29 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
private Long userSpecifiedTTL = null;
/**
- * Create a new AMQP Message Facade with an empty message instance.
+ * Initialize the state of this message for send.
*
* @param connection
- * the AmqpConnection that under which this facade was created.
+ * The connection that this message is linked to.
*/
- public AmqpJmsMessageFacade(AmqpConnection connection) {
- this.message = Proton.message();
- this.message.setDurable(true);
-
+ public void initialize(AmqpConnection connection) {
this.connection = connection;
- setMessageAnnotation(JMS_MSG_TYPE, JMS_MESSAGE);
+
+ setMessageAnnotation(JMS_MSG_TYPE, getJmsMsgType());
+ setPersistent(true); // TODO - Remove to avoid default Header
+ initializeEmptyBody();
}
/**
- * Creates a new Facade around an incoming AMQP Message for dispatch to the
- * JMS Consumer instance.
+ * Initialize the state of this message for receive.
*
* @param consumer
- * the consumer that received this message.
- * @param message
- * the incoming Message instance that is being wrapped.
+ * The consumer that this message was read from.
*/
- @SuppressWarnings("unchecked")
- public AmqpJmsMessageFacade(AmqpConsumer consumer, Message message) {
- this.message = message;
+ public void initialize(AmqpConsumer consumer) {
this.connection = consumer.getConnection();
this.consumerDestination = consumer.getDestination();
- if (message.getMessageAnnotations() != null) {
- messageAnnotationsMap = message.getMessageAnnotations().getValue();
- }
-
- if (message.getApplicationProperties() != null) {
- applicationPropertiesMap = message.getApplicationProperties().getValue();
- }
-
Long ttl = getTtl();
Long absoluteExpiryTime = getAbsoluteExpiryTime();
if (absoluteExpiryTime == null && ttl != null) {
@@ -118,6 +112,13 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
}
/**
+ * Used to indicate that a Message object should empty the body element and make
+ * any other internal updates to reflect the message now has no body value.
+ */
+ protected void initializeEmptyBody() {
+ }
+
+ /**
* @return the appropriate byte value that indicates the type of message this is.
*/
public byte getJmsMsgType() {
@@ -132,11 +133,22 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
* @return a String value indicating the message content type.
*/
public String getContentType() {
- return message.getContentType();
+ if (properties != null && properties.getContentType() != null) {
+ return properties.getContentType().toString();
+ }
+
+ return null;
}
public void setContentType(String value) {
- message.setContentType(value);
+ if (properties == null) {
+ if (value == null) {
+ return;
+ }
+ lazyCreateProperties();
+ }
+
+ properties.setContentType(Symbol.valueOf(value));
}
@Override
@@ -212,11 +224,11 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
}
if (ttl > 0 && ttl < UINT_MAX) {
- message.setTtl(ttl);
+ lazyCreateHeader();
+ header.setTtl(UnsignedInteger.valueOf(ttl));
} else {
- Header hdr = message.getHeader();
- if (hdr != null) {
- hdr.setTtl(null);
+ if (header != null) {
+ header.setTtl(null);
}
}
@@ -229,7 +241,7 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public void clearBody() {
- message.setBody(null);
+ setBody(null);
}
@Override
@@ -239,13 +251,14 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public AmqpJmsMessageFacade copy() throws JMSException {
- AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection);
+ AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade();
copyInto(copy);
return copy;
}
- @SuppressWarnings("unchecked")
protected void copyInto(AmqpJmsMessageFacade target) {
+ target.connection = connection;
+
if (consumerDestination != null) {
target.consumerDestination = consumerDestination;
}
@@ -266,48 +279,42 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
target.userSpecifiedTTL = userSpecifiedTTL;
}
- Message targetMsg = target.getAmqpMessage();
-
- if (message.getHeader() != null) {
+ if (header != null) {
Header headers = new Header();
- headers.setDurable(message.getHeader().getDurable());
- headers.setPriority(message.getHeader().getPriority());
- headers.setTtl(message.getHeader().getTtl());
- headers.setFirstAcquirer(message.getHeader().getFirstAcquirer());
- headers.setDeliveryCount(message.getHeader().getDeliveryCount());
- targetMsg.setHeader(headers);
- }
+ headers.setDurable(header.getDurable());
+ headers.setPriority(header.getPriority());
+ headers.setTtl(header.getTtl());
+ headers.setFirstAcquirer(header.getFirstAcquirer());
+ headers.setDeliveryCount(header.getDeliveryCount());
- if (message.getFooter() != null && message.getFooter().getValue() != null) {
- Map<Object, Object> newFooterMap = new HashMap<Object, Object>();
- newFooterMap.putAll(message.getFooter().getValue());
- targetMsg.setFooter(new Footer(newFooterMap));
+ target.setHeader(headers);
}
- if (message.getProperties() != null) {
+ if (properties != null) {
Properties properties = new Properties();
- properties.setMessageId(message.getProperties().getMessageId());
- properties.setUserId(message.getProperties().getUserId());
- properties.setTo(message.getProperties().getTo());
- properties.setSubject(message.getProperties().getSubject());
- properties.setReplyTo(message.getProperties().getReplyTo());
- properties.setCorrelationId(message.getProperties().getCorrelationId());
- properties.setContentType(message.getProperties().getContentType());
- properties.setContentEncoding(message.getProperties().getContentEncoding());
- properties.setAbsoluteExpiryTime(message.getProperties().getAbsoluteExpiryTime());
- properties.setCreationTime(message.getProperties().getCreationTime());
- properties.setGroupId(message.getProperties().getGroupId());
- properties.setGroupSequence(message.getProperties().getGroupSequence());
- properties.setReplyToGroupId(message.getProperties().getReplyToGroupId());
-
- targetMsg.setProperties(properties);
+ properties.setMessageId(getProperties().getMessageId());
+ properties.setUserId(getProperties().getUserId());
+ properties.setTo(getProperties().getTo());
+ properties.setSubject(getProperties().getSubject());
+ properties.setReplyTo(getProperties().getReplyTo());
+ properties.setCorrelationId(getProperties().getCorrelationId());
+ properties.setContentType(getProperties().getContentType());
+ properties.setContentEncoding(getProperties().getContentEncoding());
+ properties.setAbsoluteExpiryTime(getProperties().getAbsoluteExpiryTime());
+ properties.setCreationTime(getProperties().getCreationTime());
+ properties.setGroupId(getProperties().getGroupId());
+ properties.setGroupSequence(getProperties().getGroupSequence());
+ properties.setReplyToGroupId(getProperties().getReplyToGroupId());
+
+ target.setProperties(properties);
}
- if (message.getDeliveryAnnotations() != null && message.getDeliveryAnnotations().getValue() != null) {
- Map<Symbol, Object> newDeliveryAnnotations = new HashMap<Symbol, Object>();
- newDeliveryAnnotations.putAll(message.getDeliveryAnnotations().getValue());
- targetMsg.setFooter(new Footer(newDeliveryAnnotations));
+ target.setBody(body);
+
+ if (deliveryAnnotationsMap != null) {
+ target.lazyCreateDeliveryAnnotations();
+ target.deliveryAnnotationsMap.putAll(deliveryAnnotationsMap);
}
if (applicationPropertiesMap != null) {
@@ -319,33 +326,61 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
target.lazyCreateMessageAnnotations();
target.messageAnnotationsMap.putAll(messageAnnotationsMap);
}
+
+ if (footerMap != null) {
+ target.lazyCreateFooter();
+ target.footerMap.putAll(footerMap);
+ }
}
@Override
public String getMessageId() {
- Object underlying = message.getMessageId();
+ Object underlying = null;
+
+ if (properties != null) {
+ underlying = properties.getMessageId();
+ }
+
return AmqpMessageIdHelper.INSTANCE.toMessageIdString(underlying);
}
@Override
public Object getProviderMessageIdObject() {
- return message.getMessageId();
+ return properties == null ? null : properties.getMessageId();
}
@Override
public void setProviderMessageIdObject(Object messageId) {
- message.setMessageId(messageId);
+ if (properties == null) {
+ if (messageId == null) {
+ return;
+ }
+
+ lazyCreateProperties();
+ }
+
+ properties.setMessageId(messageId);
}
@Override
public void setMessageId(String messageId) throws IdConversionException {
- message.setMessageId(AmqpMessageIdHelper.INSTANCE.toIdObject(messageId));
+ Object value = AmqpMessageIdHelper.INSTANCE.toIdObject(messageId);
+
+ if (properties == null) {
+ if (value == null) {
+ return;
+ }
+
+ lazyCreateProperties();
+ }
+
+ properties.setMessageId(value);
}
@Override
public long getTimestamp() {
- if (message.getProperties() != null) {
- Date timestamp = message.getProperties().getCreationTime();
+ if (properties != null) {
+ Date timestamp = properties.getCreationTime();
if (timestamp != null) {
return timestamp.getTime();
}
@@ -356,39 +391,62 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public void setTimestamp(long timestamp) {
- if (timestamp != 0) {
- message.setCreationTime(timestamp);
- } else {
- if (message.getProperties() != null) {
- message.getProperties().setCreationTime(null);
+ if (properties == null) {
+ if (timestamp == 0) {
+ return;
}
+
+ lazyCreateProperties();
+ }
+
+ if (timestamp == 0) {
+ properties.setCreationTime(null);
+ } else {
+ properties.setCreationTime(new Date(timestamp));
}
}
@Override
public String getCorrelationId() {
- return AmqpMessageIdHelper.INSTANCE.toCorrelationIdString(message.getCorrelationId());
+ if (properties == null) {
+ return null;
+ }
+
+ return AmqpMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
}
@Override
public void setCorrelationId(String correlationId) throws IdConversionException {
- if (correlationId == null) {
- message.setCorrelationId(null);
- } else {
+ Object idObject = null;
+
+ if (correlationId != null) {
if (AmqpMessageIdHelper.INSTANCE.hasMessageIdPrefix(correlationId)) {
// JMSMessageID value, process it for possible type conversion
- Object idObject = AmqpMessageIdHelper.INSTANCE.toIdObject(correlationId);
- message.setCorrelationId(idObject);
+ idObject = AmqpMessageIdHelper.INSTANCE.toIdObject(correlationId);
} else {
- // application-specific value, send as-is
- message.setCorrelationId(correlationId);
+ idObject = correlationId;
}
}
+
+ if (properties == null) {
+ if (idObject == null) {
+ return;
+ }
+
+ lazyCreateProperties();
+ }
+
+ properties.setCorrelationId(idObject);
}
@Override
public byte[] getCorrelationIdBytes() throws JMSException {
- Object correlationId = message.getCorrelationId();
+ Object correlationId = null;
+
+ if (properties != null) {
+ correlationId = properties.getCorrelationId();
+ }
+
if (correlationId == null) {
return null;
} else if (correlationId instanceof Binary) {
@@ -412,17 +470,37 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
binaryIdValue = new Binary(Arrays.copyOf(correlationId, correlationId.length));
}
- message.setCorrelationId(binaryIdValue);
+ if (properties == null) {
+ if (binaryIdValue == null) {
+ return;
+ }
+
+ lazyCreateProperties();
+ }
+
+ properties.setCorrelationId(binaryIdValue);
}
@Override
public boolean isPersistent() {
- return message.isDurable();
+ if (header != null && header.getDurable() != null) {
+ return header.getDurable();
+ }
+
+ return false;
}
@Override
public void setPersistent(boolean value) {
- this.message.setDurable(value);
+ if (header == null) {
+ if (value == false) {
+ return;
+ } else {
+ lazyCreateHeader();
+ }
+ }
+
+ header.setDurable(value);
}
@Override
@@ -437,8 +515,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public int getRedeliveryCount() {
- if (message.getHeader() != null) {
- UnsignedInteger count = message.getHeader().getDeliveryCount();
+ if (header != null) {
+ UnsignedInteger count = header.getDeliveryCount();
if (count != null) {
return count.intValue();
}
@@ -450,11 +528,12 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public void setRedeliveryCount(int redeliveryCount) {
if (redeliveryCount == 0) {
- if (message.getHeader() != null) {
- message.getHeader().setDeliveryCount(null);
+ if (header != null) {
+ header.setDeliveryCount(null);
}
} else {
- message.setDeliveryCount(redeliveryCount);
+ lazyCreateHeader();
+ header.setDeliveryCount(UnsignedInteger.valueOf(redeliveryCount));
}
}
@@ -478,24 +557,29 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public String getType() {
- return message.getSubject();
+ if (properties != null) {
+ return properties.getSubject();
+ }
+
+ return null;
}
@Override
public void setType(String type) {
if (type != null) {
- message.setSubject(type);
+ lazyCreateProperties();
+ properties.setSubject(type);
} else {
- if (message.getProperties() != null) {
- message.getProperties().setSubject(null);
+ if (properties != null) {
+ properties.setSubject(null);
}
}
}
@Override
public int getPriority() {
- if (message.getHeader() != null) {
- UnsignedByte priority = message.getHeader().getPriority();
+ if (header != null) {
+ UnsignedByte priority = header.getPriority();
if (priority != null) {
int scaled = priority.intValue();
if (scaled > 9) {
@@ -512,10 +596,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public void setPriority(int priority) {
if (priority == DEFAULT_PRIORITY) {
- if (message.getHeader() == null) {
- return;
- } else {
- message.getHeader().setPriority(null);
+ if (header != null) {
+ header.setPriority(null);
}
} else {
byte scaled = (byte) priority;
@@ -525,7 +607,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
scaled = 9;
}
- message.setPriority(scaled);
+ lazyCreateHeader();
+ header.setPriority(UnsignedByte.valueOf(scaled));
}
}
@@ -636,20 +719,33 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
}
public void setReplyToGroupId(String replyToGroupId) {
- message.setReplyToGroupId(replyToGroupId);
+ if (replyToGroupId != null) {
+ lazyCreateProperties();
+ properties.setReplyToGroupId(replyToGroupId);
+ } else {
+ if (properties != null) {
+ properties.setReplyToGroupId(null);
+ }
+ }
}
public String getReplyToGroupId() {
- return message.getReplyToGroupId();
+ if (properties != null) {
+ return properties.getReplyToGroupId();
+ }
+
+ return null;
}
@Override
public String getUserId() {
String userId = null;
- byte[] userIdBytes = message.getUserId();
- if (userIdBytes != null) {
- userId = new String(userIdBytes, UTF8);
+ if (properties != null && properties.getUserId() != null) {
+ Binary userIdBytes = properties.getUserId();
+ if (userIdBytes.getLength() != 0) {
+ userId = new String(userIdBytes.getArray(), userIdBytes.getArrayOffset(), userIdBytes.getLength(), UTF8);
+ }
}
return userId;
@@ -663,44 +759,66 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
}
if (bytes == null) {
- if (message.getProperties() != null) {
- message.getProperties().setUserId(null);
+ if (properties != null) {
+ properties.setUserId(null);
}
} else {
- message.setUserId(bytes);
+ lazyCreateProperties();
+ properties.setUserId(new Binary(bytes));
}
}
@Override
public byte[] getUserIdBytes() {
- return message.getUserId();
+ if(properties == null || properties.getUserId() == null) {
+ return null;
+ } else {
+ final Binary userId = properties.getUserId();
+ byte[] id = new byte[userId.getLength()];
+ System.arraycopy(userId.getArray(), userId.getArrayOffset(), id, 0, userId.getLength());
+ return id;
+ }
}
@Override
public void setUserIdBytes(byte[] userId) {
if (userId == null || userId.length == 0) {
- if (message.getProperties() != null) {
- message.getProperties().setUserId(null);
+ if (properties != null) {
+ properties.setUserId(null);
}
} else {
- message.setUserId(userId);
+ lazyCreateProperties();
+ byte[] id = new byte[userId.length];
+ System.arraycopy(userId, 0, id, 0, userId.length);
+ properties.setUserId(new Binary(id));
}
}
@Override
public String getGroupId() {
- return message.getGroupId();
+ if (properties != null) {
+ return properties.getGroupId();
+ }
+
+ return null;
}
@Override
public void setGroupId(String groupId) {
- message.setGroupId(groupId);
+ if (groupId != null) {
+ lazyCreateProperties();
+ properties.setGroupId(groupId);
+ } else {
+ if (properties != null) {
+ properties.setGroupId(null);
+ }
+ }
}
@Override
public int getGroupSequence() {
- if (message.getProperties() != null) {
- UnsignedInteger groupSeqUint = message.getProperties().getGroupSequence();
+ if (properties != null) {
+ UnsignedInteger groupSeqUint = properties.getGroupSequence();
if (groupSeqUint != null) {
// This wraps it into the negative int range if uint is over 2^31-1
return groupSeqUint.intValue();
@@ -713,32 +831,26 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public void setGroupSequence(int groupSequence) {
// This wraps it into the upper uint range if a negative was provided
- if (groupSequence == 0) {
- if (message.getProperties() != null) {
- message.getProperties().setGroupSequence(null);
- }
+ if (groupSequence != 0) {
+ lazyCreateProperties();
+ properties.setGroupSequence(UnsignedInteger.valueOf(groupSequence));
} else {
- message.setGroupSequence(groupSequence);
+ if (properties != null) {
+ properties.setGroupSequence(null);
+ }
}
}
@Override
public boolean hasBody() {
- return message.getBody() == null;
- }
-
- /**
- * @return the true AMQP Message instance wrapped by this Facade.
- */
- public Message getAmqpMessage() {
- return this.message;
+ return body == null;
}
/**
* The AmqpConnection instance that is associated with this Message.
* @return the connection
*/
- public AmqpConnection getConnection() {
+ AmqpConnection getConnection() {
return connection;
}
@@ -813,7 +925,6 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
*/
void clearMessageAnnotations() {
messageAnnotationsMap = null;
- message.setMessageAnnotations(null);
}
/**
@@ -821,33 +932,151 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
*/
void clearAllApplicationProperties() {
applicationPropertiesMap = null;
- message.setApplicationProperties(null);
}
String getToAddress() {
- return message.getAddress();
+ if (properties != null) {
+ return properties.getTo();
+ }
+
+ return null;
}
void setToAddress(String address) {
- message.setAddress(address);
+ if (address != null) {
+ lazyCreateProperties();
+ properties.setTo(address);
+ } else {
+ if (properties != null) {
+ properties.setTo(null);
+ }
+ }
}
String getReplyToAddress() {
- return message.getReplyTo();
+ if (properties != null) {
+ return properties.getReplyTo();
+ }
+
+ return null;
}
void setReplyToAddress(String address) {
- this.message.setReplyTo(address);
+ if (address != null) {
+ lazyCreateProperties();
+ properties.setReplyTo(address);
+ } else {
+ if (properties != null) {
+ properties.setReplyTo(null);
+ }
+ }
}
JmsDestination getConsumerDestination() {
return this.consumerDestination;
}
+ public JmsMessage asJmsMessage() {
+ return new JmsMessage(this);
+ }
+
+ @Override
+ public ByteBuf encodeMessage() {
+ return AmqpCodec.encodeMessage(this);
+ }
+
+ //----- Access to AMQP Message Values ------------------------------------//
+
+ Header getHeader() {
+ return header;
+ }
+
+ void setHeader(Header header) {
+ this.header = header;
+ }
+
+ Properties getProperties() {
+ return properties;
+ }
+
+ void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ Section getBody() {
+ return body;
+ }
+
+ void setBody(Section body) {
+ this.body = body;
+ }
+
+ MessageAnnotations getMessageAnnotations() {
+ MessageAnnotations result = null;
+ if (messageAnnotationsMap != null && !messageAnnotationsMap.isEmpty()) {
+ result = new MessageAnnotations(messageAnnotationsMap);
+ }
+
+ return result;
+ }
+
+ void setMessageAnnotations(MessageAnnotations messageAnnotations) {
+ if (messageAnnotations != null) {
+ this.messageAnnotationsMap = messageAnnotations.getValue();
+ }
+ }
+
+ DeliveryAnnotations getDeliveryAnnotations() {
+ DeliveryAnnotations result = null;
+ if (deliveryAnnotationsMap != null && !deliveryAnnotationsMap.isEmpty()) {
+ result = new DeliveryAnnotations(deliveryAnnotationsMap);
+ }
+
+ return result;
+ }
+
+ void setDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations) {
+ if (deliveryAnnotations != null) {
+ this.deliveryAnnotationsMap = deliveryAnnotations.getValue();
+ }
+ }
+
+ ApplicationProperties getApplicationProperties() {
+ ApplicationProperties result = null;
+ if (applicationPropertiesMap != null && !applicationPropertiesMap.isEmpty()) {
+ result = new ApplicationProperties(applicationPropertiesMap);
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ void setApplicationProperties(ApplicationProperties applicationProperties) {
+ if (applicationProperties != null) {
+ this.applicationPropertiesMap = applicationProperties.getValue();
+ }
+ }
+
+ Footer getFooter() {
+ Footer result = null;
+ if (footerMap != null && footerMap.isEmpty()) {
+ result = new Footer(footerMap);
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ void setFooter(Footer footer) {
+ if (footer != null) {
+ this.footerMap = footer.getValue();
+ }
+ }
+
+ //----- Internal Message Utility Methods ---------------------------------//
+
private Long getAbsoluteExpiryTime() {
Long result = null;
- if (message.getProperties() != null) {
- Date date = message.getProperties().getAbsoluteExpiryTime();
+ if (properties != null) {
+ Date date = properties.getAbsoluteExpiryTime();
if (date != null) {
result = date.getTime();
}
@@ -858,8 +1087,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
private Long getTtl() {
Long result = null;
- if (message.getHeader() != null) {
- UnsignedInteger ttl = message.getHeader().getTtl();
+ if (header != null) {
+ UnsignedInteger ttl = header.getTtl();
if (ttl != null) {
result = ttl.longValue();
}
@@ -869,26 +1098,49 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
}
private void setAbsoluteExpiryTime(Long expiration) {
- if (expiration == null) {
- if (message.getProperties() != null) {
- message.getProperties().setAbsoluteExpiryTime(null);
+ if (expiration == null || expiration == 0l) {
+ if (properties != null) {
+ properties.setAbsoluteExpiryTime(null);
}
} else {
- message.setExpiryTime(expiration);
+ lazyCreateProperties();
+ properties.setAbsoluteExpiryTime(new Date(expiration));
+ }
+ }
+
+ private void lazyCreateProperties() {
+ if (properties == null) {
+ properties = new Properties();
+ }
+ }
+
+ private void lazyCreateHeader() {
+ if (header == null) {
+ header = new Header();
}
}
private void lazyCreateMessageAnnotations() {
if (messageAnnotationsMap == null) {
- messageAnnotationsMap = new HashMap<Symbol,Object>();
- message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
+ messageAnnotationsMap = new HashMap<Symbol, Object>();
+ }
+ }
+
+ private void lazyCreateDeliveryAnnotations() {
+ if (deliveryAnnotationsMap == null) {
+ deliveryAnnotationsMap = new HashMap<Symbol, Object>();
}
}
private void lazyCreateApplicationProperties() {
if (applicationPropertiesMap == null) {
applicationPropertiesMap = new HashMap<String, Object>();
- message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
+ }
+ }
+
+ private void lazyCreateFooter() {
+ if (footerMap == null) {
+ footerMap = new HashMap<Symbol, Object>();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
index 5b78556..25e1489 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
@@ -29,8 +29,6 @@ import org.apache.qpid.jms.message.JmsMessageFactory;
import org.apache.qpid.jms.message.JmsObjectMessage;
import org.apache.qpid.jms.message.JmsStreamMessage;
import org.apache.qpid.jms.message.JmsTextMessage;
-import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade;
-import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
/**
@@ -53,7 +51,9 @@ public class AmqpJmsMessageFactory implements JmsMessageFactory {
@Override
public JmsMessage createMessage() throws JMSException {
- return new JmsMessage(new AmqpJmsMessageFacade(connection));
+ AmqpJmsMessageFacade facade = new AmqpJmsMessageFacade();
+ facade.initialize(connection);
+ return facade.asJmsMessage();
}
@Override
@@ -63,29 +63,35 @@ public class AmqpJmsMessageFactory implements JmsMessageFactory {
@Override
public JmsTextMessage createTextMessage(String payload) throws JMSException {
-
- JmsTextMessageFacade facade = new AmqpJmsTextMessageFacade(connection);
+ AmqpJmsTextMessageFacade facade = new AmqpJmsTextMessageFacade();
+ facade.initialize(connection);
if (payload != null) {
facade.setText(payload);
}
- return new JmsTextMessage(facade);
+ return facade.asJmsMessage();
}
@Override
public JmsBytesMessage createBytesMessage() throws JMSException {
- return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(connection));
+ AmqpJmsBytesMessageFacade facade = new AmqpJmsBytesMessageFacade();
+ facade.initialize(connection);
+ return facade.asJmsMessage();
}
@Override
public JmsMapMessage createMapMessage() throws JMSException {
- return new JmsMapMessage(new AmqpJmsMapMessageFacade(connection));
+ AmqpJmsMapMessageFacade facade = new AmqpJmsMapMessageFacade();
+ facade.initialize(connection);
+ return facade.asJmsMessage();
}
@Override
public JmsStreamMessage createStreamMessage() throws JMSException {
- return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(connection));
+ AmqpJmsStreamMessageFacade facade = new AmqpJmsStreamMessageFacade();
+ facade.initialize(connection);
+ return facade.asJmsMessage();
}
@Override
@@ -95,9 +101,9 @@ public class AmqpJmsMessageFactory implements JmsMessageFactory {
@Override
public JmsObjectMessage createObjectMessage(Serializable payload) throws JMSException {
- JmsObjectMessageFacade facade = new AmqpJmsObjectMessageFacade(
- connection, connection.isObjectMessageUsesAmqpTypes());
+ AmqpJmsObjectMessageFacade facade = new AmqpJmsObjectMessageFacade();
+ facade.initialize(connection);
if (payload != null) {
try {
facade.setObject(payload);
@@ -106,6 +112,6 @@ public class AmqpJmsMessageFactory implements JmsMessageFactory {
}
}
- return new JmsObjectMessage(facade);
+ return facade.asJmsMessage();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
index f4b541f..e0f026f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
@@ -16,7 +16,6 @@
*/
package org.apache.qpid.jms.provider.amqp.message;
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_OBJECT_MESSAGE;
import java.io.IOException;
@@ -25,13 +24,11 @@ import java.io.Serializable;
import javax.jms.JMSException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsObjectMessage;
import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade;
import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
-import org.apache.qpid.proton.message.Message;
-
-import io.netty.buffer.ByteBuf;
/**
* Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
@@ -40,46 +37,20 @@ import io.netty.buffer.ByteBuf;
public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements JmsObjectMessageFacade {
private AmqpObjectTypeDelegate delegate;
+ private JmsDeserializationPolicy deserializationPolicy;
- private final JmsDeserializationPolicy deserializationPolicy;
-
- /**
- * Creates a new facade instance for outgoing message
- *
- * @param connection
- * the AmqpConnection that under which this facade was created.
- * @param isAmqpTypeEncoded
- * controls the type used to encode the body.
- */
- public AmqpJmsObjectMessageFacade(AmqpConnection connection, boolean isAmqpTypeEncoded) {
- this(connection, isAmqpTypeEncoded, null);
- }
-
- private AmqpJmsObjectMessageFacade(AmqpConnection connection, boolean isAmqpTypeEncoded, JmsDeserializationPolicy deserializationPolicy) {
- super(connection);
- this.deserializationPolicy = deserializationPolicy;
-
- setMessageAnnotation(JMS_MSG_TYPE, JMS_OBJECT_MESSAGE);
- initDelegate(isAmqpTypeEncoded, null);
+ @Override
+ public void initialize(AmqpConnection connection) {
+ super.initialize(connection);
+ initDelegate(connection.isObjectMessageUsesAmqpTypes());
}
- /**
- * Creates a new Facade around an incoming AMQP Message for dispatch to the
- * JMS Consumer instance.
- *
- * @param consumer
- * the consumer that received this message.
- * @param message
- * the incoming Message instance that is being wrapped.
- * @param messageBytes
- * a copy of the raw bytes of the incoming message.
- */
- public AmqpJmsObjectMessageFacade(AmqpConsumer consumer, Message message, ByteBuf messageBytes) {
- super(consumer, message);
+ @Override
+ public void initialize(AmqpConsumer consumer) {
+ super.initialize(consumer);
deserializationPolicy = consumer.getResourceInfo().getDeserializationPolicy();
-
- boolean javaSerialized = AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(message.getContentType());
- initDelegate(!javaSerialized, messageBytes);
+ boolean javaSerialized = AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(getContentType());
+ initDelegate(!javaSerialized);
}
/**
@@ -96,7 +67,9 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
@Override
public AmqpJmsObjectMessageFacade copy() throws JMSException {
- AmqpJmsObjectMessageFacade copy = new AmqpJmsObjectMessageFacade(connection, isAmqpTypedEncoding(), deserializationPolicy);
+ AmqpJmsObjectMessageFacade copy = new AmqpJmsObjectMessageFacade();
+ copy.deserializationPolicy = deserializationPolicy;
+ copy.initDelegate(isAmqpTypedEncoding());
copyInto(copy);
try {
delegate.copyInto(copy.delegate);
@@ -135,6 +108,11 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
delegate.onSend();
}
+ @Override
+ public JmsObjectMessage asJmsMessage() {
+ return new JmsObjectMessage(this);
+ }
+
void setUseAmqpTypedEncoding(boolean useAmqpTypedEncoding) throws JMSException {
if (useAmqpTypedEncoding != delegate.isAmqpTypeEncoded()) {
try {
@@ -142,9 +120,9 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
AmqpObjectTypeDelegate newDelegate = null;
if (useAmqpTypedEncoding) {
- newDelegate = new AmqpTypedObjectDelegate(this, null);
+ newDelegate = new AmqpTypedObjectDelegate(this);
} else {
- newDelegate = new AmqpSerializedObjectDelegate(this, null, deserializationPolicy);
+ newDelegate = new AmqpSerializedObjectDelegate(this, deserializationPolicy);
}
newDelegate.setObject(existingObject);
@@ -156,11 +134,11 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
}
}
- private void initDelegate(boolean useAmqpTypes, ByteBuf messageBytes) {
+ private void initDelegate(boolean useAmqpTypes) {
if (!useAmqpTypes) {
- delegate = new AmqpSerializedObjectDelegate(this, messageBytes, deserializationPolicy);
+ delegate = new AmqpSerializedObjectDelegate(this, deserializationPolicy);
} else {
- delegate = new AmqpTypedObjectDelegate(this, messageBytes);
+ delegate = new AmqpTypedObjectDelegate(this);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
index 64f1fbd..4843065 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
@@ -16,7 +16,6 @@
*/
package org.apache.qpid.jms.provider.amqp.message;
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_STREAM_MESSAGE;
import java.util.ArrayList;
@@ -25,14 +24,12 @@ import java.util.List;
import javax.jms.MessageEOFException;
+import org.apache.qpid.jms.message.JmsStreamMessage;
import org.apache.qpid.jms.message.facade.JmsStreamMessageFacade;
-import org.apache.qpid.jms.provider.amqp.AmqpConnection;
-import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
/**
* Wrapper around an AMQP Message instance that will be treated as a JMS StreamMessage
@@ -43,61 +40,11 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements
private List<Object> list;
private int position = 0;
- /**
- * Create a new facade ready for sending.
- *
- * @param connection
- * the AmqpConnection that under which this facade was created.
- */
- public AmqpJmsStreamMessageFacade(AmqpConnection connection) {
- super(connection);
- list = initializeEmptyBodyList(true);
- setMessageAnnotation(JMS_MSG_TYPE, JMS_STREAM_MESSAGE);
- }
-
- /**
- * Creates a new Facade around an incoming AMQP Message for dispatch to the
- * JMS Consumer instance.
- *
- * @param consumer
- * the consumer that received this message.
- * @param message
- * the incoming Message instance that is being wrapped.
- */
- @SuppressWarnings("unchecked")
- public AmqpJmsStreamMessageFacade(AmqpConsumer consumer, Message message) {
- super(consumer, message);
-
- Section body = getAmqpMessage().getBody();
- if (body == null) {
- list = initializeEmptyBodyList(true);
- } else if (body instanceof AmqpValue) {
- Object value = ((AmqpValue) body).getValue();
-
- if (value == null) {
- list = initializeEmptyBodyList(false);
- } else if (value instanceof List) {
- list = (List<Object>) value;
- } else {
- throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
- }
- } else if (body instanceof AmqpSequence) {
- List<?> value = ((AmqpSequence) body).getValue();
-
- if (value == null) {
- list = initializeEmptyBodyList(true);
- } else {
- list = (List<Object>) value;
- }
- } else {
- throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
- }
- }
-
@Override
public AmqpJmsStreamMessageFacade copy() {
- AmqpJmsStreamMessageFacade copy = new AmqpJmsStreamMessageFacade(connection);
+ AmqpJmsStreamMessageFacade copy = new AmqpJmsStreamMessageFacade();
copyInto(copy);
+ copy.initializeEmptyBodyList(getBody() instanceof AmqpSequence);
copy.list.addAll(list);
return copy;
}
@@ -166,13 +113,53 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements
return !list.isEmpty();
}
+ @Override
+ public JmsStreamMessage asJmsMessage() {
+ return new JmsStreamMessage(this);
+ }
+
+ @Override
+ protected void initializeEmptyBody() {
+ list = initializeEmptyBodyList(true);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ void setBody(Section body) {
+ if (body == null) {
+ list = initializeEmptyBodyList(true);
+ } else if (body instanceof AmqpValue) {
+ Object value = ((AmqpValue) body).getValue();
+
+ if (value == null) {
+ list = initializeEmptyBodyList(false);
+ } else if (value instanceof List) {
+ list = (List<Object>) value;
+ super.setBody(body);
+ } else {
+ throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
+ }
+ } else if (body instanceof AmqpSequence) {
+ List<?> value = ((AmqpSequence) body).getValue();
+
+ if (value == null) {
+ list = initializeEmptyBodyList(true);
+ } else {
+ list = (List<Object>) value;
+ super.setBody(body);
+ }
+ } else {
+ throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
+ }
+ }
+
private List<Object> initializeEmptyBodyList(boolean useSequenceBody) {
List<Object> emptyList = new ArrayList<Object>();
if (useSequenceBody) {
- message.setBody(new AmqpSequence(emptyList));
+ setBody(new AmqpSequence(emptyList));
} else {
- message.setBody(new AmqpValue(emptyList));
+ setBody(new AmqpValue(emptyList));
}
return emptyList;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
index 44ed9e6..5595362 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
@@ -16,7 +16,6 @@
*/
package org.apache.qpid.jms.provider.amqp.message;
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE;
import java.nio.ByteBuffer;
@@ -28,14 +27,12 @@ import java.nio.charset.StandardCharsets;
import javax.jms.JMSException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsTextMessage;
import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
-import org.apache.qpid.jms.provider.amqp.AmqpConnection;
-import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
/**
* Wrapper around an AMQP Message instance that will be treated as a JMS TextMessage
@@ -45,32 +42,11 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
private final Charset charset;
- /**
- * Create a new AMQP Message facade ready for sending.
- *
- * @param connection
- * the AmqpConnection that under which this facade was created.
- */
- public AmqpJmsTextMessageFacade(AmqpConnection connection) {
- super(connection);
- setMessageAnnotation(JMS_MSG_TYPE, JMS_TEXT_MESSAGE);
- setText(null);
- charset = StandardCharsets.UTF_8;
+ public AmqpJmsTextMessageFacade() {
+ this(StandardCharsets.UTF_8);
}
- /**
- * Creates a new Facade around an incoming AMQP Message for dispatch to the
- * JMS Consumer instance.
- *
- * @param consumer
- * the consumer that received this message.
- * @param message
- * the incoming Message instance that is being wrapped.
- * @param charset
- * the character set to use when decoding the text when the body is a Data section
- */
- public AmqpJmsTextMessageFacade(AmqpConsumer consumer, Message message, Charset charset) {
- super(consumer, message);
+ AmqpJmsTextMessageFacade(Charset charset) {
this.charset = charset;
}
@@ -84,7 +60,7 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
@Override
public AmqpJmsTextMessageFacade copy() throws JMSException {
- AmqpJmsTextMessageFacade copy = new AmqpJmsTextMessageFacade(connection);
+ AmqpJmsTextMessageFacade copy = new AmqpJmsTextMessageFacade();
copyInto(copy);
copy.setText(getText());
return copy;
@@ -92,7 +68,7 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
@Override
public String getText() throws JMSException {
- Section body = getAmqpMessage().getBody();
+ Section body = getBody();
if (body == null) {
return null;
@@ -126,13 +102,12 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
@Override
public void setText(String value) {
- AmqpValue body = new AmqpValue(value);
- getAmqpMessage().setBody(body);
+ setBody(new AmqpValue(value));
}
@Override
public void clearBody() {
- setText(null);
+ setBody(new AmqpValue(null));
}
@Override
@@ -144,7 +119,17 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
}
}
+ @Override
+ public JmsTextMessage asJmsMessage() {
+ return new JmsTextMessage(this);
+ }
+
Charset getCharset() {
return charset;
}
+
+ @Override
+ protected void initializeEmptyBody() {
+ setBody(new AmqpValue(null));
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
index 40987e1..702870b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms.provider.amqp.message;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import io.netty.buffer.ByteBuf;
@@ -117,14 +118,14 @@ public final class AmqpMessageSupport {
*
* @param key
* the String key to use to lookup an annotation.
- * @param message
- * the AMQP message object that is being examined.
+ * @param messageAnnotations
+ * the AMQP message annotations object that is being examined.
*
* @return the given annotation value or null if not present in the message.
*/
- public static Object getMessageAnnotation(String key, Message message) {
- if (message != null && message.getMessageAnnotations() != null) {
- Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
+ public static Object getMessageAnnotation(String key, MessageAnnotations messageAnnotations) {
+ if (messageAnnotations != null && messageAnnotations.getValue() != null) {
+ Map<Symbol, Object> annotations = messageAnnotations.getValue();
return annotations.get(AmqpMessageSupport.getSymbol(key));
}
@@ -138,16 +139,18 @@ public final class AmqpMessageSupport {
*
* @param contentType
* content type string to compare against, or null if none
- * @param message
- * the AMQP message object that is being examined.
+ * @param messageContentType
+ * the content type value read from an AMQP message object.
*
* @return true if content type matches
*/
- public static boolean isContentType(String contentType, Message message) {
+ public static boolean isContentType(String contentType, Symbol messageContentType) {
if (contentType == null) {
- return message.getContentType() == null;
+ return messageContentType == null;
+ } else if (messageContentType == null) {
+ return false;
} else {
- return contentType.equals(message.getContentType());
+ return contentType.equals(messageContentType.toString());
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
index ec73ba9..b8cdbb4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
@@ -17,14 +17,12 @@
package org.apache.qpid.jms.provider.amqp.message;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.decodeMessage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
import org.apache.qpid.jms.util.ClassLoadingAwareObjectInputStream;
@@ -32,9 +30,6 @@ import org.apache.qpid.jms.util.ClassLoadingAwareObjectInputStream.TrustedClassF
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
-
-import io.netty.buffer.ByteBuf;
/**
* Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
@@ -56,10 +51,7 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
}
private final AmqpJmsMessageFacade parent;
- private final Message message;
- private final AtomicReference<Section> cachedReceivedBody = new AtomicReference<Section>();
private final JmsDeserializationPolicy deserializationPolicy;
- private ByteBuf messageBytes;
private boolean localContent;
/**
@@ -67,23 +59,14 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
*
* @param parent
* the AMQP message facade instance where the object is to be stored / read.
- * @param messageBytes
- * the raw bytes that comprise the message when it was received.
* @param deserializationPolicy
* the JmsDeserializationPolicy that is used to validate the security of message
* content, may be null (e.g on new outgoing messages).
*/
- public AmqpSerializedObjectDelegate(AmqpJmsMessageFacade parent, ByteBuf messageBytes, JmsDeserializationPolicy deserializationPolicy) {
+ public AmqpSerializedObjectDelegate(AmqpJmsMessageFacade parent, JmsDeserializationPolicy deserializationPolicy) {
this.parent = parent;
- this.message = parent.getAmqpMessage();
- this.message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- this.messageBytes = messageBytes;
+ this.parent.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
this.deserializationPolicy = deserializationPolicy;
-
- // Cache the body so the first access can grab it without extra work.
- if (messageBytes != null) {
- cachedReceivedBody.set(message.getBody());
- }
}
private static byte[] getSerializedBytes(Serializable value) throws IOException {
@@ -100,31 +83,24 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
@Override
public Serializable getObject() throws IOException, ClassNotFoundException {
- Binary bin = null;
-
- Section body = cachedReceivedBody.getAndSet(null);
- if (body == null) {
- if (messageBytes != null) {
- body = decodeMessage(messageBytes).getBody();
- } else {
- body = message.getBody();
- }
- }
+ Binary binary = null;
+
+ Section body = parent.getBody();
if (body == null || body == NULL_OBJECT_BODY) {
return null;
} else if (body instanceof Data) {
- bin = ((Data) body).getValue();
+ binary = ((Data) body).getValue();
} else {
throw new IllegalStateException("Unexpected body type: " + body.getClass().getSimpleName());
}
- if (bin == null) {
+ if (binary == null) {
return null;
} else {
Serializable serialized = null;
- try (ByteArrayInputStream bais = new ByteArrayInputStream(bin.getArray(), bin.getArrayOffset(), bin.getLength());
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(binary.getArray(), binary.getArrayOffset(), binary.getLength());
ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(bais, this)) {
serialized = (Serializable) objIn.readObject();
@@ -136,24 +112,21 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
@Override
public void setObject(Serializable value) throws IOException {
- cachedReceivedBody.set(null);
-
if (value == null) {
- message.setBody(NULL_OBJECT_BODY);
+ parent.setBody(NULL_OBJECT_BODY);
} else {
byte[] bytes = getSerializedBytes(value);
- message.setBody(new Data(new Binary(bytes)));
+ parent.setBody(new Data(new Binary(bytes)));
}
- messageBytes = null;
localContent = true;
}
@Override
public void onSend() {
- message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- if (message.getBody() == null) {
- message.setBody(NULL_OBJECT_BODY);
+ parent.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ if (parent.getBody() == null) {
+ parent.setBody(NULL_OBJECT_BODY);
}
}
@@ -164,20 +137,11 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
} else {
AmqpSerializedObjectDelegate target = (AmqpSerializedObjectDelegate) copy;
- // Swap our cached value to the copy, we will just decode it if we need it.
- target.cachedReceivedBody.set(cachedReceivedBody.getAndSet(null));
-
- // If we have the original bytes just copy those and let the next get
- // decode them into the payload, otherwise we need to do a deep copy.
- if (messageBytes != null) {
- target.messageBytes = messageBytes.copy();
- }
-
target.localContent = localContent;
// Copy the already encoded message body if it exists, subsequent gets
// will deserialize the data so no mutations can occur.
- target.message.setBody(message.getBody());
+ target.parent.setBody(parent.getBody());
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
index 1296eaa..88b7b9d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
@@ -16,20 +16,15 @@
*/
package org.apache.qpid.jms.provider.amqp.message;
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.decodeMessage;
-import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.encodeMessage;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
import io.netty.buffer.ByteBuf;
@@ -41,39 +36,31 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
static final AmqpValue NULL_OBJECT_BODY = new AmqpValue(null);
- private final Message message;
- private final AtomicReference<Section> cachedReceivedBody = new AtomicReference<Section>();
- private ByteBuf messageBytes;
+ private ByteBuf encodedBody;
+ private final AmqpJmsMessageFacade parent;
/**
* Create a new delegate that uses Java serialization to store the message content.
*
* @param parent
* the AMQP message facade instance where the object is to be stored / read.
- * @param messageBytes
- * the raw bytes that comprise the AMQP message that was received.
*/
- public AmqpTypedObjectDelegate(AmqpJmsMessageFacade parent, ByteBuf messageBytes) {
- this.message = parent.getAmqpMessage();
- this.message.setContentType(null);
- this.messageBytes = messageBytes;
-
- // Cache the body so the first access can grab it without extra work.
- if (messageBytes != null) {
- cachedReceivedBody.set(message.getBody());
+ public AmqpTypedObjectDelegate(AmqpJmsMessageFacade parent) {
+ this.parent = parent;
+ this.parent.setContentType(null);
+
+ // Create a duplicate of the message body for decode on read attempts
+ if (parent.getBody() != null) {
+ encodedBody = AmqpCodec.encode(parent.getBody());
}
}
@Override
public Serializable getObject() throws IOException, ClassNotFoundException {
- Section body = cachedReceivedBody.getAndSet(null);
+ Section body = null;
- if (body == null) {
- if (messageBytes != null) {
- body = decodeMessage(messageBytes).getBody();
- } else {
- body = message.getBody();
- }
+ if (encodedBody != null) {
+ body = AmqpCodec.decode(encodedBody);
}
if (body == null) {
@@ -98,20 +85,15 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
@Override
public void setObject(Serializable value) throws IOException {
- cachedReceivedBody.set(null);
-
if (value == null) {
- message.setBody(NULL_OBJECT_BODY);
- messageBytes = null;
+ parent.setBody(NULL_OBJECT_BODY);
+ encodedBody = null;
} else if (isSupportedAmqpValueObjectType(value)) {
- Message transfer = Message.Factory.create();
-
// Exchange the incoming body value for one that is created from encoding
// and decoding the value. Save the bytes for subsequent getObject and
// copyInto calls to use.
- transfer.setBody(new AmqpValue(value));
- messageBytes = encodeMessage(transfer);
- transfer = decodeMessage(messageBytes);
+ encodedBody = AmqpCodec.encode(new AmqpValue(value));
+ Section decodedBody = AmqpCodec.decode(encodedBody);
// This step requires a heavy-weight operation of both encoding and decoding the
// incoming body value in order to create a copy such that changes to the original
@@ -120,7 +102,7 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
// proton such that we can encode the body and use those bytes directly on the
// message as it is being sent.
- message.setBody(transfer.getBody());
+ parent.setBody(decodedBody);
} else {
// TODO: Data and AmqpSequence?
throw new IllegalArgumentException("Encoding this object type with the AMQP type system is not supported: " + value.getClass().getName());
@@ -129,9 +111,9 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
@Override
public void onSend() {
- message.setContentType(null);
- if (message.getBody() == null) {
- message.setBody(NULL_OBJECT_BODY);
+ parent.setContentType(null);
+ if (parent.getBody() == null) {
+ parent.setBody(NULL_OBJECT_BODY);
}
}
@@ -142,23 +124,17 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
} else {
AmqpTypedObjectDelegate target = (AmqpTypedObjectDelegate) copy;
- // Swap our cached value (if any) to the copy, we will just decode it if we need it later.
- target.cachedReceivedBody.set(cachedReceivedBody.getAndSet(null));
-
- if (messageBytes != null) {
- // If we have the original bytes just copy those and let the next get
- // decode them into the payload (or for the copy, use the cached
- // body if it was swapped above).
- target.messageBytes = messageBytes.copy();
+ // If there ever was a body then we will have a snapshot of it and we can
+ // be sure that our state is correct.
+ if (encodedBody != null) {
+ // If we have any body bytes just duplicate those and let the next get
+ // decode them into the returned object payload value.
+ target.encodedBody = encodedBody.duplicate();
// Internal message body copy to satisfy sends. This is safe since the body was set
// from a copy (decoded from the bytes) to ensure it is a snapshot. Also safe for
// gets as they will use the message bytes (or cached body if set) to return the object.
- target.message.setBody(message.getBody());
- } else {
- // We have to deep get/set copy here, otherwise a get might return
- // the object value carried by the original version.
- copy.setObject(getObject());
+ target.parent.setBody(parent.getBody());
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
new file mode 100644
index 0000000..7088b5f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+/**
+ * Writable Buffer implementation based on a Netty ByteBuf
+ */
+public class AmqpWritableBuffer implements WritableBuffer {
+
+ public ByteBuf nettyBuffer;
+
+ public AmqpWritableBuffer() {
+ nettyBuffer = Unpooled.buffer(1024);
+ }
+
+ public AmqpWritableBuffer(ByteBuf buffer) {
+ nettyBuffer = buffer;
+ }
+
+ public ByteBuf getBuffer() {
+ return nettyBuffer;
+ }
+
+ @Override
+ public void put(byte b) {
+ nettyBuffer.writeByte(b);
+ }
+
+ @Override
+ public void putFloat(float f) {
+ nettyBuffer.writeFloat(f);
+ }
+
+ @Override
+ public void putDouble(double d) {
+ nettyBuffer.writeDouble(d);
+ }
+
+ @Override
+ public void put(byte[] src, int offset, int length) {
+ nettyBuffer.writeBytes(src, offset, length);
+ }
+
+ @Override
+ public void put(ByteBuffer payload) {
+ nettyBuffer.writeBytes(payload);
+ }
+
+ public void put(ByteBuf payload) {
+ nettyBuffer.writeBytes(payload);
+ }
+
+ @Override
+ public void putShort(short s) {
+ nettyBuffer.writeShort(s);
+ }
+
+ @Override
+ public void putInt(int i) {
+ nettyBuffer.writeInt(i);
+ }
+
+ @Override
+ public void putLong(long l) {
+ nettyBuffer.writeLong(l);
+ }
+
+ @Override
+ public boolean hasRemaining() {
+ return nettyBuffer.writerIndex() < nettyBuffer.capacity();
+ }
+
+ @Override
+ public int remaining() {
+ return nettyBuffer.capacity() - nettyBuffer.writerIndex();
+ }
+
+ @Override
+ public int position() {
+ return nettyBuffer.writerIndex();
+ }
+
+ @Override
+ public void position(int position) {
+ nettyBuffer.writerIndex(position);
+ }
+
+ @Override
+ public int limit() {
+ return nettyBuffer.capacity();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
index f9188c5..3254c87 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -29,14 +30,17 @@ import java.io.IOException;
import java.util.Arrays;
import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageNotWriteableException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -383,4 +387,105 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(3000);
}
}
+
+ @Test(timeout = 20000)
+ public void testAsyncSendMarksBytesMessageReadOnly() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(15000);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ BytesMessage message = session.createBytesMessage();
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response so that we can check that
+ // the inflight message is read-only
+ testPeer.expectSenderAttach();
+ testPeer.expectTransferButDoNotRespond(messageMatcher);
+ testPeer.expectClose();
+
+ MessageProducer producer = session.createProducer(queue);
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+ try {
+ producer.send(message, listener);
+ } catch (Throwable error) {
+ fail("Send should not fail for async.");
+ }
+
+ try {
+ message.setJMSCorrelationID("test");
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSCorrelationIDAsBytes(new byte[]{});
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSDeliveryMode(0);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSDestination(queue);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSExpiration(0);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSMessageID(queueName);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSPriority(0);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSRedelivered(false);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSReplyTo(queue);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSTimestamp(0);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSType(queueName);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setStringProperty("test", "test");
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.writeBoolean(true);
+ fail("Message should not be writable after a send.");
+ } catch (MessageNotWriteableException mnwe) {}
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ private class TestJmsCompletionListener implements CompletionListener {
+
+ @Override
+ public void onCompletion(Message message) {
+ }
+
+ @Override
+ public void onException(Message message, Exception exception) {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/657747b7/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index 10bcffd..02ca520 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -29,15 +29,18 @@ import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
+import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -263,4 +266,159 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(3000);
}
}
+
+ @Test(timeout = 20000)
+ public void testSendMapMessageIsWritable() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ String myIntKey = "myInt";
+ int myInt = Integer.MAX_VALUE;
+ String myStringKey = "myString";
+ String myString = myStringKey;
+
+ // Prepare a MapMessage to send to the test peer to send
+ MapMessage mapMessage = session.createMapMessage();
+
+ mapMessage.setString(myStringKey, myString);
+
+ // prepare a matcher for the test peer to use to receive and verify the message
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ map.put(myStringKey, myString);
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.JMS_MSG_TYPE), equalTo(AmqpMessageSupport.JMS_MAP_MESSAGE));
+ MessagePropertiesSectionMatcher propertiesMatcher = new MessagePropertiesSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propertiesMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(map));
+
+ testPeer.expectTransfer(messageMatcher);
+
+ // send the message
+ producer.send(mapMessage);
+
+ // Update the message and matcher and send again
+ mapMessage.setInt(myIntKey, myInt);
+ map.put(myIntKey, myInt);
+ testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
+
+ producer.send(mapMessage);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncSendMarksMapMessageReadOnly() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(15000);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ MapMessage message = session.createMapMessage();
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response so that we can check that
+ // the inflight message is read-only
+ testPeer.expectSenderAttach();
+ testPeer.expectTransferButDoNotRespond(messageMatcher);
+ testPeer.expectClose();
+
+ MessageProducer producer = session.createProducer(queue);
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+ try {
+ producer.send(message, listener);
+ } catch (Throwable error) {
+ fail("Send should not fail for async.");
+ }
+
+ try {
+ message.setJMSCorrelationID("test");
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSCorrelationIDAsBytes(new byte[]{});
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSDeliveryMode(0);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSDestination(queue);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSExpiration(0);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSMessageID(queueName);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSPriority(0);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSRedelivered(false);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSReplyTo(queue);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSTimestamp(0);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setJMSType(queueName);
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setStringProperty("test", "test");
+ fail("Should not be able to set properties on inflight message");
+ } catch (MessageNotWriteableException mnwe) {}
+ try {
+ message.setString("test", "test");
+ fail("Message should not be writable after a send.");
+ } catch (MessageNotWriteableException mnwe) {}
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ private class TestJmsCompletionListener implements CompletionListener {
+
+ @Override
+ public void onCompletion(Message message) {
+ }
+
+ @Override
+ public void onException(Message message, Exception exception) {
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org