You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/08/07 15:40:00 UTC

[1/2] qpid-broker-j git commit: QPID-7434: [Java Broker] improve AMQP 1.0 to 0-8 content conversion and add unit tests

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master d55b08e89 -> a4a175173


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
index d97950a..d272c87 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
@@ -20,167 +20,894 @@
  */
 package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertArrayEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
 import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.AmqpListToListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.AmqpMapToMapConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
 import org.apache.qpid.server.protocol.v1_0.Message_1_0;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequence;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Footer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.JmsMapMessageToMap;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.JmsStreamMessageToList;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.ListToJmsStreamMessage;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.MapToJmsMapMessage;
 import org.apache.qpid.server.util.ByteBufferUtils;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 {
-    private final MessageConverter_1_0_to_v0_8 _converter = new MessageConverter_1_0_to_v0_8();
-    private final StoredMessage<MessageMetaData_1_0> _handle = mock(StoredMessage.class);
-
-    private final MessageMetaData_1_0 _metaData = mock(MessageMetaData_1_0.class);
-    private final MessageMetaData_1_0.MessageHeader_1_0 _header = mock(MessageMetaData_1_0.MessageHeader_1_0.class);
+    private static final MessageAnnotations MESSAGE_MESSAGE_ANNOTATION =
+            new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"), (byte) 0));
+    private static final MessageAnnotations OBJECT_MESSAGE_MESSAGE_ANNOTATION =
+            new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"), (byte) 1));
+    private static final MessageAnnotations MAP_MESSAGE_MESSAGE_ANNOTATION =
+            new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"), (byte) 2));
+    private static final MessageAnnotations BYTE_MESSAGE_MESSAGE_ANNOTATION =
+            new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"), (byte) 3));
+    private static final MessageAnnotations STREAM_MESSAGE_MESSAGE_ANNOTATION =
+            new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"), (byte) 4));
+    private static final MessageAnnotations TEXT_MESSAGE_MESSAGE_ANNOTATION =
+            new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"), (byte) 5));
+    private MessageConverter_1_0_to_v0_8 _converter;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
-        when(_handle.getMetaData()).thenReturn(_metaData);
-        when(_metaData.getMessageHeader()).thenReturn(_header);
+        _converter = new MessageConverter_1_0_to_v0_8();
+    }
+
+    public void testAmqpValueWithNull() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testAmqpValueWithNullWithTextMessageAnnotation() throws Exception
+    {
+        final Object expected = null;
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage =
+                createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testAmqpValueWithString() throws Exception
+    {
+        final String expected = "testContent";
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertEquals("Unexpected content", expected, new String(getBytes(content), UTF_8));
     }
 
-    public void testConvertStringMessageBody() throws Exception
+    public void testAmqpValueWithStringWithTextMessageAnnotation() throws Exception
     {
-        final String expected = "helloworld";
+        final String expected = "testContent";
+        final AmqpValue amqpValue = new AmqpValue(expected);
+        Message_1_0 sourceMessage =
+                createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection());
 
-        final AmqpValue value = new AmqpValue(expected);
-        configureMessageContent(value.createEncodingRetainingSection());
-        final Message_1_0 sourceMessage = new Message_1_0(_handle);
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertEquals("Unexpected content", expected, new String(getBytes(content), UTF_8));
+    }
+
+    public void testAmqpValueWithMap() throws Exception
+    {
+        final Map<String, Object> originalMap = new LinkedHashMap<>();
+        originalMap.put("binaryEntry", new Binary(new byte[]{0x00, (byte) 0xFF}));
+        originalMap.put("intEntry", 42);
+        originalMap.put("nullEntry", null);
+        final AmqpValue amqpValue = new AmqpValue(originalMap);
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
-        assertEquals("text/plain", convertedMessage.getMessageHeader().getMimeType());
 
+        assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
         final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
-        assertArrayEquals(expected.getBytes(), getBytes(content));
+
+        Map<String, Object> convertedMap = new JmsMapMessageToMap().toObject(getBytes(content));
+        assertEquals("Unexpected size", originalMap.size(), convertedMap.size());
+        assertArrayEquals("Unexpected binary entry", ((Binary) originalMap.get("binaryEntry")).getArray(),
+                          (byte[]) convertedMap.get("binaryEntry"));
+        assertEquals("Unexpected int entry", originalMap.get("intEntry"), convertedMap.get("intEntry"));
+        assertEquals("Unexpected null entry", originalMap.get("nullEntry"), convertedMap.get("nullEntry"));
     }
 
-    public void testConvertListMessageBody() throws Exception
+    public void testAmqpValueWithMapContainingMap() throws Exception
     {
-        final List<Object> expected = Lists.<Object>newArrayList("helloworld", 43, 1L);
+        final Map<String, Object> originalMap =
+                Collections.singletonMap("testMap", Collections.singletonMap("innerKey", "testValue"));
 
-        final AmqpValue value = new AmqpValue(expected);
-        configureMessageContent(value.createEncodingRetainingSection());
-        final Message_1_0 sourceMessage = new Message_1_0(_handle);
+        final AmqpValue amqpValue = new AmqpValue(originalMap);
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
-        assertEquals("jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
 
+        assertEquals("Unexpected mime type", "amqp/map", convertedMessage.getMessageHeader().getMimeType());
         final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
-        TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(getBytes(content)));
-        assertEquals(expected.get(0), reader.readObject());
-        assertEquals(expected.get(1), reader.readObject());
-        assertEquals(expected.get(2), reader.readObject());
+        Map<String, Object> convertedMap = new AmqpMapToMapConverter().toObject(getBytes(content));
+        assertEquals("Unexpected size", originalMap.size(), convertedMap.size());
+        assertEquals("Unexpected binary entry", new HashMap((Map<String, Object>) originalMap.get("testMap")),
+                     new HashMap((Map<String, Object>) convertedMap.get("testMap")));
+    }
+
+    public void testAmqpValueWithMapContainingNonFieldTableCompliantEntries() throws Exception
+    {
+        final AmqpValue amqpValue = new AmqpValue(Collections.<Object, Object>singletonMap(13, 42));
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
 
         try
         {
-            reader.readObject();
-            fail("Exception not thrown");
+            _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+            fail("expected exception not thrown.");
         }
-        catch (EOFException e)
+        catch (MessageConversionException e)
         {
-            //  PASS
+            // pass
         }
     }
 
