You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/08/02 13:14:08 UTC
qpid-broker-j git commit: QPID-7434: Add unit tests for property
conversion of AMQP 0-10 messages to internal messages
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 8ab67e0c1 -> fdff8c5f2
QPID-7434: Add unit tests for property conversion of AMQP 0-10 messages to internal messages
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/fdff8c5f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/fdff8c5f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/fdff8c5f
Branch: refs/heads/master
Commit: fdff8c5f2dd2285eb1e73f53c3bd4b6886d6aa89
Parents: 8ab67e0
Author: Lorenz Quack <lq...@apache.org>
Authored: Wed Aug 2 14:11:16 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Wed Aug 2 14:11:16 2017 +0100
----------------------------------------------------------------------
.../message/internal/InternalMessage.java | 13 +-
.../message/internal/InternalMessageHeader.java | 57 +--
.../security/TrustStoreMessageSource.java | 2 +-
.../virtualhost/VirtualHostPropertiesNode.java | 2 +-
.../MessageConverter_v0_10_to_Internal.java | 2 +-
.../protocol/v0_10/MessageMetaData_0_10.java | 10 +-
.../protocol/v0_10/MessageTransferHeader.java | 26 +-
.../v0_10/transport/DeliveryProperties.java | 67 ----
.../PropertyConverter_0_10_to_InternalTest.java | 388 +++++++++++++++++++
.../v0_8/MessageConverter_v0_8_to_Internal.java | 5 +-
.../v1_0/MessageConverter_v1_0_to_Internal.java | 2 +-
.../qpid/systest/rest/MessagesRestTest.java | 10 +-
12 files changed, 469 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index a65b58c..21c55bf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -33,9 +33,9 @@ import java.util.List;
import java.util.Map;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
@@ -209,13 +209,14 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
return createMessage(store, header, messageBody, persist, null);
}
- public static InternalMessage convert(long messageNumber,
- boolean persistent,
+ public static InternalMessage convert(final ServerMessage serverMessage,
AMQMessageHeader header,
- Object messageBody,
- final String destinationName)
+ Object messageBody)
{
- InternalMessageHeader convertedHeader = new InternalMessageHeader(header);
+ long messageNumber = serverMessage.getMessageNumber();
+ boolean persistent = serverMessage.isPersistent();
+ String destinationName = serverMessage.getTo();
+ InternalMessageHeader convertedHeader = new InternalMessageHeader(header, serverMessage.getArrivalTime());
StoredMessage<InternalMessageMetaData> handle = createReadOnlyHandle(messageNumber, persistent, convertedHeader, messageBody);
return new InternalMessage(handle, convertedHeader, messageBody, destinationName);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
index 734a71b..374e3df 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
@@ -20,14 +20,16 @@
*/
package org.apache.qpid.server.message.internal;
-import org.apache.qpid.server.message.AMQMessageHeader;
-
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
public final class InternalMessageHeader implements AMQMessageHeader, Serializable
{
@@ -60,10 +62,10 @@ public final class InternalMessageHeader implements AMQMessageHeader, Serializab
final long timestamp,
final long notValidBefore,
final String type,
- final String replyTo)
+ final String replyTo,
+ final long arrivalTime)
{
- _headers = headers == null ? new LinkedHashMap<String, Object>()
- : new LinkedHashMap<String, Object>(headers);
+ _headers = headers == null ? new LinkedHashMap<>() : new LinkedHashMap<>(headers);
_correlationId = correlationId;
_expiration = expiration;
@@ -73,33 +75,36 @@ public final class InternalMessageHeader implements AMQMessageHeader, Serializab
_mimeType = mimeType;
_encoding = encoding;
_priority = priority;
- _timestamp = timestamp;
+ _timestamp = timestamp > 0 ? timestamp : arrivalTime;
_notValidBefore = notValidBefore;
_type = type;
_replyTo = replyTo;
- _arrivalTime = System.currentTimeMillis();
+ _arrivalTime = arrivalTime;
}
public InternalMessageHeader(final AMQMessageHeader header)
{
- _correlationId = header.getCorrelationId();
- _expiration = header.getExpiration();
- _userId = header.getUserId();
- _appId = header.getAppId();
- _messageId = header.getMessageId();
- _mimeType = header.getMimeType();
- _encoding = header.getEncoding();
- _priority = header.getPriority();
- _timestamp = header.getTimestamp();
- _notValidBefore = header.getNotValidBefore();
- _type = header.getType();
- _replyTo = header.getReplyTo();
- _headers = new LinkedHashMap<String, Object>();
- for(String headerName : header.getHeaderNames())
- {
- _headers.put(headerName, header.getHeader(headerName));
- }
- _arrivalTime = System.currentTimeMillis();
+ this(header, System.currentTimeMillis());
+ }
+
+ public InternalMessageHeader(final AMQMessageHeader header, long arrivalTime)
+ {
+ this(header.getHeaderNames()
+ .stream()
+ .collect(Collectors.toMap(Function.identity(), header::getHeader)),
+ header.getCorrelationId(),
+ header.getExpiration(),
+ header.getUserId(),
+ header.getAppId(),
+ header.getMessageId(),
+ header.getMimeType(),
+ header.getEncoding(),
+ header.getPriority(),
+ header.getTimestamp(),
+ header.getNotValidBefore(),
+ header.getType(),
+ header.getReplyTo(),
+ arrivalTime);
}
@Override
@@ -205,6 +210,6 @@ public final class InternalMessageHeader implements AMQMessageHeader, Serializab
public Map<String,Object> getHeaderMap()
{
- return Collections.unmodifiableMap(new LinkedHashMap<String, Object>(_headers));
+ return Collections.unmodifiableMap(new LinkedHashMap<>(_headers));
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
index 7af55ac..6e7cb94 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
@@ -142,7 +142,7 @@ public class TrustStoreMessageSource extends AbstractSystemMessageSource impleme
InternalMessageHeader header = new InternalMessageHeader(Collections.<String,Object>emptyMap(),
null, 0l, null, null, UUID.randomUUID().toString(),
null, null, (byte)4, System.currentTimeMillis(),
- 0L, null, null);
+ 0L, null, null, System.currentTimeMillis());
return InternalMessage.createListMessage(_virtualHost.getMessageStore(), header, messageList);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
index b40c5af..190f020 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
@@ -84,7 +84,7 @@ public class VirtualHostPropertiesNode extends AbstractSystemMessageSource
InternalMessageHeader header = new InternalMessageHeader(headers,
null, 0l, null, null, UUID.randomUUID().toString(),
null, null, (byte) 4, System.currentTimeMillis(),
- 0L, null, null);
+ 0L, null, null, System.currentTimeMillis());
final InternalMessage message =
InternalMessage.createBytesMessage(_addressSpace.getMessageStore(), header, new byte[0]);
message.setInitialRoutingAddress(getName());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index 8d0d9e6..e507ce2 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -77,7 +77,7 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
Object body = convertMessageBody(mimeType, data);
MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo(), encoding);
- return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), fixedHeader, body, serverMessage.getTo());
+ return InternalMessage.convert(serverMessage, fixedHeader, body);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index e1eec89..2861cc2 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -41,7 +41,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
private DeliveryProperties _deliveryProps;
private MessageProperties _messageProps;
private MessageTransferHeader _messageHeader;
- private long _arrivalTime;
private int _bodySize;
private static final int ENCODER_SIZE = 1 << 10;
@@ -70,8 +69,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
_deliveryProps = null;
_messageProps = null;
}
- _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps);
- _arrivalTime = arrivalTime;
+ _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps, arrivalTime);
_bodySize = bodySize;
}
@@ -119,7 +117,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
{
ServerEncoder encoder = new ServerEncoder(ENCODER_SIZE, false);
- encoder.writeInt64(_arrivalTime);
+ encoder.writeInt64(_messageHeader.getArrivalTime());
encoder.writeInt32(_bodySize);
int headersLength = 0;
if (_header != null)
@@ -239,12 +237,12 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
public long getExpiration()
{
- return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
+ return _messageHeader.getExpiration();
}
public long getArrivalTime()
{
- return _arrivalTime;
+ return _messageHeader.getArrivalTime();
}
public Header getHeader()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
index 303950c..5855290 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
@@ -34,12 +34,16 @@ class MessageTransferHeader implements AMQMessageHeader
private final DeliveryProperties _deliveryProps;
private final MessageProperties _messageProps;
+ private final long _arrivalTime;
private long _notValidBefore;
- public MessageTransferHeader(DeliveryProperties deliveryProps, MessageProperties messageProps)
+ public MessageTransferHeader(DeliveryProperties deliveryProps,
+ MessageProperties messageProps,
+ final long arrivalTime)
{
_deliveryProps = deliveryProps;
_messageProps = messageProps;
+ _arrivalTime = arrivalTime;
}
@Override
@@ -58,7 +62,20 @@ class MessageTransferHeader implements AMQMessageHeader
@Override
public long getExpiration()
{
- return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
+ long expiration = 0L;
+ if (_deliveryProps != null)
+ {
+ // The AMQP 0-x client wrongly sets ttl to 0 when it means "no ttl".
+ if (_deliveryProps.hasTtl() && _deliveryProps.getTtl() != 0L)
+ {
+ expiration = _arrivalTime + _deliveryProps.getTtl();
+ }
+ else if (_deliveryProps.hasExpiration())
+ {
+ expiration = _deliveryProps.getExpiration();
+ }
+ }
+ return expiration;
}
@Override
@@ -191,4 +208,9 @@ class MessageTransferHeader implements AMQMessageHeader
Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders();
return appHeaders != null && appHeaders.containsKey(name);
}
+
+ long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/DeliveryProperties.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/DeliveryProperties.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/DeliveryProperties.java
index 166d58d..18e9ee6 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/DeliveryProperties.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/DeliveryProperties.java
@@ -22,9 +22,7 @@
package org.apache.qpid.server.protocol.v0_10.transport;
-import java.util.ArrayList;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
@@ -74,71 +72,6 @@ public final class DeliveryProperties extends Struct {
public DeliveryProperties() {}
-
- public DeliveryProperties(final DeliveryProperties deliveryProp)
- {
- this(deliveryProp.getPriority(), deliveryProp.getDeliveryMode(),
- deliveryProp.getTtl(), deliveryProp.getTimestamp(),
- deliveryProp.getExpiration(), deliveryProp.getExchange(),
- deliveryProp.getRoutingKey(), deliveryProp.getResumeId(),
- deliveryProp.getResumeTtl(), getOptions(deliveryProp));
- }
-
- private static Option[] getOptions(final DeliveryProperties deliveryProp)
- {
- List<Option> optionList = new ArrayList<>();
- if(deliveryProp.getDiscardUnroutable())
- {
- optionList.add(Option.DISCARD_UNROUTABLE);
- }
- if(deliveryProp.getImmediate())
- {
- optionList.add(Option.DISCARD_UNROUTABLE);
- }
- if(deliveryProp.getRedelivered())
- {
- optionList.add(Option.REDELIVERED);
- }
- return optionList.toArray(new Option[optionList.size()]);
- }
-
-
- public DeliveryProperties(MessageDeliveryPriority priority, MessageDeliveryMode deliveryMode, long ttl, long timestamp, long expiration, String exchange, String routingKey, String resumeId, long resumeTtl, Option ... _options) {
- if(priority != null) {
- setPriority(priority);
- }
- if(deliveryMode != null) {
- setDeliveryMode(deliveryMode);
- }
- setTtl(ttl);
- setTimestamp(timestamp);
- setExpiration(expiration);
- if(exchange != null) {
- setExchange(exchange);
- }
- if(routingKey != null) {
- setRoutingKey(routingKey);
- }
- if(resumeId != null) {
- setResumeId(resumeId);
- }
- setResumeTtl(resumeTtl);
-
- for (int i=0; i < _options.length; i++) {
- switch (_options[i]) {
- case DISCARD_UNROUTABLE: packing_flags |= 256; break;
- case IMMEDIATE: packing_flags |= 512; break;
- case REDELIVERED: packing_flags |= 1024; break;
- case NONE: break;
- default: throw new IllegalArgumentException("invalid option: " + _options[i]);
- }
- }
-
- }
-
-
-
-
public final boolean hasDiscardUnroutable() {
return (packing_flags & 256) != 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java
new file mode 100644
index 0000000..91d6e38
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class PropertyConverter_0_10_to_InternalTest extends QpidTestCase
+{
+ private NamedAddressSpace _namedAddressSpace;
+ private MessageConverter_v0_10_to_Internal _messageConverter;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _namedAddressSpace = mock(NamedAddressSpace.class);
+ _messageConverter = new MessageConverter_v0_10_to_Internal();
+ }
+
+ public void testContentEncodingConversion()
+ {
+ String contentEncoding = "my-test-encoding";
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setContentEncoding(contentEncoding);
+ MessageTransferMessage message =
+ createTestMessage(new DeliveryProperties(), messageProperties, new byte[]{(byte) 1}, 0);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected content encoding", contentEncoding, convertedMessage.getMessageHeader().getEncoding());
+ }
+
+ public void testApplicationHeadersConversion()
+ {
+ Map<String, Object> headers = new HashMap<>();
+ headers.put("testProperty1", "testProperty1Value");
+ headers.put("intProperty", 1);
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setApplicationHeaders(headers);
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ Map<String, Object> header = convertedMessage.getMessageHeader().getHeaderMap();
+ assertEquals("Unexpected headers", headers, new HashMap<>(header));
+ }
+
+ public void testPersistentDeliveryModeConversion()
+ {
+ MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
+ final DeliveryProperties deliveryProperties = new DeliveryProperties();
+ deliveryProperties.setDeliveryMode(deliveryMode);
+ MessageTransferMessage message = createTestMessage(deliveryProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertTrue("Unexpected persistence", convertedMessage.isPersistent());
+ assertTrue("Unexpected persistence of meta data",
+ convertedMessage.getStoredMessage().getMetaData().isPersistent());
+ }
+
+ public void testNonPersistentDeliveryModeConversion()
+ {
+ MessageDeliveryMode deliveryMode = MessageDeliveryMode.NON_PERSISTENT;
+ final DeliveryProperties deliveryProperties = new DeliveryProperties();
+ deliveryProperties.setDeliveryMode(deliveryMode);
+ MessageTransferMessage message = createTestMessage(deliveryProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertFalse("Unexpected persistence", convertedMessage.isPersistent());
+ assertFalse("Unexpected persistence of meta data",
+ convertedMessage.getStoredMessage().getMetaData().isPersistent());
+ }
+
+ public void testPriorityConversion()
+ {
+ final byte priority = 7;
+ final DeliveryProperties deliveryProperties = new DeliveryProperties();
+ deliveryProperties.setPriority(MessageDeliveryPriority.get(priority));
+ MessageTransferMessage message = createTestMessage(deliveryProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected priority", priority, convertedMessage.getMessageHeader().getPriority());
+ }
+
+ public void testCorrelationIdConversion()
+ {
+ final byte[] correlationId = "testCorrelationId".getBytes();
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setCorrelationId(correlationId);
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertTrue("Unexpected correlationId",
+ Arrays.equals(correlationId,
+ convertedMessage.getMessageHeader().getCorrelationId().getBytes(UTF_8)));
+ }
+
+ public void testCorrelationIdConversionWhenNotString()
+ {
+ final byte[] correlationId = new byte[]{(byte) 0xc3, 0x28};
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setCorrelationId(correlationId);
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected correlationId",
+ new String(correlationId, UTF_8),
+ convertedMessage.getMessageHeader().getCorrelationId());
+ }
+
+ public void testReplyToConversionWhenExchangeAndRoutingKeySpecified()
+ {
+ final String exchangeName = "amq.direct";
+ final String routingKey = "test_routing_key";
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setReplyTo(new ReplyTo(exchangeName, routingKey));
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ String expectedReplyTo = String.format("%s/%s", exchangeName, routingKey);
+ assertEquals("Unexpected reply-to", expectedReplyTo, convertedMessage.getMessageHeader().getReplyTo());
+ }
+
+ public void testReplyToConversionWhenExchangeSpecified()
+ {
+ final String exchangeName = "amq.direct";
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setReplyTo(new ReplyTo(exchangeName, null));
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected reply-to", exchangeName, convertedMessage.getMessageHeader().getReplyTo());
+ }
+
+ public void testReplyToConversionWhenRoutingKeySpecified()
+ {
+ final String routingKey = "test_routing_key";
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setReplyTo(new ReplyTo(null, routingKey));
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected reply-to", routingKey, convertedMessage.getMessageHeader().getReplyTo());
+ }
+
+ public void testReplyToConversionWhenExchangeIsEmptyStringAndRoutingKeySpecified()
+ {
+ final String routingKey = "test_routing_key";
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setReplyTo(new ReplyTo("", routingKey));
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected reply-to", routingKey, convertedMessage.getMessageHeader().getReplyTo());
+ }
+
+ public void testReplyToConversionWhenExchangeAndRoutingKeyAreNull()
+ {
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setReplyTo(new ReplyTo(null, null));
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertNull("Unexpected reply-to", convertedMessage.getMessageHeader().getReplyTo());
+ }
+
+ public void testExpirationConversion()
+ {
+ long timestamp = System.currentTimeMillis();
+ int ttl = 100000;
+ final long expiration = timestamp + ttl;
+
+ final DeliveryProperties deliveryProperties = new DeliveryProperties();
+ deliveryProperties.setExpiration(expiration);
+ MessageTransferMessage message =
+ createTestMessage(deliveryProperties, new MessageProperties(), null, timestamp);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected expiration", expiration, convertedMessage.getMessageHeader().getExpiration());
+ }
+
+ public void testTtlConversion()
+ {
+ long timestamp = System.currentTimeMillis();
+ int ttl = 100000;
+
+ final DeliveryProperties deliveryProperties = new DeliveryProperties();
+ deliveryProperties.setTtl(ttl);
+ MessageTransferMessage message =
+ createTestMessage(deliveryProperties, new MessageProperties(), null, timestamp);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected expiration", timestamp + ttl, convertedMessage.getMessageHeader().getExpiration());
+ }
+
+ public void testTtlTakesPrecedenceOverExpiration()
+ {
+ long timestamp = System.currentTimeMillis();
+ int ttl = 100000;
+
+ final DeliveryProperties deliveryProperties = new DeliveryProperties();
+ deliveryProperties.setTtl(ttl);
+ deliveryProperties.setExpiration(timestamp + ttl + 10000);
+ MessageTransferMessage message =
+ createTestMessage(deliveryProperties, new MessageProperties(), null, timestamp);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected expiration", timestamp + ttl, convertedMessage.getMessageHeader().getExpiration());
+ }
+
+ public void testMessageIdConversion()
+ {
+ UUID messageId = UUID.randomUUID();
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setMessageId(messageId);
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected messageId", "ID:" + messageId, convertedMessage.getMessageHeader().getMessageId());
+ }
+
+ public void testTimestampConversion()
+ {
+ final DeliveryProperties deliveryProperties = new DeliveryProperties();
+ final long timestamp = System.currentTimeMillis() - 1000;
+ deliveryProperties.setTimestamp(timestamp);
+ MessageTransferMessage message = createTestMessage(deliveryProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected creation timestamp", timestamp, convertedMessage.getMessageHeader().getTimestamp());
+ }
+
+ public void testArrivalTimeConversion()
+ {
+ final long timestamp = System.currentTimeMillis() - 1000;
+ MessageTransferMessage message =
+ createTestMessage(new DeliveryProperties(), new MessageProperties(), null, timestamp);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected creation timestamp", timestamp, convertedMessage.getMessageHeader().getTimestamp());
+ }
+
+ public void testJmsTypeConversion()
+ {
+ final String type = "test-type";
+ final Map<String, Object> headers = Collections.singletonMap("x-jms-type", type);
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setApplicationHeaders(headers);
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected subject", type, convertedMessage.getMessageHeader().getType());
+ }
+
+ public void testUserIdConversion()
+ {
+ final String userId = "test-userId";
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setUserId(userId.getBytes());
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected user-id", userId, convertedMessage.getMessageHeader().getUserId());
+ }
+
+ public void testUserIdConversionWhenNotUtf8()
+ {
+ final byte[] userId = new byte[]{(byte) 0xc3, 0x28};
+ final MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setUserId(userId);
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected user-id", new String(userId, UTF_8), convertedMessage.getMessageHeader().getUserId());
+ }
+
+ public void testExchangeConversion()
+ {
+ final String testExchange = "testExchange";
+ final DeliveryProperties deliveryProperties = new DeliveryProperties();
+ deliveryProperties.setExchange(testExchange);
+ MessageTransferMessage message = createTestMessage(deliveryProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected exchange", testExchange, convertedMessage.getTo());
+ }
+
+ public void testApplicationIdConversion()
+ {
+ String applicationId = "testAppId";
+ MessageProperties messageProperties = new MessageProperties();
+ messageProperties.setAppId(applicationId.getBytes(UTF_8));
+ MessageTransferMessage message = createTestMessage(messageProperties);
+
+ final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+ assertEquals("Unexpected app-id", applicationId, convertedMessage.getMessageHeader().getAppId());
+ }
+
+ private MessageTransferMessage createTestMessage(final DeliveryProperties deliveryProperties)
+ {
+ return createTestMessage(deliveryProperties, new MessageProperties(), null, 0);
+ }
+
+ private MessageTransferMessage createTestMessage(final MessageProperties messageProperties)
+ {
+ return createTestMessage(new DeliveryProperties(), messageProperties, null, 0);
+ }
+
+ private MessageTransferMessage createTestMessage(final DeliveryProperties deliveryProperties,
+ final MessageProperties messageProperties,
+ final byte[] content,
+ final long arrivalTime)
+ {
+ int bodySize = content == null ? 0 : content.length;
+ final org.apache.qpid.server.protocol.v0_10.transport.Header header =
+ new org.apache.qpid.server.protocol.v0_10.transport.Header(deliveryProperties, messageProperties);
+ final MessageMetaData_0_10 metaData = new MessageMetaData_0_10(header, bodySize, arrivalTime);
+
+ final StoredMessage<MessageMetaData_0_10> storedMessage = mock(StoredMessage.class);
+ when(storedMessage.getMetaData()).thenReturn(metaData);
+
+ if (content != null)
+ {
+ when(storedMessage.getContentSize()).thenReturn(content.length);
+ when(storedMessage.getContent(0, content.length)).thenReturn(Collections.singleton(QpidByteBuffer.wrap(
+ content)));
+ }
+ return new MessageTransferMessage(storedMessage, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index 19a937d..fb43b6a 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -76,8 +76,9 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
Object body = convertMessageBody(mimeType, data);
- return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(),
- new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding), body, serverMessage.getTo());
+ return InternalMessage.convert(serverMessage,
+ new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding),
+ body);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
index 6cecd57..35fc420 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
@@ -56,7 +56,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
{
Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMessage);
- return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject, serverMessage.getTo());
+ return InternalMessage.convert(serverMessage, serverMessage.getMessageHeader(), bodyObject);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/systests/src/test/java/org/apache/qpid/systest/rest/MessagesRestTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/rest/MessagesRestTest.java b/systests/src/test/java/org/apache/qpid/systest/rest/MessagesRestTest.java
index 9305ea8..1db8e30 100644
--- a/systests/src/test/java/org/apache/qpid/systest/rest/MessagesRestTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/rest/MessagesRestTest.java
@@ -594,8 +594,14 @@ public class MessagesRestTest extends QpidRestTestCase
}
else
{
- assertEquals("Unexpected message attribute expirationTime", ((Number) message.get("timestamp")).longValue()
- + _ttl, message.get("expirationTime"));
+ final long expirationTime = ((Number) message.get("expirationTime")).longValue();
+ final long currentTime = System.currentTimeMillis();
+ assertTrue(String.format("Unexpected message attribute expirationTime. got %d which is not >= %d",
+ expirationTime,
+ _startTime + _ttl), expirationTime >= _startTime + _ttl);
+ assertTrue(String.format("Unexpected message attribute expirationTime. got %d which is not <= %d",
+ expirationTime,
+ currentTime + _ttl), expirationTime <= currentTime + _ttl);
assertEquals("Unexpected message attribute priority", 5, message.get("priority"));
assertEquals("Unexpected message attribute persistent", Boolean.FALSE, message.get("persistent"));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org