You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/08/02 13:14:08 UTC

qpid-broker-j git commit: QPID-7434: Add unit tests for property conversion of AMQP 0-10 messages to internal messages

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 8ab67e0c1 -> fdff8c5f2


QPID-7434: Add unit tests for property conversion of AMQP 0-10 messages to internal messages


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/fdff8c5f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/fdff8c5f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/fdff8c5f

Branch: refs/heads/master
Commit: fdff8c5f2dd2285eb1e73f53c3bd4b6886d6aa89
Parents: 8ab67e0
Author: Lorenz Quack <lq...@apache.org>
Authored: Wed Aug 2 14:11:16 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Wed Aug 2 14:11:16 2017 +0100

----------------------------------------------------------------------
 .../message/internal/InternalMessage.java       |  13 +-
 .../message/internal/InternalMessageHeader.java |  57 +--
 .../security/TrustStoreMessageSource.java       |   2 +-
 .../virtualhost/VirtualHostPropertiesNode.java  |   2 +-
 .../MessageConverter_v0_10_to_Internal.java     |   2 +-
 .../protocol/v0_10/MessageMetaData_0_10.java    |  10 +-
 .../protocol/v0_10/MessageTransferHeader.java   |  26 +-
 .../v0_10/transport/DeliveryProperties.java     |  67 ----
 .../PropertyConverter_0_10_to_InternalTest.java | 388 +++++++++++++++++++
 .../v0_8/MessageConverter_v0_8_to_Internal.java |   5 +-
 .../v1_0/MessageConverter_v1_0_to_Internal.java |   2 +-
 .../qpid/systest/rest/MessagesRestTest.java     |  10 +-
 12 files changed, 469 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index a65b58c..21c55bf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -33,9 +33,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
@@ -209,13 +209,14 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
         return createMessage(store, header, messageBody, persist, null);
     }
 
-    public static InternalMessage convert(long messageNumber,
-                                          boolean persistent,
+    public static InternalMessage convert(final ServerMessage serverMessage,
                                           AMQMessageHeader header,
-                                          Object messageBody,
-                                          final String destinationName)
+                                          Object messageBody)
     {
-        InternalMessageHeader convertedHeader = new InternalMessageHeader(header);
+        long messageNumber = serverMessage.getMessageNumber();
+        boolean persistent = serverMessage.isPersistent();
+        String destinationName = serverMessage.getTo();
+        InternalMessageHeader convertedHeader = new InternalMessageHeader(header, serverMessage.getArrivalTime());
         StoredMessage<InternalMessageMetaData> handle = createReadOnlyHandle(messageNumber, persistent, convertedHeader, messageBody);
         return new InternalMessage(handle, convertedHeader, messageBody, destinationName);
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
index 734a71b..374e3df 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
@@ -20,14 +20,16 @@
  */
 package org.apache.qpid.server.message.internal;
 
-import org.apache.qpid.server.message.AMQMessageHeader;
-
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
 
 public final class InternalMessageHeader implements AMQMessageHeader, Serializable
 {
@@ -60,10 +62,10 @@ public final class InternalMessageHeader implements AMQMessageHeader, Serializab
                                  final long timestamp,
                                  final long notValidBefore,
                                  final String type,
-                                 final String replyTo)
+                                 final String replyTo,
+                                 final long arrivalTime)
     {
-        _headers = headers == null ? new LinkedHashMap<String, Object>()
-                : new LinkedHashMap<String, Object>(headers);
+        _headers = headers == null ? new LinkedHashMap<>() : new LinkedHashMap<>(headers);
 
         _correlationId = correlationId;
         _expiration = expiration;
@@ -73,33 +75,36 @@ public final class InternalMessageHeader implements AMQMessageHeader, Serializab
         _mimeType = mimeType;
         _encoding = encoding;
         _priority = priority;
-        _timestamp = timestamp;
+        _timestamp = timestamp > 0 ? timestamp : arrivalTime;
         _notValidBefore = notValidBefore;
         _type = type;
         _replyTo = replyTo;
-        _arrivalTime = System.currentTimeMillis();
+        _arrivalTime = arrivalTime;
     }
 
     public InternalMessageHeader(final AMQMessageHeader header)
     {
-        _correlationId = header.getCorrelationId();
-        _expiration = header.getExpiration();
-        _userId = header.getUserId();
-        _appId = header.getAppId();
-        _messageId = header.getMessageId();
-        _mimeType = header.getMimeType();
-        _encoding = header.getEncoding();
-        _priority = header.getPriority();
-        _timestamp = header.getTimestamp();
-        _notValidBefore = header.getNotValidBefore();
-        _type = header.getType();
-        _replyTo = header.getReplyTo();
-        _headers = new LinkedHashMap<String, Object>();
-        for(String headerName : header.getHeaderNames())
-        {
-            _headers.put(headerName, header.getHeader(headerName));
-        }
-        _arrivalTime = System.currentTimeMillis();
+        this(header, System.currentTimeMillis());
+    }
+
+    public InternalMessageHeader(final AMQMessageHeader header, long arrivalTime)
+    {
+        this(header.getHeaderNames()
+                   .stream()
+                   .collect(Collectors.toMap(Function.identity(), header::getHeader)),
+             header.getCorrelationId(),
+             header.getExpiration(),
+             header.getUserId(),
+             header.getAppId(),
+             header.getMessageId(),
+             header.getMimeType(),
+             header.getEncoding(),
+             header.getPriority(),
+             header.getTimestamp(),
+             header.getNotValidBefore(),
+             header.getType(),
+             header.getReplyTo(),
+             arrivalTime);
     }
 
     @Override
