You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/08/11 14:15:04 UTC

[1/3] qpid-broker-j git commit: QPID-7434: [Java Broker] Improve Internal to AMQP 0-x message conversion and add unit tests

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 42bebb9ff -> 17e6c7d6e


QPID-7434: [Java Broker] Improve Internal to AMQP 0-x message conversion and add unit tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/17e6c7d6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/17e6c7d6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/17e6c7d6

Branch: refs/heads/master
Commit: 17e6c7d6e070ecfa8a46edb643607b0c6ef9d099
Parents: 939cda5
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Aug 11 14:30:37 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Aug 11 15:14:53 2017 +0100

----------------------------------------------------------------------
 .../protocol/v0_10/transport/EncoderUtils.java  |   2 +-
 .../ListToAmqpListConverter.java                |   3 +-
 .../MessageConverter_Internal_to_0_10Test.java  | 278 ++++++++++++++++++
 .../v0_8/MessageConverter_Internal_to_v0_8.java |  11 +-
 .../MessageConverter_Internal_to_0_8Test.java   | 280 +++++++++++++++++++
 5 files changed, 571 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17e6c7d6/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
index 32c5e6d..63b2c09 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
@@ -357,7 +357,7 @@ public class EncoderUtils
                 if (key instanceof String)
                 {
                     String string = (String)key;
-                    if ( string.length() > 0xFF)
+                    if (string.length() > 0xFF)
                     {
                         return false;
                     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17e6c7d6/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/ListToAmqpListConverter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/ListToAmqpListConverter.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/ListToAmqpListConverter.java
index 3dfc06b..17717b1 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/ListToAmqpListConverter.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/ListToAmqpListConverter.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
 
 @PluggableService
 public class ListToAmqpListConverter implements ObjectToMimeContentConverter<List>
@@ -57,7 +58,7 @@ public class ListToAmqpListConverter implements ObjectToMimeContentConverter<Lis
     @Override
     public boolean isAcceptable(final List list)
     {
-        return true;
+        return EncoderUtils.isEncodable(list);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17e6c7d6/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_0_10Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_0_10Test.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_0_10Test.java
new file mode 100644
index 0000000..aeb9a79
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_0_10Test.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessageMetaData;
+import org.apache.qpid.server.message.internal.InternalMessageMetaDataType;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.ListToJmsStreamMessage;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.MapToJmsMapMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageConverter_Internal_to_0_10Test extends QpidTestCase
+{
+    private final MessageConverter_Internal_to_v0_10 _converter = new MessageConverter_Internal_to_v0_10();
+    private final StoredMessage<InternalMessageMetaData> _handle = mock(StoredMessage.class);
+    private final AMQMessageHeader _amqpHeader = mock(AMQMessageHeader.class);
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+    }
+
+
+    public void testStringMessage() throws Exception
+    {
+        String content = "testContent";
+        final String mimeType = "text/plain";
+        doTest(content, mimeType, content.getBytes(UTF_8), mimeType);
+    }
+
+    public void testStringMessageWithUnknownMimeType() throws Exception
+    {
+        String content = "testContent";
+        final String mimeType = "foo/bar";
+        doTest(content, mimeType, content.getBytes(UTF_8), "text/plain");
+    }
+
+    public void testStringMessageWithoutMimeType() throws Exception
+    {
+        String content = "testContent";
+        doTest(content, null, content.getBytes(UTF_8), "text/plain");
+    }
+
+    public void testListMessageWithMimeType() throws Exception
+    {
+        ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42);
+        final ListToJmsStreamMessage listToJmsStreamMessage = new ListToJmsStreamMessage();
+        final byte[] expectedContent = listToJmsStreamMessage.toMimeContent(content);
+        doTest(content, "foo/bar", expectedContent, listToJmsStreamMessage.getMimeType());
+    }
+
+    public void testListMessageWithoutMimeType() throws Exception
+    {
+        ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42);
+        final ListToJmsStreamMessage listToJmsStreamMessage = new ListToJmsStreamMessage();
+        final byte[] expectedContent = listToJmsStreamMessage.toMimeContent(content);
+        doTest(content, null, expectedContent, listToJmsStreamMessage.getMimeType());
+    }
+
+    public void testListMessageWithoutMimeTypeWithNonJmsContent() throws Exception
+    {
+        ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42, Lists.newArrayList());
+        final ListToAmqpListConverter listToAmqpListConverter = new ListToAmqpListConverter();
+        final byte[] expectedContent = listToAmqpListConverter.toMimeContent(content);
+        doTest(content, null, expectedContent, listToAmqpListConverter.getMimeType());
+    }
+
+    public void testListMessageWithoutMimeTypeWithNonConvertibleItem() throws Exception
+    {
+        ArrayList<?> content = Lists.newArrayList(new MySerializable());
+        final InternalMessage sourceMessage = getAmqMessage(content, null);
+        doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+    }
+
+    public void testByteArrayMessageWithoutMimeType() throws Exception
+    {
+        byte[] content = "testContent".getBytes(UTF_8);
+        doTest(content, null, content, "application/octet-stream");
+    }
+
+    public void testByteArrayMessageWithMimeType() throws Exception
+    {
+        byte[] content = "testContent".getBytes(UTF_8);
+        final String mimeType = "foo/bar";
+        doTest(content, mimeType, content, mimeType);
+    }
+
+    public void testEmptyByteArrayMessageWithMimeType() throws Exception
+    {
+        byte[] content = new byte[0];
+        final String mimeType = "foo/bar";
+        doTest(content, mimeType, content, mimeType);
+    }
+
+    public void testMapMessageWithMimeType() throws Exception
+    {
+        HashMap<Object, Object> content = new HashMap<>();
+        content.put("key1", 37);
+        content.put("key2", "foo");
+        final String mimeType = "foo/bar";
+        final MapToJmsMapMessage mapToJmsMapMessage = new MapToJmsMapMessage();
+        final byte[] expectedContent = mapToJmsMapMessage.toMimeContent(content);
+        doTest(content, mimeType, expectedContent, mapToJmsMapMessage.getMimeType());
+    }
+
+    public void testMapMessageWithoutMimeType() throws Exception
+    {
+        HashMap<Object, Object> content = new HashMap<>();
+        content.put("key1", 37);
+        content.put("key2", "foo");
+        final MapToJmsMapMessage mapToJmsMapMessage = new MapToJmsMapMessage();
+        final byte[] expectedContent = mapToJmsMapMessage.toMimeContent(content);
+        doTest(content, null, expectedContent, mapToJmsMapMessage.getMimeType());
+    }
+
+    public void testMapMessageWithMimeTypeWithNonJmsContent() throws Exception
+    {
+        HashMap<Object, Object> content = new HashMap<>();
+        content.put("key", Collections.singletonMap("foo", "bar"));
+        final String mimeType = "foo/bar";
+        final MapToAmqpMapConverter mapToAmqpMapConverter = new MapToAmqpMapConverter();
+        final byte[] expectedContent = mapToAmqpMapConverter.toMimeContent(content);
+        doTest(content, mimeType, expectedContent, mapToAmqpMapConverter.getMimeType());
+    }
+
+    public void testMapMessageWithoutMimeTypeWithNonConvertibleEntry() throws Exception
+    {
+        HashMap<Object, Object> content = new HashMap<>();
+        content.put(37, new MySerializable());
+
+        doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+    }
+
+    public void testSerializableMessageWithMimeType() throws Exception
+    {
+        Serializable content = new MySerializable();
+        final String mimeType = "foo/bar";
+        doTest(content, mimeType, getObjectStreamMessageBytes(content), "application/java-object-stream");
+    }
+
+    public void testSerializableMessageWithoutMimeType() throws Exception
+    {
+        Serializable content = new MySerializable();
+        doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+    }
+
+    public void testNullMessageWithoutMimeType() throws Exception
+    {
+        doTest(null, null, null, null);
+    }
+
+
+    private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception
+    {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(bos))
+        {
+            oos.writeObject(o);
+            return bos.toByteArray();
+        }
+    }
+
+
+    protected InternalMessage getAmqMessage(final Serializable content, final String mimeType) throws Exception
+    {
+        final byte[] serializedContent = getObjectStreamMessageBytes(content);
+        configureMessageContent(serializedContent);
+        configureMessageHeader(mimeType);
+
+        final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(_amqpHeader);
+        final int contentSize = serializedContent == null ? 0 : serializedContent.length;
+        final InternalMessageMetaData metaData =
+                new InternalMessageMetaData(false, internalMessageHeader, contentSize);
+        when(_handle.getMetaData()).thenReturn(metaData);
+
+        return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(_handle));
+    }
+
+    private void configureMessageHeader(final String mimeType)
+    {
+        when(_amqpHeader.getMimeType()).thenReturn(mimeType);
+    }
+
+    private void configureMessageContent(byte[] section)
+    {
+        if (section == null)
+        {
+            section = new byte[0];
+        }
+        final QpidByteBuffer combined = QpidByteBuffer.wrap(section);
+        when(_handle.getContentSize()).thenReturn(section.length);
+        final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+
+        when(_handle.getContent(offsetCaptor.capture(),
+                                sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+        {
+            @Override
+            public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
+            {
+                final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
+                return Collections.singleton(view);
+            }
+        });
+    }
+
+    private void doTest(final Serializable messageBytes,
+                        final String mimeType,
+                        final byte[] expectedContent,
+                        final String expectedContentType) throws Exception
+    {
+        final InternalMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
+        final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertArrayEquals("Unexpected content", expectedContent != null ? expectedContent : new byte[0], getBytes(content));
+        assertEquals("Unexpected content type", expectedContentType, convertedMessage.getMessageHeader().getMimeType());
+    }
+
+    private byte[] getBytes(final Collection<QpidByteBuffer> content) throws Exception
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        for (QpidByteBuffer buf : content)
+        {
+            ByteStreams.copy(buf.asInputStream(), bos);
+            buf.dispose();
+        }
+        return bos.toByteArray();
+    }
+
+    private static class MySerializable implements Serializable
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17e6c7d6/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index a83f4df..a8ff769 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -71,7 +71,16 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
     {
         Object messageBody = serverMsg.getMessageBody();
         ObjectToMimeContentConverter converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(messageBody);
-        final byte[] messageContent = converter == null ? new byte[] {} : converter.toMimeContent(messageBody);
+        final byte[] messageContent;
+        try
+        {
+            messageContent = converter == null ? new byte[] {} : converter.toMimeContent(messageBody);
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new MessageConversionException("Could not convert message from Internal to 0-8 because"
+                                                 + " conversion of message content failed.", e);
+        }
         String mimeType = converter == null ? null  : converter.getMimeType();
 
         mimeType = improveMimeType(serverMsg, mimeType);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17e6c7d6/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_0_8Test.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_0_8Test.java
new file mode 100644
index 0000000..6de6ceb
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_0_8Test.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessageMetaData;
+import org.apache.qpid.server.message.internal.InternalMessageMetaDataType;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.ListToJmsStreamMessage;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.MapToJmsMapMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageConverter_Internal_to_0_8Test extends QpidTestCase
+{
+    private final MessageConverter_Internal_to_v0_8 _converter = new MessageConverter_Internal_to_v0_8();
+    private final StoredMessage<InternalMessageMetaData> _handle = mock(StoredMessage.class);
+    private final AMQMessageHeader _amqpHeader = mock(AMQMessageHeader.class);
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+    }
+
+
+    public void testStringMessage() throws Exception
+    {
+        String content = "testContent";
+        final String mimeType = "text/plain";
+        doTest(content, mimeType, content.getBytes(UTF_8), mimeType);
+    }
+
+    public void testStringMessageWithUnknownMimeType() throws Exception
+    {
+        String content = "testContent";
+        final String mimeType = "foo/bar";
+        doTest(content, mimeType, content.getBytes(UTF_8), "text/plain");
+    }
+
+    public void testStringMessageWithoutMimeType() throws Exception
+    {
+        String content = "testContent";
+        doTest(content, null, content.getBytes(UTF_8), "text/plain");
+    }
+
+    public void testListMessageWithMimeType() throws Exception
+    {
+        ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42);
+        final ListToJmsStreamMessage listToJmsStreamMessage = new ListToJmsStreamMessage();
+        final byte[] expectedContent = listToJmsStreamMessage.toMimeContent(content);
+        doTest(content, "foo/bar", expectedContent, listToJmsStreamMessage.getMimeType());
+    }
+
+    public void testListMessageWithoutMimeType() throws Exception
+    {
+        ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42);
+        final ListToJmsStreamMessage listToJmsStreamMessage = new ListToJmsStreamMessage();
+        final byte[] expectedContent = listToJmsStreamMessage.toMimeContent(content);
+        doTest(content, null, expectedContent, listToJmsStreamMessage.getMimeType());
+    }
+
+    public void testListMessageWithoutMimeTypeWithNonJmsContent() throws Exception
+    {
+        ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42, Lists.newArrayList());
+        final ListToAmqpListConverter listToAmqpListConverter = new ListToAmqpListConverter();
+        final byte[] expectedContent = listToAmqpListConverter.toMimeContent(content);
+        doTest(content, null, expectedContent, listToAmqpListConverter.getMimeType());
+    }
+
+    public void testListMessageWithoutMimeTypeWithNonConvertibleItem() throws Exception
+    {
+        ArrayList<?> content = Lists.newArrayList(new MySerializable());
+        final InternalMessage sourceMessage = getAmqMessage(content, null);
+        doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+    }
+
+    public void testByteArrayMessageWithoutMimeType() throws Exception
+    {
+        byte[] content = "testContent".getBytes(UTF_8);
+        doTest(content, null, content, "application/octet-stream");
+    }
+
+    public void testByteArrayMessageWithMimeType() throws Exception
+    {
+        byte[] content = "testContent".getBytes(UTF_8);
+        final String mimeType = "foo/bar";
+        doTest(content, mimeType, content, mimeType);
+    }
+
+    public void testEmptyByteArrayMessageWithMimeType() throws Exception
+    {
+        byte[] content = new byte[0];
+        final String mimeType = "foo/bar";
+        doTest(content, mimeType, content, mimeType);
+    }
+
+    public void testMapMessageWithMimeType() throws Exception
+    {
+        HashMap<Object, Object> content = new HashMap<>();
+        content.put("key1", 37);
+        content.put("key2", "foo");
+        final String mimeType = "foo/bar";
+        final MapToJmsMapMessage mapToJmsMapMessage = new MapToJmsMapMessage();
+        final byte[] expectedContent = mapToJmsMapMessage.toMimeContent(content);
+        doTest(content, mimeType, expectedContent, mapToJmsMapMessage.getMimeType());
+    }
+
+    public void testMapMessageWithoutMimeType() throws Exception
+    {
+        HashMap<Object, Object> content = new HashMap<>();
+        content.put("key1", 37);
+        content.put("key2", "foo");
+        final MapToJmsMapMessage mapToJmsMapMessage = new MapToJmsMapMessage();
+        final byte[] expectedContent = mapToJmsMapMessage.toMimeContent(content);
+        doTest(content, null, expectedContent, mapToJmsMapMessage.getMimeType());
+    }
+
+    public void testMapMessageWithMimeTypeWithNonJmsContent() throws Exception
+    {
+        HashMap<Object, Object> content = new HashMap<>();
+        content.put("key", Collections.singletonMap("foo", "bar"));
+        final String mimeType = "foo/bar";
+        final MapToAmqpMapConverter mapToAmqpMapConverter = new MapToAmqpMapConverter();
+        final byte[] expectedContent = mapToAmqpMapConverter.toMimeContent(content);
+        doTest(content, mimeType, expectedContent, mapToAmqpMapConverter.getMimeType());
+    }
+
+    public void testMapMessageWithoutMimeTypeWithNonConvertibleEntry() throws Exception
+    {
+        HashMap<Object, Object> content = new HashMap<>();
+        content.put(37, new MySerializable());
+
+        doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+    }
+
+    public void testSerializableMessageWithMimeType() throws Exception
+    {
+        Serializable content = new MySerializable();
+        final String mimeType = "foo/bar";
+        doTest(content, mimeType, getObjectStreamMessageBytes(content), "application/java-object-stream");
+    }
+
+    public void testSerializableMessageWithoutMimeType() throws Exception
+    {
+        Serializable content = new MySerializable();
+        doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+    }
+
+    public void testNullMessageWithoutMimeType() throws Exception
+    {
+        doTest(null, null, null, null);
+    }
+
+
+    private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception
+    {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(bos))
+        {
+            oos.writeObject(o);
+            return bos.toByteArray();
+        }
+    }
+
+
+    protected InternalMessage getAmqMessage(final Serializable content, final String mimeType) throws Exception
+    {
+        final byte[] serializedContent = getObjectStreamMessageBytes(content);
+        configureMessageContent(serializedContent);
+        configureMessageHeader(mimeType);
+
+        final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(_amqpHeader);
+        final int contentSize = serializedContent == null ? 0 : serializedContent.length;
+        final InternalMessageMetaData metaData =
+                new InternalMessageMetaData(false, internalMessageHeader, contentSize);
+        when(_handle.getMetaData()).thenReturn(metaData);
+
+        return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(_handle));
+    }
+
+    private void configureMessageHeader(final String mimeType)
+    {
+        when(_amqpHeader.getMimeType()).thenReturn(mimeType);
+    }
+
+    private void configureMessageContent(byte[] section)
+    {
+        if (section == null)
+        {
+            section = new byte[0];
+        }
+        final QpidByteBuffer combined = QpidByteBuffer.wrap(section);
+        when(_handle.getContentSize()).thenReturn(section.length);
+        final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+
+        when(_handle.getContent(offsetCaptor.capture(),
+                                sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+        {
+            @Override
+            public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
+            {
+                final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
+                return Collections.singleton(view);
+            }
+        });
+    }
+
+    private void doTest(final Serializable messageBytes,
+                        final String mimeType,
+                        final byte[] expectedContent,
+                        final String expectedContentType) throws Exception
+    {
+        final InternalMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
+        final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+        assertArrayEquals("Unexpected content", expectedContent != null ? expectedContent : new byte[0], getBytes(content));
+        assertEquals("Unexpected content type", expectedContentType, convertedMessage.getMessageHeader().getMimeType());
+    }
+
+    private byte[] getBytes(final Collection<QpidByteBuffer> content) throws Exception
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        for (QpidByteBuffer buf : content)
+        {
+            ByteStreams.copy(buf.asInputStream(), bos);
+            buf.dispose();
+        }
+        return bos.toByteArray();
+    }
+
+    private static class MySerializable implements Serializable
+    {
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-broker-j git commit: QPID-7434: [Java Broker] Improve AMQP 0-x to Internal conversion and add unit tests.

Posted by lq...@apache.org.
QPID-7434: [Java Broker] Improve AMQP 0-x to Internal conversion and add unit tests.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/939cda5b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/939cda5b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/939cda5b

Branch: refs/heads/master
Commit: 939cda5bf529bc0ecf911f6af586eb48573bbcca
Parents: 385167e
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu Aug 10 17:15:41 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Aug 11 15:14:53 2017 +0100

----------------------------------------------------------------------
 .../mimecontentconverter/ConversionUtils.java   |  34 ++
 .../mimecontentconverter/TextPlainToString.java |   2 +-
 .../mimecontentconverter/TextXmlToString.java   |   2 +-
 .../JmsMapMessageToMap.java                     |   6 +
 .../JmsStreamMessageToList.java                 |   6 +
 .../MessageConverter_v0_10_to_Internal.java     |  97 ++++-
 .../AmqpListToListConverter.java                |   6 +
 .../AmqpMapToMapConverter.java                  |   6 +
 .../MessageConverter_0_10_to_InternalTest.java  | 416 +++++++++++++++++++
 broker-plugins/amqp-0-8-protocol/pom.xml        |   6 +
 .../v0_8/MessageConverter_v0_8_to_Internal.java |  95 ++++-
 .../MessageConverter_0_8_to_InternalTest.java   | 414 ++++++++++++++++++
 .../v1_0/MessageConverter_from_1_0.java         |  19 +-
 .../protocol/v1_0/MessageConverter_to_1_0.java  |  34 +-
 .../v1_0/MessageConverter_v1_0_to_Internal.java |  21 +-
 .../MessageConverter_0_10_to_1_0Test.java       |   5 +-
 16 files changed, 1112 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java
new file mode 100644
index 0000000..98a9ba8
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.message.mimecontentconverter;
+
+import java.util.regex.Pattern;
+
+public class ConversionUtils
+{
+    public static final Pattern
+            TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$");
+    public static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$");
+    public static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$");
+    public static final Pattern
+            OBJECT_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/x-java-serialized-object|application/java-object-stream$");
+    public static final Pattern BYTES_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/octet-stream$");
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java
index d71882a..01e8ca9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java
@@ -48,6 +48,6 @@ public class TextPlainToString implements MimeContentToObjectConverter<String>
     @Override
     public String toObject(final byte[] data)
     {
-        return new String(data, StandardCharsets.UTF_8);
+        return data == null ? "" : new String(data, StandardCharsets.UTF_8);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java
index aeba9dd..97026d5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java
@@ -48,6 +48,6 @@ public class TextXmlToString implements MimeContentToObjectConverter<String>
     @Override
     public String toObject(final byte[] data)
     {
-        return new String(data, StandardCharsets.UTF_8);
+        return data == null ? "" : new String(data, StandardCharsets.UTF_8);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java
index 37b1db7..76db633 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.typedmessage.mimecontentconverter;
 
 import java.io.EOFException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -54,6 +55,11 @@ public class JmsMapMessageToMap implements MimeContentToObjectConverter<Map>
     @Override
     public Map toObject(final byte[] data)
     {
+        if (data == null || data.length == 0)
+        {
+            return Collections.emptyMap();
+        }
+
         TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
 
         LinkedHashMap map = new LinkedHashMap();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java
index 7be7d9f..657f66e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.typedmessage.mimecontentconverter;
 import java.io.EOFException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
@@ -54,6 +55,11 @@ public class JmsStreamMessageToList implements MimeContentToObjectConverter<List
     @Override
     public List toObject(final byte[] data)
     {
+        if (data == null || data.length == 0)
+        {
+            return Collections.emptyList();
+        }
+
         TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
 
         List<Object> list = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index e507ce2..6868b14 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -20,13 +20,24 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.LIST_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.MAP_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.TEXT_CONTENT_TYPES;
+
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils;
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
 import org.apache.qpid.server.model.NamedAddressSpace;
@@ -75,9 +86,8 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
         }
 
         Object body = convertMessageBody(mimeType, data);
-        MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
-        AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo(), encoding);
-        return InternalMessage.convert(serverMessage, fixedHeader, body);
+        final AMQMessageHeader convertedHeader = convertHeader(serverMessage, addressSpace, body, encoding);
+        return InternalMessage.convert(serverMessage, convertedHeader, body);
     }
 
     @Override
@@ -86,6 +96,55 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
 
     }
 
+    private AMQMessageHeader convertHeader(final MessageTransferMessage serverMessage,
+                                           final NamedAddressSpace addressSpace,
+                                           final Object convertedBodyObject, final String encoding)
+    {
+        final String convertedMimeType = getInternalConvertedMimeType(serverMessage, convertedBodyObject);
+        final AMQMessageHeader messageHeader = serverMessage.getMessageHeader();
+
+        Map<String, Object> headers = new HashMap<>();
+        messageHeader.getHeaderNames()
+                     .forEach(headerName -> headers.put(headerName, messageHeader.getHeader(headerName)));
+
+        final InternalMessageHeader header = new InternalMessageHeader(headers,
+                                                                       messageHeader.getCorrelationId(),
+                                                                       messageHeader.getExpiration(),
+                                                                       messageHeader.getUserId(),
+                                                                       messageHeader.getAppId(),
+                                                                       messageHeader.getMessageId(),
+                                                                       convertedMimeType,
+                                                                       messageHeader.getEncoding(),
+                                                                       messageHeader.getPriority(),
+                                                                       messageHeader.getTimestamp(),
+                                                                       messageHeader.getNotValidBefore(),
+                                                                       messageHeader.getType(),
+                                                                       messageHeader.getReplyTo(),
+                                                                       serverMessage.getArrivalTime());
+        MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
+        final ReplyTo replyTo = messageProps == null ? null : messageProps.getReplyTo();
+        return new DelegatingMessageHeader(header, replyTo, encoding);
+    }
+
+    private String getInternalConvertedMimeType(final MessageTransferMessage serverMessage, final Object convertedBodyObject)
+    {
+        String originalMimeType = serverMessage.getMessageHeader().getMimeType();
+        if (originalMimeType != null)
+        {
+            if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches()
+                || ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches())
+            {
+                return null;
+            }
+            else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches())
+            {
+                return "application/x-java-serialized-object";
+            }
+        }
+
+        return originalMimeType;
+    }
+
     private static class DelegatingMessageHeader implements AMQMessageHeader
     {
         private final AMQMessageHeader _delegate;
@@ -206,14 +265,38 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
     private static Object convertMessageBody(String mimeType, byte[] data)
     {
         MimeContentToObjectConverter converter = MimeContentConverterRegistry.getMimeContentToObjectConverter(mimeType);
-        if (converter != null)
+        if (data != null && data.length != 0)
+        {
+            if (converter != null)
+            {
+                return converter.toObject(data);
+            }
+            else if (mimeType != null && TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+            {
+                return new String(data, UTF_8);
+            }
+        }
+        else if (mimeType == null)
+        {
+            return null;
+        }
+        else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+        {
+            return new byte[0];
+        }
+        else if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+        {
+            return "";
+        }
+        else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
         {
-            return converter.toObject(data);
+            return Collections.emptyMap();
         }
-        else
+        else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
         {
-            return data;
+            return Collections.emptyList();
         }
+        return data;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java
index 8139db1..1cfb640 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
@@ -51,6 +52,11 @@ public class AmqpListToListConverter implements MimeContentToObjectConverter<Lis
     @Override
     public List toObject(final byte[] data)
     {
+        if (data == null || data.length == 0)
+        {
+            return Collections.emptyList();
+        }
+
         BBDecoder decoder = new BBDecoder();
         decoder.init(ByteBuffer.wrap(data));
         return decoder.readList();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java
index 4280481..ce9d098 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
@@ -51,6 +52,11 @@ public class AmqpMapToMapConverter implements MimeContentToObjectConverter<Map>
     @Override
     public Map toObject(final byte[] data)
     {
+        if (data == null || data.length == 0)
+        {
+            return Collections.emptyMap();
+        }
+
         BBDecoder decoder = new BBDecoder();
         decoder.init(ByteBuffer.wrap(data));
         return decoder.readMap();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java
new file mode 100644
index 0000000..5a28f4f
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageConverter_0_10_to_InternalTest extends QpidTestCase
+{
+    private final MessageConverter_v0_10_to_Internal _converter = new MessageConverter_v0_10_to_Internal();
+
+    private final StoredMessage<MessageMetaData_0_10> _handle = mock(StoredMessage.class);
+
+    private final MessageMetaData_0_10 _metaData = mock(MessageMetaData_0_10.class);
+    private final AMQMessageHeader _amqpHeader = mock(AMQMessageHeader.class);
+    private final Header _header = mock(Header.class);
+    private MessageProperties _messageProperties;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+
+        _messageProperties = new MessageProperties();
+
+        when(_handle.getMetaData()).thenReturn(_metaData);
+        when(_header.getMessageProperties()).thenReturn(_messageProperties);
+        when(_metaData.getHeader()).thenReturn(_header);
+        when(_metaData.getMessageHeader()).thenReturn(_amqpHeader);
+        when(_metaData.getMessageProperties()).thenReturn(_messageProperties);
+    }
+
+    public void testConvertStringMessageBody() throws Exception
+    {
+        doTestTextMessage("helloworld", "text/plain");
+    }
+
+    public void testConvertEmptyStringMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "text/plain");
+    }
+
+    public void testConvertStringXmlMessageBody() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>", "text/xml");
+    }
+
+    public void testConvertEmptyStringXmlMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "text/xml");
+    }
+
+    public void testConvertEmptyStringApplicationXmlMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "application/xml");
+    }
+
+    public void testConvertStringWithContentTypeText() throws Exception
+    {
+        doTestTextMessage("foo","text/foobar");
+    }
+
+    public void testConvertStringWithContentTypeApplicationXml() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>","application/xml");
+    }
+
+    public void testConvertStringWithContentTypeApplicationXmlDtd() throws Exception
+    {
+        doTestTextMessage("<!DOCTYPE name []>","application/xml-dtd");
+    }
+
+    public void testConvertStringWithContentTypeApplicationFooXml() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>","application/foo+xml");
+    }
+
+    public void testConvertStringWithContentTypeApplicationJson() throws Exception
+    {
+        doTestTextMessage("[]","application/json");
+    }
+
+    public void testConvertStringWithContentTypeApplicationFooJson() throws Exception
+    {
+        doTestTextMessage("[]","application/foo+json");
+    }
+
+    public void testConvertStringWithContentTypeApplicationJavascript() throws Exception
+    {
+        doTestTextMessage("var foo","application/javascript");
+    }
+
+    public void testConvertStringWithContentTypeApplicationEcmascript() throws Exception
+    {
+        doTestTextMessage("var foo","application/ecmascript");
+    }
+
+    public void testConvertBytesMessageBody() throws Exception
+    {
+        doTestBytesMessage("helloworld".getBytes());
+    }
+
+    public void testConvertBytesMessageBodyNoContentType() throws Exception
+    {
+        final byte[] messageContent = "helloworld".getBytes();
+        doTest(messageContent, null, messageContent, null);
+    }
+
+    public void testConvertMessageBodyUnknownContentType() throws Exception
+    {
+        final byte[] messageContent = "helloworld".getBytes();
+        final String mimeType = "my/bytes";
+        doTest(messageContent, mimeType, messageContent, mimeType);
+    }
+
+
+    public void testConvertEmptyBytesMessageBody() throws Exception
+    {
+        doTestBytesMessage(new byte[0]);
+    }
+
+    public void testConvertJmsStreamMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+        final byte[] messageBytes = getJmsStreamMessageBytes(expected);
+
+        final String mimeType = "jms/stream-message";
+        doTestStreamMessage(messageBytes, mimeType, expected);
+    }
+
+    public void testConvertEmptyJmsStreamMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList();
+        final String mimeType = "jms/stream-message";
+        doTestStreamMessage(null, mimeType, expected);
+    }
+
+    public void testConvertAmqpListMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+        final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected);
+
+        doTestStreamMessage(messageBytes, "amqp/list", expected);
+    }
+
+    public void testConvertEmptyAmqpListMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList();
+        doTestStreamMessage(null, "amqp/list", expected);
+    }
+
+    public void testConvertJmsMapMessageBody() throws Exception
+    {
+        final Map<String, Object> expected = Collections.singletonMap("key", "value");
+        final byte[] messageBytes = getJmsMapMessageBytes(expected);
+
+        doTestMapMessage(messageBytes, "jms/map-message", expected);
+    }
+
+    public void testConvertEmptyJmsMapMessageBody() throws Exception
+    {
+        doTestMapMessage(null, "jms/map-message", Collections.emptyMap());
+    }
+
+    public void testConvertAmqpMapMessageBody() throws Exception
+    {
+        final Map<String, Object> expected = Collections.singletonMap("key", "value");
+        final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected);
+
+        doTestMapMessage(messageBytes, "amqp/map", expected);
+    }
+
+    public void testConvertEmptyAmqpMapMessageBody() throws Exception
+    {
+        doTestMapMessage(null, "amqp/map", Collections.emptyMap());
+    }
+
+    public void testConvertObjectStreamMessageBody() throws Exception
+    {
+        final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+        doTestObjectMessage(messageBytes, "application/java-object-stream", messageBytes);
+    }
+
+    public void testConvertObjectStream2MessageBody() throws Exception
+    {
+        final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+        doTestObjectMessage(messageBytes, "application/x-java-serialized-object", messageBytes);
+    }
+
+    public void testConvertEmptyObjectStreamMessageBody() throws Exception
+    {
+        doTestObjectMessage(null, "application/java-object-stream", new byte[0]);
+    }
+
+    public void testConvertEmptyMessageWithoutContentType() throws Exception
+    {
+        doTest(null, null, null, null);
+    }
+
+    public void testConvertEmptyMessageWithUnknownContentType() throws Exception
+    {
+        doTest(null, "foo/bar", new byte[0], "foo/bar");
+    }
+
+    public void testConvertMessageWithoutContentType() throws Exception
+    {
+        final byte[] expectedContent = "someContent".getBytes(UTF_8);
+        doTest(expectedContent, null, expectedContent, null);
+    }
+
+
+    private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception
+    {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(bos))
+        {
+            oos.writeObject(o);
+            return bos.toByteArray();
+        }
+    }
+
+    private byte[] getJmsStreamMessageBytes(List<Object> objects) throws Exception
+    {
+        TypedBytesContentWriter writer = new TypedBytesContentWriter();
+        for (Object o : objects)
+        {
+            writer.writeObject(o);
+        }
+        return getBytes(writer);
+    }
+
+    private byte[] getJmsMapMessageBytes(Map<String, Object> map) throws Exception
+    {
+        TypedBytesContentWriter writer = new TypedBytesContentWriter();
+        writer.writeIntImpl(map.size());
+        for (Map.Entry<String, Object> entry : map.entrySet())
+        {
+            writer.writeNullTerminatedStringImpl(entry.getKey());
+            writer.writeObject(entry.getValue());
+        }
+        return getBytes(writer);
+    }
+
+    private byte[] getBytes(final TypedBytesContentWriter writer)
+    {
+        ByteBuffer buf = writer.getData();
+        final byte[] expected = new byte[buf.remaining()];
+        buf.get(expected);
+        return expected;
+    }
+
+    private MessageTransferMessage getAmqMessage(final byte[] expected, final String mimeType)
+    {
+        configureMessageContent(expected);
+        configureMessageHeader(mimeType);
+
+        return new MessageTransferMessage(_handle, new Object());
+    }
+
+    private void configureMessageHeader(final String mimeType)
+    {
+        when(_amqpHeader.getMimeType()).thenReturn(mimeType);
+        _messageProperties.setContentType(mimeType);
+    }
+
+    private void configureMessageContent(byte[] section)
+    {
+        if (section == null)
+        {
+            section = new byte[0];
+        }
+        final QpidByteBuffer combined = QpidByteBuffer.wrap(section);
+        when(_handle.getContentSize()).thenReturn(section.length);
+        final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+
+        when(_handle.getContent(offsetCaptor.capture(),
+                                sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+        {
+            @Override
+            public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
+            {
+                final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
+                return Collections.singleton(view);
+            }
+        });
+    }
+
+    private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception
+    {
+
+        final byte[] contentBytes;
+        final String expectedContent;
+        if (originalContent == null)
+        {
+            contentBytes = null;
+            expectedContent = "";
+        }
+        else
+        {
+            contentBytes = originalContent.getBytes(UTF_8);
+            expectedContent = originalContent;
+        }
+        doTest(contentBytes, mimeType, expectedContent, mimeType);
+    }
+
+
+    private void doTestMapMessage(final byte[] messageBytes,
+                                  final String mimeType,
+                                  final Map<String, Object> expected) throws Exception
+    {
+        doTest(messageBytes, mimeType, expected, null);
+    }
+
+    private void doTestBytesMessage(final byte[] messageContent) throws Exception
+    {
+        doTest(messageContent,"application/octet-stream", messageContent, "application/octet-stream");
+    }
+
+    private void doTestStreamMessage(final byte[] messageBytes,
+                                     final String mimeType,
+                                     final List<Object> expected) throws Exception
+    {
+        doTest(messageBytes, mimeType, expected, null);
+    }
+
+    private void doTestObjectMessage(final byte[] messageBytes,
+                                     final String mimeType,
+                                     final byte[] expectedBytes)
+            throws Exception
+    {
+        doTest(messageBytes, mimeType, expectedBytes, "application/x-java-serialized-object");
+    }
+
+    private void doTest(final byte[] messageBytes,
+                        final String mimeType,
+                        final Object expectedContent,
+                        final String expectedMimeType) throws Exception
+    {
+        final MessageTransferMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
+        final InternalMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+        
+        if (expectedContent instanceof byte[])
+        {
+            assertArrayEquals("Unexpected content",
+                              ((byte[]) expectedContent),
+                              ((byte[]) convertedMessage.getMessageBody()));
+        }
+        else if (expectedContent instanceof List)
+        {
+            assertEquals("Unexpected content",
+                         new ArrayList<Object>((Collection) expectedContent),
+                         new ArrayList<Object>((Collection) convertedMessage.getMessageBody()));
+        }
+        else if (expectedContent instanceof Map)
+        {
+            assertEquals("Unexpected content",
+                         new HashMap<Object,Object>((Map) expectedContent),
+                         new HashMap<Object,Object>((Map) convertedMessage.getMessageBody()));
+        }
+        else
+        {
+            assertEquals("Unexpected content", expectedContent, convertedMessage.getMessageBody());
+        }
+        String convertedMimeType = convertedMessage.getMessageHeader().getMimeType();
+        assertEquals("Unexpected content type", expectedMimeType, convertedMimeType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-8-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/pom.xml b/broker-plugins/amqp-0-8-protocol/pom.xml
index a40955f..010c9aa 100644
--- a/broker-plugins/amqp-0-8-protocol/pom.xml
+++ b/broker-plugins/amqp-0-8-protocol/pom.xml
@@ -59,6 +59,12 @@
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index fb43b6a..dbd2194 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -20,14 +20,26 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.LIST_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.MAP_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.TEXT_CONTENT_TYPES;
+
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils;
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
 import org.apache.qpid.server.model.NamedAddressSpace;
@@ -75,10 +87,55 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
         }
 
         Object body = convertMessageBody(mimeType, data);
+        final AMQMessageHeader convertedHeader = convertHeader(serverMessage, addressSpace, body, encoding);
+        return InternalMessage.convert(serverMessage, convertedHeader, body);
+    }
+
+    private AMQMessageHeader convertHeader(final AMQMessage serverMessage,
+                                           final NamedAddressSpace addressSpace,
+                                           final Object convertedBodyObject, final String encoding)
+    {
+        final String convertedMimeType = getInternalConvertedMimeType(serverMessage, convertedBodyObject);
+        final AMQMessageHeader messageHeader = serverMessage.getMessageHeader();
+
+        Map<String, Object> headers = new HashMap<>();
+        messageHeader.getHeaderNames()
+                     .forEach(headerName -> headers.put(headerName, messageHeader.getHeader(headerName)));
+
+        final InternalMessageHeader header = new InternalMessageHeader(headers,
+                                                                       messageHeader.getCorrelationId(),
+                                                                       messageHeader.getExpiration(),
+                                                                       messageHeader.getUserId(),
+                                                                       messageHeader.getAppId(),
+                                                                       messageHeader.getMessageId(),
+                                                                       convertedMimeType,
+                                                                       messageHeader.getEncoding(),
+                                                                       messageHeader.getPriority(),
+                                                                       messageHeader.getTimestamp(),
+                                                                       messageHeader.getNotValidBefore(),
+                                                                       messageHeader.getType(),
+                                                                       messageHeader.getReplyTo(),
+                                                                       serverMessage.getArrivalTime());
+        return new DelegatingMessageHeader(header, encoding);
+    }
+
+    private String getInternalConvertedMimeType(final AMQMessage serverMessage, final Object convertedBodyObject)
+    {
+        String originalMimeType = serverMessage.getMessageHeader().getMimeType();
+        if (originalMimeType != null)
+        {
+            if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches()
+                || ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches())
+            {
+                return null;
+            }
+            else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches())
+            {
+                return "application/x-java-serialized-object";
+            }
+        }
 
