You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/07/28 14:42:49 UTC
qpid-broker-j git commit: QPID-7434: Add missing properties
conversion for message conversion layer from 1.0 to 0-8 and add unit tests
Repository: qpid-broker-j
Updated Branches:
refs/heads/master a9bd93b47 -> 5489d616f
QPID-7434: Add missing properties conversion for message conversion layer from 1.0 to 0-8 and add unit tests
* also some fixes for 0-10 to 0-8 conversion
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/5489d616
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/5489d616
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/5489d616
Branch: refs/heads/master
Commit: 5489d616fc41d2aced7ad9db4338595fbb974ac3
Parents: a9bd93b
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Jul 28 15:40:15 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Jul 28 15:40:15 2017 +0100
----------------------------------------------------------------------
.../converter/MessageConversionException.java | 5 +
.../codec/DeliveryAnnotationsWriter.java | 2 +-
.../MessageConverter_0_10_to_0_8.java | 29 +-
.../PropertyConverter_0_10_to_0_8Test.java | 82 +-
.../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 346 +++++++-
.../PropertyConverter_1_0_to_0_8Test.java | 781 +++++++++++++++++++
6 files changed, 1210 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5489d616/broker-core/src/main/java/org/apache/qpid/server/protocol/converter/MessageConversionException.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/converter/MessageConversionException.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/converter/MessageConversionException.java
index 55eed43..4ca8415 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/protocol/converter/MessageConversionException.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/converter/MessageConversionException.java
@@ -22,6 +22,11 @@ package org.apache.qpid.server.protocol.converter;
public class MessageConversionException extends RuntimeException
{
+ public MessageConversionException(final String msg)
+ {
+ super(msg);
+ }
+
public MessageConversionException(final String msg, final Throwable cause)
{
super(msg, cause);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5489d616/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeliveryAnnotationsWriter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeliveryAnnotationsWriter.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeliveryAnnotationsWriter.java
index 2a71c08..deefa35 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeliveryAnnotationsWriter.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/DeliveryAnnotationsWriter.java
@@ -38,7 +38,7 @@ public class DeliveryAnnotationsWriter extends AbstractDescribedTypeWriter<Deliv
public DeliveryAnnotationsWriter(final Registry registry,
final DeliveryAnnotations object)
{
- super(DESCRIPTOR_WRITER, registry.getValueWriter(object));
+ super(DESCRIPTOR_WRITER, registry.getValueWriter(object.getValue()));
}
private static Factory<DeliveryAnnotations> FACTORY = new Factory<DeliveryAnnotations>()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5489d616/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 4ba599b..1d298d8 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -113,7 +113,14 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
}
if(messageProps.hasCorrelationId())
{
- props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId()));
+ try
+ {
+ props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId()));
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new MessageConversionException("Could not convert message from 0-10 to 0-8 because conversion of 'correlationId' failed.", e);
+ }
}
if(messageProps.hasContentEncoding())
{
@@ -143,12 +150,28 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
? ExchangeDefaults.DIRECT_EXCHANGE_CLASS
: exchange.getType();
String routingKeyOption = routingKey == null ? "" : "?routingkey='" + routingKey + "'";
- props.setReplyTo(String.format("%s://%s//%s", exchangeClass, exchangeName, routingKeyOption));
+ final String replyToBindingUrl =
+ String.format("%s://%s//%s", exchangeClass, exchangeName, routingKeyOption);
+ try
+ {
+ props.setReplyTo(replyToBindingUrl);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new MessageConversionException("Could not convert message from 0-10 to 0-8 because conversion of 'reply-to' failed.", e);
+ }
}
}
if(messageProps.hasUserId())
{
- props.setUserId(new AMQShortString(messageProps.getUserId()));
+ try
+ {
+ props.setUserId(new AMQShortString(messageProps.getUserId()));
+ }
+ catch (IllegalArgumentException e)
+ {
+ // ignore
+ }
}
if(messageProps.hasApplicationHeaders())
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5489d616/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
index a432737..1c2f2a1 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
@@ -24,6 +24,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -189,6 +190,37 @@ public class PropertyConverter_0_10_to_0_8Test extends QpidTestCase
assertEquals("Unexpected correlationId", correlationId, properties.getCorrelationId().toString());
}
+ public void testCorrelationIdConversionWhenLengthExceeds255()
+ {
+ final String correlationId = generateLongString();
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setCorrelationId(correlationId.getBytes());
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ try
+ {
+ _messageConverter.convert(message, _namedAddressSpace);
+ fail("expected exception not thrown");
+ }
+ catch (MessageConversionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testCorrelationIdConversionWhenNotString()
+ {
+ final byte[] correlationId = new byte[] {(byte) 0xc3, 0x28};
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setCorrelationId(correlationId);
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties properties = convertedMessage.getContentHeaderBody().getProperties();
+ assertTrue("Unexpected correlationId", Arrays.equals(correlationId, properties.getCorrelationId().getBytes()));
+ }
+
public void testReplyToConversionWhenExchangeAndRoutingKeySpecified()
{
final String exchangeName = "amq.direct";
@@ -259,6 +291,23 @@ public class PropertyConverter_0_10_to_0_8Test extends QpidTestCase
assertNull("Unexpected reply-to", properties.getReplyTo());
}
+ public void testReplyToConversionWhenResultExceeds255()
+ {
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setReplyTo(new ReplyTo(generateLongString(255), generateLongString(255)));
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ try
+ {
+ _messageConverter.convert(message, _namedAddressSpace);
+ fail("expected exception not thrown");
+ }
+ catch (MessageConversionException e)
+ {
+ // pass
+ }
+ }
+
public void testExpirationConversion()
{
long timestamp = System.currentTimeMillis();
@@ -374,6 +423,32 @@ public class PropertyConverter_0_10_to_0_8Test extends QpidTestCase
assertEquals("Unexpected user-id", userId, properties.getUserId().toString());
}
+ public void testUserIdConversionExceeds255()
+ {
+ final String userId = generateLongString();
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setUserId(userId.getBytes());
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+ BasicContentHeaderProperties properties = convertedMessage.getContentHeaderBody().getProperties();
+
+ assertNull("Unexpected user-id", properties.getUserId());
+ }
+
+ public void testUserIdConversionWhenNotUtf8()
+ {
+ final byte[] userId = new byte[] {(byte) 0xc3, 0x28};
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setUserId(userId);
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+ BasicContentHeaderProperties properties = convertedMessage.getContentHeaderBody().getProperties();
+
+ assertTrue("Unexpected user-id", Arrays.equals(userId, properties.getUserId().getBytes()));
+ }
+
public void testExchangeConversion()
{
final String testExchange = "testExchange";
@@ -482,8 +557,13 @@ public class PropertyConverter_0_10_to_0_8Test extends QpidTestCase
private String generateLongString()
{
+ return generateLongString(AMQShortString.MAX_LENGTH + 1);
+ }
+
+ private String generateLongString(int stringLength)
+ {
StringBuilder buffer = new StringBuilder();
- for(int i = 0; i < AMQShortString.MAX_LENGTH + 1 ; i++)
+ for(int i = 0; i < stringLength ; i++)
{
buffer.append('x');
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5489d616/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index c1e5c0a..2fb9243 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -22,25 +22,35 @@ package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
-import org.apache.qpid.server.protocol.v0_8.AMQShortString;
-import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
-import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
-import org.apache.qpid.server.protocol.v0_8.FieldTable;
-import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
+import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0;
import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
import org.apache.qpid.server.protocol.v1_0.Message_1_0;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
import org.apache.qpid.server.store.StoredMessage;
@PluggableService
@@ -64,7 +74,7 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
@Override
public AMQMessage convert(Message_1_0 serverMsg, NamedAddressSpace addressSpace)
{
- return new AMQMessage(convertToStoredMessage(serverMsg), null);
+ return new AMQMessage(convertToStoredMessage(serverMsg, addressSpace), null);
}
@Override
@@ -73,7 +83,8 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
}
- private StoredMessage<MessageMetaData> convertToStoredMessage(final Message_1_0 serverMsg)
+ private StoredMessage<MessageMetaData> convertToStoredMessage(final Message_1_0 serverMsg,
+ final NamedAddressSpace addressSpace)
{
Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg);
@@ -83,7 +94,8 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
final String mimeType = converter == null ? null : converter.getMimeType();
final MessageMetaData messageMetaData_0_8 = convertMetaData(serverMsg,
mimeType,
- messageContent.length);
+ messageContent.length,
+ addressSpace);
final int metadataSize = messageMetaData_0_8.getStorableSize();
return new StoredMessage<MessageMetaData>()
@@ -144,57 +156,331 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
};
}
- private MessageMetaData convertMetaData(final Message_1_0 serverMsg, final String bodyMimeType, final int size)
+ private MessageMetaData convertMetaData(final Message_1_0 serverMsg,
+ final String bodyMimeType,
+ final int size,
+ final NamedAddressSpace addressSpace)
{
- final MessageMetaData_1_0.MessageHeader_1_0 header = serverMsg.getMessageHeader();
- String key = header.getTo();
- if(key == null)
- {
- key = header.getSubject();
- }
-
- MessagePublishInfo publishInfo = new MessagePublishInfo(null, false, false, AMQShortString.valueOf(key));
-
+ final MessageMetaData_1_0.MessageHeader_1_0 header = serverMsg.getMessageHeader();
final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
props.setAppId(serverMsg.getMessageHeader().getAppId());
props.setContentType(bodyMimeType);
- props.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId());
+ props.setEncoding(convertToShortStringForProperty("content-encoding",
+ serverMsg.getMessageHeader().getEncoding()));
+ props.setCorrelationId(getCorrelationId(serverMsg));
props.setDeliveryMode(serverMsg.isPersistent() ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT);
- props.setExpiration(serverMsg.getExpiration());
- props.setMessageId(serverMsg.getMessageHeader().getMessageId());
+
+
+ final Date absoluteExpiryTime = getAbsoluteExpiryTime(serverMsg);
+ if (absoluteExpiryTime != null)
+ {
+ props.setExpiration(absoluteExpiryTime.getTime());
+ }
+ else
+ {
+ Long ttl = getTtl(serverMsg);
+ if (ttl != null)
+ {
+ props.setExpiration(ttl + serverMsg.getArrivalTime());
+ }
+ }
+
+ props.setMessageId(getMessageId(serverMsg));
props.setPriority(serverMsg.getMessageHeader().getPriority());
- props.setReplyTo(serverMsg.getMessageHeader().getReplyTo());
- props.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
- props.setUserId(serverMsg.getMessageHeader().getUserId());
+ props.setReplyTo(getReplyTo(serverMsg, addressSpace));
+ final long timestamp = serverMsg.getMessageHeader().getTimestamp();
+ if (timestamp > 0)
+ {
+ props.setTimestamp(timestamp);
+ }
+ else
+ {
+ props.setTimestamp(serverMsg.getArrivalTime());
+ }
+ props.setUserId(getUserId(serverMsg));
Map<String,Object> headerProps = new LinkedHashMap<String, Object>();
if(header.getSubject() != null)
{
headerProps.put("qpid.subject", header.getSubject());
+ props.setType(convertToShortStringForProperty("subject", header.getSubject()));
}
- for(String headerName : serverMsg.getMessageHeader().getHeaderNames())
+ String groupId = getGroupId(serverMsg);
+ if (groupId != null)
{
- headerProps.put(headerName, MessageConverter_from_1_0.convertValue(serverMsg.getMessageHeader().getHeader(headerName)));
+ headerProps.put("JMSXGroupID", groupId);
}
- props.setHeaders(FieldTable.convertToFieldTable(headerProps));
+ UnsignedInteger groupSequence = getGroupSequence(serverMsg);
+ if (groupSequence != null)
+ {
+ headerProps.put("JMSXGroupSeq", groupSequence.intValue());
+ }
+
+ for (String headerName : serverMsg.getMessageHeader().getHeaderNames())
+ {
+ headerProps.put(headerName,
+ MessageConverter_from_1_0.convertValue(serverMsg.getMessageHeader().getHeader(headerName)));
+ }
+
+ final FieldTable headers;
+ try
+ {
+ headers = FieldTable.convertToFieldTable(headerProps);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new MessageConversionException(
+ "Could not convert message from 1.0 to 0-8 because conversion of 'application-properties' failed.",
+ e);
+ }
+ props.setHeaders(headers);
final ContentHeaderBody chb = new ContentHeaderBody(props);
chb.setBodySize(size);
+ MessagePublishInfo publishInfo = createMessagePublishInfo(header, addressSpace);
return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
}
+ private MessagePublishInfo createMessagePublishInfo(final MessageMetaData_1_0.MessageHeader_1_0 header,
+ final NamedAddressSpace addressSpace)
+ {
+ final String to = header.getTo();
+ final String subject = header.getSubject() == null ? "" : header.getSubject();
+
+ final String exchangeName;
+ final String routingKey;
+
+ if (to != null)
+ {
+ if (to.startsWith("/"))
+ {
+ //TODO: get local address from global
+ throw new MessageConversionException("Could not convert message from 1.0 to 0-8 because conversion of 'to' failed. Global addresses cannot be converted.");
+ }
+
+ int separatorPosition = to.indexOf('/');
+ if (separatorPosition != -1)
+ {
+ exchangeName = to.substring(0, separatorPosition);
+ routingKey = to.substring(separatorPosition + 1);
+ }
+ else
+ {
+ MessageDestination destination = addressSpace.getAttainedMessageDestination(to);
+ if (destination instanceof Queue)
+ {
+ exchangeName = "";
+ routingKey = to;
+ }
+ else
+ {
+ exchangeName = to;
+ routingKey = subject;
+ }
+ }
+ }
+ else
+ {
+ exchangeName = "";
+ routingKey = subject;
+ }
+
+ return new MessagePublishInfo(convertToShortStringForProperty("to", exchangeName),
+ false,
+ false,
+ convertToShortStringForProperty("to' or 'subject",
+ routingKey));
+ }
+
+ private UnsignedInteger getGroupSequence(final Message_1_0 serverMsg)
+ {
+ final PropertiesSection propertiesSection = serverMsg.getPropertiesSection();
+ if (propertiesSection != null)
+ {
+ final Properties properties = propertiesSection.getValue();
+ if (properties != null)
+ {
+ return properties.getGroupSequence();
+ }
+ }
+ return null;
+ }
+
+ private String getGroupId(final Message_1_0 serverMsg)
+ {
+ final PropertiesSection propertiesSection = serverMsg.getPropertiesSection();
+ if (propertiesSection != null)
+ {
+ final Properties properties = propertiesSection.getValue();
+ if (properties != null)
+ {
+ return properties.getGroupId();
+ }
+ }
+ return null;
+ }
+
+ private AMQShortString getUserId(final Message_1_0 serverMsg)
+ {
+ final PropertiesSection propertiesSection = serverMsg.getPropertiesSection();
+ if (propertiesSection != null)
+ {
+ final Properties properties = propertiesSection.getValue();
+ if (properties != null)
+ {
+ Binary userId = properties.getUserId();
+ if (userId != null)
+ {
+ try
+ {
+ return new AMQShortString(userId.getArray());
+ }
+ catch (IllegalArgumentException e)
+ {
+ return null;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private AMQShortString getMessageId(final Message_1_0 serverMsg)
+ {
+ final PropertiesSection propertiesSection = serverMsg.getPropertiesSection();
+ if (propertiesSection != null)
+ {
+ final Properties properties = propertiesSection.getValue();
+ if (properties != null)
+ {
+ Object messageId = properties.getMessageId();
+ try
+ {
+ if (messageId instanceof Binary)
+ {
+ return new AMQShortString(((Binary) messageId).getArray());
+ }
+ else if (messageId instanceof byte[])
+ {
+ return new AMQShortString(((byte[]) messageId));
+ }
+ else
+ {
+ return AMQShortString.valueOf(messageId);
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+ }
+ }
+ return null;
+
+ }
+
+ private Date getAbsoluteExpiryTime(final Message_1_0 serverMsg)
+ {
+ final PropertiesSection propertiesSection = serverMsg.getPropertiesSection();
+ if (propertiesSection != null)
+ {
+ final Properties properties = propertiesSection.getValue();
+ if (properties != null)
+ {
+ return properties.getAbsoluteExpiryTime();
+ }
+ }
+ return null;
+ }
+
+ private Long getTtl(final Message_1_0 serverMsg)
+ {
+ HeaderSection headerSection = serverMsg.getHeaderSection();
+ if (headerSection != null)
+ {
+ Header header = headerSection.getValue();
+ if (header != null)
+ {
+ UnsignedInteger ttl = header.getTtl();
+ if (ttl != null)
+ {
+ return ttl.longValue();
+ }
+ }
+ }
+ return null;
+ }
+
+ private AMQShortString getReplyTo(final Message_1_0 serverMsg, final NamedAddressSpace addressSpace)
+ {
+ // TODO : QPID-7602 - we probably need to look up the replyTo object and construct the correct BURL based on that
+ final String replyTo = serverMsg.getMessageHeader().getReplyTo();
+ try
+ {
+ return AMQShortString.valueOf(replyTo);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new MessageConversionException("Could not convert message from 1.0 to 0-8 because conversion of 'reply-to' failed.", e);
+ }
+ }
+
+ private AMQShortString getCorrelationId(final Message_1_0 serverMsg)
+ {
+ AMQShortString correlationId = null;
+ final PropertiesSection propertiesSection = serverMsg.getPropertiesSection();
+ if (propertiesSection != null)
+ {
+ final Properties properties = propertiesSection.getValue();
+ if (properties != null)
+ {
+ final Object correlationIdObject = properties.getCorrelationId();
+ try
+ {
+ if (correlationIdObject instanceof Binary)
+ {
+ correlationId = new AMQShortString(((Binary) correlationIdObject).getArray());
+ }
+ else if (correlationIdObject instanceof byte[])
+ {
+ correlationId = new AMQShortString(((byte[]) correlationIdObject));
+ }
+ else
+ {
+ correlationId = AMQShortString.valueOf(correlationIdObject);
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new MessageConversionException("Could not convert message from 1.0 to 0-8 because conversion of 'correlation-id' failed.", e);
+ }
+ }
+ }
+ return correlationId;
+ }
+
+ private AMQShortString convertToShortStringForProperty(String propertyName, String s)
+ {
+ try
+ {
+ return AMQShortString.valueOf(s);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new MessageConversionException(String.format(
+ "Could not convert message from 1.0 to 0-8 because conversion of '%s' failed.", propertyName), e);
+ }
+ }
@Override
public String getType()
{
return "v1-0 to v0-8";
}
-
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5489d616/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
new file mode 100644
index 0000000..186888c
--- /dev/null
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
@@ -0,0 +1,781 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
+import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Footer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase
+{
+ private NamedAddressSpace _namedAddressSpace;
+ private MessageConverter_1_0_to_v0_8 _messageConverter;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _namedAddressSpace = mock(NamedAddressSpace.class);
+ _messageConverter = new MessageConverter_1_0_to_v0_8();
+ }
+
+ public void testContentEncodingConversion()
+ {
+
+ String contentEncoding = "my-test-encoding";
+ final Properties properties = new Properties();
+ properties.setContentEncoding(Symbol.valueOf(contentEncoding));
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected content encoding", contentEncoding, convertedProperties.getEncoding().toString());
+ }
+
+ public void testContentEncodingConversionWhenLengthExceeds255()
+ {
+
+ String contentEncoding = generateLongString();
+ final Properties properties = new Properties();
+ properties.setContentEncoding(Symbol.valueOf(contentEncoding));
+ Message_1_0 message = createTestMessage(properties);
+
+ try
+ {
+ _messageConverter.convert(message, _namedAddressSpace);
+ fail("expected exception not thrown");
+ }
+ catch (MessageConversionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testApplicationPropertiesConversion()
+ {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("testProperty1", "testProperty1Value");
+ properties.put("intProperty", 1);
+ ApplicationProperties applicationProperties = new ApplicationProperties(properties);
+ Message_1_0 message = createTestMessage(applicationProperties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ final Map<String, Object> headers = FieldTable.convertToMap(convertedProperties.getHeaders());
+ assertEquals("Unexpected headers", properties, new HashMap<>(headers));
+ }
+
+ public void testSubjectConversion()
+ {
+ final String subject = "testSubject";
+ Properties properties = new Properties();
+ properties.setSubject(subject);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ final Map<String, Object> headers = FieldTable.convertToMap(convertedProperties.getHeaders());
+ assertEquals("Unexpected qpid.subject is missing from headers", subject, headers.get("qpid.subject"));
+ assertEquals("Unexpected type", subject, convertedProperties.getType().toString());
+ final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo();
+ assertEquals("Unexpected routing-key", subject, messagePublishInfo.getRoutingKey().toString());
+ }
+
+ public void testSubjectConversionWhenSubjectExceeds255()
+ {
+ final String subject = generateLongString();
+ Properties properties = new Properties();
+ properties.setSubject(subject);
+ Message_1_0 message = createTestMessage(properties);
+
+ try
+ {
+ _messageConverter.convert(message, _namedAddressSpace);
+ fail("Expected conversion exception");
+ }
+ catch (MessageConversionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testDurableConversion()
+ {
+ final Header header = new Header();
+ header.setDurable(true);
+ Message_1_0 message = createTestMessage(header);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected deliveryMode",
+ BasicContentHeaderProperties.PERSISTENT,
+ convertedProperties.getDeliveryMode());
+ }
+
+ public void testNonDurableConversion()
+ {
+ final Header header = new Header();
+ header.setDurable(false);
+ Message_1_0 message = createTestMessage(header);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected deliveryMode",
+ BasicContentHeaderProperties.NON_PERSISTENT,
+ convertedProperties.getDeliveryMode());
+ }
+
+ public void testPriorityConversion()
+ {
+ final Header header = new Header();
+ final byte priority = (byte) 7;
+ header.setPriority(UnsignedByte.valueOf(priority));
+ Message_1_0 message = createTestMessage(header);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected priority", priority, convertedProperties.getPriority());
+ }
+
+ public void testCorrelationIdStringConversion()
+ {
+ final String correlationId = "testCorrelationId";
+ Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected correlationId", correlationId, convertedProperties.getCorrelationId().toString());
+ }
+
+ public void testCorrelationIdLongStringConversion()
+ {
+ final String correlationId = generateLongString();
+ Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ Message_1_0 message = createTestMessage(properties);
+
+ try
+ {
+ _messageConverter.convert(message, _namedAddressSpace);
+ fail("Expected exception not thrown");
+ }
+ catch (MessageConversionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testCorrelationIdULongConversion()
+ {
+ final UnsignedLong correlationId = UnsignedLong.valueOf(-1);
+ Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected correlationId",
+ correlationId.toString(),
+ convertedProperties.getCorrelationId().toString());
+ }
+
+ public void testCorrelationIdUUIDConversion()
+ {
+ final UUID correlationId = UUID.randomUUID();
+ Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected correlationId",
+ correlationId.toString(),
+ convertedProperties.getCorrelationId().toString());
+ }
+
+ public void testCorrelationIdBinaryConversion()
+ {
+ final String testCorrelationId = "testCorrelationId";
+ final Binary correlationId = new Binary(testCorrelationId.getBytes(UTF_8));
+ Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected correlationId", testCorrelationId, convertedProperties.getCorrelationId().toString());
+ }
+
+ public void testCorrelationIdBinaryConversionWhenNotUtf8()
+ {
+ final byte[] testCorrelationId = new byte[]{(byte) 0xc3, 0x28};
+ final Binary correlationId = new Binary(testCorrelationId);
+ Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertTrue("Unexpected correlationId",
+ Arrays.equals(testCorrelationId, convertedProperties.getCorrelationId().getBytes()));
+ }
+
+ public void testReplyToConversionWhenResultExceeds255()
+ {
+ final String replyTo = generateLongString() + "/" + generateLongString();
+ Properties properties = new Properties();
+ properties.setReplyTo(replyTo);
+ Message_1_0 message = createTestMessage(properties);
+
+ try
+ {
+ _messageConverter.convert(message, _namedAddressSpace);
+ fail("expected exception not thrown");
+ }
+ catch (MessageConversionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testTTLConversion()
+ {
+ long ttl = 10000;
+ long arrivalTime = System.currentTimeMillis();
+ long expectedExpiration = arrivalTime + ttl;
+ Header header = new Header();
+ header.setTtl(UnsignedInteger.valueOf(ttl));
+ Message_1_0 message = createTestMessage(header, arrivalTime);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected expiration", expectedExpiration, convertedProperties.getExpiration());
+ }
+
+ public void testAbsoluteExpiryTimeConversion()
+ {
+ long ttl = 10000;
+ long arrivalTime = System.currentTimeMillis();
+ long expiryTime = arrivalTime + ttl;
+ Properties properties = new Properties();
+ properties.setAbsoluteExpiryTime(new Date(expiryTime));
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected expiration", expiryTime, convertedProperties.getExpiration());
+ }
+
+ public void testConversionOfAbsoluteExpiryTimeTakesPrecedenceOverTTL()
+ {
+ long ttl = 10000;
+ final long time = System.currentTimeMillis();
+ long absoluteExpiryTime = time + ttl;
+ long arrivalTime = time + 1;
+
+ Header header = new Header();
+ header.setTtl(UnsignedInteger.valueOf(ttl));
+
+ Properties properties = new Properties();
+ properties.setAbsoluteExpiryTime(new Date(absoluteExpiryTime));
+
+ Message_1_0 message = createTestMessage(header,
+ new DeliveryAnnotations(Collections.emptyMap()),
+ new MessageAnnotations(Collections.emptyMap()),
+ properties,
+ new ApplicationProperties(Collections.emptyMap()),
+ arrivalTime
+ );
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected expiration", absoluteExpiryTime, convertedProperties.getExpiration());
+ }
+
+ public void testMessageIdStringConversion()
+ {
+ final String messageId = "testMessageId";
+ Properties properties = new Properties();
+ properties.setMessageId(messageId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected messageId", messageId, convertedProperties.getMessageId().toString());
+ }
+
+ public void testMessageIdLongStringConversion()
+ {
+ final String messageId = generateLongString();
+ Properties properties = new Properties();
+ properties.setMessageId(messageId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertNull("Unexpected messageId", convertedProperties.getMessageId());
+ }
+
+ public void testMessageIdUUIDConversion()
+ {
+ final UUID messageId = UUID.randomUUID();
+ Properties properties = new Properties();
+ properties.setMessageId(messageId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected messageId", messageId.toString(), convertedProperties.getMessageId().toString());
+ }
+
+ public void testMessageIdUnsignedLongConversion()
+ {
+ final UnsignedLong messageId = UnsignedLong.valueOf(-1);
+ Properties properties = new Properties();
+ properties.setMessageId(messageId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected messageId", messageId.toString(), convertedProperties.getMessageId().toString());
+ }
+
+ public void testMessageIdBinaryConversion()
+ {
+ final String messageId = "testMessageId";
+ Properties properties = new Properties();
+ properties.setMessageId(new Binary(messageId.getBytes()));
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected messageId", messageId, convertedProperties.getMessageId().toString());
+ }
+
+ public void testMessageIdByteArrayConversion()
+ {
+ final byte[] messageId = "testMessageId".getBytes();
+ Properties properties = new Properties();
+ properties.setMessageId(messageId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertTrue("Unexpected messageId", Arrays.equals(messageId, convertedProperties.getMessageId().getBytes()));
+ }
+
+ public void testMessageIdBinaryConversionWhenNonUtf8()
+ {
+ final byte[] messageId = new byte[]{(byte) 0xc3, 0x28};
+ Properties properties = new Properties();
+ properties.setMessageId(new Binary(messageId));
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertTrue("Unexpected messageId", Arrays.equals(messageId, convertedProperties.getMessageId().getBytes()));
+ }
+
+ public void testCreationTimeConversion()
+ {
+ final long timestamp = System.currentTimeMillis() - 10000;
+ final long arrivalTime = timestamp + 1;
+ Properties properties = new Properties();
+ properties.setCreationTime(new Date(timestamp));
+ Message_1_0 message = createTestMessage(new Header(),
+ new DeliveryAnnotations(Collections.emptyMap()),
+ new MessageAnnotations(Collections.emptyMap()),
+ properties,
+ new ApplicationProperties(Collections.emptyMap()),
+ arrivalTime
+ );
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected timestamp", timestamp, convertedProperties.getTimestamp());
+ }
+
+ public void testArrivalTimeConversion()
+ {
+ final long arrivalTime = System.currentTimeMillis() - 10000;
+ Message_1_0 message = createTestMessage(new Header(), arrivalTime);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected timestamp", arrivalTime, convertedProperties.getTimestamp());
+ }
+
+ public void testUserIdyConversion()
+ {
+ final String userId = "test-userId";
+ Properties properties = new Properties();
+ properties.setUserId(new Binary(userId.getBytes()));
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertEquals("Unexpected user-id", userId, convertedProperties.getUserIdAsString());
+ }
+
+ public void testUserIdyConversionWhenLengthExceeds255()
+ {
+ final String userId = generateLongString();
+ Properties properties = new Properties();
+ properties.setUserId(new Binary(userId.getBytes()));
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertNull("Unexpected user-id", convertedProperties.getUserId());
+ }
+
+ public void testUserIdyConversionWhenNonUtf8()
+ {
+ final byte[] userId = new byte[]{(byte) 0xc3, 0x28};
+ Properties properties = new Properties();
+ properties.setUserId(new Binary(userId));
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ assertTrue("Unexpected user-id", Arrays.equals(userId, convertedProperties.getUserId().getBytes()));
+ }
+
+ public void testGroupIdConversion()
+ {
+ String testGroupId = generateLongString();
+ Properties properties = new Properties();
+ properties.setGroupId(testGroupId);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ Map<String, Object> headers = FieldTable.convertToMap(convertedProperties.getHeaders());
+ assertEquals("Unexpected group-id", testGroupId, headers.get("JMSXGroupID"));
+ }
+
+ public void testGroupSequenceConversion()
+ {
+ int testGroupSequence = 1;
+ Properties properties = new Properties();
+ properties.setGroupSequence(UnsignedInteger.valueOf(testGroupSequence));
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ BasicContentHeaderProperties convertedProperties = convertedMessage.getContentHeaderBody().getProperties();
+ Map<String, Object> headers = FieldTable.convertToMap(convertedProperties.getHeaders());
+ assertEquals("Unexpected group-id", testGroupSequence, headers.get("JMSXGroupSeq"));
+ }
+
+
+ public void testApplicationPropertiesConversionWhenKeyLengthExceeds255()
+ {
+ Map<String, Object> properties = Collections.singletonMap("testProperty-" + generateLongString(), "testValue");
+ ApplicationProperties applicationProperties = new ApplicationProperties(properties);
+ Message_1_0 message = createTestMessage(applicationProperties);
+
+ try
+ {
+ _messageConverter.convert(message, _namedAddressSpace);
+ fail("Exception is expected");
+ }
+ catch (MessageConversionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testToConversionWhenExchangeAndRoutingKeyIsSpecified()
+ {
+ final String testExchange = "testExchange";
+ final String testRoutingKey = "testRoutingKey";
+
+ String to = testExchange + "/" + testRoutingKey;
+ Properties properties = new Properties();
+ properties.setTo(to);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo();
+
+ assertEquals("Unexpected exchange", testExchange, messagePublishInfo.getExchange().toString());
+ assertEquals("Unexpected routing key", testRoutingKey, messagePublishInfo.getRoutingKey().toString());
+ }
+
+ public void testToConversionWhenExchangeIsSpecified()
+ {
+ final String testExchange = "testExchange";
+ Properties properties = new Properties();
+ properties.setTo(testExchange);
+ Message_1_0 message = createTestMessage(properties);
+
+ when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(mock(Exchange.class));
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo();
+
+ assertEquals("Unexpected exchange", testExchange, messagePublishInfo.getExchange().toString());
+ assertEquals("Unexpected routing key", null, messagePublishInfo.getRoutingKey());
+ }
+
+ public void testToConversionWhenExchangeIsSpecifiedAndSubjectIsSet()
+ {
+ final String testExchange = "testExchange";
+ final String testRoutingKey = "testRoutingKey";
+ Properties properties = new Properties();
+ properties.setTo(testExchange);
+ properties.setSubject(testRoutingKey);
+ Message_1_0 message = createTestMessage(properties);
+
+ when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(mock(Exchange.class));
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo();
+
+ assertEquals("Unexpected exchange", testExchange, messagePublishInfo.getExchange().toString());
+ assertEquals("Unexpected routing key", testRoutingKey, messagePublishInfo.getRoutingKey().toString());
+ }
+
+ public void testToConversionWhenQueueIsSpecified()
+ {
+ final String testQueue = "testQueue";
+ Properties properties = new Properties();
+ properties.setTo(testQueue);
+ Message_1_0 message = createTestMessage(properties);
+
+ when(_namedAddressSpace.getAttainedMessageDestination(testQueue)).thenReturn(mock(Queue.class));
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo();
+
+ assertEquals("Unexpected exchange", "", messagePublishInfo.getExchange().toString());
+ assertEquals("Unexpected routing key", testQueue, messagePublishInfo.getRoutingKey().toString());
+ }
+
+ public void testToConversionWhenGlobalAddress()
+ {
+ final String globalAddress = "/testQueue";
+ Properties properties = new Properties();
+ properties.setTo(globalAddress);
+ Message_1_0 message = createTestMessage(properties);
+
+ try
+ {
+ _messageConverter.convert(message, _namedAddressSpace);
+ fail("Exception is not thrown");
+ }
+ catch (MessageConversionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testToConversionWhenExchangeLengthExceeds255()
+ {
+ final String testExchange = generateLongString();
+ final String testRoutingKey = "testRoutingKey";
+
+ String to = testExchange + "/" + testRoutingKey;
+ Properties properties = new Properties();
+ properties.setTo(to);
+ Message_1_0 message = createTestMessage(properties);
+
+ try
+ {
+ _messageConverter.convert(message, _namedAddressSpace);
+ fail("Exception is not thrown");
+ }
+ catch (MessageConversionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testToConversionWhenRoutingKeyLengthExceeds255()
+ {
+ final String testExchange = "testExchange";
+ final String testRoutingKey = generateLongString();
+
+ String to = testExchange + "/" + testRoutingKey;
+ Properties properties = new Properties();
+ properties.setTo(to);
+ Message_1_0 message = createTestMessage(properties);
+
+ try
+ {
+ _messageConverter.convert(message, _namedAddressSpace);
+ fail("Exception is not thrown");
+ }
+ catch (MessageConversionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testToConversionWhenDestinationIsSpecifiedButDoesNotExists()
+ {
+ final String testDestination = "testDestination";
+ Properties properties = new Properties();
+ properties.setTo(testDestination);
+ Message_1_0 message = createTestMessage(properties);
+
+ final AMQMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ final MessagePublishInfo messagePublishInfo = convertedMessage.getMessagePublishInfo();
+
+ assertEquals("Unexpected exchange", testDestination, messagePublishInfo.getExchange().toString());
+ assertEquals("Unexpected routing key", "", messagePublishInfo.getRoutingKey().toString());
+ }
+
+ private Message_1_0 createTestMessage(final Header header)
+ {
+ return createTestMessage(header, 0);
+ }
+
+ private Message_1_0 createTestMessage(final Header header, long arrivalTime)
+ {
+ return createTestMessage(header,
+ new DeliveryAnnotations(Collections.emptyMap()),
+ new MessageAnnotations(Collections.emptyMap()),
+ new Properties(),
+ new ApplicationProperties(Collections.emptyMap()),
+ arrivalTime
+ );
+ }
+
+ private Message_1_0 createTestMessage(final Properties properties)
+ {
+ return createTestMessage(new Header(),
+ new DeliveryAnnotations(Collections.emptyMap()),
+ new MessageAnnotations(Collections.emptyMap()),
+ properties,
+ new ApplicationProperties(Collections.emptyMap()),
+ 0
+ );
+ }
+
+ private Message_1_0 createTestMessage(final ApplicationProperties applicationProperties)
+ {
+ return createTestMessage(new Header(),
+ new DeliveryAnnotations(Collections.emptyMap()),
+ new MessageAnnotations(Collections.emptyMap()),
+ new Properties(),
+ applicationProperties,
+ 0
+ );
+ }
+
+ private Message_1_0 createTestMessage(final Header header,
+ final DeliveryAnnotations deliveryAnnotations,
+ final MessageAnnotations messageAnnotations,
+ final Properties properties,
+ final ApplicationProperties applicationProperties,
+ final long arrivalTime)
+ {
+ final StoredMessage<MessageMetaData_1_0> storedMessage = mock(StoredMessage.class);
+ MessageMetaData_1_0 metaData = new MessageMetaData_1_0(header.createEncodingRetainingSection(),
+ deliveryAnnotations.createEncodingRetainingSection(),
+ messageAnnotations.createEncodingRetainingSection(),
+ properties.createEncodingRetainingSection(),
+ applicationProperties.createEncodingRetainingSection(),
+ new Footer(Collections.emptyMap()).createEncodingRetainingSection(),
+ arrivalTime,
+ 0);
+ when(storedMessage.getMetaData()).thenReturn(metaData);
+ return new Message_1_0(storedMessage);
+ }
+
+ private String generateLongString()
+ {
+ StringBuilder buffer = new StringBuilder();
+ for (int i = 0; i < AMQShortString.MAX_LENGTH + 1; i++)
+ {
+ buffer.append('x');
+ }
+
+ return buffer.toString();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org