@@ -205,6 +210,6 @@ public final class InternalMessageHeader implements AMQMessageHeader, Serializab
 
     public Map<String,Object> getHeaderMap()
     {
-        return Collections.unmodifiableMap(new LinkedHashMap<String, Object>(_headers));
+        return Collections.unmodifiableMap(new LinkedHashMap<>(_headers));
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
index 7af55ac..6e7cb94 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
@@ -142,7 +142,7 @@ public class TrustStoreMessageSource extends AbstractSystemMessageSource impleme
         InternalMessageHeader header = new InternalMessageHeader(Collections.<String,Object>emptyMap(),
                                                                  null, 0l, null, null, UUID.randomUUID().toString(),
                                                                  null, null, (byte)4, System.currentTimeMillis(),
-                                                                 0L, null, null);
+                                                                 0L, null, null, System.currentTimeMillis());
         return InternalMessage.createListMessage(_virtualHost.getMessageStore(), header, messageList);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
index b40c5af..190f020 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
@@ -84,7 +84,7 @@ public class VirtualHostPropertiesNode extends AbstractSystemMessageSource
         InternalMessageHeader header = new InternalMessageHeader(headers,
                                                                  null, 0l, null, null, UUID.randomUUID().toString(),
                                                                  null, null, (byte) 4, System.currentTimeMillis(),
-                                                                 0L, null, null);
+                                                                 0L, null, null, System.currentTimeMillis());
         final InternalMessage message =
                 InternalMessage.createBytesMessage(_addressSpace.getMessageStore(), header, new byte[0]);
         message.setInitialRoutingAddress(getName());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index 8d0d9e6..e507ce2 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -77,7 +77,7 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
         Object body = convertMessageBody(mimeType, data);
         MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
         AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo(), encoding);
-        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), fixedHeader, body, serverMessage.getTo());
+        return InternalMessage.convert(serverMessage, fixedHeader, body);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index e1eec89..2861cc2 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -41,7 +41,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
     private DeliveryProperties _deliveryProps;
     private MessageProperties _messageProps;
     private MessageTransferHeader _messageHeader;
-    private long _arrivalTime;
     private int _bodySize;
 
     private static final int ENCODER_SIZE = 1 << 10;
@@ -70,8 +69,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
             _deliveryProps = null;
             _messageProps = null;
         }
-        _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps);
-        _arrivalTime = arrivalTime;
+        _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps, arrivalTime);
         _bodySize = bodySize;
 
     }