-        return InternalMessage.convert(serverMessage,
-                                       new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding),
-                                       body);
+        return originalMimeType;
     }
 
     @Override
@@ -298,14 +355,38 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
     private static Object convertMessageBody(String mimeType, byte[] data)
     {
         MimeContentToObjectConverter converter = MimeContentConverterRegistry.getMimeContentToObjectConverter(mimeType);
-        if (converter != null)
+        if (data != null && data.length != 0)
+        {
+            if (converter != null)
+            {
+                return converter.toObject(data);
+            }
+            else if (mimeType != null && TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+            {
+                return new String(data, UTF_8);
+            }
+        }
+        else if (mimeType == null)
+        {
+            return null;
+        }
+        else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+        {
+            return new byte[0];
+        }
+        else if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+        {
+            return "";
+        }
+        else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
         {
-            return converter.toObject(data);
+            return Collections.emptyMap();
         }
-        else
+        else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
         {
-            return data;
+            return Collections.emptyList();
         }
+        return data;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java
new file mode 100644
index 0000000..0f32481
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageConverter_0_8_to_InternalTest extends QpidTestCase
+{
+    private final MessageConverter_v0_8_to_Internal _converter = new MessageConverter_v0_8_to_Internal();
+
+    private final StoredMessage<MessageMetaData> _handle = mock(StoredMessage.class);
+
+    private final MessageMetaData _metaData = mock(MessageMetaData.class);
+    private final AMQMessageHeader _header = mock(AMQMessageHeader.class);
+    private final ContentHeaderBody _contentHeaderBody = mock(ContentHeaderBody.class);
+    private final BasicContentHeaderProperties _basicContentHeaderProperties = mock(BasicContentHeaderProperties.class);
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        when(_handle.getMetaData()).thenReturn(_metaData);
+        when(_metaData.getMessageHeader()).thenReturn(_header);
+        when(_metaData.getMessagePublishInfo()).thenReturn(new MessagePublishInfo());
+        when(_metaData.getContentHeaderBody()).thenReturn(_contentHeaderBody);
+        when(_contentHeaderBody.getProperties()).thenReturn(_basicContentHeaderProperties);
+    }
+
+    public void testConvertStringMessageBody() throws Exception
+    {
+        doTestTextMessage("helloworld", "text/plain");
+    }
+
+    public void testConvertEmptyStringMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "text/plain");
+    }
+
+    public void testConvertStringXmlMessageBody() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>", "text/xml");
+    }
+
+    public void testConvertEmptyStringXmlMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "text/xml");
+    }
+
+    public void testConvertEmptyStringApplicationXmlMessageBody() throws Exception
+    {
+        doTestTextMessage(null, "application/xml");
+    }
+
+    public void testConvertStringWithContentTypeText() throws Exception
+    {
+        doTestTextMessage("foo","text/foobar");
+    }
+
+    public void testConvertStringWithContentTypeApplicationXml() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>","application/xml");
+    }
+
+    public void testConvertStringWithContentTypeApplicationXmlDtd() throws Exception
+    {
+        doTestTextMessage("<!DOCTYPE name []>","application/xml-dtd");
+    }
+
+    public void testConvertStringWithContentTypeApplicationFooXml() throws Exception
+    {
+        doTestTextMessage("<helloworld></helloworld>","application/foo+xml");
+    }
+
+    public void testConvertStringWithContentTypeApplicationJson() throws Exception
+    {
+        doTestTextMessage("[]","application/json");
+    }
+
+    public void testConvertStringWithContentTypeApplicationFooJson() throws Exception
+    {
+        doTestTextMessage("[]","application/foo+json");
+    }
+
+    public void testConvertStringWithContentTypeApplicationJavascript() throws Exception
+    {
+        doTestTextMessage("var foo","application/javascript");
+    }
+
+    public void testConvertStringWithContentTypeApplicationEcmascript() throws Exception
+    {
+        doTestTextMessage("var foo","application/ecmascript");
+    }
+
+    public void testConvertBytesMessageBody() throws Exception
+    {
+        doTestBytesMessage("helloworld".getBytes());
+    }
+
+    public void testConvertBytesMessageBodyNoContentType() throws Exception
+    {
+        final byte[] messageContent = "helloworld".getBytes();
+        doTest(messageContent, null, messageContent, null);
+    }
+
+    public void testConvertMessageBodyUnknownContentType() throws Exception
+    {
+        final byte[] messageContent = "helloworld".getBytes();
+        final String mimeType = "my/bytes";
+        doTest(messageContent, mimeType, messageContent, mimeType);
+    }
+
+
+    public void testConvertEmptyBytesMessageBody() throws Exception
+    {
+        doTestBytesMessage(new byte[0]);
+    }
+
+    public void testConvertJmsStreamMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+        final byte[] messageBytes = getJmsStreamMessageBytes(expected);
+
+        final String mimeType = "jms/stream-message";
+        doTestStreamMessage(messageBytes, mimeType, expected);
+    }
+
+    public void testConvertEmptyJmsStreamMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList();
+        final String mimeType = "jms/stream-message";
+        doTestStreamMessage(null, mimeType, expected);
+    }
+
+    public void testConvertAmqpListMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+        final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected);
+
+        doTestStreamMessage(messageBytes, "amqp/list", expected);
+    }
+
+    public void testConvertEmptyAmqpListMessageBody() throws Exception
+    {
+        final List<Object> expected = Lists.newArrayList();
+        doTestStreamMessage(null, "amqp/list", expected);
+    }
+
+    public void testConvertJmsMapMessageBody() throws Exception
+    {
+        final Map<String, Object> expected = Collections.singletonMap("key", "value");
+        final byte[] messageBytes = getJmsMapMessageBytes(expected);
+
+        doTestMapMessage(messageBytes, "jms/map-message", expected);
+    }
+
+    public void testConvertEmptyJmsMapMessageBody() throws Exception
+    {
+        doTestMapMessage(null, "jms/map-message", Collections.emptyMap());
+    }
+
+    public void testConvertAmqpMapMessageBody() throws Exception
+    {
+        final Map<String, Object> expected = Collections.singletonMap("key", "value");
+        final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected);
+
+        doTestMapMessage(messageBytes, "amqp/map", expected);
+    }
+
+    public void testConvertEmptyAmqpMapMessageBody() throws Exception
+    {
+        doTestMapMessage(null, "amqp/map", Collections.emptyMap());
+    }
+
+    public void testConvertObjectStreamMessageBody() throws Exception
+    {
+        final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+        doTestObjectMessage(messageBytes, "application/java-object-stream", messageBytes);
+    }
+
+    public void testConvertObjectStream2MessageBody() throws Exception
+    {
+        final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+        doTestObjectMessage(messageBytes, "application/x-java-serialized-object", messageBytes);
+    }
+
+    public void testConvertEmptyObjectStreamMessageBody() throws Exception
+    {
+        doTestObjectMessage(null, "application/java-object-stream", new byte[0]);
+    }
+
+    public void testConvertEmptyMessageWithoutContentType() throws Exception
+    {
+        doTest(null, null, null, null);
+    }
+
+    public void testConvertEmptyMessageWithUnknownContentType() throws Exception
+    {
+        doTest(null, "foo/bar", new byte[0], "foo/bar");
+    }
+
+    public void testConvertMessageWithoutContentType() throws Exception
+    {
+        final byte[] expectedContent = "someContent".getBytes(UTF_8);
+        doTest(expectedContent, null, expectedContent, null);
+    }
+
+
+    private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception
+    {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(bos))
+        {
+            oos.writeObject(o);
+            return bos.toByteArray();
+        }
+    }
+
+    private byte[] getJmsStreamMessageBytes(List<Object> objects) throws Exception
+    {
+        TypedBytesContentWriter writer = new TypedBytesContentWriter();
+        for (Object o : objects)
+        {
+            writer.writeObject(o);
+        }
+        return getBytes(writer);
+    }
+
+    private byte[] getJmsMapMessageBytes(Map<String, Object> map) throws Exception
+    {
+        TypedBytesContentWriter writer = new TypedBytesContentWriter();
+        writer.writeIntImpl(map.size());
+        for (Map.Entry<String, Object> entry : map.entrySet())
+        {
+            writer.writeNullTerminatedStringImpl(entry.getKey());
+            writer.writeObject(entry.getValue());
+        }
+        return getBytes(writer);
+    }
+
+    private byte[] getBytes(final TypedBytesContentWriter writer)
+    {
+        ByteBuffer buf = writer.getData();
+        final byte[] expected = new byte[buf.remaining()];
+        buf.get(expected);
+        return expected;
+    }
+
+    protected AMQMessage getAmqMessage(final byte[] expected, final String mimeType)
+    {
+        configureMessageContent(expected);
+        configureMessageHeader(mimeType);
+
+        return new AMQMessage(_handle);
+    }
+
+    private void configureMessageHeader(final String mimeType)
+    {
+        when(_header.getMimeType()).thenReturn(mimeType);
+        when(_basicContentHeaderProperties.getContentTypeAsString()).thenReturn(mimeType);
+    }
+
+    private void configureMessageContent(byte[] section)
+    {
+        if (section == null)
+        {
+            section = new byte[0];
+        }
+        final QpidByteBuffer combined = QpidByteBuffer.wrap(section);
+        when(_handle.getContentSize()).thenReturn(section.length);
+        final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+
+        when(_handle.getContent(offsetCaptor.capture(),
+                                sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+        {
+            @Override
+            public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
+            {
+                final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
+                return Collections.singleton(view);
+            }
+        });
+    }
+
+    private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception
+    {
+
+        final byte[] contentBytes;
+        final String expectedContent;
+        if (originalContent == null)
+        {
+            contentBytes = null;
+            expectedContent = "";
+        }
+        else
+        {
+            contentBytes = originalContent.getBytes(UTF_8);
+            expectedContent = originalContent;
+        }
+        doTest(contentBytes, mimeType, expectedContent, mimeType);
+    }
+
+
+    private void doTestMapMessage(final byte[] messageBytes,
+                                  final String mimeType,
+                                  final Map<String, Object> expected) throws Exception
+    {
+        doTest(messageBytes, mimeType, expected, null);
+    }
+
+    private void doTestBytesMessage(final byte[] messageContent) throws Exception
+    {
+        doTest(messageContent,"application/octet-stream", messageContent, "application/octet-stream");
+    }
+
+    private void doTestStreamMessage(final byte[] messageBytes,
+                                     final String mimeType,
+                                     final List<Object> expected) throws Exception
+    {
+        doTest(messageBytes, mimeType, expected, null);
+    }
+
+    private void doTestObjectMessage(final byte[] messageBytes,
+                                     final String mimeType,
+                                     final byte[] expectedBytes)
+            throws Exception
+    {
+        doTest(messageBytes, mimeType, expectedBytes, "application/x-java-serialized-object");
+    }
+
+    private void doTest(final byte[] messageBytes,
+                        final String mimeType,
+                        final Object expectedContent,
+                        final String expectedMimeType) throws Exception
+    {
+        final AMQMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
+        final InternalMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+        
+        if (expectedContent instanceof byte[])
+        {
+            assertArrayEquals("Unexpected content",
+                              ((byte[]) expectedContent),
+                              ((byte[]) convertedMessage.getMessageBody()));
+        }
+        else if (expectedContent instanceof List)
+        {
+            assertEquals("Unexpected content",
+                         new ArrayList((Collection) expectedContent),
+                         new ArrayList((Collection) convertedMessage.getMessageBody()));
+        }
+        else if (expectedContent instanceof Map)
+        {
+            assertEquals("Unexpected content",
+                         new HashMap((Map) expectedContent),
+                         new HashMap((Map) convertedMessage.getMessageBody()));
+        }
+        else
+        {
+            assertEquals("Unexpected content", expectedContent, convertedMessage.getMessageBody());
+        }
+        String convertedMimeType = convertedMessage.getMessageHeader().getMimeType();
+        assertEquals("Unexpected content type", expectedMimeType, convertedMimeType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
index d5581fa..2e27f47 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
@@ -36,9 +36,9 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.regex.Pattern;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils;
 import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
 import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
@@ -77,13 +77,6 @@ public class MessageConverter_from_1_0
                                                                                         UUID.class,
                                                                                         Date.class));
 
-    public static final Pattern TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$");
-    public static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$");
-    public static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$");
-    public static final Pattern
-            OBJECT_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/x-java-serialized-object|application/java-object-stream$");
-    public static final Pattern BYTES_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/octet-stream$");
-
     static Object convertBodyToObject(final Message_1_0 serverMessage)
     {
         final Collection<QpidByteBuffer> allData = serverMessage.getContent(0, (int) serverMessage.getSize());
@@ -296,29 +289,29 @@ public class MessageConverter_from_1_0
             Class<?> contentTypeClassHint = null;
             String type = contentType.toString();
             String supportedContentType = null;
-            if (TEXT_CONTENT_TYPES.matcher(type).matches())
+            if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(type).matches())
             {
                 contentTypeClassHint = String.class;
                 // the AMQP 0-x client does not accept arbitrary "text/*" mimeTypes so use "text/plain"
                 supportedContentType = "text/plain";
             }
-            else if (MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            else if (ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches())
             {
                 contentTypeClassHint = Map.class;
                 supportedContentType = contentType.toString();
             }
-            else if (LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            else if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches())
             {
                 contentTypeClassHint = List.class;
                 supportedContentType = contentType.toString();
             }
-            else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches())
             {
                 contentTypeClassHint = Serializable.class;
                 // the AMQP 0-x client does not accept the "application/x-java-serialized-object" mimeTypes so use fall back
                 supportedContentType = "application/java-object-stream";
             }
-            else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            else if (ConversionUtils.BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches())
             {
                 contentTypeClassHint = byte[].class;
                 supportedContentType = "application/octet-stream";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 23eee80..6d4b653 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -21,6 +21,9 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.*;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.TEXT_CONTENT_TYPES;
 import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.BYTES_MESSAGE;
 import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MAP_MESSAGE;
 import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MESSAGE;
@@ -42,6 +45,7 @@ import java.util.Map;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils;
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
 import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
 import org.apache.qpid.server.model.NamedAddressSpace;
@@ -76,23 +80,23 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
         Symbol contentType = null;
         if (contentMimeType != null)
         {
-            if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(contentMimeType).matches())
+            if (TEXT_CONTENT_TYPES.matcher(contentMimeType).matches())
             {
                 contentType = Symbol.valueOf(contentMimeType);
             }
-            else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
             {
                 contentType = Symbol.valueOf("application/octet-stream");
             }
-            else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            else if (MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
             {
                 contentType = null;
             }
-            else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            else if (LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
             {
                 contentType = null;
             }
-            else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
             {
                 contentType = Symbol.valueOf("application/x-java-serialized-object");
             }
@@ -111,22 +115,22 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
         final Symbol key = Symbol.valueOf("x-opt-jms-msg-type");
         if (contentMimeType != null)
         {
-            if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(contentMimeType).matches())
+            if (TEXT_CONTENT_TYPES.matcher(contentMimeType).matches())
             {
                 messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, TEXT_MESSAGE.getType()));
             }
-            else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
             {
                 messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, BYTES_MESSAGE.getType()));
             }
