You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/03 01:05:21 UTC
[28/36] activemq-artemis git commit: Fixing converters part I
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
index bfaec54..693b4e0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
@@ -16,11 +16,13 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -30,23 +32,15 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
@@ -58,13 +52,15 @@ import org.apache.qpid.proton.message.Message;
import org.junit.Before;
import org.junit.Test;
-public class JMSMappingInboundTransformerTest {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
- private IDGenerator idGenerator;
+public class JMSMappingInboundTransformerTest {
@Before
public void setUp() {
- this.idGenerator = new SimpleIDGenerator(0);
}
// ----- Null Body Section ------------------------------------------------//
@@ -78,14 +74,14 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
Message message = Message.Factory.create();
message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
- AMQPMessage messageEncode = new AMQPMessage(message, null);
+ AMQPMessage messageEncode = new AMQPMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(messageEncode);
+ ICoreMessage coreMessage = messageEncode.toCore();
+
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(coreMessage);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -100,12 +96,9 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
Message message = Message.Factory.create();
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -121,13 +114,10 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
Message message = Message.Factory.create();
message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@@ -135,13 +125,10 @@ public class JMSMappingInboundTransformerTest {
@Test
public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
Message message = Message.Factory.create();
message.setContentType("text/plain");
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -155,14 +142,12 @@ public class JMSMappingInboundTransformerTest {
* @throws Exception
* if an error occurs during the test.
*/
+ @Test
public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
Message message = Message.Factory.create();
message.setContentType("unknown-content-type");
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQMessage.class, jmsMessage.getClass());
@@ -185,10 +170,7 @@ public class JMSMappingInboundTransformerTest {
message.setBody(new Data(binary));
message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -208,10 +190,7 @@ public class JMSMappingInboundTransformerTest {
message.setBody(new Data(binary));
message.setContentType("unknown-content-type");
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -232,10 +211,7 @@ public class JMSMappingInboundTransformerTest {
assertNull(message.getContentType());
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -256,10 +232,7 @@ public class JMSMappingInboundTransformerTest {
message.setBody(new Data(binary));
message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@@ -359,10 +332,7 @@ public class JMSMappingInboundTransformerTest {
message.setBody(new Data(binary));
message.setContentType(contentType);
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
if (StandardCharsets.UTF_8.equals(expectedCharset)) {
@@ -386,10 +356,7 @@ public class JMSMappingInboundTransformerTest {
Message message = Proton.message();
message.setBody(new AmqpValue("content"));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -407,10 +374,7 @@ public class JMSMappingInboundTransformerTest {
Message message = Proton.message();
message.setBody(new AmqpValue(null));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -426,14 +390,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
Message message = Message.Factory.create();
message.setBody(new AmqpValue(new Binary(new byte[0])));
message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@@ -452,10 +413,7 @@ public class JMSMappingInboundTransformerTest {
Map<String, String> map = new HashMap<>();
message.setBody(new AmqpValue(map));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass());
@@ -481,10 +439,7 @@ public class JMSMappingInboundTransformerTest {
message.setBody(new AmqpValue(map));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass());
@@ -508,10 +463,7 @@ public class JMSMappingInboundTransformerTest {
List<String> list = new ArrayList<>();
message.setBody(new AmqpValue(list));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@@ -530,10 +482,7 @@ public class JMSMappingInboundTransformerTest {
List<String> list = new ArrayList<>();
message.setBody(new AmqpSequence(list));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@@ -552,10 +501,7 @@ public class JMSMappingInboundTransformerTest {
Binary binary = new Binary(new byte[0]);
message.setBody(new AmqpValue(binary));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -574,11 +520,7 @@ public class JMSMappingInboundTransformerTest {
Message message = Proton.message();
message.setBody(new AmqpValue(UUID.randomUUID()));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -590,10 +532,7 @@ public class JMSMappingInboundTransformerTest {
Message message = Message.Factory.create();
message.setBody(new AmqpValue(contentString));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -633,7 +572,6 @@ public class JMSMappingInboundTransformerTest {
private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass)
throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
String toAddress = "toAddress";
Message amqp = Message.Factory.create();
@@ -646,9 +584,7 @@ public class JMSMappingInboundTransformerTest {
amqp.setMessageAnnotations(ma);
}
- EncodedMessage em = encodeMessage(amqp);
-
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
}
@@ -681,7 +617,6 @@ public class JMSMappingInboundTransformerTest {
private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass)
throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
String replyToAddress = "replyToAddress";
Message amqp = Message.Factory.create();
@@ -694,27 +629,8 @@ public class JMSMappingInboundTransformerTest {
amqp.setMessageAnnotations(ma);
}
- EncodedMessage em = encodeMessage(amqp);
-
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
}
- // ----- Utility Methods --------------------------------------------------//
-
- private EncodedMessage encodeMessage(Message message) {
- byte[] encodeBuffer = new byte[1024 * 8];
- int encodedSize;
- while (true) {
- try {
- encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
- break;
- } catch (java.nio.BufferOverflowException e) {
- encodeBuffer = new byte[encodeBuffer.length * 2];
- }
- }
-
- long messageFormat = 0;
- return new EncodedMessage(messageFormat, encodeBuffer, 0, encodedSize);
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
index 3fe6d70..f38da3a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java
@@ -27,10 +27,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@@ -38,9 +37,6 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@@ -70,8 +66,6 @@ public class JMSMappingOutboundTransformerTest {
private final UUID TEST_OBJECT_VALUE = UUID.fromString("fee14b62-09e0-4ac6-a4c3-4206c630d844");
private final String TEST_ADDRESS = "queue://testAddress";
- private IDGenerator idGenerator;
- private JMSMappingOutboundTransformer transformer;
public static final byte QUEUE_TYPE = 0x00;
public static final byte TOPIC_TYPE = 0x01;
@@ -80,8 +74,6 @@ public class JMSMappingOutboundTransformerTest {
@Before
public void setUp() {
- idGenerator = new SimpleIDGenerator(0);
- transformer = new JMSMappingOutboundTransformer(idGenerator);
}
// ----- no-body Message type tests ---------------------------------------//
@@ -91,10 +83,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSMessage outbound = createMessage();
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNull(amqp.getBody());
}
@@ -105,10 +94,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNull(amqp.getBody());
}
@@ -120,10 +106,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSBytesMessage outbound = createBytesMessage();
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -138,10 +121,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeBytes(expectedPayload);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -162,10 +142,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeBytes(expectedPayload);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -184,10 +161,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -203,10 +177,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeBytes(expectedPayload);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -228,10 +199,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeBytes(expectedPayload);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -251,10 +219,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSMapMessage outbound = createMapMessage();
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -269,10 +234,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setBytes("bytes", byteArray);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -294,10 +256,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setBoolean("property-3", true);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -318,10 +277,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setBoolean("property-3", true);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -341,10 +297,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSStreamMessage outbound = createStreamMessage();
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -357,10 +310,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpSequence);
@@ -374,10 +324,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeString("test");
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -397,10 +344,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeString("test");
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpSequence);
@@ -419,10 +363,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSObjectMessage outbound = createObjectMessage();
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -435,10 +376,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -451,10 +389,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -467,10 +402,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -487,10 +419,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -507,10 +436,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -527,10 +453,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -547,10 +470,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -567,10 +487,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -589,10 +506,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage();
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -606,10 +520,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -627,10 +538,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -647,10 +555,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -663,10 +568,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@@ -680,10 +582,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
outbound.encode();
- EncodedMessage encoded = transform(outbound);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@@ -729,15 +628,12 @@ public class JMSMappingOutboundTransformerTest {
textMessage.setText("myTextMessageContent");
textMessage.setJMSDestination(jmsDestination);
- EncodedMessage encoded = transform(textMessage);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
MessageAnnotations ma = amqp.getMessageAnnotations();
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
if (maMap != null) {
- Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION);
+ Object actualValue = maMap.get(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
} else if (expectedAnnotationValue != null) {
fail("Expected annotation value, but there were no annotations");
@@ -783,15 +679,12 @@ public class JMSMappingOutboundTransformerTest {
textMessage.setText("myTextMessageContent");
textMessage.setJMSReplyTo(jmsReplyTo);
- EncodedMessage encoded = transform(textMessage);
- assertNotNull(encoded);
-
- Message amqp = encoded.decode();
+ Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
MessageAnnotations ma = amqp.getMessageAnnotations();
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
if (maMap != null) {
- Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
+ Object actualValue = maMap.get(AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
} else if (expectedAnnotationValue != null) {
fail("Expected annotation value, but there were no annotations");
@@ -804,17 +697,6 @@ public class JMSMappingOutboundTransformerTest {
// ----- Utility Methods used for this Test -------------------------------//
- public EncodedMessage transform(ServerJMSMessage message) throws Exception {
- // Useful for testing but not recommended for real life use.
- ByteBuf nettyBuffer = Unpooled.buffer(1024);
- NettyWritable buffer = new NettyWritable(nettyBuffer);
-
- long messageFormat = transformer.transform(message, buffer);
-
- EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
-
- return encoded;
- }
private ServerDestination createDestination(byte destType) {
ServerDestination destination = null;
@@ -839,7 +721,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSMessage createMessage() {
- return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0);
+ return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE));
}
private ServerJMSBytesMessage createBytesMessage() {
@@ -847,7 +729,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSBytesMessage createBytesMessage(boolean compression) {
- ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0);
+ ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE));
if (compression) {
// TODO
@@ -861,7 +743,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSMapMessage createMapMessage(boolean compression) {
- ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0);
+ ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE));
if (compression) {
// TODO
@@ -875,7 +757,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSStreamMessage createStreamMessage(boolean compression) {
- ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0);
+ ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE));
if (compression) {
// TODO
@@ -893,7 +775,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) {
- ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(idGenerator);
+ ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0);
if (compression) {
// TODO
@@ -920,7 +802,7 @@ public class JMSMappingOutboundTransformerTest {
}
private ServerJMSTextMessage createTextMessage(String text, boolean compression) {
- ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(idGenerator);
+ ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0);
if (compression) {
// TODO
@@ -942,7 +824,7 @@ public class JMSMappingOutboundTransformerTest {
}
private CoreMessage newMessage(byte messageType) {
- CoreMessage message = new CoreMessage(idGenerator.generateID(), 512);
+ CoreMessage message = new CoreMessage(0, 512);
message.setType(messageType);
((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
return message;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
index fdf0129..483f245 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java
@@ -21,27 +21,23 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.ProtonJMessage;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
/**
* Some simple performance tests for the Message Transformers.
*/
@@ -51,16 +47,11 @@ public class JMSTransformationSpeedComparisonTest {
@Rule
public TestName test = new TestName();
- private IDGenerator idGenerator;
- private ProtonMessageConverter converter;
-
private final int WARM_CYCLES = 1000;
private final int PROFILE_CYCLES = 1000000;
@Before
public void setUp() {
- idGenerator = new SimpleIDGenerator(0);
- converter = new ProtonMessageConverter(idGenerator);
}
@Test
@@ -68,20 +59,20 @@ public class JMSTransformationSpeedComparisonTest {
Message message = Proton.message();
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
- AMQPMessage encoded = encode(message);
+ AMQPMessage encoded = new AMQPMessage(message);
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -99,20 +90,20 @@ public class JMSTransformationSpeedComparisonTest {
message.setContentType("text/plain");
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
- AMQPMessage encoded = encode(message);
+ AMQPMessage encoded = new AMQPMessage(message);
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -122,20 +113,20 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testTypicalQpidJMSMessage() throws Exception {
- AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
+ AMQPMessage encoded = new AMQPMessage(createTypicalQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -149,16 +140,16 @@ public class JMSTransformationSpeedComparisonTest {
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -172,14 +163,16 @@ public class JMSTransformationSpeedComparisonTest {
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- converter.inbound(encoded);
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- converter.inbound(encoded);
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -191,18 +184,19 @@ public class JMSTransformationSpeedComparisonTest {
public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
- org.apache.activemq.artemis.api.core.Message intermediate = converter.inbound(encoded);
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
- encode(converter.outbound(intermediate, 1));
+ ICoreMessage intermediate = encoded.toCore();
+ encode(AMQPConverter.getInstance().fromCore(intermediate));
}
totalDuration += System.nanoTime() - startTime;
@@ -278,16 +272,16 @@ public class JMSTransformationSpeedComparisonTest {
return message;
}
- private AMQPMessage encode(Object target) {
- if (target instanceof ProtonJMessage) {
- ProtonJMessage amqp = (ProtonJMessage) target;
-
- ByteBuf nettyBuffer = Unpooled.buffer(1024);
- amqp.encode(new NettyWritable(nettyBuffer));
+ private AMQPMessage encode(Message message) {
+ return new AMQPMessage(message);
+ }
- return new AMQPMessage(0, nettyBuffer.array(), null);
- } else {
- return null;
+ private void encode(AMQPMessage target) {
+ ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+ try {
+ target.sendBuffer(buf, 1);
+ } finally {
+ buf.release();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
index 6a0f20c..7cba09b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
@@ -16,36 +16,30 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
-import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.ProtonJMessage;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
/**
* Tests some basic encode / decode functionality on the transformers.
*/
@@ -54,72 +48,10 @@ public class MessageTransformationTest {
@Rule
public TestName test = new TestName();
- private IDGenerator idGenerator;
- private ProtonMessageConverter converter;
-
@Before
public void setUp() {
- idGenerator = new SimpleIDGenerator(0);
- converter = new ProtonMessageConverter(idGenerator);
}
- @Test
- public void testEncodeDecodeFidelity() throws Exception {
- Map<String, Object> applicationProperties = new HashMap<>();
- Map<Symbol, Object> messageAnnotations = new HashMap<>();
-
- applicationProperties.put("property-1", "string");
- applicationProperties.put("property-2", 512);
- applicationProperties.put("property-3", true);
-
- messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
- messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
-
- Message incomingMessage = Proton.message();
-
- incomingMessage.setAddress("queue://test-queue");
- incomingMessage.setDeliveryCount(1);
- incomingMessage.setApplicationProperties(new ApplicationProperties(applicationProperties));
- incomingMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
- incomingMessage.setCreationTime(System.currentTimeMillis());
- incomingMessage.setContentType("text/plain");
- incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
-
- EncodedMessage encoded = encode(incomingMessage);
-
- org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(encoded);
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, outbound.getLongProperty("JMSXDeliveryCount").intValue())).decode();
-
- // Test that message details are equal
- assertEquals(incomingMessage.getAddress(), outboudMessage.getAddress());
- assertEquals(incomingMessage.getDeliveryCount(), outboudMessage.getDeliveryCount());
- assertEquals(incomingMessage.getCreationTime(), outboudMessage.getCreationTime());
- assertEquals(incomingMessage.getContentType(), outboudMessage.getContentType());
-
- // Test Message annotations
- ApplicationProperties incomingApplicationProperties = incomingMessage.getApplicationProperties();
- ApplicationProperties outgoingApplicationProperties = outboudMessage.getApplicationProperties();
-
- assertEquals(incomingApplicationProperties.getValue(), outgoingApplicationProperties.getValue());
-
- // Test Message properties
- MessageAnnotations incomingMessageAnnotations = incomingMessage.getMessageAnnotations();
- MessageAnnotations outgoingMessageAnnotations = outboudMessage.getMessageAnnotations();
-
- assertEquals(incomingMessageAnnotations.getValue(), outgoingMessageAnnotations.getValue());
-
- // Test that bodies are equal
- assertTrue(incomingMessage.getBody() instanceof AmqpValue);
- assertTrue(outboudMessage.getBody() instanceof AmqpValue);
-
- AmqpValue incomingBody = (AmqpValue) incomingMessage.getBody();
- AmqpValue outgoingBody = (AmqpValue) outboudMessage.getBody();
-
- assertTrue(incomingBody.getValue() instanceof String);
- assertTrue(outgoingBody.getValue() instanceof String);
-
- assertEquals(incomingBody.getValue(), outgoingBody.getValue());
- }
@Test
public void testBodyOnlyEncodeDecode() throws Exception {
@@ -128,8 +60,8 @@ public class MessageTransformationTest {
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
- org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null));
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+ ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
+ Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
assertNull(outboudMessage.getHeader());
assertNull(outboudMessage.getProperties());
@@ -143,8 +75,8 @@ public class MessageTransformationTest {
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
incomingMessage.setMessageId("ID:SomeQualifier:0:0:1");
- org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null));
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+ ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
+ Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
assertNull(outboudMessage.getHeader());
assertNotNull(outboudMessage.getProperties());
@@ -158,8 +90,8 @@ public class MessageTransformationTest {
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
incomingMessage.setDurable(true);
- org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null));
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+ ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
+ Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
assertNotNull(outboudMessage.getHeader());
assertNull(outboudMessage.getProperties());
@@ -172,8 +104,8 @@ public class MessageTransformationTest {
incomingMessage.setBody(new AmqpValue(new Boolean(true)));
- org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(incomingMessage, null));
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+ ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
+ Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
Section section = outboudMessage.getBody();
assertNotNull(section);
@@ -229,8 +161,8 @@ public class MessageTransformationTest {
message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
- org.apache.activemq.artemis.api.core.Message outbound = converter.inbound(new AMQPMessage(message, null));
- Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode();
+ ICoreMessage core = new AMQPMessage(message).toCore();
+ Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
assertNotNull(outboudMessage.getHeader());
assertNotNull(outboudMessage.getProperties());
@@ -242,18 +174,4 @@ public class MessageTransformationTest {
assertEquals(9, outboudMessage.getApplicationProperties().getValue().size());
assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size());
}
-
- private EncodedMessage encode(Message message) {
- ProtonJMessage amqp = (ProtonJMessage) message;
-
- ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
- final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
- int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
- if (overflow.position() > 0) {
- buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
- c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
- }
-
- return new EncodedMessage(1, buffer.array(), 0, c);
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
index 4313eae..db40a8e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java
@@ -52,7 +52,7 @@ public class AMQPMessageTest {
nettyBuffer.readBytes(bytes);
- AMQPMessage encode = new AMQPMessage(0, bytes, null);
+ AMQPMessage encode = new AMQPMessage(0, bytes);
Assert.assertEquals(7, encode.getHeader().getDeliveryCount().intValue());
Assert.assertEquals(true, encode.getHeader().getDurable());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 1f435ff..f4cba64 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -132,11 +131,6 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter
}
@Override
- public MessageConverter getConverter() {
- return null;
- }
-
- @Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index e7b8c50..613fef3 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
@@ -91,7 +92,7 @@ public class MQTTUtil {
return MQTT_RETAIN_ADDRESS_PREFIX + MQTT_WILDCARD.convert(filter, wildcardConfiguration);
}
- private static Message createServerMessage(MQTTSession session,
+ private static ICoreMessage createServerMessage(MQTTSession session,
SimpleString address,
boolean retain,
int qos) {
@@ -111,7 +112,7 @@ public class MQTTUtil {
int qos,
ByteBuf payload) {
String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
- Message message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
+ ICoreMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
return message;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 550a63a..76e50ef 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -35,6 +35,7 @@ import java.util.zip.InflaterOutputStream;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
@@ -68,7 +69,7 @@ import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.UTF8Buffer;
-public class OpenWireMessageConverter implements MessageConverter {
+public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> {
public static final String AMQ_PREFIX = "__HDR_";
public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause";
@@ -101,12 +102,22 @@ public class OpenWireMessageConverter implements MessageConverter {
}
@Override
+ public OpenwireMessage fromCore(ICoreMessage coreMessage) throws Exception {
+ return null;
+ }
+
+ @Override
+ public ICoreMessage toCore(OpenwireMessage pureMessage) throws Exception {
+ return null;
+ }
+
+ // @Override
public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
// TODO: implement this
return null;
}
- @Override
+// @Override
public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception {
Message messageSend = (Message) message;
@@ -414,7 +425,7 @@ public class OpenWireMessageConverter implements MessageConverter {
}
public static MessageDispatch createMessageDispatch(MessageReference reference,
- org.apache.activemq.artemis.api.core.Message message,
+ ICoreMessage message,
AMQConsumer consumer) throws IOException, JMSException {
// TODO-now: use new Encode here
ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination());
@@ -433,7 +444,7 @@ public class OpenWireMessageConverter implements MessageConverter {
}
private static ActiveMQMessage toAMQMessage(MessageReference reference,
- org.apache.activemq.artemis.api.core.Message coreMessage,
+ ICoreMessage coreMessage,
WireFormat marshaller,
ActiveMQDestination actualDestination) throws IOException {
ActiveMQMessage amqMsg = null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 5b62e3e..4292fe5 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -35,6 +35,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
@@ -44,12 +45,10 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -236,11 +235,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
@Override
- public MessageConverter getConverter() {
- return messageConverter;
- }
-
- @Override
public void removeHandler(String name) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
new file mode 100644
index 0000000..0b29114
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire;
+
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessageListener;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.Persister;
+
+// TODO: Implement this
+public class OpenwireMessage implements Message {
+
+ @Override
+ public boolean containsProperty(SimpleString key) {
+ return false;
+ }
+
+ @Override
+ public void messageChanged() {
+
+ }
+
+ @Override
+ public Long getScheduledDeliveryTime() {
+ return null;
+ }
+
+ @Override
+ public RefCountMessageListener getContext() {
+ return null;
+ }
+
+ @Override
+ public Message setContext(RefCountMessageListener context) {
+ return null;
+ }
+
+ @Override
+ public Message setBuffer(ByteBuf buffer) {
+ return null;
+ }
+
+ @Override
+ public ByteBuf getBuffer() {
+ return null;
+ }
+
+ @Override
+ public Message copy() {
+ return null;
+ }
+
+ @Override
+ public Message copy(long newID) {
+ return null;
+ }
+
+ @Override
+ public long getMessageID() {
+ return 0;
+ }
+
+ @Override
+ public Message setMessageID(long id) {
+ return null;
+ }
+
+ @Override
+ public long getExpiration() {
+ return 0;
+ }
+
+ @Override
+ public Message setExpiration(long expiration) {
+ return null;
+ }
+
+ @Override
+ public Object getUserID() {
+ return null;
+ }
+
+ @Override
+ public Message setUserID(Object userID) {
+ return null;
+ }
+
+ @Override
+ public void copyHeadersAndProperties(Message msg) {
+
+ }
+
+ @Override
+ public boolean isDurable() {
+ return false;
+ }
+
+ @Override
+ public Message setDurable(boolean durable) {
+ return null;
+ }
+
+ @Override
+ public Persister<Message> getPersister() {
+ return null;
+ }
+
+ @Override
+ public String getAddress() {
+ return null;
+ }
+
+ @Override
+ public Message setAddress(String address) {
+ return null;
+ }
+
+ @Override
+ public SimpleString getAddressSimpleString() {
+ return null;
+ }
+
+ @Override
+ public Message setAddress(SimpleString address) {
+ return null;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return 0;
+ }
+
+ @Override
+ public Message setTimestamp(long timestamp) {
+ return null;
+ }
+
+ @Override
+ public byte getPriority() {
+ return 0;
+ }
+
+ @Override
+ public Message setPriority(byte priority) {
+ return null;
+ }
+
+ @Override
+ public void receiveBuffer(ByteBuf buffer) {
+
+ }
+
+ @Override
+ public void sendBuffer(ByteBuf buffer, int deliveryCount) {
+
+ }
+
+ @Override
+ public int getPersistSize() {
+ return 0;
+ }
+
+ @Override
+ public void persist(ActiveMQBuffer targetRecord) {
+
+ }
+
+ @Override
+ public void reloadPersistence(ActiveMQBuffer record) {
+
+ }
+
+ @Override
+ public Message putBooleanProperty(String key, boolean value) {
+ return null;
+ }
+
+ @Override
+ public Message putByteProperty(String key, byte value) {
+ return null;
+ }
+
+ @Override
+ public Message putBytesProperty(String key, byte[] value) {
+ return null;
+ }
+
+ @Override
+ public Message putShortProperty(String key, short value) {
+ return null;
+ }
+
+ @Override
+ public Message putCharProperty(String key, char value) {
+ return null;
+ }
+
+ @Override
+ public Message putIntProperty(String key, int value) {
+ return null;
+ }
+
+ @Override
+ public Message putLongProperty(String key, long value) {
+ return null;
+ }
+
+ @Override
+ public Message putFloatProperty(String key, float value) {
+ return null;
+ }
+
+ @Override
+ public Message putDoubleProperty(String key, double value) {
+ return null;
+ }
+
+ @Override
+ public Message putBooleanProperty(SimpleString key, boolean value) {
+ return null;
+ }
+
+ @Override
+ public Message putByteProperty(SimpleString key, byte value) {
+ return null;
+ }
+
+ @Override
+ public Message putBytesProperty(SimpleString key, byte[] value) {
+ return null;
+ }
+
+ @Override
+ public Message putShortProperty(SimpleString key, short value) {
+ return null;
+ }
+
+ @Override
+ public Message putCharProperty(SimpleString key, char value) {
+ return null;
+ }
+
+ @Override
+ public Message putIntProperty(SimpleString key, int value) {
+ return null;
+ }
+
+ @Override
+ public Message putLongProperty(SimpleString key, long value) {
+ return null;
+ }
+
+ @Override
+ public Message putFloatProperty(SimpleString key, float value) {
+ return null;
+ }
+
+ @Override
+ public Message putDoubleProperty(SimpleString key, double value) {
+ return null;
+ }
+
+ @Override
+ public Message putStringProperty(String key, String value) {
+ return null;
+ }
+
+ @Override
+ public Message putObjectProperty(String key, Object value) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Message putObjectProperty(SimpleString key, Object value) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Object removeProperty(String key) {
+ return null;
+ }
+
+ @Override
+ public boolean containsProperty(String key) {
+ return false;
+ }
+
+ @Override
+ public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Object getObjectProperty(String key) {
+ return null;
+ }
+
+ @Override
+ public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException {
+ return new byte[0];
+ }
+
+ @Override
+ public Object removeProperty(SimpleString key) {
+ return null;
+ }
+
+ @Override
+ public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Object getObjectProperty(SimpleString key) {
+ return null;
+ }
+
+ @Override
+ public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return null;
+ }
+
+ @Override
+ public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
+ return new byte[0];
+ }
+
+ @Override
+ public Message putStringProperty(SimpleString key, SimpleString value) {
+ return null;
+ }
+
+ @Override
+ public int getEncodeSize() {
+ return 0;
+ }
+
+ @Override
+ public Set<SimpleString> getPropertyNames() {
+ return null;
+ }
+
+ @Override
+ public int getRefCount() {
+ return 0;
+ }
+
+ @Override
+ public int incrementRefCount() throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int decrementRefCount() throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int incrementDurableRefCount() {
+ return 0;
+ }
+
+ @Override
+ public int decrementDurableRefCount() {
+ return 0;
+ }
+
+ @Override
+ public ICoreMessage toCore() {
+ return null;
+ }
+
+ @Override
+ public int getMemoryEstimate() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9b731bb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 6f83c2d..3bdee8b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,14 +27,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -208,7 +209,7 @@ public class AMQConsumer {
}
- public int handleDeliver(MessageReference reference, Message message, int deliveryCount) {
+ public int handleDeliver(MessageReference reference, ICoreMessage message, int deliveryCount) {
MessageDispatch dispatch;
try {
if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) {