@@ -119,7 +117,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
     {
         ServerEncoder encoder = new ServerEncoder(ENCODER_SIZE, false);
 
-        encoder.writeInt64(_arrivalTime);
+        encoder.writeInt64(_messageHeader.getArrivalTime());
         encoder.writeInt32(_bodySize);
         int headersLength = 0;
         if (_header != null)
@@ -239,12 +237,12 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
 
     public long getExpiration()
     {
-        return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
+        return _messageHeader.getExpiration();
     }
 
     public long getArrivalTime()
     {
-        return _arrivalTime;
+        return _messageHeader.getArrivalTime();
     }
 
     public Header getHeader()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
index 303950c..5855290 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
@@ -34,12 +34,16 @@ class MessageTransferHeader implements AMQMessageHeader
 
     private final DeliveryProperties _deliveryProps;
     private final MessageProperties _messageProps;
+    private final long _arrivalTime;
     private long _notValidBefore;
 
-    public MessageTransferHeader(DeliveryProperties deliveryProps, MessageProperties messageProps)
+    public MessageTransferHeader(DeliveryProperties deliveryProps,
+                                 MessageProperties messageProps,
+                                 final long arrivalTime)
     {
         _deliveryProps = deliveryProps;
         _messageProps = messageProps;
+        _arrivalTime = arrivalTime;
     }
 
     @Override
@@ -58,7 +62,20 @@ class MessageTransferHeader implements AMQMessageHeader
     @Override
     public long getExpiration()
     {
-        return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
+        long expiration = 0L;
+        if (_deliveryProps != null)
+        {
+            // The AMQP 0-x client wrongly sets ttl to 0 when it means "no ttl".
+            if (_deliveryProps.hasTtl() && _deliveryProps.getTtl() != 0L)
+            {
+                expiration = _arrivalTime + _deliveryProps.getTtl();
+            }
+            else if (_deliveryProps.hasExpiration())
+            {
+                expiration = _deliveryProps.getExpiration();
+            }
+        }
+        return expiration;
     }
 
     @Override
@@ -191,4 +208,9 @@ class MessageTransferHeader implements AMQMessageHeader
         Map<String, Object> appHeaders = _messageProps == null ? null : _messageProps.getApplicationHeaders();
         return appHeaders != null && appHeaders.containsKey(name);
     }
+
+    long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/DeliveryProperties.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/DeliveryProperties.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/DeliveryProperties.java
index 166d58d..18e9ee6 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/DeliveryProperties.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/DeliveryProperties.java
@@ -22,9 +22,7 @@
 package org.apache.qpid.server.protocol.v0_10.transport;
 
 
-import java.util.ArrayList;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 
 