-            else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            else if (MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
             {
                 if (isSectionValidForJmsMap(bodySection))
                 {
                     messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, MAP_MESSAGE.getType()));
                 }
             }
-            else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            else if (LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
             {
                 if (isSectionValidForJmsList(bodySection))
                 {
@@ -134,7 +138,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
                             new MessageAnnotations(Collections.singletonMap(key, STREAM_MESSAGE.getType()));
                 }
             }
-            else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+            else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
             {
                 messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, OBJECT_MESSAGE.getType()));
             }
@@ -269,7 +273,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
                     return new AmqpSequence(fixListValues((List<Object>) bodyObject));
                 }
             }
-            else if (mimeType != null && MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+            else if (mimeType != null && TEXT_CONTENT_TYPES.matcher(mimeType).matches())
             {
                 return new AmqpValue(new String(data, UTF_8));
             }
@@ -278,19 +282,19 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
         {
             return new AmqpValue(null);
         }
-        else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+        else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
         {
             return new Data(new Binary(SERIALIZED_NULL));
         }
-        else if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+        else if (TEXT_CONTENT_TYPES.matcher(mimeType).matches())
         {
             return new AmqpValue("");
         }
-        else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+        else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
         {
             return new AmqpValue(Collections.emptyMap());
         }
-        else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+        else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
         {
             return new AmqpSequence(Collections.emptyList());
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
index 749c1a6..6e015d1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
@@ -72,7 +73,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
                                            final NamedAddressSpace addressSpace,
                                            final Object convertedBodyObject)
     {
-        final String convertedMimeType = getInternalConvertedContentAndMimeType(serverMessage, convertedBodyObject);
+        final String convertedMimeType = getInternalConvertedMimeType(serverMessage, convertedBodyObject);
         final MessageMetaData_1_0.MessageHeader_1_0 messageHeader = serverMessage.getMessageHeader();
         final InternalMessageHeader header = new InternalMessageHeader(messageHeader.getHeadersAsMap(),
                                                                        messageHeader.getCorrelationId(),
@@ -103,8 +104,8 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
         return "v1-0 to Internal";
     }
 
-    private static String getInternalConvertedContentAndMimeType(final Message_1_0 serverMsg,
-                                                                 final Object convertedBodyObject)
+    private static String getInternalConvertedMimeType(final Message_1_0 serverMsg,
+                                                       final Object convertedBodyObject)
     {
         MessageConverter_from_1_0.ContentHint contentHint = getInternalTypeHint(serverMsg);
 
@@ -129,7 +130,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
             }
             else if (contentClassHint == String.class
                      && (originalContentType == null
-                         || !MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(originalContentType).matches()))
+                         || !ConversionUtils.TEXT_CONTENT_TYPES.matcher(originalContentType).matches()))
             {
                 mimeType = "text/plain";
             }
