You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/04 13:15:34 UTC
[08/16] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index 2ece01d..d06464f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -16,33 +16,20 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
+import javax.jms.JMSException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import javax.jms.JMSException;
-
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -50,9 +37,6 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@@ -64,16 +48,18 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class JMSMappingOutboundTransformerTest {
private final UUID TEST_OBJECT_VALUE = UUID.fromString("fee14b62-09e0-4ac6-a4c3-4206c630d844");
private final String TEST_ADDRESS = "queue://testAddress";
- private IDGenerator idGenerator;
- private JMSMappingOutboundTransformer transformer;
public static final byte QUEUE_TYPE = 0x00;
public static final byte TOPIC_TYPE = 0x01;
@@ -82,80 +68,10 @@ public class JMSMappingOutboundTransformerTest {
@Before
public void setUp() {
- idGenerator = new SimpleIDGenerator(0);
- transformer = new JMSMappingOutboundTransformer(idGenerator);
}
// ----- no-body Message type tests ---------------------------------------//
- @Test
- public void testConvertMessageToAmqpMessageWithNoBody() throws Exception {
- ServerJMSMessage outbound = createMessage();
- outbound.encode();
-
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
-
- assertNull(amqp.getBody());
- }
-
- @Test
- public void testConvertTextMessageToAmqpMessageWithNoBodyOriginalEncodingWasNull() throws Exception {
- ServerJMSTextMessage outbound = createTextMessage();
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
- outbound.encode();
-
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
-
- assertNull(amqp.getBody());
- }
-
- // ----- BytesMessage type tests ---------------------------------------//
-
- @Test
- public void testConvertEmptyBytesMessageToAmqpMessageWithDataBody() throws Exception {
- ServerJMSBytesMessage outbound = createBytesMessage();
- outbound.encode();
-
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
-
- assertNotNull(amqp.getBody());
- assertTrue(amqp.getBody() instanceof Data);
- assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
- assertEquals(0, ((Data) amqp.getBody()).getValue().getLength());
- }
-
- @Test
- public void testConvertUncompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
- byte[] expectedPayload = new byte[] {8, 16, 24, 32};
- ServerJMSBytesMessage outbound = createBytesMessage();
- outbound.writeBytes(expectedPayload);
- outbound.encode();
-
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
-
- assertNotNull(amqp.getBody());
- assertTrue(amqp.getBody() instanceof Data);
- assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
- assertEquals(4, ((Data) amqp.getBody()).getValue().getLength());
-
- Binary amqpData = ((Data) amqp.getBody()).getValue();
- Binary inputData = new Binary(expectedPayload);
-
- assertTrue(inputData.equals(amqpData));
- }
-
@Ignore("Compressed message body support not yet implemented.")
@Test
public void testConvertCompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
@@ -164,10 +80,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeBytes(expectedPayload);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -183,13 +96,9 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertEmptyBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
ServerJMSBytesMessage outbound = createBytesMessage();
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -201,14 +110,10 @@ public class JMSMappingOutboundTransformerTest {
public void testConvertUncompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
byte[] expectedPayload = new byte[] {8, 16, 24, 32};
ServerJMSBytesMessage outbound = createBytesMessage();
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.writeBytes(expectedPayload);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -226,14 +131,10 @@ public class JMSMappingOutboundTransformerTest {
public void testConvertCompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
byte[] expectedPayload = new byte[] {8, 16, 24, 32};
ServerJMSBytesMessage outbound = createBytesMessage(true);
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.writeBytes(expectedPayload);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -253,10 +154,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSMapMessage outbound = createMapMessage();
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -271,10 +169,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setBytes("bytes", byteArray);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -296,10 +191,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setBoolean("property-3", true);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -320,10 +212,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setBoolean("property-3", true);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -336,33 +225,12 @@ public class JMSMappingOutboundTransformerTest {
assertTrue("string".equals(amqpMap.get("property-1")));
}
- // ----- StreamMessage type tests -----------------------------------------//
-
- @Test
- public void testConvertStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception {
- ServerJMSStreamMessage outbound = createStreamMessage();
- outbound.encode();
-
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
-
- assertNotNull(amqp.getBody());
- assertTrue(amqp.getBody() instanceof AmqpValue);
- assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List);
- }
-
@Test
public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
ServerJMSStreamMessage outbound = createStreamMessage();
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpSequence);
@@ -376,17 +244,15 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeString("test");
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
- assertTrue(amqp.getBody() instanceof AmqpValue);
- assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List);
+ assertTrue(amqp.getBody() instanceof AmqpSequence);
+
+ AmqpSequence list = (AmqpSequence)amqp.getBody();
@SuppressWarnings("unchecked")
- List<Object> amqpList = (List<Object>) ((AmqpValue) amqp.getBody()).getValue();
+ List<Object> amqpList = list.getValue();
assertEquals(2, amqpList.size());
}
@@ -394,15 +260,11 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
ServerJMSStreamMessage outbound = createStreamMessage(true);
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
outbound.writeBoolean(false);
outbound.writeString("test");
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpSequence);
@@ -421,10 +283,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSObjectMessage outbound = createObjectMessage();
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -434,45 +293,20 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertEmptyObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
ServerJMSObjectMessage outbound = createObjectMessage();
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertEquals(5, ((Data) amqp.getBody()).getValue().getLength());
}
-
- @Test
- public void testConvertEmptyObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
- ServerJMSObjectMessage outbound = createObjectMessage();
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
- outbound.encode();
-
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
-
- assertNotNull(amqp.getBody());
- assertTrue(amqp.getBody() instanceof AmqpValue);
- assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
- assertEquals(5, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
- }
-
@Test
public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception {
ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -486,13 +320,9 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -504,35 +334,11 @@ public class JMSMappingOutboundTransformerTest {
}
@Test
- public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
- ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
- outbound.encode();
-
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
-
- assertNotNull(amqp.getBody());
- assertTrue(amqp.getBody() instanceof AmqpValue);
- assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
- assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
-
- Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray());
- assertNotNull(value);
- assertTrue(value instanceof UUID);
- }
-
- @Test
public void testConvertCompressedObjectMessageToAmqpMessageWithDataBody() throws Exception {
ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -546,13 +352,9 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -566,20 +368,16 @@ public class JMSMappingOutboundTransformerTest {
@Test
public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
- assertTrue(amqp.getBody() instanceof AmqpValue);
- assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
- assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
+ assertTrue(amqp.getBody() instanceof Data);
+ assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+ assertFalse(0 == ((Binary) ((Data) amqp.getBody()).getValue()).getLength());
- Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray());
+ Object value = deserialize((((Data) amqp.getBody()).getValue()).getArray());
assertNotNull(value);
assertTrue(value instanceof UUID);
}
@@ -591,10 +389,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage();
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -602,57 +397,12 @@ public class JMSMappingOutboundTransformerTest {
}
@Test
- public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
- String contentString = "myTextMessageContent";
- ServerJMSTextMessage outbound = createTextMessage(contentString);
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
- outbound.encode();
-
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
-
- assertNotNull(amqp.getBody());
- assertTrue(amqp.getBody() instanceof Data);
- assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
-
- Binary data = ((Data) amqp.getBody()).getValue();
- String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
- assertEquals(contentString, contents);
- }
-
- @Test
- public void testConvertTextMessageContentNotStoredCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
- String contentString = "myTextMessageContent";
- ServerJMSTextMessage outbound = createTextMessage(contentString);
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
- outbound.encode();
-
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
-
- assertNotNull(amqp.getBody());
- assertTrue(amqp.getBody() instanceof Data);
- assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
-
- Binary data = ((Data) amqp.getBody()).getValue();
- String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
- assertEquals(contentString, contents);
- }
-
- @Test
public void testConvertTextMessageCreatesAmqpValueStringBody() throws Exception {
String contentString = "myTextMessageContent";
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -665,10 +415,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -679,21 +426,16 @@ public class JMSMappingOutboundTransformerTest {
public void testConvertCompressedTextMessageCreatesDataSectionBody() throws Exception {
String contentString = "myTextMessageContent";
ServerJMSTextMessage outbound = createTextMessage(contentString, true);
- outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
- assertTrue(amqp.getBody() instanceof Data);
- assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
+ assertTrue(amqp.getBody() instanceof AmqpValue);
- Binary data = ((Data) amqp.getBody()).getValue();
- String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
- assertEquals(contentString, contents);
+ AmqpValue value = (AmqpValue)amqp.getBody();
+
+ assertEquals(contentString, value.getValue());
}
// ----- Test JMSDestination Handling -------------------------------------//
@@ -731,15 +473,12 @@ public class JMSMappingOutboundTransformerTest {
textMessage.setText("myTextMessageContent");
textMessage.setJMSDestination(jmsDestination);
- EncodedMessage encoded = transform(textMessage);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
MessageAnnotations ma = amqp.getMessageAnnotations();
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
if (maMap != null) {
- Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION);
+ Object actualValue = maMap.get(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
} else if (expectedAnnotationValue != null) {
fail("Expected annotation value, but there were no annotations");
@@ -785,15 +524,12 @@ public class JMSMappingOutboundTransformerTest {
textMessage.setText("myTextMessageContent");
textMessage.setJMSReplyTo(jmsReplyTo);
- EncodedMessage encoded = transform(textMessage);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
MessageAnnotations ma = amqp.getMessageAnnotations();
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
if (maMap != null) {
- Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
+ Object actualValue = maMap.get(AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
} else if (expectedAnnotationValue != null) {
fail("Expected annotation value, but there were no annotations");
@@ -806,17 +542,6 @@ public class JMSMappingOutboundTransformerTest {
// ----- Utility Methods used for this Test -------------------------------//
- public EncodedMessage transform(ServerJMSMessage message) throws Exception {
- // Useful for testing but not recommended for real life use.
- ByteBuf nettyBuffer = Unpooled.buffer(1024);
- NettyWritable buffer = new NettyWritable(nettyBuffer);
-
- long messageFormat = transformer.transform(message, buffer);
-
- EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
-
- return encoded;
- }
private ServerDestination createDestination(byte destType) {
ServerDestination destination = null;
@@ -841,7 +566,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSMessage createMessage() {
- return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0);
+ return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE));
}
private ServerJMSBytesMessage createBytesMessage() {
@@ -849,7 +574,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSBytesMessage createBytesMessage(boolean compression) {
- ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0);
+ ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE));
if (compression) {
// TODO
@@ -863,7 +588,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSMapMessage createMapMessage(boolean compression) {
- ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0);
+ ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE));
if (compression) {
// TODO
@@ -877,7 +602,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSStreamMessage createStreamMessage(boolean compression) {
- ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0);
+ ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE));
if (compression) {
// TODO
@@ -895,7 +620,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) {
- ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(idGenerator);
+ ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0);
if (compression) {
// TODO
@@ -922,7 +647,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSTextMessage createTextMessage(String text, boolean compression) {
- ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(idGenerator);
+ ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0);
if (compression) {
// TODO
@@ -943,8 +668,8 @@ public class JMSMappingOutboundTransformerTest {
}
}
- private ServerMessageImpl newMessage(byte messageType) {
- ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512);
+ private CoreMessage newMessage(byte messageType) {
+ CoreMessage message = new CoreMessage(0, 512);
message.setType(messageType);
((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
return message;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
index 99aab33..483f245 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
@@ -21,27 +21,23 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.ProtonJMessage;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
/**
* Some simple performance tests for the Message Transformers.
*/
@@ -51,16 +47,11 @@ public class JMSTransformationSpeedComparisonTest {
@Rule
public TestName test = new TestName();
- private IDGenerator idGenerator;
- private ProtonMessageConverter converter;
-
private final int WARM_CYCLES = 1000;
private final int PROFILE_CYCLES = 1000000;
@Before
public void setUp() {
- idGenerator = new SimpleIDGenerator(0);
- converter = new ProtonMessageConverter(idGenerator);
}
@Test
@@ -68,20 +59,20 @@ public class JMSTransformationSpeedComparisonTest {
Message message = Proton.message();
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
- EncodedMessage encoded = encode(message);
+ AMQPMessage encoded = new AMQPMessage(message);
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- ServerMessage intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- ServerMessage intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -99,20 +90,20 @@ public class JMSTransformationSpeedComparisonTest {
message.setContentType("text/plain");
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
- EncodedMessage encoded = encode(message);
+ AMQPMessage encoded = new AMQPMessage(message);
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- ServerMessage intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- ServerMessage intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -122,20 +113,20 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testTypicalQpidJMSMessage() throws Exception {
- EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
+ AMQPMessage encoded = new AMQPMessage(createTypicalQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- ServerMessage intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- ServerMessage intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -145,20 +136,20 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testComplexQpidJMSMessage() throws Exception {
- EncodedMessage encoded = encode(createComplexQpidJMSMessage());
+ AMQPMessage encoded = encode(createComplexQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- ServerMessage intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- ServerMessage intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -168,18 +159,20 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
- EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
+ AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- converter.inbound(encoded);
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- converter.inbound(encoded);
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -190,19 +183,20 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
- EncodedMessage encoded = encode(createTypicalQpidJMSMessage());
- ServerMessage intermediate = converter.inbound(encoded);
+ AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -278,16 +272,16 @@ public class JMSTransformationSpeedComparisonTest {
return message;
}
- private EncodedMessage encode(Object target) {
- if (target instanceof ProtonJMessage) {
- ProtonJMessage amqp = (ProtonJMessage) target;
-
- ByteBuf nettyBuffer = Unpooled.buffer(1024);
- amqp.encode(new NettyWritable(nettyBuffer));
+ private AMQPMessage encode(Message message) {
+ return new AMQPMessage(message);
+ }
- return new EncodedMessage(0, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
- } else {
- return null;
+ private void encode(AMQPMessage target) {
+ ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+ try {
+ target.sendBuffer(buf, 1);
+ } finally {
+ buf.release();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
index a5a2168..a73d29f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
@@ -16,36 +16,28 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.ProtonJMessage;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
/**
* Tests some basic encode / decode functionality on the transformers.
*/
@@ -54,72 +46,10 @@ public class MessageTransformationTest {
@Rule
public TestName test = new TestName();
- private IDGenerator idGenerator;
- private ProtonMessageConverter converter;
-
@Before
public void setUp() {
- idGenerator = new SimpleIDGenerator(0);
- converter = new ProtonMessageConverter(idGenerator);
}
- @Test
- public void testEncodeDecodeFidelity() throws Exception {
- Map<String, Object> applicationProperties = new HashMap<>();
- Map<Symbol, Object> messageAnnotations = new HashMap<>();
-
- applicationProperties.put("property-1", "string");
- applicationProperties.put("property-2", 512);
- applicationProperties.put("property-3", true);
-
- messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
- messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
-
- Message incomingMessage = Proton.message();
-
- incomingMessage.setAddress("queue://test-queue");
- incomingMessage.setDeliveryCount(1);
- incomingMessage.setApplicationProperties(new ApplicationProperties(applicationProperties));
- incomingMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
- incomingMessage.setCreationTime(System.currentTimeMillis());
- incomingMessage.setContentType("text/plain");
- incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
-
- EncodedMessage encoded = encode(incomingMessage);
-
- ServerMessage outbound = converter.inbound(encoded);
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, outbound.getLongProperty("JMSXDeliveryCount").intValue())).decode();
-
- // Test that message details are equal
- assertEquals(incomingMessage.getAddress(), outboudMessage.getAddress());
- assertEquals(incomingMessage.getDeliveryCount(), outboudMessage.getDeliveryCount());
- assertEquals(incomingMessage.getCreationTime(), outboudMessage.getCreationTime());
- assertEquals(incomingMessage.getContentType(), outboudMessage.getContentType());
-
- // Test Message annotations
- ApplicationProperties incomingApplicationProperties = incomingMessage.getApplicationProperties();
- ApplicationProperties outgoingApplicationProperties = outboudMessage.getApplicationProperties();
-
- assertEquals(incomingApplicationProperties.getValue(), outgoingApplicationProperties.getValue());
-
- // Test Message properties
- MessageAnnotations incomingMessageAnnotations = incomingMessage.getMessageAnnotations();
- MessageAnnotations outgoingMessageAnnotations = outboudMessage.getMessageAnnotations();
-
- assertEquals(incomingMessageAnnotations.getValue(), outgoingMessageAnnotations.getValue());
-
- // Test that bodies are equal
- assertTrue(incomingMessage.getBody() instanceof AmqpValue);
- assertTrue(outboudMessage.getBody() instanceof AmqpValue);
-
- AmqpValue incomingBody = (AmqpValue) incomingMessage.getBody();
- AmqpValue outgoingBody = (AmqpValue) outboudMessage.getBody();
-
- assertTrue(incomingBody.getValue() instanceof String);
- assertTrue(outgoingBody.getValue() instanceof String);
-
- assertEquals(incomingBody.getValue(), outgoingBody.getValue());
- }
@Test
public void testBodyOnlyEncodeDecode() throws Exception {
@@ -128,12 +58,10 @@ public class MessageTransformationTest {
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
- EncodedMessage encoded = encode(incomingMessage);
- ServerMessage outbound = converter.inbound(encoded);
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+ ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
+ Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
assertNull(outboudMessage.getHeader());
- assertNull(outboudMessage.getProperties());
}
@Test
@@ -144,9 +72,8 @@ public class MessageTransformationTest {
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
incomingMessage.setMessageId("ID:SomeQualifier:0:0:1");
- EncodedMessage encoded = encode(incomingMessage);
- ServerMessage outbound = converter.inbound(encoded);
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+ ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
+ Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
assertNull(outboudMessage.getHeader());
assertNotNull(outboudMessage.getProperties());
@@ -160,32 +87,9 @@ public class MessageTransformationTest {
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
incomingMessage.setDurable(true);
- EncodedMessage encoded = encode(incomingMessage);
- ServerMessage outbound = converter.inbound(encoded);
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
-
- assertNotNull(outboudMessage.getHeader());
- assertNull(outboudMessage.getProperties());
- }
-
- @Test
- public void testMessageWithAmqpValueThatFailsJMSConversion() throws Exception {
-
- Message incomingMessage = Proton.message();
-
- incomingMessage.setBody(new AmqpValue(new Boolean(true)));
-
- EncodedMessage encoded = encode(incomingMessage);
- ServerMessage outbound = converter.inbound(encoded);
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+ ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
+ Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
- Section section = outboudMessage.getBody();
- assertNotNull(section);
- assertTrue(section instanceof AmqpValue);
- AmqpValue amqpValue = (AmqpValue) section;
- assertNotNull(amqpValue.getValue());
- assertTrue(amqpValue.getValue() instanceof Boolean);
- assertEquals(true, amqpValue.getValue());
}
@Test
@@ -233,32 +137,10 @@ public class MessageTransformationTest {
message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
- EncodedMessage encoded = encode(message);
- ServerMessage outbound = converter.inbound(encoded);
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+ ICoreMessage core = new AMQPMessage(message).toCore();
+ Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
- assertNotNull(outboudMessage.getHeader());
- assertNotNull(outboudMessage.getProperties());
- assertNotNull(outboudMessage.getMessageAnnotations());
- assertNotNull(outboudMessage.getApplicationProperties());
- assertNull(outboudMessage.getDeliveryAnnotations());
- assertNull(outboudMessage.getFooter());
-
- assertEquals(9, outboudMessage.getApplicationProperties().getValue().size());
+ assertEquals(10, outboudMessage.getApplicationProperties().getValue().size());
assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size());
}
-
- private EncodedMessage encode(Message message) {
- ProtonJMessage amqp = (ProtonJMessage) message;
-
- ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
- final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
- int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
- if (overflow.position() > 0) {
- buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
- c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
- }
-
- return new EncodedMessage(1, buffer.array(), 0, c);
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
new file mode 100644
index 0000000..db40a8e
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.message;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AMQPMessageTest {
+
+ @Test
+ public void testVerySimple() {
+ MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
+ protonMessage.setHeader( new Header());
+ Properties properties = new Properties();
+ properties.setTo("someNiceLocal");
+ protonMessage.setProperties(properties);
+ protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
+ protonMessage.getHeader().setDurable(Boolean.TRUE);
+ protonMessage.setApplicationProperties(new ApplicationProperties(new HashedMap()));
+
+ ByteBuf nettyBuffer = Unpooled.buffer(1500);
+
+ protonMessage.encode(new NettyWritable(nettyBuffer));
+
+ byte[] bytes = new byte[nettyBuffer.writerIndex()];
+
+ nettyBuffer.readBytes(bytes);
+
+ AMQPMessage encode = new AMQPMessage(0, bytes);
+
+ Assert.assertEquals(7, encode.getHeader().getDeliveryCount().intValue());
+ Assert.assertEquals(true, encode.getHeader().getDurable());
+ Assert.assertEquals("someNiceLocal", encode.getAddress());
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 1f435ff..f4cba64 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -132,11 +131,6 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
}
@Override
- public MessageConverter getConverter() {
- return null;
- }
-
- @Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index f0385dc..67ef258 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -28,17 +28,19 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.jboss.logging.Logger;
/**
* Handles MQTT Exactly Once (QoS level 2) Protocol.
*/
public class MQTTPublishManager {
+ private static final Logger logger = Logger.getLogger(MQTTPublishManager.class);
+
private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
private SimpleString managementAddress;
@@ -112,19 +114,20 @@ public class MQTTPublishManager {
* to original ID and consumer in the Session state. This way we can look up the consumer Id and the message Id from
* the PubAck or PubRec message id. *
*/
- protected void sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
+ protected void sendMessage(CoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
// This is to allow retries of PubRel.
if (isManagementConsumer(consumer)) {
sendPubRelMessage(message);
} else {
int qos = decideQoS(message, consumer);
if (qos == 0) {
- sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
+ // TODO-now: fix encoding
+ sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos);
session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
} else if (qos == 1 || qos == 2) {
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
- sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
+ sendServerMessage(mqttid, message, deliveryCount, qos);
} else {
// Client must have disconnected and it's Subscription QoS cleared
consumer.individualCancel(message.getMessageID(), false);
@@ -149,7 +152,7 @@ public class MQTTPublishManager {
*/
void sendInternal(int messageId, String topic, int qos, ByteBuf payload, boolean retain, boolean internal) throws Exception {
synchronized (lock) {
- ServerMessage serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload);
+ Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload);
if (qos > 0) {
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
@@ -173,6 +176,7 @@ public class MQTTPublishManager {
}
tx.commit();
} catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
tx.rollback();
throw t;
}
@@ -181,7 +185,7 @@ public class MQTTPublishManager {
}
}
- void sendPubRelMessage(ServerMessage message) {
+ void sendPubRelMessage(Message message) {
int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
session.getProtocolHandler().sendPubRel(messageId);
}
@@ -190,7 +194,7 @@ public class MQTTPublishManager {
try {
Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
if (ref != null) {
- ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
+ Message m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
session.getServerSession().send(m, true);
session.getServerSession().acknowledge(ref.getB(), ref.getA());
} else {
@@ -246,30 +250,30 @@ public class MQTTPublishManager {
}
}
- private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
+ private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
ByteBuf payload;
switch (message.getType()) {
case Message.TEXT_TYPE:
try {
- SimpleString text = message.getBodyBuffer().readNullableSimpleString();
+ SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString();
byte[] stringPayload = text.toString().getBytes("UTF-8");
payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
payload.writeBytes(stringPayload);
break;
} catch (UnsupportedEncodingException e) {
- log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+ log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
}
default:
- ActiveMQBuffer bufferDup = message.getBodyBufferDuplicate();
- payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf();
+ ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer();
+ payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf();
break;
}
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
}
- private int decideQoS(ServerMessage message, ServerConsumer consumer) {
+ private int decideQoS(Message message, ServerConsumer consumer) {
int subscriptionQoS = -1;
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 596670b..0b52a0b 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -17,12 +17,12 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.LinkedListIterator;
@@ -44,7 +44,7 @@ public class MQTTRetainMessageManager {
* the subscription queue for the consumer. When a new retained message is received the message will be sent to
* the retained queue and the previous retain message consumed to remove it from the queue.
*/
- void handleRetainedMessage(ServerMessage message, String address, boolean reset, Transaction tx) throws Exception {
+ void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception {
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
Queue queue = session.getServer().locateQueue(retainAddress);
@@ -82,7 +82,7 @@ public class MQTTRetainMessageManager {
Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
if (i.hasNext()) {
- ServerMessage message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
+ Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
sendToQueue(message, queue, tx);
}
}
@@ -95,7 +95,7 @@ public class MQTTRetainMessageManager {
tx.commit();
}
- private void sendToQueue(ServerMessage message, Queue queue, Transaction tx) throws Exception {
+ private void sendToQueue(Message message, Queue queue, Transaction tx) throws Exception {
RoutingContext context = new RoutingContextImpl(tx);
queue.route(message, context);
session.getServer().getPostOffice().processRoute(message, context, false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 548b62c..a5b908f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -17,10 +17,12 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -43,13 +45,13 @@ public class MQTTSessionCallback implements SessionCallback {
@Override
public int sendMessage(MessageReference reference,
- ServerMessage message,
+ Message message,
ServerConsumer consumer,
int deliveryCount) {
try {
- session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
+ session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
} catch (Exception e) {
- log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+ log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
}
return 1;
}
@@ -70,7 +72,7 @@ public class MQTTSessionCallback implements SessionCallback {
@Override
public int sendLargeMessage(MessageReference reference,
- ServerMessage message,
+ Message message,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 7bc6b84..613fef3 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -24,12 +24,11 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
/**
* A Utility Class for creating Server Side objects and converting MQTT concepts to/from Artemis.
@@ -93,13 +92,13 @@ public class MQTTUtil {
return MQTT_RETAIN_ADDRESS_PREFIX + MQTT_WILDCARD.convert(filter, wildcardConfiguration);
}
- private static ServerMessage createServerMessage(MQTTSession session,
+ private static ICoreMessage createServerMessage(MQTTSession session,
SimpleString address,
boolean retain,
int qos) {
long id = session.getServer().getStorageManager().generateID();
- ServerMessageImpl message = new ServerMessageImpl(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
+ CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
message.setAddress(address);
message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
@@ -107,21 +106,20 @@ public class MQTTUtil {
return message;
}
- public static ServerMessage createServerMessageFromByteBuf(MQTTSession session,
+ public static Message createServerMessageFromByteBuf(MQTTSession session,
String topic,
boolean retain,
int qos,
ByteBuf payload) {
String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
- ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
+ ICoreMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
- // FIXME does this involve a copy?
- message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes());
+ message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
return message;
}
- public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
- ServerMessage message = createServerMessage(session, address, false, 1);
+ public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
+ Message message = createServerMessage(session, address, false, 1);
message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);
message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value());
return message;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 9b27b81..76e50ef 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -35,12 +35,12 @@ import java.util.zip.InflaterOutputStream;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -69,7 +69,7 @@ import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.UTF8Buffer;
-public class OpenWireMessageConverter implements MessageConverter {
+public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> {
public static final String AMQ_PREFIX = "__HDR_";
public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause";
@@ -102,16 +102,26 @@ public class OpenWireMessageConverter implements MessageConverter {
}
@Override
- public Object outbound(ServerMessage message, int deliveryCount) {
- // TODO: implement this
+ public OpenwireMessage fromCore(ICoreMessage coreMessage) throws Exception {
return null;
}
@Override
- public ServerMessage inbound(Object message) throws Exception {
+ public ICoreMessage toCore(OpenwireMessage pureMessage) throws Exception {
+ return null;
+ }
+
+ // @Override
+ public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
+ // TODO: implement this
+ return null;
+ }
+
+// @Override
+ public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception {
Message messageSend = (Message) message;
- ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize());
+ CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize());
String type = messageSend.getType();
if (type != null) {
@@ -157,7 +167,7 @@ public class OpenWireMessageConverter implements MessageConverter {
mdataIn.close();
TypedProperties props = new TypedProperties();
loadMapIntoProperties(props, map);
- props.encode(body);
+ props.encode(body.byteBuf());
break;
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
if (messageCompressed) {
@@ -415,8 +425,9 @@ public class OpenWireMessageConverter implements MessageConverter {
}
public static MessageDispatch createMessageDispatch(MessageReference reference,
- ServerMessage message,
+ ICoreMessage message,
AMQConsumer consumer) throws IOException, JMSException {
+ // TODO-now: use new Encode here
ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination());
//we can use core message id for sequenceId
@@ -433,7 +444,7 @@ public class OpenWireMessageConverter implements MessageConverter {
}
private static ActiveMQMessage toAMQMessage(MessageReference reference,
- ServerMessage coreMessage,
+ ICoreMessage coreMessage,
WireFormat marshaller,
ActiveMQDestination actualDestination) throws IOException {
ActiveMQMessage amqMsg = null;
@@ -476,7 +487,7 @@ public class OpenWireMessageConverter implements MessageConverter {
}
amqMsg.setBrokerInTime(brokerInTime);
- ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate();
+ ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
amqMsg.setCompressed(isCompressed);
@@ -503,7 +514,7 @@ public class OpenWireMessageConverter implements MessageConverter {
TypedProperties mapData = new TypedProperties();
//it could be a null map
if (buffer.readableBytes() > 0) {
- mapData.decode(buffer);
+ mapData.decode(buffer.byteBuf());
Map<String, Object> map = mapData.getMap();
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
OutputStream os = out;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3384260e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 5b62e3e..4292fe5 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -35,6 +35,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
@@ -44,12 +45,10 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -236,11 +235,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
@Override
- public MessageConverter getConverter() {
- return messageConverter;
- }
-
- @Override
public void removeHandler(String name) {
}