@@ -74,71 +72,6 @@ public final class DeliveryProperties extends Struct {
 
     public DeliveryProperties() {}
 
-
-    public DeliveryProperties(final DeliveryProperties deliveryProp)
-    {
-        this(deliveryProp.getPriority(), deliveryProp.getDeliveryMode(),
-             deliveryProp.getTtl(), deliveryProp.getTimestamp(),
-             deliveryProp.getExpiration(), deliveryProp.getExchange(),
-             deliveryProp.getRoutingKey(), deliveryProp.getResumeId(),
-             deliveryProp.getResumeTtl(), getOptions(deliveryProp));
-    }
-
-    private static Option[] getOptions(final DeliveryProperties deliveryProp)
-    {
-        List<Option> optionList = new ArrayList<>();
-        if(deliveryProp.getDiscardUnroutable())
-        {
-            optionList.add(Option.DISCARD_UNROUTABLE);
-        }
-        if(deliveryProp.getImmediate())
-        {
-            optionList.add(Option.DISCARD_UNROUTABLE);
-        }
-        if(deliveryProp.getRedelivered())
-        {
-            optionList.add(Option.REDELIVERED);
-        }
-        return optionList.toArray(new Option[optionList.size()]);
-    }
-
-
-    public DeliveryProperties(MessageDeliveryPriority priority, MessageDeliveryMode deliveryMode, long ttl, long timestamp, long expiration, String exchange, String routingKey, String resumeId, long resumeTtl, Option ... _options) {
-        if(priority != null) {
-            setPriority(priority);
-        }
-        if(deliveryMode != null) {
-            setDeliveryMode(deliveryMode);
-        }
-        setTtl(ttl);
-        setTimestamp(timestamp);
-        setExpiration(expiration);
-        if(exchange != null) {
-            setExchange(exchange);
-        }
-        if(routingKey != null) {
-            setRoutingKey(routingKey);
-        }
-        if(resumeId != null) {
-            setResumeId(resumeId);
-        }
-        setResumeTtl(resumeTtl);
-
-        for (int i=0; i < _options.length; i++) {
-            switch (_options[i]) {
-            case DISCARD_UNROUTABLE: packing_flags |= 256; break;
-            case IMMEDIATE: packing_flags |= 512; break;
-            case REDELIVERED: packing_flags |= 1024; break;
-            case NONE: break;
-            default: throw new IllegalArgumentException("invalid option: " + _options[i]);
-            }
-        }
-
-    }
-
-
-
-
     public final boolean hasDiscardUnroutable() {
         return (packing_flags & 256) != 0;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java
new file mode 100644
index 0000000..91d6e38
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/PropertyConverter_0_10_to_InternalTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class PropertyConverter_0_10_to_InternalTest extends QpidTestCase
+{
+    private NamedAddressSpace _namedAddressSpace;
+    private MessageConverter_v0_10_to_Internal _messageConverter;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _namedAddressSpace = mock(NamedAddressSpace.class);
+        _messageConverter = new MessageConverter_v0_10_to_Internal();
+    }
+
+    public void testContentEncodingConversion()
+    {
+        String contentEncoding = "my-test-encoding";
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setContentEncoding(contentEncoding);
+        MessageTransferMessage message =
+                createTestMessage(new DeliveryProperties(), messageProperties, new byte[]{(byte) 1}, 0);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected content encoding", contentEncoding, convertedMessage.getMessageHeader().getEncoding());
+    }
+
+    public void testApplicationHeadersConversion()
+    {
+        Map<String, Object> headers = new HashMap<>();
+        headers.put("testProperty1", "testProperty1Value");
+        headers.put("intProperty", 1);
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setApplicationHeaders(headers);
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        Map<String, Object> header = convertedMessage.getMessageHeader().getHeaderMap();
+        assertEquals("Unexpected headers", headers, new HashMap<>(header));
+    }
+
+    public void testPersistentDeliveryModeConversion()
+    {
+        MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
+        final DeliveryProperties deliveryProperties = new DeliveryProperties();
+        deliveryProperties.setDeliveryMode(deliveryMode);
+        MessageTransferMessage message = createTestMessage(deliveryProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertTrue("Unexpected persistence", convertedMessage.isPersistent());
+        assertTrue("Unexpected persistence of meta data",
+                   convertedMessage.getStoredMessage().getMetaData().isPersistent());
+    }
+
+    public void testNonPersistentDeliveryModeConversion()
+    {
+        MessageDeliveryMode deliveryMode = MessageDeliveryMode.NON_PERSISTENT;
+        final DeliveryProperties deliveryProperties = new DeliveryProperties();
+        deliveryProperties.setDeliveryMode(deliveryMode);
+        MessageTransferMessage message = createTestMessage(deliveryProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertFalse("Unexpected persistence", convertedMessage.isPersistent());
+        assertFalse("Unexpected persistence of meta data",
+                    convertedMessage.getStoredMessage().getMetaData().isPersistent());
+    }
+
+    public void testPriorityConversion()
+    {
+        final byte priority = 7;
+        final DeliveryProperties deliveryProperties = new DeliveryProperties();
+        deliveryProperties.setPriority(MessageDeliveryPriority.get(priority));
+        MessageTransferMessage message = createTestMessage(deliveryProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected priority", priority, convertedMessage.getMessageHeader().getPriority());
+    }
+
+    public void testCorrelationIdConversion()
+    {
+        final byte[] correlationId = "testCorrelationId".getBytes();
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setCorrelationId(correlationId);
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertTrue("Unexpected correlationId",
+                   Arrays.equals(correlationId,
+                                 convertedMessage.getMessageHeader().getCorrelationId().getBytes(UTF_8)));
+    }
+
+    public void testCorrelationIdConversionWhenNotString()
+    {
+        final byte[] correlationId = new byte[]{(byte) 0xc3, 0x28};
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setCorrelationId(correlationId);
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected correlationId",
+                     new String(correlationId, UTF_8),
+                     convertedMessage.getMessageHeader().getCorrelationId());
+    }
+
+    public void testReplyToConversionWhenExchangeAndRoutingKeySpecified()
+    {
+        final String exchangeName = "amq.direct";
+        final String routingKey = "test_routing_key";
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setReplyTo(new ReplyTo(exchangeName, routingKey));
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        String expectedReplyTo = String.format("%s/%s", exchangeName, routingKey);
+        assertEquals("Unexpected reply-to", expectedReplyTo, convertedMessage.getMessageHeader().getReplyTo());
+    }
+
+    public void testReplyToConversionWhenExchangeSpecified()
+    {
+        final String exchangeName = "amq.direct";
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setReplyTo(new ReplyTo(exchangeName, null));
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected reply-to", exchangeName, convertedMessage.getMessageHeader().getReplyTo());
+    }
+
+    public void testReplyToConversionWhenRoutingKeySpecified()
+    {
+        final String routingKey = "test_routing_key";
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setReplyTo(new ReplyTo(null, routingKey));
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected reply-to", routingKey, convertedMessage.getMessageHeader().getReplyTo());
+    }
+
+    public void testReplyToConversionWhenExchangeIsEmptyStringAndRoutingKeySpecified()
+    {
+        final String routingKey = "test_routing_key";
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setReplyTo(new ReplyTo("", routingKey));
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected reply-to", routingKey, convertedMessage.getMessageHeader().getReplyTo());
+    }
+
+    public void testReplyToConversionWhenExchangeAndRoutingKeyAreNull()
+    {
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setReplyTo(new ReplyTo(null, null));
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertNull("Unexpected reply-to", convertedMessage.getMessageHeader().getReplyTo());
+    }
+
+    public void testExpirationConversion()
+    {
+        long timestamp = System.currentTimeMillis();
+        int ttl = 100000;
+        final long expiration = timestamp + ttl;
+
+        final DeliveryProperties deliveryProperties = new DeliveryProperties();
+        deliveryProperties.setExpiration(expiration);
+        MessageTransferMessage message =
+                createTestMessage(deliveryProperties, new MessageProperties(), null, timestamp);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected expiration", expiration, convertedMessage.getMessageHeader().getExpiration());
+    }
+
+    public void testTtlConversion()
+    {
+        long timestamp = System.currentTimeMillis();
+        int ttl = 100000;
+
+        final DeliveryProperties deliveryProperties = new DeliveryProperties();
+        deliveryProperties.setTtl(ttl);
+        MessageTransferMessage message =
+                createTestMessage(deliveryProperties, new MessageProperties(), null, timestamp);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected expiration", timestamp + ttl, convertedMessage.getMessageHeader().getExpiration());
+    }
+
+    public void testTtlTakesPrecedenceOverExpiration()
+    {
+        long timestamp = System.currentTimeMillis();
+        int ttl = 100000;
+
+        final DeliveryProperties deliveryProperties = new DeliveryProperties();
+        deliveryProperties.setTtl(ttl);
+        deliveryProperties.setExpiration(timestamp + ttl + 10000);
+        MessageTransferMessage message =
+                createTestMessage(deliveryProperties, new MessageProperties(), null, timestamp);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected expiration", timestamp + ttl, convertedMessage.getMessageHeader().getExpiration());
+    }
+
+    public void testMessageIdConversion()
+    {
+        UUID messageId = UUID.randomUUID();
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setMessageId(messageId);
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected messageId", "ID:" + messageId, convertedMessage.getMessageHeader().getMessageId());
+    }
+
+    public void testTimestampConversion()
+    {
+        final DeliveryProperties deliveryProperties = new DeliveryProperties();
+        final long timestamp = System.currentTimeMillis() - 1000;
+        deliveryProperties.setTimestamp(timestamp);
+        MessageTransferMessage message = createTestMessage(deliveryProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected creation timestamp", timestamp, convertedMessage.getMessageHeader().getTimestamp());
+    }
+
+    public void testArrivalTimeConversion()
+    {
+        final long timestamp = System.currentTimeMillis() - 1000;
+        MessageTransferMessage message =
+                createTestMessage(new DeliveryProperties(), new MessageProperties(), null, timestamp);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected creation timestamp", timestamp, convertedMessage.getMessageHeader().getTimestamp());
+    }
+
+    public void testJmsTypeConversion()
+    {
+        final String type = "test-type";
+        final Map<String, Object> headers = Collections.singletonMap("x-jms-type", type);
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setApplicationHeaders(headers);
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected subject", type, convertedMessage.getMessageHeader().getType());
+    }
+
+    public void testUserIdConversion()
+    {
+        final String userId = "test-userId";
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setUserId(userId.getBytes());
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected user-id", userId, convertedMessage.getMessageHeader().getUserId());
+    }
+
+    public void testUserIdConversionWhenNotUtf8()
+    {
+        final byte[] userId = new byte[]{(byte) 0xc3, 0x28};
+        final MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setUserId(userId);
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected user-id", new String(userId, UTF_8), convertedMessage.getMessageHeader().getUserId());
+    }
+
+    public void testExchangeConversion()
+    {
+        final String testExchange = "testExchange";
+        final DeliveryProperties deliveryProperties = new DeliveryProperties();
+        deliveryProperties.setExchange(testExchange);
+        MessageTransferMessage message = createTestMessage(deliveryProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected exchange", testExchange, convertedMessage.getTo());
+    }
+
+    public void testApplicationIdConversion()
+    {
+        String applicationId = "testAppId";
+        MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setAppId(applicationId.getBytes(UTF_8));
+        MessageTransferMessage message = createTestMessage(messageProperties);
+
+        final InternalMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace);
+
+        assertEquals("Unexpected app-id", applicationId, convertedMessage.getMessageHeader().getAppId());
+    }
+
+    private MessageTransferMessage createTestMessage(final DeliveryProperties deliveryProperties)
+    {
+        return createTestMessage(deliveryProperties, new MessageProperties(), null, 0);
+    }
+
+    private MessageTransferMessage createTestMessage(final MessageProperties messageProperties)
+    {
+        return createTestMessage(new DeliveryProperties(), messageProperties, null, 0);
+    }
+
+    private MessageTransferMessage createTestMessage(final DeliveryProperties deliveryProperties,
+                                                     final MessageProperties messageProperties,
+                                                     final byte[] content,
+                                                     final long arrivalTime)
+    {
+        int bodySize = content == null ? 0 : content.length;
+        final org.apache.qpid.server.protocol.v0_10.transport.Header header =
+                new org.apache.qpid.server.protocol.v0_10.transport.Header(deliveryProperties, messageProperties);
+        final MessageMetaData_0_10 metaData = new MessageMetaData_0_10(header, bodySize, arrivalTime);
+
+        final StoredMessage<MessageMetaData_0_10> storedMessage = mock(StoredMessage.class);
+        when(storedMessage.getMetaData()).thenReturn(metaData);
+
+        if (content != null)
+        {
+            when(storedMessage.getContentSize()).thenReturn(content.length);
+            when(storedMessage.getContent(0, content.length)).thenReturn(Collections.singleton(QpidByteBuffer.wrap(
+                    content)));
+        }
+        return new MessageTransferMessage(storedMessage, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index 19a937d..fb43b6a 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -76,8 +76,9 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
 
         Object body = convertMessageBody(mimeType, data);
 
-        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(),
-                new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding), body, serverMessage.getTo());
+        return InternalMessage.convert(serverMessage,
+                                       new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding),
+                                       body);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
index 6cecd57..35fc420 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
@@ -56,7 +56,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
     {
         Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMessage);
 
-        return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject, serverMessage.getTo());
+        return InternalMessage.convert(serverMessage, serverMessage.getMessageHeader(), bodyObject);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fdff8c5f/systests/src/test/java/org/apache/qpid/systest/rest/MessagesRestTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/rest/MessagesRestTest.java b/systests/src/test/java/org/apache/qpid/systest/rest/MessagesRestTest.java
index 9305ea8..1db8e30 100644
--- a/systests/src/test/java/org/apache/qpid/systest/rest/MessagesRestTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/rest/MessagesRestTest.java
@@ -594,8 +594,14 @@ public class MessagesRestTest extends QpidRestTestCase
         }
         else
         {
-            assertEquals("Unexpected message attribute expirationTime", ((Number) message.get("timestamp")).longValue()
-                                                                        + _ttl, message.get("expirationTime"));
+            final long expirationTime = ((Number) message.get("expirationTime")).longValue();
+            final long currentTime = System.currentTimeMillis();
+            assertTrue(String.format("Unexpected message attribute expirationTime. got %d which is not >= %d",
+                                     expirationTime,
+                                     _startTime + _ttl), expirationTime >= _startTime + _ttl);
+            assertTrue(String.format("Unexpected message attribute expirationTime. got %d which is not <= %d",
+                                     expirationTime,
+                                     currentTime + _ttl), expirationTime <= currentTime + _ttl);
             assertEquals("Unexpected message attribute priority", 5, message.get("priority"));
             assertEquals("Unexpected message attribute persistent", Boolean.FALSE, message.get("persistent"));
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org