-    public void testConvertMapMessageBody() throws Exception
+    public void testAmqpValueWithList() throws Exception
     {
-        final Map<String, String> expected = Collections.singletonMap("key", "value");
+        final List<Object> originalList = new ArrayList<>();
+        originalList.add(new Binary(new byte[]{0x00, (byte) 0xFF}));
+        originalList.add(42);
+        originalList.add(null);
+        final AmqpValue amqpValue = new AmqpValue(originalList);
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
 
-        final AmqpValue value = new AmqpValue(expected);
-        configureMessageContent(value.createEncodingRetainingSection());
-        final Message_1_0 sourceMessage = new Message_1_0(_handle);
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        List<Object> convertedList = new JmsStreamMessageToList().toObject(getBytes(content));
+        assertEquals("Unexpected size", originalList.size(), convertedList.size());
+        assertArrayEquals("Unexpected binary item", ((Binary) originalList.get(0)).getArray(),
+                          (byte[]) convertedList.get(0));
+        assertEquals("Unexpected int item", originalList.get(1), convertedList.get(1));
+        assertEquals("Unexpected null item", originalList.get(2), convertedList.get(2));
+    }
+
+    public void testAmqpValueWithListContainingMap() throws Exception
+    {
+        final List<Object> originalList = Collections.singletonList(Collections.singletonMap("testKey", "testValue"));
+        final AmqpValue amqpValue = new AmqpValue(originalList);
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
-        assertEquals("jms/map-message", convertedMessage.getMessageHeader().getMimeType());
 
+        assertEquals("Unexpected mime type", "amqp/list", convertedMessage.getMessageHeader().getMimeType());
         final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
-        TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(getBytes(content)));
-        assertEquals(expected.size(), reader.readIntImpl());
-        assertEquals("key", reader.readStringImpl());
-        assertEquals(expected.get("key"), reader.readObject());
+        List<Object> convertedList = new AmqpListToListConverter().toObject(getBytes(content));
+        assertEquals("Unexpected size", originalList.size(), convertedList.size());
+        assertEquals("Unexpected map item", new HashMap<String, Object>((Map) originalList.get(0)),
+                     new HashMap<String, Object>((Map) convertedList.get(0)));
+    }
+
+    public void testAmqpValueWithListContainingUnsupportedType() throws Exception
+    {
+        final List<Object> originalList = Collections.singletonList(new Source());
+        final AmqpValue amqpValue = new AmqpValue(originalList);
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
+
+        try
+        {
+            _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+            fail("expected exception not thrown.");
+        }
+        catch (MessageConversionException e)
+        {
+            // pass
+        }
+    }
+
+    public void testAmqpValueWithUnsupportedType() throws Exception
+    {
+        final Integer originalValue = 42;
+        final AmqpValue amqpValue = new AmqpValue(originalValue);
+        Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection());
+
         try
         {
-            reader.readString();
-            fail("Exception not thrown");
+            _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+            fail("expected exception not thrown.");
         }
-        catch (EOFException e)
+        catch (MessageConversionException e)
         {
-            //  PASS
+            // pass
         }
     }
 
-    public void testConvertBytesMessageBody() throws Exception
+    public void testAmqpSequenceWithSimpleTypes() throws Exception
     {
-        final String expected = "helloworld";
+        final List<Integer> expected = new ArrayList<>();
+        expected.add(37);
+        expected.add(42);
+        final AmqpSequence amqpSequence = new AmqpSequence(expected);
+        Message_1_0 sourceMessage = createTestMessage(amqpSequence.createEncodingRetainingSection());
 
-        final Data value = new Data(new Binary(expected.getBytes()));
-        configureMessageContent(value.createEncodingRetainingSection());
-        final Message_1_0 sourceMessage = new Message_1_0(_handle);
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertEquals("Unexpected content", expected, new JmsStreamMessageToList().toObject(getBytes(content)));
+    }
+
+    public void testAmqpSequenceWithMap() throws Exception
+    {
+        final List<Object> originalList = Collections.singletonList(Collections.singletonMap("testKey", "testValue"));
+        final AmqpSequence amqpSequence = new AmqpSequence(originalList);
+        Message_1_0 sourceMessage = createTestMessage(amqpSequence.createEncodingRetainingSection());
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
-        assertEquals("application/octet-stream", convertedMessage.getMessageHeader().getMimeType());
 
+        assertEquals("Unexpected mime type", "amqp/list", convertedMessage.getMessageHeader().getMimeType());
         final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
-        assertArrayEquals(expected.getBytes(), getBytes(content));
+
+        List<Object> convertedList = new AmqpListToListConverter().toObject(getBytes(content));
+        assertEquals("Unexpected size", originalList.size(), convertedList.size());
+        assertEquals("Unexpected map item", new HashMap<String, Object>((Map) originalList.get(0)),
+                     new HashMap<String, Object>((Map) convertedList.get(0)));
     }
 