@@ -153,7 +154,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
         }
         else if (convertedBodyObject instanceof String
                  && (originalContentType == null
-                     || !MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(originalContentType).matches()))
+                     || !ConversionUtils.TEXT_CONTENT_TYPES.matcher(originalContentType).matches()))
         {
             mimeType = "text/plain";
         }
@@ -222,23 +223,23 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
         {
             Class<?> contentTypeClassHint = null;
             String type = contentType.toString();
-            if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(type).matches())
+            if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(type).matches())
             {
                 contentTypeClassHint = String.class;
             }
-            else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            else if (ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches())
             {
                 contentTypeClassHint = Map.class;
             }
-            else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            else if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches())
             {
                 contentTypeClassHint = List.class;
             }
-            else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches())
             {
                 contentTypeClassHint = Serializable.class;
             }
-            else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+            else if (ConversionUtils.BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches())
             {
                 contentTypeClassHint = byte[].class;
             }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
index 2d53233..7cf43ce 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
@@ -352,13 +352,12 @@ public class MessageConverter_0_10_to_1_0Test extends QpidTestCase
         return sections;
     }
 
-    protected MessageTransferMessage getAmqMessage(final byte[] expected, final String mimeType)
+    private MessageTransferMessage getAmqMessage(final byte[] expected, final String mimeType)
     {
         configureMessageContent(expected);
         configureMessageHeader(mimeType);
 
-        final MessageTransferMessage messageTransferMessage = new MessageTransferMessage(_handle, new Object());
-        return messageTransferMessage;
+        return new MessageTransferMessage(_handle, new Object());
     }
 
     private void configureMessageHeader(final String mimeType)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-broker-j git commit: QPID-7434: [Java Broker] Improve handling of empty map/stream messages.

