You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/08/11 14:15:04 UTC
[1/3] qpid-broker-j git commit: QPID-7434: [Java Broker] Improve
Internal to AMQP 0-x message conversion and add unit tests
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 42bebb9ff -> 17e6c7d6e
QPID-7434: [Java Broker] Improve Internal to AMQP 0-x message conversion and add unit tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/17e6c7d6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/17e6c7d6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/17e6c7d6
Branch: refs/heads/master
Commit: 17e6c7d6e070ecfa8a46edb643607b0c6ef9d099
Parents: 939cda5
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Aug 11 14:30:37 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Aug 11 15:14:53 2017 +0100
----------------------------------------------------------------------
.../protocol/v0_10/transport/EncoderUtils.java | 2 +-
.../ListToAmqpListConverter.java | 3 +-
.../MessageConverter_Internal_to_0_10Test.java | 278 ++++++++++++++++++
.../v0_8/MessageConverter_Internal_to_v0_8.java | 11 +-
.../MessageConverter_Internal_to_0_8Test.java | 280 +++++++++++++++++++
5 files changed, 571 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17e6c7d6/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
index 32c5e6d..63b2c09 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/EncoderUtils.java
@@ -357,7 +357,7 @@ public class EncoderUtils
if (key instanceof String)
{
String string = (String)key;
- if ( string.length() > 0xFF)
+ if (string.length() > 0xFF)
{
return false;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17e6c7d6/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/ListToAmqpListConverter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/ListToAmqpListConverter.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/ListToAmqpListConverter.java
index 3dfc06b..17717b1 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/ListToAmqpListConverter.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/ListToAmqpListConverter.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.EncoderUtils;
@PluggableService
public class ListToAmqpListConverter implements ObjectToMimeContentConverter<List>
@@ -57,7 +58,7 @@ public class ListToAmqpListConverter implements ObjectToMimeContentConverter<Lis
@Override
public boolean isAcceptable(final List list)
{
- return true;
+ return EncoderUtils.isEncodable(list);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17e6c7d6/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_0_10Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_0_10Test.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_0_10Test.java
new file mode 100644
index 0000000..aeb9a79
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_0_10Test.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessageMetaData;
+import org.apache.qpid.server.message.internal.InternalMessageMetaDataType;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.ListToJmsStreamMessage;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.MapToJmsMapMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageConverter_Internal_to_0_10Test extends QpidTestCase
+{
+ private final MessageConverter_Internal_to_v0_10 _converter = new MessageConverter_Internal_to_v0_10();
+ private final StoredMessage<InternalMessageMetaData> _handle = mock(StoredMessage.class);
+ private final AMQMessageHeader _amqpHeader = mock(AMQMessageHeader.class);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+
+ public void testStringMessage() throws Exception
+ {
+ String content = "testContent";
+ final String mimeType = "text/plain";
+ doTest(content, mimeType, content.getBytes(UTF_8), mimeType);
+ }
+
+ public void testStringMessageWithUnknownMimeType() throws Exception
+ {
+ String content = "testContent";
+ final String mimeType = "foo/bar";
+ doTest(content, mimeType, content.getBytes(UTF_8), "text/plain");
+ }
+
+ public void testStringMessageWithoutMimeType() throws Exception
+ {
+ String content = "testContent";
+ doTest(content, null, content.getBytes(UTF_8), "text/plain");
+ }
+
+ public void testListMessageWithMimeType() throws Exception
+ {
+ ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42);
+ final ListToJmsStreamMessage listToJmsStreamMessage = new ListToJmsStreamMessage();
+ final byte[] expectedContent = listToJmsStreamMessage.toMimeContent(content);
+ doTest(content, "foo/bar", expectedContent, listToJmsStreamMessage.getMimeType());
+ }
+
+ public void testListMessageWithoutMimeType() throws Exception
+ {
+ ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42);
+ final ListToJmsStreamMessage listToJmsStreamMessage = new ListToJmsStreamMessage();
+ final byte[] expectedContent = listToJmsStreamMessage.toMimeContent(content);
+ doTest(content, null, expectedContent, listToJmsStreamMessage.getMimeType());
+ }
+
+ public void testListMessageWithoutMimeTypeWithNonJmsContent() throws Exception
+ {
+ ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42, Lists.newArrayList());
+ final ListToAmqpListConverter listToAmqpListConverter = new ListToAmqpListConverter();
+ final byte[] expectedContent = listToAmqpListConverter.toMimeContent(content);
+ doTest(content, null, expectedContent, listToAmqpListConverter.getMimeType());
+ }
+
+ public void testListMessageWithoutMimeTypeWithNonConvertibleItem() throws Exception
+ {
+ ArrayList<?> content = Lists.newArrayList(new MySerializable());
+ final InternalMessage sourceMessage = getAmqMessage(content, null);
+ doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+ }
+
+ public void testByteArrayMessageWithoutMimeType() throws Exception
+ {
+ byte[] content = "testContent".getBytes(UTF_8);
+ doTest(content, null, content, "application/octet-stream");
+ }
+
+ public void testByteArrayMessageWithMimeType() throws Exception
+ {
+ byte[] content = "testContent".getBytes(UTF_8);
+ final String mimeType = "foo/bar";
+ doTest(content, mimeType, content, mimeType);
+ }
+
+ public void testEmptyByteArrayMessageWithMimeType() throws Exception
+ {
+ byte[] content = new byte[0];
+ final String mimeType = "foo/bar";
+ doTest(content, mimeType, content, mimeType);
+ }
+
+ public void testMapMessageWithMimeType() throws Exception
+ {
+ HashMap<Object, Object> content = new HashMap<>();
+ content.put("key1", 37);
+ content.put("key2", "foo");
+ final String mimeType = "foo/bar";
+ final MapToJmsMapMessage mapToJmsMapMessage = new MapToJmsMapMessage();
+ final byte[] expectedContent = mapToJmsMapMessage.toMimeContent(content);
+ doTest(content, mimeType, expectedContent, mapToJmsMapMessage.getMimeType());
+ }
+
+ public void testMapMessageWithoutMimeType() throws Exception
+ {
+ HashMap<Object, Object> content = new HashMap<>();
+ content.put("key1", 37);
+ content.put("key2", "foo");
+ final MapToJmsMapMessage mapToJmsMapMessage = new MapToJmsMapMessage();
+ final byte[] expectedContent = mapToJmsMapMessage.toMimeContent(content);
+ doTest(content, null, expectedContent, mapToJmsMapMessage.getMimeType());
+ }
+
+ public void testMapMessageWithMimeTypeWithNonJmsContent() throws Exception
+ {
+ HashMap<Object, Object> content = new HashMap<>();
+ content.put("key", Collections.singletonMap("foo", "bar"));
+ final String mimeType = "foo/bar";
+ final MapToAmqpMapConverter mapToAmqpMapConverter = new MapToAmqpMapConverter();
+ final byte[] expectedContent = mapToAmqpMapConverter.toMimeContent(content);
+ doTest(content, mimeType, expectedContent, mapToAmqpMapConverter.getMimeType());
+ }
+
+ public void testMapMessageWithoutMimeTypeWithNonConvertibleEntry() throws Exception
+ {
+ HashMap<Object, Object> content = new HashMap<>();
+ content.put(37, new MySerializable());
+
+ doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+ }
+
+ public void testSerializableMessageWithMimeType() throws Exception
+ {
+ Serializable content = new MySerializable();
+ final String mimeType = "foo/bar";
+ doTest(content, mimeType, getObjectStreamMessageBytes(content), "application/java-object-stream");
+ }
+
+ public void testSerializableMessageWithoutMimeType() throws Exception
+ {
+ Serializable content = new MySerializable();
+ doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+ }
+
+ public void testNullMessageWithoutMimeType() throws Exception
+ {
+ doTest(null, null, null, null);
+ }
+
+
+ private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception
+ {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos))
+ {
+ oos.writeObject(o);
+ return bos.toByteArray();
+ }
+ }
+
+
+ protected InternalMessage getAmqMessage(final Serializable content, final String mimeType) throws Exception
+ {
+ final byte[] serializedContent = getObjectStreamMessageBytes(content);
+ configureMessageContent(serializedContent);
+ configureMessageHeader(mimeType);
+
+ final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(_amqpHeader);
+ final int contentSize = serializedContent == null ? 0 : serializedContent.length;
+ final InternalMessageMetaData metaData =
+ new InternalMessageMetaData(false, internalMessageHeader, contentSize);
+ when(_handle.getMetaData()).thenReturn(metaData);
+
+ return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(_handle));
+ }
+
+ private void configureMessageHeader(final String mimeType)
+ {
+ when(_amqpHeader.getMimeType()).thenReturn(mimeType);
+ }
+
+ private void configureMessageContent(byte[] section)
+ {
+ if (section == null)
+ {
+ section = new byte[0];
+ }
+ final QpidByteBuffer combined = QpidByteBuffer.wrap(section);
+ when(_handle.getContentSize()).thenReturn(section.length);
+ final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
+ final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+
+ when(_handle.getContent(offsetCaptor.capture(),
+ sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+ {
+ @Override
+ public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
+ {
+ final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
+ return Collections.singleton(view);
+ }
+ });
+ }
+
+ private void doTest(final Serializable messageBytes,
+ final String mimeType,
+ final byte[] expectedContent,
+ final String expectedContentType) throws Exception
+ {
+ final InternalMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
+ final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+ final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+ assertArrayEquals("Unexpected content", expectedContent != null ? expectedContent : new byte[0], getBytes(content));
+ assertEquals("Unexpected content type", expectedContentType, convertedMessage.getMessageHeader().getMimeType());
+ }
+
+ private byte[] getBytes(final Collection<QpidByteBuffer> content) throws Exception
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ for (QpidByteBuffer buf : content)
+ {
+ ByteStreams.copy(buf.asInputStream(), bos);
+ buf.dispose();
+ }
+ return bos.toByteArray();
+ }
+
+ private static class MySerializable implements Serializable
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17e6c7d6/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index a83f4df..a8ff769 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -71,7 +71,16 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
{
Object messageBody = serverMsg.getMessageBody();
ObjectToMimeContentConverter converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(messageBody);
- final byte[] messageContent = converter == null ? new byte[] {} : converter.toMimeContent(messageBody);
+ final byte[] messageContent;
+ try
+ {
+ messageContent = converter == null ? new byte[] {} : converter.toMimeContent(messageBody);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new MessageConversionException("Could not convert message from Internal to 0-8 because"
+ + " conversion of message content failed.", e);
+ }
String mimeType = converter == null ? null : converter.getMimeType();
mimeType = improveMimeType(serverMsg, mimeType);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17e6c7d6/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_0_8Test.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_0_8Test.java
new file mode 100644
index 0000000..6de6ceb
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_0_8Test.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessageMetaData;
+import org.apache.qpid.server.message.internal.InternalMessageMetaDataType;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.ListToJmsStreamMessage;
+import org.apache.qpid.server.typedmessage.mimecontentconverter.MapToJmsMapMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageConverter_Internal_to_0_8Test extends QpidTestCase
+{
+ private final MessageConverter_Internal_to_v0_8 _converter = new MessageConverter_Internal_to_v0_8();
+ private final StoredMessage<InternalMessageMetaData> _handle = mock(StoredMessage.class);
+ private final AMQMessageHeader _amqpHeader = mock(AMQMessageHeader.class);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+
+ public void testStringMessage() throws Exception
+ {
+ String content = "testContent";
+ final String mimeType = "text/plain";
+ doTest(content, mimeType, content.getBytes(UTF_8), mimeType);
+ }
+
+ public void testStringMessageWithUnknownMimeType() throws Exception
+ {
+ String content = "testContent";
+ final String mimeType = "foo/bar";
+ doTest(content, mimeType, content.getBytes(UTF_8), "text/plain");
+ }
+
+ public void testStringMessageWithoutMimeType() throws Exception
+ {
+ String content = "testContent";
+ doTest(content, null, content.getBytes(UTF_8), "text/plain");
+ }
+
+ public void testListMessageWithMimeType() throws Exception
+ {
+ ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42);
+ final ListToJmsStreamMessage listToJmsStreamMessage = new ListToJmsStreamMessage();
+ final byte[] expectedContent = listToJmsStreamMessage.toMimeContent(content);
+ doTest(content, "foo/bar", expectedContent, listToJmsStreamMessage.getMimeType());
+ }
+
+ public void testListMessageWithoutMimeType() throws Exception
+ {
+ ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42);
+ final ListToJmsStreamMessage listToJmsStreamMessage = new ListToJmsStreamMessage();
+ final byte[] expectedContent = listToJmsStreamMessage.toMimeContent(content);
+ doTest(content, null, expectedContent, listToJmsStreamMessage.getMimeType());
+ }
+
+ public void testListMessageWithoutMimeTypeWithNonJmsContent() throws Exception
+ {
+ ArrayList<?> content = Lists.newArrayList("testItem", 37.5, 42, Lists.newArrayList());
+ final ListToAmqpListConverter listToAmqpListConverter = new ListToAmqpListConverter();
+ final byte[] expectedContent = listToAmqpListConverter.toMimeContent(content);
+ doTest(content, null, expectedContent, listToAmqpListConverter.getMimeType());
+ }
+
+ public void testListMessageWithoutMimeTypeWithNonConvertibleItem() throws Exception
+ {
+ ArrayList<?> content = Lists.newArrayList(new MySerializable());
+ final InternalMessage sourceMessage = getAmqMessage(content, null);
+ doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+ }
+
+ public void testByteArrayMessageWithoutMimeType() throws Exception
+ {
+ byte[] content = "testContent".getBytes(UTF_8);
+ doTest(content, null, content, "application/octet-stream");
+ }
+
+ public void testByteArrayMessageWithMimeType() throws Exception
+ {
+ byte[] content = "testContent".getBytes(UTF_8);
+ final String mimeType = "foo/bar";
+ doTest(content, mimeType, content, mimeType);
+ }
+
+ public void testEmptyByteArrayMessageWithMimeType() throws Exception
+ {
+ byte[] content = new byte[0];
+ final String mimeType = "foo/bar";
+ doTest(content, mimeType, content, mimeType);
+ }
+
+ public void testMapMessageWithMimeType() throws Exception
+ {
+ HashMap<Object, Object> content = new HashMap<>();
+ content.put("key1", 37);
+ content.put("key2", "foo");
+ final String mimeType = "foo/bar";
+ final MapToJmsMapMessage mapToJmsMapMessage = new MapToJmsMapMessage();
+ final byte[] expectedContent = mapToJmsMapMessage.toMimeContent(content);
+ doTest(content, mimeType, expectedContent, mapToJmsMapMessage.getMimeType());
+ }
+
+ public void testMapMessageWithoutMimeType() throws Exception
+ {
+ HashMap<Object, Object> content = new HashMap<>();
+ content.put("key1", 37);
+ content.put("key2", "foo");
+ final MapToJmsMapMessage mapToJmsMapMessage = new MapToJmsMapMessage();
+ final byte[] expectedContent = mapToJmsMapMessage.toMimeContent(content);
+ doTest(content, null, expectedContent, mapToJmsMapMessage.getMimeType());
+ }
+
+ public void testMapMessageWithMimeTypeWithNonJmsContent() throws Exception
+ {
+ HashMap<Object, Object> content = new HashMap<>();
+ content.put("key", Collections.singletonMap("foo", "bar"));
+ final String mimeType = "foo/bar";
+ final MapToAmqpMapConverter mapToAmqpMapConverter = new MapToAmqpMapConverter();
+ final byte[] expectedContent = mapToAmqpMapConverter.toMimeContent(content);
+ doTest(content, mimeType, expectedContent, mapToAmqpMapConverter.getMimeType());
+ }
+
+ public void testMapMessageWithoutMimeTypeWithNonConvertibleEntry() throws Exception
+ {
+ HashMap<Object, Object> content = new HashMap<>();
+ content.put(37, new MySerializable());
+
+ doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+ }
+
+ public void testSerializableMessageWithMimeType() throws Exception
+ {
+ Serializable content = new MySerializable();
+ final String mimeType = "foo/bar";
+ doTest(content, mimeType, getObjectStreamMessageBytes(content), "application/java-object-stream");
+ }
+
+ public void testSerializableMessageWithoutMimeType() throws Exception
+ {
+ Serializable content = new MySerializable();
+ doTest(content, null, getObjectStreamMessageBytes(content), "application/java-object-stream");
+ }
+
+ public void testNullMessageWithoutMimeType() throws Exception
+ {
+ doTest(null, null, null, null);
+ }
+
+
+ private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception
+ {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos))
+ {
+ oos.writeObject(o);
+ return bos.toByteArray();
+ }
+ }
+
+
+ protected InternalMessage getAmqMessage(final Serializable content, final String mimeType) throws Exception
+ {
+ final byte[] serializedContent = getObjectStreamMessageBytes(content);
+ configureMessageContent(serializedContent);
+ configureMessageHeader(mimeType);
+
+ final InternalMessageHeader internalMessageHeader = new InternalMessageHeader(_amqpHeader);
+ final int contentSize = serializedContent == null ? 0 : serializedContent.length;
+ final InternalMessageMetaData metaData =
+ new InternalMessageMetaData(false, internalMessageHeader, contentSize);
+ when(_handle.getMetaData()).thenReturn(metaData);
+
+ return ((InternalMessage) InternalMessageMetaDataType.INSTANCE.createMessage(_handle));
+ }
+
+ private void configureMessageHeader(final String mimeType)
+ {
+ when(_amqpHeader.getMimeType()).thenReturn(mimeType);
+ }
+
+ private void configureMessageContent(byte[] section)
+ {
+ if (section == null)
+ {
+ section = new byte[0];
+ }
+ final QpidByteBuffer combined = QpidByteBuffer.wrap(section);
+ when(_handle.getContentSize()).thenReturn(section.length);
+ final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
+ final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+
+ when(_handle.getContent(offsetCaptor.capture(),
+ sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+ {
+ @Override
+ public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
+ {
+ final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
+ return Collections.singleton(view);
+ }
+ });
+ }
+
+ private void doTest(final Serializable messageBytes,
+ final String mimeType,
+ final byte[] expectedContent,
+ final String expectedContentType) throws Exception
+ {
+ final InternalMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
+ final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+ final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+
+ assertArrayEquals("Unexpected content", expectedContent != null ? expectedContent : new byte[0], getBytes(content));
+ assertEquals("Unexpected content type", expectedContentType, convertedMessage.getMessageHeader().getMimeType());
+ }
+
+ private byte[] getBytes(final Collection<QpidByteBuffer> content) throws Exception
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ for (QpidByteBuffer buf : content)
+ {
+ ByteStreams.copy(buf.asInputStream(), bos);
+ buf.dispose();
+ }
+ return bos.toByteArray();
+ }
+
+ private static class MySerializable implements Serializable
+ {
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-broker-j git commit: QPID-7434: [Java Broker] Improve AMQP
0-x to Internal conversion and add unit tests.
Posted by lq...@apache.org.
QPID-7434: [Java Broker] Improve AMQP 0-x to Internal conversion and add unit tests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/939cda5b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/939cda5b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/939cda5b
Branch: refs/heads/master
Commit: 939cda5bf529bc0ecf911f6af586eb48573bbcca
Parents: 385167e
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu Aug 10 17:15:41 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Aug 11 15:14:53 2017 +0100
----------------------------------------------------------------------
.../mimecontentconverter/ConversionUtils.java | 34 ++
.../mimecontentconverter/TextPlainToString.java | 2 +-
.../mimecontentconverter/TextXmlToString.java | 2 +-
.../JmsMapMessageToMap.java | 6 +
.../JmsStreamMessageToList.java | 6 +
.../MessageConverter_v0_10_to_Internal.java | 97 ++++-
.../AmqpListToListConverter.java | 6 +
.../AmqpMapToMapConverter.java | 6 +
.../MessageConverter_0_10_to_InternalTest.java | 416 +++++++++++++++++++
broker-plugins/amqp-0-8-protocol/pom.xml | 6 +
.../v0_8/MessageConverter_v0_8_to_Internal.java | 95 ++++-
.../MessageConverter_0_8_to_InternalTest.java | 414 ++++++++++++++++++
.../v1_0/MessageConverter_from_1_0.java | 19 +-
.../protocol/v1_0/MessageConverter_to_1_0.java | 34 +-
.../v1_0/MessageConverter_v1_0_to_Internal.java | 21 +-
.../MessageConverter_0_10_to_1_0Test.java | 5 +-
16 files changed, 1112 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java
new file mode 100644
index 0000000..98a9ba8
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/ConversionUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.message.mimecontentconverter;
+
+import java.util.regex.Pattern;
+
+public class ConversionUtils
+{
+ public static final Pattern
+ TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$");
+ public static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$");
+ public static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$");
+ public static final Pattern
+ OBJECT_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/x-java-serialized-object|application/java-object-stream$");
+ public static final Pattern BYTES_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/octet-stream$");
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java
index d71882a..01e8ca9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextPlainToString.java
@@ -48,6 +48,6 @@ public class TextPlainToString implements MimeContentToObjectConverter<String>
@Override
public String toObject(final byte[] data)
{
- return new String(data, StandardCharsets.UTF_8);
+ return data == null ? "" : new String(data, StandardCharsets.UTF_8);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java
index aeba9dd..97026d5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/TextXmlToString.java
@@ -48,6 +48,6 @@ public class TextXmlToString implements MimeContentToObjectConverter<String>
@Override
public String toObject(final byte[] data)
{
- return new String(data, StandardCharsets.UTF_8);
+ return data == null ? "" : new String(data, StandardCharsets.UTF_8);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java
index 37b1db7..76db633 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsMapMessageToMap.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.typedmessage.mimecontentconverter;
import java.io.EOFException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -54,6 +55,11 @@ public class JmsMapMessageToMap implements MimeContentToObjectConverter<Map>
@Override
public Map toObject(final byte[] data)
{
+ if (data == null || data.length == 0)
+ {
+ return Collections.emptyMap();
+ }
+
TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
LinkedHashMap map = new LinkedHashMap();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java
index 7be7d9f..657f66e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/typedmessage/mimecontentconverter/JmsStreamMessageToList.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.typedmessage.mimecontentconverter;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
@@ -54,6 +55,11 @@ public class JmsStreamMessageToList implements MimeContentToObjectConverter<List
@Override
public List toObject(final byte[] data)
{
+ if (data == null || data.length == 0)
+ {
+ return Collections.emptyList();
+ }
+
TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
List<Object> list = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index e507ce2..6868b14 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -20,13 +20,24 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.LIST_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.MAP_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.TEXT_CONTENT_TYPES;
+
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
import org.apache.qpid.server.model.NamedAddressSpace;
@@ -75,9 +86,8 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
}
Object body = convertMessageBody(mimeType, data);
- MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
- AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo(), encoding);
- return InternalMessage.convert(serverMessage, fixedHeader, body);
+ final AMQMessageHeader convertedHeader = convertHeader(serverMessage, addressSpace, body, encoding);
+ return InternalMessage.convert(serverMessage, convertedHeader, body);
}
@Override
@@ -86,6 +96,55 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
}
+ private AMQMessageHeader convertHeader(final MessageTransferMessage serverMessage,
+ final NamedAddressSpace addressSpace,
+ final Object convertedBodyObject, final String encoding)
+ {
+ final String convertedMimeType = getInternalConvertedMimeType(serverMessage, convertedBodyObject);
+ final AMQMessageHeader messageHeader = serverMessage.getMessageHeader();
+
+ Map<String, Object> headers = new HashMap<>();
+ messageHeader.getHeaderNames()
+ .forEach(headerName -> headers.put(headerName, messageHeader.getHeader(headerName)));
+
+ final InternalMessageHeader header = new InternalMessageHeader(headers,
+ messageHeader.getCorrelationId(),
+ messageHeader.getExpiration(),
+ messageHeader.getUserId(),
+ messageHeader.getAppId(),
+ messageHeader.getMessageId(),
+ convertedMimeType,
+ messageHeader.getEncoding(),
+ messageHeader.getPriority(),
+ messageHeader.getTimestamp(),
+ messageHeader.getNotValidBefore(),
+ messageHeader.getType(),
+ messageHeader.getReplyTo(),
+ serverMessage.getArrivalTime());
+ MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
+ final ReplyTo replyTo = messageProps == null ? null : messageProps.getReplyTo();
+ return new DelegatingMessageHeader(header, replyTo, encoding);
+ }
+
+ private String getInternalConvertedMimeType(final MessageTransferMessage serverMessage, final Object convertedBodyObject)
+ {
+ String originalMimeType = serverMessage.getMessageHeader().getMimeType();
+ if (originalMimeType != null)
+ {
+ if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches()
+ || ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches())
+ {
+ return null;
+ }
+ else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches())
+ {
+ return "application/x-java-serialized-object";
+ }
+ }
+
+ return originalMimeType;
+ }
+
private static class DelegatingMessageHeader implements AMQMessageHeader
{
private final AMQMessageHeader _delegate;
@@ -206,14 +265,38 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
private static Object convertMessageBody(String mimeType, byte[] data)
{
MimeContentToObjectConverter converter = MimeContentConverterRegistry.getMimeContentToObjectConverter(mimeType);
- if (converter != null)
+ if (data != null && data.length != 0)
+ {
+ if (converter != null)
+ {
+ return converter.toObject(data);
+ }
+ else if (mimeType != null && TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+ {
+ return new String(data, UTF_8);
+ }
+ }
+ else if (mimeType == null)
+ {
+ return null;
+ }
+ else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+ {
+ return new byte[0];
+ }
+ else if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+ {
+ return "";
+ }
+ else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
{
- return converter.toObject(data);
+ return Collections.emptyMap();
}
- else
+ else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
{
- return data;
+ return Collections.emptyList();
}
+ return data;
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java
index 8139db1..1cfb640 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpListToListConverter.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
@@ -51,6 +52,11 @@ public class AmqpListToListConverter implements MimeContentToObjectConverter<Lis
@Override
public List toObject(final byte[] data)
{
+ if (data == null || data.length == 0)
+ {
+ return Collections.emptyList();
+ }
+
BBDecoder decoder = new BBDecoder();
decoder.init(ByteBuffer.wrap(data));
return decoder.readList();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java
index 4280481..ce9d098 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/transport/mimecontentconverter/AmqpMapToMapConverter.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
@@ -51,6 +52,11 @@ public class AmqpMapToMapConverter implements MimeContentToObjectConverter<Map>
@Override
public Map toObject(final byte[] data)
{
+ if (data == null || data.length == 0)
+ {
+ return Collections.emptyMap();
+ }
+
BBDecoder decoder = new BBDecoder();
decoder.init(ByteBuffer.wrap(data));
return decoder.readMap();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java
new file mode 100644
index 0000000..5a28f4f
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_0_10_to_InternalTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageConverter_0_10_to_InternalTest extends QpidTestCase
+{
+ private final MessageConverter_v0_10_to_Internal _converter = new MessageConverter_v0_10_to_Internal();
+
+ private final StoredMessage<MessageMetaData_0_10> _handle = mock(StoredMessage.class);
+
+ private final MessageMetaData_0_10 _metaData = mock(MessageMetaData_0_10.class);
+ private final AMQMessageHeader _amqpHeader = mock(AMQMessageHeader.class);
+ private final Header _header = mock(Header.class);
+ private MessageProperties _messageProperties;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _messageProperties = new MessageProperties();
+
+ when(_handle.getMetaData()).thenReturn(_metaData);
+ when(_header.getMessageProperties()).thenReturn(_messageProperties);
+ when(_metaData.getHeader()).thenReturn(_header);
+ when(_metaData.getMessageHeader()).thenReturn(_amqpHeader);
+ when(_metaData.getMessageProperties()).thenReturn(_messageProperties);
+ }
+
+ public void testConvertStringMessageBody() throws Exception
+ {
+ doTestTextMessage("helloworld", "text/plain");
+ }
+
+ public void testConvertEmptyStringMessageBody() throws Exception
+ {
+ doTestTextMessage(null, "text/plain");
+ }
+
+ public void testConvertStringXmlMessageBody() throws Exception
+ {
+ doTestTextMessage("<helloworld></helloworld>", "text/xml");
+ }
+
+ public void testConvertEmptyStringXmlMessageBody() throws Exception
+ {
+ doTestTextMessage(null, "text/xml");
+ }
+
+ public void testConvertEmptyStringApplicationXmlMessageBody() throws Exception
+ {
+ doTestTextMessage(null, "application/xml");
+ }
+
+ public void testConvertStringWithContentTypeText() throws Exception
+ {
+ doTestTextMessage("foo","text/foobar");
+ }
+
+ public void testConvertStringWithContentTypeApplicationXml() throws Exception
+ {
+ doTestTextMessage("<helloworld></helloworld>","application/xml");
+ }
+
+ public void testConvertStringWithContentTypeApplicationXmlDtd() throws Exception
+ {
+ doTestTextMessage("<!DOCTYPE name []>","application/xml-dtd");
+ }
+
+ public void testConvertStringWithContentTypeApplicationFooXml() throws Exception
+ {
+ doTestTextMessage("<helloworld></helloworld>","application/foo+xml");
+ }
+
+ public void testConvertStringWithContentTypeApplicationJson() throws Exception
+ {
+ doTestTextMessage("[]","application/json");
+ }
+
+ public void testConvertStringWithContentTypeApplicationFooJson() throws Exception
+ {
+ doTestTextMessage("[]","application/foo+json");
+ }
+
+ public void testConvertStringWithContentTypeApplicationJavascript() throws Exception
+ {
+ doTestTextMessage("var foo","application/javascript");
+ }
+
+ public void testConvertStringWithContentTypeApplicationEcmascript() throws Exception
+ {
+ doTestTextMessage("var foo","application/ecmascript");
+ }
+
+ public void testConvertBytesMessageBody() throws Exception
+ {
+ doTestBytesMessage("helloworld".getBytes());
+ }
+
+ public void testConvertBytesMessageBodyNoContentType() throws Exception
+ {
+ final byte[] messageContent = "helloworld".getBytes();
+ doTest(messageContent, null, messageContent, null);
+ }
+
+ public void testConvertMessageBodyUnknownContentType() throws Exception
+ {
+ final byte[] messageContent = "helloworld".getBytes();
+ final String mimeType = "my/bytes";
+ doTest(messageContent, mimeType, messageContent, mimeType);
+ }
+
+
+ public void testConvertEmptyBytesMessageBody() throws Exception
+ {
+ doTestBytesMessage(new byte[0]);
+ }
+
+ public void testConvertJmsStreamMessageBody() throws Exception
+ {
+ final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+ final byte[] messageBytes = getJmsStreamMessageBytes(expected);
+
+ final String mimeType = "jms/stream-message";
+ doTestStreamMessage(messageBytes, mimeType, expected);
+ }
+
+ public void testConvertEmptyJmsStreamMessageBody() throws Exception
+ {
+ final List<Object> expected = Lists.newArrayList();
+ final String mimeType = "jms/stream-message";
+ doTestStreamMessage(null, mimeType, expected);
+ }
+
+ public void testConvertAmqpListMessageBody() throws Exception
+ {
+ final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+ final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected);
+
+ doTestStreamMessage(messageBytes, "amqp/list", expected);
+ }
+
+ public void testConvertEmptyAmqpListMessageBody() throws Exception
+ {
+ final List<Object> expected = Lists.newArrayList();
+ doTestStreamMessage(null, "amqp/list", expected);
+ }
+
+ public void testConvertJmsMapMessageBody() throws Exception
+ {
+ final Map<String, Object> expected = Collections.singletonMap("key", "value");
+ final byte[] messageBytes = getJmsMapMessageBytes(expected);
+
+ doTestMapMessage(messageBytes, "jms/map-message", expected);
+ }
+
+ public void testConvertEmptyJmsMapMessageBody() throws Exception
+ {
+ doTestMapMessage(null, "jms/map-message", Collections.emptyMap());
+ }
+
+ public void testConvertAmqpMapMessageBody() throws Exception
+ {
+ final Map<String, Object> expected = Collections.singletonMap("key", "value");
+ final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected);
+
+ doTestMapMessage(messageBytes, "amqp/map", expected);
+ }
+
+ public void testConvertEmptyAmqpMapMessageBody() throws Exception
+ {
+ doTestMapMessage(null, "amqp/map", Collections.emptyMap());
+ }
+
+ public void testConvertObjectStreamMessageBody() throws Exception
+ {
+ final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+ doTestObjectMessage(messageBytes, "application/java-object-stream", messageBytes);
+ }
+
+ public void testConvertObjectStream2MessageBody() throws Exception
+ {
+ final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+ doTestObjectMessage(messageBytes, "application/x-java-serialized-object", messageBytes);
+ }
+
+ public void testConvertEmptyObjectStreamMessageBody() throws Exception
+ {
+ doTestObjectMessage(null, "application/java-object-stream", new byte[0]);
+ }
+
+ public void testConvertEmptyMessageWithoutContentType() throws Exception
+ {
+ doTest(null, null, null, null);
+ }
+
+ public void testConvertEmptyMessageWithUnknownContentType() throws Exception
+ {
+ doTest(null, "foo/bar", new byte[0], "foo/bar");
+ }
+
+ public void testConvertMessageWithoutContentType() throws Exception
+ {
+ final byte[] expectedContent = "someContent".getBytes(UTF_8);
+ doTest(expectedContent, null, expectedContent, null);
+ }
+
+
+ private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception
+ {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos))
+ {
+ oos.writeObject(o);
+ return bos.toByteArray();
+ }
+ }
+
+ private byte[] getJmsStreamMessageBytes(List<Object> objects) throws Exception
+ {
+ TypedBytesContentWriter writer = new TypedBytesContentWriter();
+ for (Object o : objects)
+ {
+ writer.writeObject(o);
+ }
+ return getBytes(writer);
+ }
+
+ private byte[] getJmsMapMessageBytes(Map<String, Object> map) throws Exception
+ {
+ TypedBytesContentWriter writer = new TypedBytesContentWriter();
+ writer.writeIntImpl(map.size());
+ for (Map.Entry<String, Object> entry : map.entrySet())
+ {
+ writer.writeNullTerminatedStringImpl(entry.getKey());
+ writer.writeObject(entry.getValue());
+ }
+ return getBytes(writer);
+ }
+
+ private byte[] getBytes(final TypedBytesContentWriter writer)
+ {
+ ByteBuffer buf = writer.getData();
+ final byte[] expected = new byte[buf.remaining()];
+ buf.get(expected);
+ return expected;
+ }
+
+ private MessageTransferMessage getAmqMessage(final byte[] expected, final String mimeType)
+ {
+ configureMessageContent(expected);
+ configureMessageHeader(mimeType);
+
+ return new MessageTransferMessage(_handle, new Object());
+ }
+
+ private void configureMessageHeader(final String mimeType)
+ {
+ when(_amqpHeader.getMimeType()).thenReturn(mimeType);
+ _messageProperties.setContentType(mimeType);
+ }
+
+ private void configureMessageContent(byte[] section)
+ {
+ if (section == null)
+ {
+ section = new byte[0];
+ }
+ final QpidByteBuffer combined = QpidByteBuffer.wrap(section);
+ when(_handle.getContentSize()).thenReturn(section.length);
+ final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
+ final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+
+ when(_handle.getContent(offsetCaptor.capture(),
+ sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+ {
+ @Override
+ public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
+ {
+ final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
+ return Collections.singleton(view);
+ }
+ });
+ }
+
+ private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception
+ {
+
+ final byte[] contentBytes;
+ final String expectedContent;
+ if (originalContent == null)
+ {
+ contentBytes = null;
+ expectedContent = "";
+ }
+ else
+ {
+ contentBytes = originalContent.getBytes(UTF_8);
+ expectedContent = originalContent;
+ }
+ doTest(contentBytes, mimeType, expectedContent, mimeType);
+ }
+
+
+ private void doTestMapMessage(final byte[] messageBytes,
+ final String mimeType,
+ final Map<String, Object> expected) throws Exception
+ {
+ doTest(messageBytes, mimeType, expected, null);
+ }
+
+ private void doTestBytesMessage(final byte[] messageContent) throws Exception
+ {
+ doTest(messageContent,"application/octet-stream", messageContent, "application/octet-stream");
+ }
+
+ private void doTestStreamMessage(final byte[] messageBytes,
+ final String mimeType,
+ final List<Object> expected) throws Exception
+ {
+ doTest(messageBytes, mimeType, expected, null);
+ }
+
+ private void doTestObjectMessage(final byte[] messageBytes,
+ final String mimeType,
+ final byte[] expectedBytes)
+ throws Exception
+ {
+ doTest(messageBytes, mimeType, expectedBytes, "application/x-java-serialized-object");
+ }
+
+ private void doTest(final byte[] messageBytes,
+ final String mimeType,
+ final Object expectedContent,
+ final String expectedMimeType) throws Exception
+ {
+ final MessageTransferMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
+ final InternalMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+ if (expectedContent instanceof byte[])
+ {
+ assertArrayEquals("Unexpected content",
+ ((byte[]) expectedContent),
+ ((byte[]) convertedMessage.getMessageBody()));
+ }
+ else if (expectedContent instanceof List)
+ {
+ assertEquals("Unexpected content",
+ new ArrayList<Object>((Collection) expectedContent),
+ new ArrayList<Object>((Collection) convertedMessage.getMessageBody()));
+ }
+ else if (expectedContent instanceof Map)
+ {
+ assertEquals("Unexpected content",
+ new HashMap<Object,Object>((Map) expectedContent),
+ new HashMap<Object,Object>((Map) convertedMessage.getMessageBody()));
+ }
+ else
+ {
+ assertEquals("Unexpected content", expectedContent, convertedMessage.getMessageBody());
+ }
+ String convertedMimeType = convertedMessage.getMessageHeader().getMimeType();
+ assertEquals("Unexpected content type", expectedMimeType, convertedMimeType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-8-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/pom.xml b/broker-plugins/amqp-0-8-protocol/pom.xml
index a40955f..010c9aa 100644
--- a/broker-plugins/amqp-0-8-protocol/pom.xml
+++ b/broker-plugins/amqp-0-8-protocol/pom.xml
@@ -59,6 +59,12 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index fb43b6a..dbd2194 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -20,14 +20,26 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.LIST_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.MAP_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.TEXT_CONTENT_TYPES;
+
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
import org.apache.qpid.server.model.NamedAddressSpace;
@@ -75,10 +87,55 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
}
Object body = convertMessageBody(mimeType, data);
+ final AMQMessageHeader convertedHeader = convertHeader(serverMessage, addressSpace, body, encoding);
+ return InternalMessage.convert(serverMessage, convertedHeader, body);
+ }
+
+ private AMQMessageHeader convertHeader(final AMQMessage serverMessage,
+ final NamedAddressSpace addressSpace,
+ final Object convertedBodyObject, final String encoding)
+ {
+ final String convertedMimeType = getInternalConvertedMimeType(serverMessage, convertedBodyObject);
+ final AMQMessageHeader messageHeader = serverMessage.getMessageHeader();
+
+ Map<String, Object> headers = new HashMap<>();
+ messageHeader.getHeaderNames()
+ .forEach(headerName -> headers.put(headerName, messageHeader.getHeader(headerName)));
+
+ final InternalMessageHeader header = new InternalMessageHeader(headers,
+ messageHeader.getCorrelationId(),
+ messageHeader.getExpiration(),
+ messageHeader.getUserId(),
+ messageHeader.getAppId(),
+ messageHeader.getMessageId(),
+ convertedMimeType,
+ messageHeader.getEncoding(),
+ messageHeader.getPriority(),
+ messageHeader.getTimestamp(),
+ messageHeader.getNotValidBefore(),
+ messageHeader.getType(),
+ messageHeader.getReplyTo(),
+ serverMessage.getArrivalTime());
+ return new DelegatingMessageHeader(header, encoding);
+ }
+
+ private String getInternalConvertedMimeType(final AMQMessage serverMessage, final Object convertedBodyObject)
+ {
+ String originalMimeType = serverMessage.getMessageHeader().getMimeType();
+ if (originalMimeType != null)
+ {
+ if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches()
+ || ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches())
+ {
+ return null;
+ }
+ else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(originalMimeType).matches())
+ {
+ return "application/x-java-serialized-object";
+ }
+ }
- return InternalMessage.convert(serverMessage,
- new DelegatingMessageHeader(serverMessage.getMessageHeader(), encoding),
- body);
+ return originalMimeType;
}
@Override
@@ -298,14 +355,38 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
private static Object convertMessageBody(String mimeType, byte[] data)
{
MimeContentToObjectConverter converter = MimeContentConverterRegistry.getMimeContentToObjectConverter(mimeType);
- if (converter != null)
+ if (data != null && data.length != 0)
+ {
+ if (converter != null)
+ {
+ return converter.toObject(data);
+ }
+ else if (mimeType != null && TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+ {
+ return new String(data, UTF_8);
+ }
+ }
+ else if (mimeType == null)
+ {
+ return null;
+ }
+ else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+ {
+ return new byte[0];
+ }
+ else if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+ {
+ return "";
+ }
+ else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
{
- return converter.toObject(data);
+ return Collections.emptyMap();
}
- else
+ else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
{
- return data;
+ return Collections.emptyList();
}
+ return data;
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java
new file mode 100644
index 0000000..0f32481
--- /dev/null
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_0_8_to_InternalTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter;
+import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class MessageConverter_0_8_to_InternalTest extends QpidTestCase
+{
+ private final MessageConverter_v0_8_to_Internal _converter = new MessageConverter_v0_8_to_Internal();
+
+ private final StoredMessage<MessageMetaData> _handle = mock(StoredMessage.class);
+
+ private final MessageMetaData _metaData = mock(MessageMetaData.class);
+ private final AMQMessageHeader _header = mock(AMQMessageHeader.class);
+ private final ContentHeaderBody _contentHeaderBody = mock(ContentHeaderBody.class);
+ private final BasicContentHeaderProperties _basicContentHeaderProperties = mock(BasicContentHeaderProperties.class);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ when(_handle.getMetaData()).thenReturn(_metaData);
+ when(_metaData.getMessageHeader()).thenReturn(_header);
+ when(_metaData.getMessagePublishInfo()).thenReturn(new MessagePublishInfo());
+ when(_metaData.getContentHeaderBody()).thenReturn(_contentHeaderBody);
+ when(_contentHeaderBody.getProperties()).thenReturn(_basicContentHeaderProperties);
+ }
+
+ public void testConvertStringMessageBody() throws Exception
+ {
+ doTestTextMessage("helloworld", "text/plain");
+ }
+
+ public void testConvertEmptyStringMessageBody() throws Exception
+ {
+ doTestTextMessage(null, "text/plain");
+ }
+
+ public void testConvertStringXmlMessageBody() throws Exception
+ {
+ doTestTextMessage("<helloworld></helloworld>", "text/xml");
+ }
+
+ public void testConvertEmptyStringXmlMessageBody() throws Exception
+ {
+ doTestTextMessage(null, "text/xml");
+ }
+
+ public void testConvertEmptyStringApplicationXmlMessageBody() throws Exception
+ {
+ doTestTextMessage(null, "application/xml");
+ }
+
+ public void testConvertStringWithContentTypeText() throws Exception
+ {
+ doTestTextMessage("foo","text/foobar");
+ }
+
+ public void testConvertStringWithContentTypeApplicationXml() throws Exception
+ {
+ doTestTextMessage("<helloworld></helloworld>","application/xml");
+ }
+
+ public void testConvertStringWithContentTypeApplicationXmlDtd() throws Exception
+ {
+ doTestTextMessage("<!DOCTYPE name []>","application/xml-dtd");
+ }
+
+ public void testConvertStringWithContentTypeApplicationFooXml() throws Exception
+ {
+ doTestTextMessage("<helloworld></helloworld>","application/foo+xml");
+ }
+
+ public void testConvertStringWithContentTypeApplicationJson() throws Exception
+ {
+ doTestTextMessage("[]","application/json");
+ }
+
+ public void testConvertStringWithContentTypeApplicationFooJson() throws Exception
+ {
+ doTestTextMessage("[]","application/foo+json");
+ }
+
+ public void testConvertStringWithContentTypeApplicationJavascript() throws Exception
+ {
+ doTestTextMessage("var foo","application/javascript");
+ }
+
+ public void testConvertStringWithContentTypeApplicationEcmascript() throws Exception
+ {
+ doTestTextMessage("var foo","application/ecmascript");
+ }
+
+ public void testConvertBytesMessageBody() throws Exception
+ {
+ doTestBytesMessage("helloworld".getBytes());
+ }
+
+ public void testConvertBytesMessageBodyNoContentType() throws Exception
+ {
+ final byte[] messageContent = "helloworld".getBytes();
+ doTest(messageContent, null, messageContent, null);
+ }
+
+ public void testConvertMessageBodyUnknownContentType() throws Exception
+ {
+ final byte[] messageContent = "helloworld".getBytes();
+ final String mimeType = "my/bytes";
+ doTest(messageContent, mimeType, messageContent, mimeType);
+ }
+
+
+ public void testConvertEmptyBytesMessageBody() throws Exception
+ {
+ doTestBytesMessage(new byte[0]);
+ }
+
+ public void testConvertJmsStreamMessageBody() throws Exception
+ {
+ final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+ final byte[] messageBytes = getJmsStreamMessageBytes(expected);
+
+ final String mimeType = "jms/stream-message";
+ doTestStreamMessage(messageBytes, mimeType, expected);
+ }
+
+ public void testConvertEmptyJmsStreamMessageBody() throws Exception
+ {
+ final List<Object> expected = Lists.newArrayList();
+ final String mimeType = "jms/stream-message";
+ doTestStreamMessage(null, mimeType, expected);
+ }
+
+ public void testConvertAmqpListMessageBody() throws Exception
+ {
+ final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
+ final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected);
+
+ doTestStreamMessage(messageBytes, "amqp/list", expected);
+ }
+
+ public void testConvertEmptyAmqpListMessageBody() throws Exception
+ {
+ final List<Object> expected = Lists.newArrayList();
+ doTestStreamMessage(null, "amqp/list", expected);
+ }
+
+ public void testConvertJmsMapMessageBody() throws Exception
+ {
+ final Map<String, Object> expected = Collections.singletonMap("key", "value");
+ final byte[] messageBytes = getJmsMapMessageBytes(expected);
+
+ doTestMapMessage(messageBytes, "jms/map-message", expected);
+ }
+
+ public void testConvertEmptyJmsMapMessageBody() throws Exception
+ {
+ doTestMapMessage(null, "jms/map-message", Collections.emptyMap());
+ }
+
+ public void testConvertAmqpMapMessageBody() throws Exception
+ {
+ final Map<String, Object> expected = Collections.singletonMap("key", "value");
+ final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected);
+
+ doTestMapMessage(messageBytes, "amqp/map", expected);
+ }
+
+ public void testConvertEmptyAmqpMapMessageBody() throws Exception
+ {
+ doTestMapMessage(null, "amqp/map", Collections.emptyMap());
+ }
+
+ public void testConvertObjectStreamMessageBody() throws Exception
+ {
+ final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+ doTestObjectMessage(messageBytes, "application/java-object-stream", messageBytes);
+ }
+
+ public void testConvertObjectStream2MessageBody() throws Exception
+ {
+ final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID());
+ doTestObjectMessage(messageBytes, "application/x-java-serialized-object", messageBytes);
+ }
+
+ public void testConvertEmptyObjectStreamMessageBody() throws Exception
+ {
+ doTestObjectMessage(null, "application/java-object-stream", new byte[0]);
+ }
+
+ public void testConvertEmptyMessageWithoutContentType() throws Exception
+ {
+ doTest(null, null, null, null);
+ }
+
+ public void testConvertEmptyMessageWithUnknownContentType() throws Exception
+ {
+ doTest(null, "foo/bar", new byte[0], "foo/bar");
+ }
+
+ public void testConvertMessageWithoutContentType() throws Exception
+ {
+ final byte[] expectedContent = "someContent".getBytes(UTF_8);
+ doTest(expectedContent, null, expectedContent, null);
+ }
+
+
+ private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception
+ {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos))
+ {
+ oos.writeObject(o);
+ return bos.toByteArray();
+ }
+ }
+
+ private byte[] getJmsStreamMessageBytes(List<Object> objects) throws Exception
+ {
+ TypedBytesContentWriter writer = new TypedBytesContentWriter();
+ for (Object o : objects)
+ {
+ writer.writeObject(o);
+ }
+ return getBytes(writer);
+ }
+
+ private byte[] getJmsMapMessageBytes(Map<String, Object> map) throws Exception
+ {
+ TypedBytesContentWriter writer = new TypedBytesContentWriter();
+ writer.writeIntImpl(map.size());
+ for (Map.Entry<String, Object> entry : map.entrySet())
+ {
+ writer.writeNullTerminatedStringImpl(entry.getKey());
+ writer.writeObject(entry.getValue());
+ }
+ return getBytes(writer);
+ }
+
+ private byte[] getBytes(final TypedBytesContentWriter writer)
+ {
+ ByteBuffer buf = writer.getData();
+ final byte[] expected = new byte[buf.remaining()];
+ buf.get(expected);
+ return expected;
+ }
+
+ protected AMQMessage getAmqMessage(final byte[] expected, final String mimeType)
+ {
+ configureMessageContent(expected);
+ configureMessageHeader(mimeType);
+
+ return new AMQMessage(_handle);
+ }
+
+ private void configureMessageHeader(final String mimeType)
+ {
+ when(_header.getMimeType()).thenReturn(mimeType);
+ when(_basicContentHeaderProperties.getContentTypeAsString()).thenReturn(mimeType);
+ }
+
+ private void configureMessageContent(byte[] section)
+ {
+ if (section == null)
+ {
+ section = new byte[0];
+ }
+ final QpidByteBuffer combined = QpidByteBuffer.wrap(section);
+ when(_handle.getContentSize()).thenReturn(section.length);
+ final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
+ final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
+
+ when(_handle.getContent(offsetCaptor.capture(),
+ sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
+ {
+ @Override
+ public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
+ {
+ final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
+ return Collections.singleton(view);
+ }
+ });
+ }
+
+ private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception
+ {
+
+ final byte[] contentBytes;
+ final String expectedContent;
+ if (originalContent == null)
+ {
+ contentBytes = null;
+ expectedContent = "";
+ }
+ else
+ {
+ contentBytes = originalContent.getBytes(UTF_8);
+ expectedContent = originalContent;
+ }
+ doTest(contentBytes, mimeType, expectedContent, mimeType);
+ }
+
+
+ private void doTestMapMessage(final byte[] messageBytes,
+ final String mimeType,
+ final Map<String, Object> expected) throws Exception
+ {
+ doTest(messageBytes, mimeType, expected, null);
+ }
+
+ private void doTestBytesMessage(final byte[] messageContent) throws Exception
+ {
+ doTest(messageContent,"application/octet-stream", messageContent, "application/octet-stream");
+ }
+
+ private void doTestStreamMessage(final byte[] messageBytes,
+ final String mimeType,
+ final List<Object> expected) throws Exception
+ {
+ doTest(messageBytes, mimeType, expected, null);
+ }
+
+ private void doTestObjectMessage(final byte[] messageBytes,
+ final String mimeType,
+ final byte[] expectedBytes)
+ throws Exception
+ {
+ doTest(messageBytes, mimeType, expectedBytes, "application/x-java-serialized-object");
+ }
+
+ private void doTest(final byte[] messageBytes,
+ final String mimeType,
+ final Object expectedContent,
+ final String expectedMimeType) throws Exception
+ {
+ final AMQMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
+ final InternalMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
+
+ if (expectedContent instanceof byte[])
+ {
+ assertArrayEquals("Unexpected content",
+ ((byte[]) expectedContent),
+ ((byte[]) convertedMessage.getMessageBody()));
+ }
+ else if (expectedContent instanceof List)
+ {
+ assertEquals("Unexpected content",
+ new ArrayList((Collection) expectedContent),
+ new ArrayList((Collection) convertedMessage.getMessageBody()));
+ }
+ else if (expectedContent instanceof Map)
+ {
+ assertEquals("Unexpected content",
+ new HashMap((Map) expectedContent),
+ new HashMap((Map) convertedMessage.getMessageBody()));
+ }
+ else
+ {
+ assertEquals("Unexpected content", expectedContent, convertedMessage.getMessageBody());
+ }
+ String convertedMimeType = convertedMessage.getMessageHeader().getMimeType();
+ assertEquals("Unexpected content type", expectedMimeType, convertedMimeType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
index d5581fa..2e27f47 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
@@ -36,9 +36,9 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.regex.Pattern;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils;
import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
@@ -77,13 +77,6 @@ public class MessageConverter_from_1_0
UUID.class,
Date.class));
- public static final Pattern TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$");
- public static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$");
- public static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$");
- public static final Pattern
- OBJECT_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/x-java-serialized-object|application/java-object-stream$");
- public static final Pattern BYTES_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/octet-stream$");
-
static Object convertBodyToObject(final Message_1_0 serverMessage)
{
final Collection<QpidByteBuffer> allData = serverMessage.getContent(0, (int) serverMessage.getSize());
@@ -296,29 +289,29 @@ public class MessageConverter_from_1_0
Class<?> contentTypeClassHint = null;
String type = contentType.toString();
String supportedContentType = null;
- if (TEXT_CONTENT_TYPES.matcher(type).matches())
+ if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(type).matches())
{
contentTypeClassHint = String.class;
// the AMQP 0-x client does not accept arbitrary "text/*" mimeTypes so use "text/plain"
supportedContentType = "text/plain";
}
- else if (MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+ else if (ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches())
{
contentTypeClassHint = Map.class;
supportedContentType = contentType.toString();
}
- else if (LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+ else if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches())
{
contentTypeClassHint = List.class;
supportedContentType = contentType.toString();
}
- else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+ else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches())
{
contentTypeClassHint = Serializable.class;
// the AMQP 0-x client does not accept the "application/x-java-serialized-object" mimeTypes so use fall back
supportedContentType = "application/java-object-stream";
}
- else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+ else if (ConversionUtils.BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches())
{
contentTypeClassHint = byte[].class;
supportedContentType = "application/octet-stream";
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 23eee80..6d4b653 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.protocol.v1_0;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.*;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES;
+import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtils.TEXT_CONTENT_TYPES;
import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.BYTES_MESSAGE;
import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MAP_MESSAGE;
import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MESSAGE;
@@ -42,6 +45,7 @@ import java.util.Map;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry;
import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter;
import org.apache.qpid.server.model.NamedAddressSpace;
@@ -76,23 +80,23 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
Symbol contentType = null;
if (contentMimeType != null)
{
- if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(contentMimeType).matches())
+ if (TEXT_CONTENT_TYPES.matcher(contentMimeType).matches())
{
contentType = Symbol.valueOf(contentMimeType);
}
- else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+ else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
{
contentType = Symbol.valueOf("application/octet-stream");
}
- else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+ else if (MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
{
contentType = null;
}
- else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+ else if (LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
{
contentType = null;
}
- else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+ else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
{
contentType = Symbol.valueOf("application/x-java-serialized-object");
}
@@ -111,22 +115,22 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
final Symbol key = Symbol.valueOf("x-opt-jms-msg-type");
if (contentMimeType != null)
{
- if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(contentMimeType).matches())
+ if (TEXT_CONTENT_TYPES.matcher(contentMimeType).matches())
{
messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, TEXT_MESSAGE.getType()));
}
- else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+ else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
{
messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, BYTES_MESSAGE.getType()));
}
- else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+ else if (MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
{
if (isSectionValidForJmsMap(bodySection))
{
messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, MAP_MESSAGE.getType()));
}
}
- else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+ else if (LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
{
if (isSectionValidForJmsList(bodySection))
{
@@ -134,7 +138,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
new MessageAnnotations(Collections.singletonMap(key, STREAM_MESSAGE.getType()));
}
}
- else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
+ else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches())
{
messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, OBJECT_MESSAGE.getType()));
}
@@ -269,7 +273,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
return new AmqpSequence(fixListValues((List<Object>) bodyObject));
}
}
- else if (mimeType != null && MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+ else if (mimeType != null && TEXT_CONTENT_TYPES.matcher(mimeType).matches())
{
return new AmqpValue(new String(data, UTF_8));
}
@@ -278,19 +282,19 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
{
return new AmqpValue(null);
}
- else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+ else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
{
return new Data(new Binary(SERIALIZED_NULL));
}
- else if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
+ else if (TEXT_CONTENT_TYPES.matcher(mimeType).matches())
{
return new AmqpValue("");
}
- else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+ else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
{
return new AmqpValue(Collections.emptyMap());
}
- else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+ else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
{
return new AmqpSequence(Collections.emptyList());
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
index 749c1a6..6e015d1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.message.mimecontentconverter.ConversionUtils;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.plugin.PluggableService;
@@ -72,7 +73,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
final NamedAddressSpace addressSpace,
final Object convertedBodyObject)
{
- final String convertedMimeType = getInternalConvertedContentAndMimeType(serverMessage, convertedBodyObject);
+ final String convertedMimeType = getInternalConvertedMimeType(serverMessage, convertedBodyObject);
final MessageMetaData_1_0.MessageHeader_1_0 messageHeader = serverMessage.getMessageHeader();
final InternalMessageHeader header = new InternalMessageHeader(messageHeader.getHeadersAsMap(),
messageHeader.getCorrelationId(),
@@ -103,8 +104,8 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
return "v1-0 to Internal";
}
- private static String getInternalConvertedContentAndMimeType(final Message_1_0 serverMsg,
- final Object convertedBodyObject)
+ private static String getInternalConvertedMimeType(final Message_1_0 serverMsg,
+ final Object convertedBodyObject)
{
MessageConverter_from_1_0.ContentHint contentHint = getInternalTypeHint(serverMsg);
@@ -129,7 +130,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
}
else if (contentClassHint == String.class
&& (originalContentType == null
- || !MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(originalContentType).matches()))
+ || !ConversionUtils.TEXT_CONTENT_TYPES.matcher(originalContentType).matches()))
{
mimeType = "text/plain";
}
@@ -153,7 +154,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
}
else if (convertedBodyObject instanceof String
&& (originalContentType == null
- || !MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(originalContentType).matches()))
+ || !ConversionUtils.TEXT_CONTENT_TYPES.matcher(originalContentType).matches()))
{
mimeType = "text/plain";
}
@@ -222,23 +223,23 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
{
Class<?> contentTypeClassHint = null;
String type = contentType.toString();
- if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(type).matches())
+ if (ConversionUtils.TEXT_CONTENT_TYPES.matcher(type).matches())
{
contentTypeClassHint = String.class;
}
- else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+ else if (ConversionUtils.MAP_MESSAGE_CONTENT_TYPES.matcher(type).matches())
{
contentTypeClassHint = Map.class;
}
- else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+ else if (ConversionUtils.LIST_MESSAGE_CONTENT_TYPES.matcher(type).matches())
{
contentTypeClassHint = List.class;
}
- else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+ else if (ConversionUtils.OBJECT_MESSAGE_CONTENT_TYPES.matcher(type).matches())
{
contentTypeClassHint = Serializable.class;
}
- else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches())
+ else if (ConversionUtils.BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches())
{
contentTypeClassHint = byte[].class;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/939cda5b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
index 2d53233..7cf43ce 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
@@ -352,13 +352,12 @@ public class MessageConverter_0_10_to_1_0Test extends QpidTestCase
return sections;
}
- protected MessageTransferMessage getAmqMessage(final byte[] expected, final String mimeType)
+ private MessageTransferMessage getAmqMessage(final byte[] expected, final String mimeType)
{
configureMessageContent(expected);
configureMessageHeader(mimeType);
- final MessageTransferMessage messageTransferMessage = new MessageTransferMessage(_handle, new Object());
- return messageTransferMessage;
+ return new MessageTransferMessage(_handle, new Object());
}
private void configureMessageHeader(final String mimeType)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-broker-j git commit: QPID-7434: [Java Broker] Improve
handling of empty map/stream messages.
Posted by lq...@apache.org.
QPID-7434: [Java Broker] Improve handling of empty map/stream messages.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/385167e1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/385167e1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/385167e1
Branch: refs/heads/master
Commit: 385167e1f6085aceddb2ded398ad504bb40aed98
Parents: 42bebb9
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Aug 11 10:48:16 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Aug 11 15:14:53 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/MessageConverter_to_1_0.java | 11 +++++++++--
.../MessageConverter_0_10_to_1_0Test.java | 17 ++++++++++++++++-
.../v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java | 16 +++++++++++++++-
3 files changed, 40 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/385167e1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 7060058..23eee80 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -32,7 +32,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -285,7 +284,15 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
else if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches())
{
- return new AmqpValue(null);
+ return new AmqpValue("");
+ }
+ else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+ {
+ return new AmqpValue(Collections.emptyMap());
+ }
+ else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
+ {
+ return new AmqpSequence(Collections.emptyList());
}
return new Data(new Binary(data));
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/385167e1/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
index 2e8357a..2d53233 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0Test.java
@@ -203,6 +203,13 @@ public class MessageConverter_0_10_to_1_0Test extends QpidTestCase
doTestStreamMessage(messageBytes, mimeType, expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
}
+ public void testConvertJmsStreamMessageEmptyBody() throws Exception
+ {
+ final List<Object> expected = Collections.emptyList();
+
+ doTestStreamMessage(null, "jms/stream-message", expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
+ }
+
public void testConvertAmqpListMessageBody() throws Exception
{
final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
@@ -237,6 +244,14 @@ public class MessageConverter_0_10_to_1_0Test extends QpidTestCase
doTestMapMessage(messageBytes, "amqp/map", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
}
+ public void testConvertJmsMapMessageEmptyBody() throws Exception
+ {
+ final Map<String, Object> expected = Collections.emptyMap();
+
+ doTestMapMessage(null, "jms/map-message", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
+ }
+
+
public void testConvertAmqpMapMessageBodyWithNonJmsContent() throws Exception
{
final Map<String, Object> expected = Collections.singletonMap("key", Collections.singletonList("nonJmsList"));
@@ -396,7 +411,7 @@ public class MessageConverter_0_10_to_1_0Test extends QpidTestCase
private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception
{
final byte[] contentBytes = originalContent == null ? null : originalContent.getBytes(UTF_8);
- String expectedContent = originalContent == null ? null : originalContent;
+ String expectedContent = originalContent == null ? "" : originalContent;
doTest(contentBytes,
mimeType,
AmqpValueSection.class,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/385167e1/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
index 62dc8f4d..df1a786 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
@@ -202,6 +202,13 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
doTestStreamMessage(messageBytes, mimeType, expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
}
+ public void testConvertJmsStreamMessageEmptyBody() throws Exception
+ {
+ final List<Object> expected = Collections.emptyList();
+
+ doTestStreamMessage(null, "jms/stream-message", expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType());
+ }
+
public void testConvertAmqpListMessageBody() throws Exception
{
final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D);
@@ -228,6 +235,13 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
doTestMapMessage(messageBytes, "jms/map-message", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
}
+ public void testConvertJmsMapMessageEmptyBody() throws Exception
+ {
+ final Map<String, Object> expected = Collections.emptyMap();
+
+ doTestMapMessage(null, "jms/map-message", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType());
+ }
+
public void testConvertAmqpMapMessageBody() throws Exception
{
final Map<String, Object> expected = Collections.singletonMap("key", "value");
@@ -394,7 +408,7 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception
{
final byte[] contentBytes = originalContent == null ? null : originalContent.getBytes(UTF_8);
- String expectedContent = originalContent == null ? null : originalContent;
+ String expectedContent = originalContent == null ? "" : originalContent;
doTest(contentBytes,
mimeType,
AmqpValueSection.class,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org