-    private void configureMessageContent(final EncodingRetainingSection section)
+    public void testAmqpSequenceWithUnsupportedType() throws Exception
     {
-        final QpidByteBuffer combined = QpidByteBuffer.wrap(ByteBufferUtils.combine(section.getEncodedForm()));
-        when(_handle.getContentSize()).thenReturn((int) section.getEncodedSize());
-        final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
-        final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+        final List<Object> originalList = Collections.singletonList(new Source());
+        final AmqpSequence amqpSequence = new AmqpSequence(originalList);
+        Message_1_0 sourceMessage = createTestMessage(amqpSequence.createEncodingRetainingSection());
 
-        when(_handle.getContent(offsetCaptor.capture(), sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+        try
+        {
+            _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+            fail("expected exception not thrown.");
+        }
+        catch (MessageConversionException e)
         {
-            @Override
-            public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
-            {
-                final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
-                return Collections.singleton(view);
-            }
-        });
+            // pass
+        }
+    }
+
+    public void testDataWithMessageAnnotation() throws Exception
+    {
+        doTestDataWithAnnotation("helloworld".getBytes(UTF_8), MESSAGE_MESSAGE_ANNOTATION, "application/octet-stream");
+    }
+
+    public void testDataWithObjectMessageAnnotation() throws Exception
+    {
+        byte[] bytes = "helloworld".getBytes(UTF_8);
+        final byte[] expected = getObjectBytes(bytes);
+        doTestDataWithAnnotation(expected, OBJECT_MESSAGE_MESSAGE_ANNOTATION, "application/java-object-stream");
+    }
+
+    public void testDataWithMapMessageAnnotation() throws Exception
+    {
+        doTestDataWithAnnotation("helloworld".getBytes(UTF_8),
+                                 MAP_MESSAGE_MESSAGE_ANNOTATION,
+                                 "application/octet-stream");
+    }
+
+    public void testDataWithMapMessageAnnotationAndContentTypeJmsMapMessage() throws Exception
+    {
+        Map<String, Object> originalMap = Collections.singletonMap("testKey", "testValue");
+        byte[] data = new MapToJmsMapMessage().toMimeContent(originalMap);
+        String expectedMimeType = "jms/map-message";
+        final Data value = new Data(new Binary(data));
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf(expectedMimeType));
+        Message_1_0 sourceMessage = createTestMessage(properties,
+                                                      MAP_MESSAGE_MESSAGE_ANNOTATION,
+                                                      value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", data, getBytes(content));
+    }
+
+    public void testDataWithMapMessageAnnotationAndContentTypeAmqpMap() throws Exception
+    {
+        Map<String, Object> originalMap = Collections.singletonMap("testKey", "testValue");
+        byte[] data = new MapToAmqpMapConverter().toMimeContent(originalMap);
+        String expectedMimeType = "amqp/map";
+        final Data value = new Data(new Binary(data));
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf(expectedMimeType));
+        Message_1_0 sourceMessage = createTestMessage(properties,
+                                                      MAP_MESSAGE_MESSAGE_ANNOTATION,
+                                                      value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", data, getBytes(content));
+    }
+
+    public void testDataWithBytesMessageAnnotation() throws Exception
+    {
+        doTestDataWithAnnotation("helloworld".getBytes(UTF_8),
+                                 BYTE_MESSAGE_MESSAGE_ANNOTATION,
+                                 "application/octet-stream");
+    }
+
+    public void testDataWithStreamMessageAnnotation() throws Exception
+    {
+        doTestDataWithAnnotation("helloworld".getBytes(UTF_8), STREAM_MESSAGE_MESSAGE_ANNOTATION,
+                                 "application/octet-stream");
+    }
+
+    public void testDataWithStreamMessageAnnotationAndContentTypeJmsStreamMessage() throws Exception
+    {
+        List<Object> originalList = Collections.singletonList("testValue");
+        byte[] data = new ListToJmsStreamMessage().toMimeContent(originalList);
+        String expectedMimeType = "jms/stream-message";
+        final Data value = new Data(new Binary(data));
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf(expectedMimeType));
+        Message_1_0 sourceMessage = createTestMessage(properties,
+                                                      STREAM_MESSAGE_MESSAGE_ANNOTATION,
+                                                      value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", data, getBytes(content));
+    }
+
+    public void testDataWithStreamMessageAnnotationAndContentTypeAmqpList() throws Exception
+    {
+        List<Object> originalList = Collections.singletonList("testValue");
+        byte[] data = new ListToAmqpListConverter().toMimeContent(originalList);
+        String expectedMimeType = "amqp/list";
+        final Data value = new Data(new Binary(data));
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf(expectedMimeType));
+        Message_1_0 sourceMessage = createTestMessage(properties,
+                                                      STREAM_MESSAGE_MESSAGE_ANNOTATION,
+                                                      value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", data, getBytes(content));
+    }
+
+    public void testDataWithTextMessageAnnotation() throws Exception
+    {
+        doTestDataWithAnnotation("helloworld".getBytes(UTF_8), TEXT_MESSAGE_MESSAGE_ANNOTATION, "text/plain");
+    }
+
+    public void testDataWithUnsupportedMessageAnnotation() throws Exception
+    {
+        doTestDataWithAnnotation("helloworld".getBytes(UTF_8),
+                                 new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"),
+                                                                                 (byte) 11)),
+                                 "application/octet-stream");
+    }
+
+    public void testDataWithContentTypeText() throws Exception
+    {
+        doTestConvertOfDataSectionForTextualType("text/foobar");
+    }
+
+    public void testDataWithContentTypeApplicationXml() throws Exception
+    {
+        doTestConvertOfDataSectionForTextualType("application/xml");
+    }
+
+    public void testDataWithContentTypeApplicationXmlDtd() throws Exception
+    {
+        doTestConvertOfDataSectionForTextualType("application/xml-dtd");
+    }
+
+    public void testDataWithContentTypeApplicationFooXml() throws Exception
+    {
+        doTestConvertOfDataSectionForTextualType("application/foo+xml");
+    }
+
+    public void testDataWithContentTypeApplicationJson() throws Exception
+    {
+        doTestConvertOfDataSectionForTextualType("application/json");
+    }
+
+    public void testDataWithContentTypeApplicationFooJson() throws Exception
+    {
+        doTestConvertOfDataSectionForTextualType("application/foo+json");
+    }
+
+    public void testDataWithContentTypeApplicationJavascript() throws Exception
+    {
+        doTestConvertOfDataSectionForTextualType("application/javascript");
+    }
+
+    public void testDataWithContentTypeApplicationEcmascript() throws Exception
+    {
+        doTestConvertOfDataSectionForTextualType("application/ecmascript");
+    }
+
+    public void testDataWithContentTypeAmqpMap() throws Exception
+    {
+        Map<String, Object> originalMap = Collections.singletonMap("testKey", "testValue");
+        byte[] bytes = new MapToAmqpMapConverter().toMimeContent(originalMap);
+
+        final Data value = new Data(new Binary(bytes));
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("amqp/map"));
+        Message_1_0 sourceMessage = createTestMessage(properties, value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "amqp/map", convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", bytes, getBytes(content));
+    }
+
+    public void testDataWithContentTypeJmsMapMessage() throws Exception
+    {
+        Map<String, Object> originalMap = Collections.singletonMap("testKey", "testValue");
+        byte[] bytes = new MapToJmsMapMessage().toMimeContent(originalMap);
+
+        final Data value = new Data(new Binary(bytes));
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("jms/map-message"));
+        Message_1_0 sourceMessage = createTestMessage(properties, value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", bytes, getBytes(content));
+    }
+
+    public void testDataWithContentTypeAmqpList() throws Exception
+    {
+        List<Object> originalMap = Collections.singletonList("testValue");
+        byte[] bytes = new ListToAmqpListConverter().toMimeContent(originalMap);
+
+        final Data value = new Data(new Binary(bytes));
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("amqp/list"));
+        Message_1_0 sourceMessage = createTestMessage(properties, value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "amqp/list", convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", bytes, getBytes(content));
+    }
+
+    public void testDataWithContentTypeJmsStreamMessage() throws Exception
+    {
+        List<Object> originalMap = Collections.singletonList("testValue");
+        byte[] bytes = new ListToJmsStreamMessage().toMimeContent(originalMap);
+
+        final Data value = new Data(new Binary(bytes));
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("jms/stream-message"));
+        Message_1_0 sourceMessage = createTestMessage(properties, value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", bytes, getBytes(content));
+    }
+
+    public void testDataWithContentTypeJavaSerializedObject() throws Exception
+    {
+        final byte[] expected = getObjectBytes("helloworld".getBytes(UTF_8));
+        final Data value = new Data(new Binary(expected));
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+        Message_1_0 sourceMessage = createTestMessage(properties, value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/java-object-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", expected, getBytes(content));
+    }
+
+
+    public void testDataWithContentTypeJavaObjectStream() throws Exception
+    {
+        final byte[] expected = getObjectBytes("helloworld".getBytes(UTF_8));
+        final Data value = new Data(new Binary(expected));
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("application/java-object-stream"));
+        Message_1_0 sourceMessage = createTestMessage(properties, value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/java-object-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", expected, getBytes(content));
+    }
+
+    public void testDataWithContentTypeOther() throws Exception
+    {
+        final byte[] expected = "helloworld".getBytes(UTF_8);
+        final Data value = new Data(new Binary(expected));
+        final Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("application/bin"));
+        Message_1_0 sourceMessage = createTestMessage(properties, value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/octet-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", expected, getBytes(content));
+    }
+
+    public void testData() throws Exception
+    {
+        final byte[] expected = getObjectBytes("helloworld".getBytes(UTF_8));
+        final Data value = new Data(new Binary(expected));
+        final Message_1_0 sourceMessage = createTestMessage(value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/octet-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", expected, getBytes(content));
+    }
+
+    public void testNoBodyWithMessageAnnotation() throws Exception
+    {
+        Message_1_0 sourceMessage = createTestMessage(MESSAGE_MESSAGE_ANNOTATION, null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testNoBodyWithObjectMessageAnnotation() throws Exception
+    {
+        Message_1_0 sourceMessage = createTestMessage(OBJECT_MESSAGE_MESSAGE_ANNOTATION, null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertEquals("Unexpected mime type",
+                     "application/java-object-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        assertArrayEquals("Unexpected content size", getObjectBytes(null), getBytes(content));
+    }
+
+    public void testNoBodyWithMapMessageAnnotation() throws Exception
+    {
+        Message_1_0 sourceMessage = createTestMessage(MAP_MESSAGE_MESSAGE_ANNOTATION, null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
+        assertArrayEquals("Unexpected content size",
+                          new MapToJmsMapMessage().toMimeContent(Collections.emptyMap()),
+                          getBytes(content));
+    }
+
+    public void testNoBodyWithBytesMessageAnnotation() throws Exception
+    {
+        Message_1_0 sourceMessage = createTestMessage(BYTE_MESSAGE_MESSAGE_ANNOTATION, null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/octet-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testNoBodyWithStreamMessageAnnotation() throws Exception
+    {
+        Message_1_0 sourceMessage = createTestMessage(STREAM_MESSAGE_MESSAGE_ANNOTATION, null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testNoBodyWithTextMessageAnnotation() throws Exception
+    {
+        Message_1_0 sourceMessage = createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testNoBodyWithUnknownMessageAnnotation() throws Exception
+    {
+        Message_1_0 sourceMessage =
+                createTestMessage(new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"),
+                                                                                  (byte) 11)), null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testNoBody() throws Exception
+    {
+        final Message_1_0 sourceMessage = createTestMessage(null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testNoBodyWithContentTypeApplicationOctetStream() throws Exception
+    {
+        Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("application/octet-stream"));
+        final Message_1_0 sourceMessage = createTestMessage(properties, null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testNoBodyWithObjectMessageContentType() throws Exception
+    {
+        final Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+        final Message_1_0 sourceMessage = createTestMessage(properties, null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/java-object-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+
+        assertEquals("Unexpected content size",
+                     getObjectBytes(null).length,
+                     convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    public void testNoBodyWithJmsMapContentType() throws Exception
+    {
+        final Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("jms/map-message"));
+        final Message_1_0 sourceMessage = createTestMessage(properties, null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
+
+        assertArrayEquals("Unexpected content size",
+                          new MapToJmsMapMessage().toMimeContent(Collections.emptyMap()),
+                          getBytes(content));
+    }
+
+    public void testMessageAnnotationTakesPrecedenceOverContentType() throws Exception
+    {
+        final Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf("application/octet-stream"));
+        final Message_1_0 sourceMessage = createTestMessage(OBJECT_MESSAGE_MESSAGE_ANNOTATION, null);
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     "application/java-object-stream",
+                     convertedMessage.getMessageHeader().getMimeType());
+        assertEquals("Unexpected content size",
+                     getObjectBytes(null).length,
+                     convertedMessage.getMessageMetaData().getContentSize());
+    }
+
+    private void doTestConvertOfDataSectionForTextualType(final String contentType) throws Exception
+    {
+        final String expected = "testContent";
+        final Data value = new Data(new Binary(expected.getBytes(UTF_8)));
+        final Properties properties = new Properties();
+        properties.setContentType(Symbol.valueOf(contentType));
+        Message_1_0 sourceMessage = createTestMessage(properties, value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertEquals("Unexpected content", expected, new String(getBytes(content), UTF_8));
+
+        assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
     }
 
     private byte[] getBytes(final Collection<QpidByteBuffer> content) throws Exception
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        for (QpidByteBuffer buf: content)
+        for (QpidByteBuffer buf : content)
         {
             ByteStreams.copy(buf.asInputStream(), bos);
             buf.dispose();
         }
         return bos.toByteArray();
     }
+
+    private Message_1_0 createTestMessage(final EncodingRetainingSection encodingRetainingSection)
+    {
+        return createTestMessage(new Properties(), encodingRetainingSection);
+    }
+
+    private Message_1_0 createTestMessage(final Properties properties, final EncodingRetainingSection section)
+    {
+        return createTestMessage(new Header(),
+                                 new DeliveryAnnotations(Collections.emptyMap()),
+                                 new MessageAnnotations(Collections.emptyMap()),
+                                 properties,
+                                 new ApplicationProperties(Collections.emptyMap()),
+                                 0,
+                                 section);
+    }
+
+    private Message_1_0 createTestMessage(final Properties properties,
+                                          final MessageAnnotations messageAnnotations,
+                                          final EncodingRetainingSection section)
+    {
+        return createTestMessage(new Header(),
+                                 new DeliveryAnnotations(Collections.emptyMap()),
+                                 messageAnnotations,
+                                 properties,
+                                 new ApplicationProperties(Collections.emptyMap()),
+                                 0,
+                                 section);
+    }
+
+    private Message_1_0 createTestMessage(final MessageAnnotations messageAnnotations,
+                                          final EncodingRetainingSection section)
+    {
+        return createTestMessage(new Header(),
+                                 new DeliveryAnnotations(Collections.emptyMap()),
+                                 messageAnnotations,
+                                 new Properties(),
+                                 new ApplicationProperties(Collections.emptyMap()),
+                                 0,
+                                 section);
+    }
+
+    private Message_1_0 createTestMessage(final Header header,
+                                          final DeliveryAnnotations deliveryAnnotations,
+                                          final MessageAnnotations messageAnnotations,
+                                          final Properties properties,
+                                          final ApplicationProperties applicationProperties,
+                                          final long arrivalTime,
+                                          final EncodingRetainingSection section)
+    {
+        final StoredMessage<MessageMetaData_1_0> storedMessage = mock(StoredMessage.class);
+        MessageMetaData_1_0 metaData = new MessageMetaData_1_0(header.createEncodingRetainingSection(),
+                                                               deliveryAnnotations.createEncodingRetainingSection(),
+                                                               messageAnnotations.createEncodingRetainingSection(),
+                                                               properties.createEncodingRetainingSection(),
+                                                               applicationProperties.createEncodingRetainingSection(),
+                                                               new Footer(Collections.emptyMap()).createEncodingRetainingSection(),
+                                                               arrivalTime,
+                                                               0);
+        when(storedMessage.getMetaData()).thenReturn(metaData);
+
+        if (section != null)
+        {
+            final QpidByteBuffer combined = QpidByteBuffer.wrap(ByteBufferUtils.combine(section.getEncodedForm()));
+            when(storedMessage.getContentSize()).thenReturn((int) section.getEncodedSize());
+            final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
+            final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+
+            when(storedMessage.getContent(offsetCaptor.capture(),
+                                          sizeCaptor.capture())).then(invocation ->
+                                                                      {
+                                                                          final QpidByteBuffer view = combined.view(
+                                                                                  offsetCaptor.getValue(),
+                                                                                  sizeCaptor.getValue());
+                                                                          return Collections.singleton(view);
+                                                                      });
+        }
+        return new Message_1_0(storedMessage);
+    }
+
+    private byte[] getObjectBytes(final Object object) throws IOException
+    {
+        final byte[] expected;
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(baos))
+        {
+            oos.writeObject(object);
+            expected = baos.toByteArray();
+        }
+        return expected;
+    }
+
+    private void doTestDataWithAnnotation(final byte[] data,
+                                          final MessageAnnotations messageAnnotations,
+                                          final String expectedMimeType) throws Exception
+    {
+        final Data value = new Data(new Binary(data));
+        Message_1_0 sourceMessage = createTestMessage(messageAnnotations, value.createEncodingRetainingSection());
+
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+        assertEquals("Unexpected mime type",
+                     expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        assertArrayEquals("Unexpected content", data, getBytes(content));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-7434: [Java Broker] improve AMQP 1.0 to 0-8 content conversion and add unit tests

Posted by or...@apache.org.
QPID-7434: [Java Broker] improve AMQP 1.0 to 0-8 content conversion and add unit tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a4a17517
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a4a17517
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a4a17517

Branch: refs/heads/master
Commit: a4a17517312350831b797fa817ebbc2bbe1ee735
Parents: d55b08e
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu Aug 3 17:09:05 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Aug 7 16:38:55 2017 +0100

----------------------------------------------------------------------
 .../ByteArrayToOctetStream.java                 |   4 +
 .../MimeContentConverterRegistry.java           |  38 +-
 .../mimecontentconverter/StringToTextPlain.java |   9 +-
 .../ListToJmsStreamMessage.java                 |  33 +-
 .../MapToJmsMapMessage.java                     |  62 +-
 .../MessageConverter_Internal_to_v0_10.java     |  39 +-
 .../protocol/v0_10/transport/EncoderUtils.java  |  51 ++
 .../MapToAmqpMapConverter.java                  |   3 +-
 .../protocol/v1_0/JmsMessageTypeAnnotation.java |  69 ++
 .../v1_0/MessageConverter_from_1_0.java         | 163 +++-
 broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml |   6 +
 .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java |  54 +-
 .../MessageConverter_1_0_to_v0_8Test.java       | 865 +++++++++++++++++--
 13 files changed, 1211 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ByteArrayToOctetStream.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ByteArrayToOctetStream.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ByteArrayToOctetStream.java
index 5246ae3..10cfd19 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ByteArrayToOctetStream.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ByteArrayToOctetStream.java
@@ -58,6 +58,10 @@ public class ByteArrayToOctetStream implements ObjectToMimeContentConverter<byte
     @Override
     public byte[] toMimeContent(final byte[] object)
     {
+        if (object == null)
+        {
+            return new byte[0];
+        }
         return object;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java
index a5073f5..b580557 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java
@@ -72,6 +72,7 @@ public class MimeContentConverterRegistry
             }
             classToMineConverters.put(objectClass, converter);
         }
+        classToMineConverters.put(Void.class, new StringToTextPlain());
         return ImmutableMultimap.copyOf(classToMineConverters);
     }
 
@@ -101,28 +102,41 @@ public class MimeContentConverterRegistry
 
     public static ObjectToMimeContentConverter getBestFitObjectToMimeContentConverter(Object object)
     {
-        if (object == null)
-        {
-            return null;
-        }
-
-        final List<Class<?>> classes = new ArrayList<>(Arrays.asList(object.getClass().getInterfaces()));
-        classes.add(object.getClass());
         ObjectToMimeContentConverter converter = null;
-        for (Class<?> i : classes)
+        if (object != null)
         {
-            for (ObjectToMimeContentConverter candidate : _classToMimeContentConverters.get(i))
+            final List<Class<?>> classes = new ArrayList<>(Arrays.asList(object.getClass().getInterfaces()));
+            classes.add(object.getClass());
+            for (Class<?> i : classes)
             {
-                if (candidate.isAcceptable(object))
+                for (ObjectToMimeContentConverter candidate : _classToMimeContentConverters.get(i))
                 {
-                    if (converter == null || candidate.getRank() > converter.getRank())
+                    if (candidate.isAcceptable(object))
                     {
-                        converter = candidate;
+                        if (converter == null || candidate.getRank() > converter.getRank())
+                        {
+                            converter = candidate;
+                        }
                     }
                 }
             }
         }
+        return converter;
+    }
 
+    public static ObjectToMimeContentConverter getBestFitObjectToMimeContentConverter(Object object, Class<?> typeHint)
+    {
+        ObjectToMimeContentConverter converter = null;
+        for (ObjectToMimeContentConverter candidate : _classToMimeContentConverters.get(typeHint))
+        {
+            if (candidate.isAcceptable(object))
+            {
+                if (converter == null || candidate.getRank() > converter.getRank())
+                {
+                    converter = candidate;
+                }
+            }
+        }
         return converter;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/StringToTextPlain.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/StringToTextPlain.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/StringToTextPlain.java
index 3875520..e37b191 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/StringToTextPlain.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/StringToTextPlain.java
@@ -60,6 +60,13 @@ public class StringToTextPlain implements ObjectToMimeContentConverter<String>
     @Override
     public byte[] toMimeContent(final String object)
     {
-        return object.getBytes(StandardCharsets.UTF_8);
+        if (object == null)
+        {
+            return new byte[0];
+        }
+        else
+        {
+            return object.getBytes(StandardCharsets.UTF_8);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/ListToJmsStreamMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/ListToJmsStreamMessage.java b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/ListToJmsStreamMessage.java
index b436597..425d7dc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/ListToJmsStreamMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/ListToJmsStreamMessage.java
@@ -58,20 +58,24 @@ public class ListToJmsStreamMessage implements ObjectToMimeContentConverter<List
     @Override
     public boolean isAcceptable(final List list)
     {
-        for(Object value : list)
+        if (list != null)
         {
-            if(!(value instanceof String
-                 || value instanceof Integer
-                 || value instanceof Long
-                 || value instanceof Double
-                 || value instanceof Float
-                 || value instanceof Byte
-                 || value instanceof Short
-                 || value instanceof Character
-                 || value instanceof Boolean
-                 || value instanceof byte[]))
+            for (Object value : list)
             {
-                return false;
+                if (value != null
+                    && !(value instanceof String
+                      || value instanceof Integer
+                      || value instanceof Long
+                      || value instanceof Double
+                      || value instanceof Float
+                      || value instanceof Byte
+                      || value instanceof Short
+                      || value instanceof Character
+                      || value instanceof Boolean
+                      || value instanceof byte[]))
+                {
+                    return false;
+                }
             }
         }
         return true;
@@ -80,6 +84,11 @@ public class ListToJmsStreamMessage implements ObjectToMimeContentConverter<List
     @Override
     public byte[] toMimeContent(final List list)
     {
+        if (list == null)
+        {
+            return new byte[0];
+        }
+
         TypedBytesContentWriter writer = new TypedBytesContentWriter();
 
         for(Object o : list)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/MapToJmsMapMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/MapToJmsMapMessage.java b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/MapToJmsMapMessage.java
index abca854..32fa328 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/MapToJmsMapMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/MapToJmsMapMessage.java
@@ -60,26 +60,31 @@ public class MapToJmsMapMessage implements ObjectToMimeContentConverter<Map>
     @Override
     public boolean isAcceptable(final Map map)
     {
-        for(Entry entry : (Set<Entry>) map.entrySet())
+        if (map != null)
         {
-            Object key = entry.getKey();
-            if (!(key instanceof String))
+            for (Entry entry : (Set<Entry>) map.entrySet())
             {
-                return false;
-            }
-            Object value = entry.getValue();
-            if(!(value instanceof String
-                 || value instanceof Integer
-                 || value instanceof Long
-                 || value instanceof Double
-                 || value instanceof Float
-                 || value instanceof Byte
-                 || value instanceof Short
-                 || value instanceof Character
-                 || value instanceof Boolean
-                 || value instanceof byte[]))
-            {
-                return false;
+                Object key = entry.getKey();
+                if (!(key instanceof String))
+                {
+                    return false;
+                }
+                Object value = entry.getValue();
+
+                if (value != null
+                    && !(value instanceof String
+                      || value instanceof Integer
+                      || value instanceof Long
+                      || value instanceof Double
+                      || value instanceof Float
+                      || value instanceof Byte
+                      || value instanceof Short
+                      || value instanceof Character
+                      || value instanceof Boolean
+                      || value instanceof byte[]))
+                {
+                    return false;
+                }
             }
         }
         return true;
@@ -89,19 +94,22 @@ public class MapToJmsMapMessage implements ObjectToMimeContentConverter<Map>
     public byte[] toMimeContent(final Map map)
     {
         TypedBytesContentWriter writer = new TypedBytesContentWriter();
-        writer.writeIntImpl(map.size());
+        writer.writeIntImpl(map == null ? 0 : map.size());
 
-        try
+        if (map != null)
         {
-            for(Entry entry : (Set<Entry>)map.entrySet())
+            try
             {
-                writer.writeNullTerminatedStringImpl((String)entry.getKey());
-                writer.writeObject(entry.getValue());
+                for (Entry entry : (Set<Entry>) map.entrySet())
+                {
+                    writer.writeNullTerminatedStringImpl((String) entry.getKey());
+                    writer.writeObject(entry.getValue());
+                }
+            }
+            catch (TypedBytesFormatException e)
+            {
+                throw new IllegalArgumentException(e);
             }
-        }
-        catch (TypedBytesFormatException e)
-        {
-            throw new IllegalArgumentException(e);
         }
 
         final ByteBuffer buf = writer.getData();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index c9cfaff..700a0bc 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -24,7 +24,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
 import java.util.UUID;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -225,44 +224,10 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
 
     private void validateValue(final Object value, final String path)
     {
-        try
-        {
-            EncoderUtils.getEncodingType(value);
-        }
-        catch (IllegalArgumentException e)
+        if (!EncoderUtils.isEncodable(value))
         {
             throw new MessageConversionException(String.format(
-                    "Could not convert message from internal to 0-10 because conversion of %s failed. Unsupported type is used.", path),e);
-        }
-
-        if (value instanceof Map)
-        {
-            for(Map.Entry<?,?> entry: ((Map<?,?>)value).entrySet())
-            {
-                Object key = entry.getKey();
-                String childPath = path + "['" + key + "']";
-                if (key instanceof String)
-                {
-                    ensureStr8(childPath, (String)key);
-                }
-                else
-                {
-                    throw new MessageConversionException(
-                            String.format(
-                                    "Could not convert message from internal to 0-10 because conversion of %s failed.", childPath));
-                }
-                validateValue(entry.getValue(), childPath);
-            }
-        }
-        else if (value instanceof Collection)
-        {
-            Collection<?> collection = (Collection<?>) value;
-            int index = 0;
-            for (Object o: collection)
-            {
-                validateValue(o, path+ "[" + index + "]");
-                index++;
-            }
+                    "Could not convert message from internal to 0-10 because conversion of %s failed. Unsupported type is used.", path));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
index 5dca3b7..32c5e6d 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_10.transport;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -336,4 +337,54 @@ public class EncoderUtils
                 return t.getWidth() + (value == null ? 0 : ((byte[])value).length);
         }
     }
+
+    public static boolean isEncodable(final Object value)
+    {
+        try
+        {
+            getEncodingType(value);
+        }
+        catch (IllegalArgumentException e)
+        {
+            return false;
+        }
+
+        if (value instanceof Map)
+        {
+            for(Map.Entry<?,?> entry: ((Map<?,?>)value).entrySet())
+            {
+                Object key = entry.getKey();
+                if (key instanceof String)
+                {
+                    String string = (String)key;
+                    if ( string.length() > 0xFF)
+                    {
+                        return false;
+                    }
+                }
+                else
+                {
+                    return false;
+                }
+                if (!isEncodable(entry.getValue()))
+                {
+                    return false;
+                }
+            }
+        }
+        else if (value instanceof Collection)
+        {
+            Collection<?> collection = (Collection<?>) value;
+            int index = 0;
+            for (Object o: collection)
+            {
+                if (!isEncodable(o))
+                {
+                    return false;
+                }
+                index++;
+            }
+        }
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/MapToAmqpMapConverter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/MapToAmqpMapConverter.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/MapToAmqpMapConverter.java
index adb222b..de62ec7 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/MapToAmqpMapConverter.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/MapToAmqpMapConverter.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
 
 @PluggableService
 public class MapToAmqpMapConverter implements ObjectToMimeContentConverter<Map>
@@ -57,7 +58,7 @@ public class MapToAmqpMapConverter implements ObjectToMimeContentConverter<Map>
     @Override
     public boolean isAcceptable(final Map map)
     {
-        return true;
+        return EncoderUtils.isEncodable(map);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/JmsMessageTypeAnnotation.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/JmsMessageTypeAnnotation.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/JmsMessageTypeAnnotation.java
new file mode 100644
index 0000000..46cfbe5
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/JmsMessageTypeAnnotation.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+
+public enum JmsMessageTypeAnnotation
+{
+    MESSAGE((byte) 0),
+    OBJECT_MESSAGE((byte) 1),
+    MAP_MESSAGE((byte) 2),
+    BYTES_MESSAGE((byte) 3),
+    STREAM_MESSAGE((byte) 4),
+    TEXT_MESSAGE((byte) 5);
+
+    public static final Symbol ANNOTATION_KEY = Symbol.valueOf("x-opt-jms-msg-type");
+    private final byte _type;
+
+    JmsMessageTypeAnnotation(byte value)
+    {
+        _type = value;
+    }
+
+    public byte getType()
+    {
+        return _type;
+    }
+
+    public static JmsMessageTypeAnnotation valueOf(byte type)
+    {
+        switch (type)
+        {
+            case 0:
+                return MESSAGE;
+            case 1:
+                return OBJECT_MESSAGE;
+            case 2:
+                return MAP_MESSAGE;
+            case 3:
+                return BYTES_MESSAGE;
+            case 4:
+                return STREAM_MESSAGE;
+            case 5:
+                return TEXT_MESSAGE;
+            default:
+                throw new IllegalArgumentException(String.format("Unknown %s type %d",
+                                                                 JmsMessageTypeAnnotation.class.getSimpleName(),
+                                                                 type));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
index 1cc4a22..cafdc7d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
@@ -20,9 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,8 +34,10 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.regex.Pattern;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -51,9 +52,11 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class MessageConverter_from_1_0
 {
@@ -70,12 +73,18 @@ public class MessageConverter_from_1_0
                                                                                         byte[].class,
                                                                                         UUID.class));
 
+    private static final Pattern TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$");
+    private static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$");
+    private static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$");
+    private static final Pattern
+            OBJECT_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/x-java-serialized-object|application/java-object-stream$");
+
     public static Object convertBodyToObject(final Message_1_0 serverMessage)
     {
         final Collection<QpidByteBuffer> allData = serverMessage.getContent(0, (int) serverMessage.getSize());
         SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry());
 
-        Object bodyObject;
+        Object bodyObject = null;
         try
         {
             List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(new ArrayList<>(allData));
@@ -97,7 +106,7 @@ public class MessageConverter_from_1_0
                 {
                     if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValueSection))
                     {
-                        throw new ConnectionScopedRuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
+                        throw new MessageConversionException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
                     }
                     else
                     {
@@ -106,13 +115,8 @@ public class MessageConverter_from_1_0
                 }
             }
 
-
-            if(sections.isEmpty())
-            {
-                // should actually be illegal
-                bodyObject = new byte[0];
-            }
-            else
+            // In 1.0 of the spec, it is illegal to have message with no body but AMQP-127 asks to have that restriction lifted
+            if(!sections.isEmpty())
             {
                 EncodingRetainingSection<?> firstBodySection = sections.get(0);
                 if(firstBodySection instanceof AmqpValueSection)
@@ -208,8 +212,9 @@ public class MessageConverter_from_1_0
             }
             else
             {
-                // Throw exception instead?
-                return value.toString();
+                throw new MessageConversionException(String.format(
+                        "Could not convert message from 1.0. Unsupported type '%s'.",
+                        value.getClass().getSimpleName()));
             }
         }
         else
@@ -228,6 +233,115 @@ public class MessageConverter_from_1_0
         return result;
     }
 
+    public static ContentHint getTypeHint(final Message_1_0 serverMsg)
+    {
+        Symbol contentType = getContentType(serverMsg);
+
+        JmsMessageTypeAnnotation jmsMessageTypeAnnotation = null;
+        MessageAnnotationsSection section = serverMsg.getMessageAnnotationsSection();
+        if (section != null)
+        {
+            Map<Symbol, Object> annotations = section.getValue();
+            if (annotations != null && annotations.containsKey(JmsMessageTypeAnnotation.ANNOTATION_KEY))
+            {
+                Object object = annotations.get(JmsMessageTypeAnnotation.ANNOTATION_KEY);
+                if (object instanceof Byte)
+                {
+                    try
+                    {
+                        jmsMessageTypeAnnotation = JmsMessageTypeAnnotation.valueOf(((Byte) object));
+                    }
+                    catch (IllegalArgumentException e)
+                    {
+                        // ignore
+                    }
+                }
+            }
+        }
+
+        Class<?> classHint = null;
+        String mimeTypeHint = null;
+
+        if (jmsMessageTypeAnnotation != null)
+        {
+            switch (jmsMessageTypeAnnotation)
+            {
+                case MESSAGE:
+                    classHint = Void.class;
+                    break;
+                case MAP_MESSAGE:
+                    classHint = Map.class;
+                    break;
+                case BYTES_MESSAGE:
+                    classHint = byte[].class;
+                    break;
+                case OBJECT_MESSAGE:
+                    classHint = Serializable.class;
+                    break;
+                case TEXT_MESSAGE:
+                    classHint = String.class;
+                    break;
+                case STREAM_MESSAGE:
+                    classHint = List.class;
+                    break;
+                default:
+                    throw new ServerScopedRuntimeException(String.format(
+                            "Unexpected jms message type annotation %s", jmsMessageTypeAnnotation));
+            }
+        }
+
+        if (contentType != null)
+        {
+            Class<?> contentTypeClassHint = null;
+            String type = contentType.toString();
+            String supportedContentType = null;
+            if (TEXT_CONTENT_TYPES.matcher(type).matches())
+            {
+                contentTypeClassHint = String.class;
+                // the AMQP 0-x client does not accept arbitrary "text/*" mimeTypes so use "text/plain"
+                supportedContentType = "text/plain";
+            }
+            else if (MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            {
+                contentTypeClassHint = Map.class;
+                supportedContentType = contentType.toString();
+            }
+            else if (LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            {
+                contentTypeClassHint = List.class;
+                supportedContentType = contentType.toString();
+            }
+            else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            {
+                contentTypeClassHint = Serializable.class;
+                // the AMQP 0-x client does not accept the "application/x-java-serialized-object" mimeTypes so use fall back
+                supportedContentType = "application/java-object-stream";
+            }
+
+            if (classHint == null || classHint == contentTypeClassHint)
+            {
+                classHint = contentTypeClassHint;
+                mimeTypeHint = supportedContentType;
+            }
+        }
+
+        return new ContentHint(classHint, mimeTypeHint);
+    }
+
+    public static Symbol getContentType(final Message_1_0 serverMsg)
+    {
+        final PropertiesSection propertiesSection = serverMsg.getPropertiesSection();
+        if (propertiesSection != null)
+        {
+            final Properties properties = propertiesSection.getValue();
+            if (properties != null)
+            {
+                return properties.getContentType();
+            }
+        }
+        return null;
+    }
+
     public static UnsignedInteger getGroupSequence(final Message_1_0 serverMsg)
     {
         final PropertiesSection propertiesSection = serverMsg.getPropertiesSection();
@@ -378,4 +492,27 @@ public class MessageConverter_from_1_0
         }
         return messageId;
     }
+
+    public static class ContentHint
+    {
+        private final Class<?> _contentClass;
+        private final String _contentType;
+
+        public ContentHint(final Class<?> contentClass, final String contentType)
+        {
+            _contentClass = contentClass;
+            _contentType = contentType;
+        }
+
+        public Class<?> getContentClass()
+        {
+            return _contentClass;
+        }
+
+        public String getContentType()
+        {
+            return _contentType;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml b/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml
index a1f088b..e6aca1f 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml
@@ -57,6 +57,12 @@
       <artifactId>qpid-test-utils</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a4a17517/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 8177444..5eb43a5 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -20,25 +20,19 @@
  */
 package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
 
-import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.convertBodyToObject;
-import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.convertValue;
-import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getAbsoluteExpiryTime;
-import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getCorrelationId;
-import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getCreationTime;
-import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getGroupId;
-import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getGroupSequence;
-import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getMessageId;
-import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getTtl;
-import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getUserId;
+import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.*;
 
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.mimecontentconverter.ByteArrayToOctetStream;
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
 import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
 import org.apache.qpid.server.model.NamedAddressSpace;
@@ -56,6 +50,7 @@ import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
 import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
 import org.apache.qpid.server.protocol.v1_0.Message_1_0;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.store.StoredMessage;
 
@@ -91,11 +86,44 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
                                                                   final NamedAddressSpace addressSpace)
     {
         Object bodyObject = convertBodyToObject(serverMsg);
+        ObjectToMimeContentConverter converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(bodyObject);
 
-        final ObjectToMimeContentConverter converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(bodyObject);
+        ContentHint contentHint = getTypeHint(serverMsg);
+        Class<?> typeHint = contentHint.getContentClass();
+        if (typeHint == null && bodyObject == null)
+        {
+            typeHint = Void.class;
+        }
+
+        if (converter == null)
+        {
+            converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(bodyObject, typeHint);
+
+            if (converter == null)
+            {
+                throw new MessageConversionException(String.format(
+                        "Could not convert message from 1.0 to 0-8 because conversion of content failed. Could not find mime type converter for the content '%s'.", bodyObject == null ? null : bodyObject.getClass().getSimpleName()));
+            }
+        }
+
+        final byte[] messageContent = converter.toMimeContent(bodyObject);
+        String mimeType = converter.getMimeType();
+        if (bodyObject instanceof byte[])
+        {
+            if (Serializable.class == typeHint)
+            {
+                mimeType = "application/java-object-stream";
+            }
+            else if (String.class == typeHint)
+            {
+                mimeType = "text/plain";
+            }
+            else if ((Map.class == typeHint || List.class == typeHint) && contentHint.getContentType() != null)
+            {
+                mimeType = contentHint.getContentType();
+            }
+        }
 
-        final byte[] messageContent = converter == null ? new byte[] {} : converter.toMimeContent(bodyObject);
-        final String mimeType = converter == null ? null  : converter.getMimeType();
         final MessageMetaData messageMetaData_0_8 = convertMetaData(serverMsg,
                                                                     mimeType,
                                                                     messageContent.length,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org