Posted by lq...@apache.org.
QPID-7434: [Java Broker] Improve handling of empty map/stream messages.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/385167e1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/385167e1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/385167e1

Branch: refs/heads/master
Commit: 385167e1f6085aceddb2ded398ad504bb40aed98
Parents: 42bebb9
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Aug 11 10:48:16 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Aug 11 15:14:53 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/MessageConverter_to_1_0.java     | 11 +++++++++--
 .../MessageConverter_0_10_to_1_0Test.java          | 17 ++++++++++++++++-
 .../v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java | 16 +++++++++++++++-
 3 files changed, 40 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/385167e1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 7060058..23eee80 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -32,7 +32,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -285,7 +284,15 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
         }
         else if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
         {
-            return new AmqpValue(null);
+            return new AmqpValue("");
+        }
+        else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+        {
+            return new AmqpValue(Collections.emptyMap());
+        }
+        else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+        {
+            return new AmqpSequence(Collections.emptyList());
         }
         return new Data(new Binary(data));
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/385167e1/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
index 2e8357a..2d53233 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
@@ -203,6 +203,13 @@ public class MessageConverter_0_10_to_1_0Test extends QpidTestCase
         doTestStreamMessage(messageBytes, mimeType, expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
     }
 
+    public void testConvertJmsStreamMessageEmptyBody() throws Exception
+    {
+        final List<Object> expected = Collections.emptyList();
+
+        doTestStreamMessage(null, "jms/stream-message", expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
+    }
+
     public void testConvertAmqpListMessageBody() throws Exception
     {
         final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
@@ -237,6 +244,14 @@ public class MessageConverter_0_10_to_1_0Test extends QpidTestCase
         doTestMapMessage(messageBytes, "amqp/map", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
     }
 
+    public void testConvertJmsMapMessageEmptyBody() throws Exception
+    {
+        final Map<String, Object> expected = Collections.emptyMap();
+
+        doTestMapMessage(null, "jms/map-message", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
+    }
+
+
     public void testConvertAmqpMapMessageBodyWithNonJmsContent() throws Exception
     {
         final Map<String, Object> expected = Collections.singletonMap("key", Collections.singletonList("nonJmsList"));
@@ -396,7 +411,7 @@ public class MessageConverter_0_10_to_1_0Test extends QpidTestCase
     private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception
     {
         final byte[] contentBytes = originalContent == null ? null : originalContent.getBytes(UTF_8);
-        String expectedContent = originalContent == null ? null : originalContent;
+        String expectedContent = originalContent == null ? "" : originalContent;
         doTest(contentBytes,
                mimeType,
                AmqpValueSection.class,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/385167e1/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
index 62dc8f4d..df1a786 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
@@ -202,6 +202,13 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
         doTestStreamMessage(messageBytes, mimeType, expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
     }
 
+    public void testConvertJmsStreamMessageEmptyBody() throws Exception
+    {
+        final List<Object> expected = Collections.emptyList();
+
+        doTestStreamMessage(null, "jms/stream-message", expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
+    }
+
     public void testConvertAmqpListMessageBody() throws Exception
     {
         final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
@@ -228,6 +235,13 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
         doTestMapMessage(messageBytes, "jms/map-message", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
     }
 
+    public void testConvertJmsMapMessageEmptyBody() throws Exception
+    {
+        final Map<String, Object> expected = Collections.emptyMap();
+
+        doTestMapMessage(null, "jms/map-message", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
+    }
+
     public void testConvertAmqpMapMessageBody() throws Exception
     {
         final Map<String, Object> expected = Collections.singletonMap("key", "value");
@@ -394,7 +408,7 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
     private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception
     {
         final byte[] contentBytes = originalContent == null ? null : originalContent.getBytes(UTF_8);
-        String expectedContent = originalContent == null ? null : originalContent;
+        String expectedContent = originalContent == null ? "" : originalContent;
         doTest(contentBytes,
                mimeType,
                AmqpValueSection.class,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org