You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/06 03:58:32 UTC
[14/21] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 2e19f07..4e33c9b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -21,15 +21,19 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
@@ -38,7 +42,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.qpid.proton.amqp.DescribedType;
@@ -61,9 +64,6 @@ import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
/**
* TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
*/
@@ -474,7 +474,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (closed) {
return;
}
- Object message = delivery.getContext();
+ Message message = (Message)delivery.getContext();
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
@@ -518,6 +518,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
+ e.printStackTrace();
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
} else if (remoteState instanceof Released) {
@@ -566,7 +567,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
/**
* handle an out going message from ActiveMQ Artemis, send via the Proton Sender
*/
- public int deliverMessage(Object message, int deliveryCount) throws Exception {
+ public int deliverMessage(AMQPMessage message, int deliveryCount) throws Exception {
if (closed) {
return 0;
}
@@ -590,16 +591,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try {
- long messageFormat = 0;
-
- // Encode the Server Message into the given Netty Buffer as an AMQP
- // Message transformed from the internal message model.
- try {
- messageFormat = sessionSPI.encodeMessage(message, deliveryCount, new NettyWritable(nettyBuffer));
- } catch (Throwable e) {
- log.warn(e.getMessage(), e);
- throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
- }
+ message.sendBuffer(nettyBuffer, deliveryCount);
int size = nettyBuffer.writerIndex();
@@ -609,7 +601,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
final Delivery delivery;
delivery = sender.delivery(tag, 0, tag.length);
- delivery.setMessageFormat((int) messageFormat);
+ delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(message);
// this will avoid a copy.. patch provided by Norman using buffer.array()
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
index 51f42a3..1afeba8 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -61,7 +62,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
return;
}
- DeliveryUtil.readDelivery(receiver, buffer);
+ receiver.recv(new NettyWritable(buffer));
receiver.advance();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 25ef51e..673a688 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -254,7 +254,7 @@ public class ProtonHandler extends ProtonInitializable {
flush(false);
}
- private void flush(boolean wait) {
+ public void flush(boolean wait) {
synchronized (lock) {
transport.process();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
new file mode 100644
index 0000000..e0705b4
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.codec.ReadableBuffer;
+
+public class NettyReadable implements ReadableBuffer {
+
+ private static final Charset Charset_UTF8 = Charset.forName("UTF-8");
+
+ private final ByteBuf buffer;
+
+ public NettyReadable(ByteBuf buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public void put(ReadableBuffer other) {
+ buffer.writeBytes(other.byteBuffer());
+ }
+
+ @Override
+ public byte get() {
+ return buffer.readByte();
+ }
+
+ @Override
+ public int getInt() {
+ return buffer.readInt();
+ }
+
+ @Override
+ public long getLong() {
+ return buffer.readLong();
+ }
+
+ @Override
+ public short getShort() {
+ return buffer.readShort();
+ }
+
+ @Override
+ public float getFloat() {
+ return buffer.readFloat();
+ }
+
+ @Override
+ public double getDouble() {
+ return buffer.readDouble();
+ }
+
+ @Override
+ public ReadableBuffer get(byte[] data, int offset, int length) {
+ buffer.readBytes(data, offset, length);
+ return this;
+ }
+
+ @Override
+ public ReadableBuffer get(byte[] data) {
+ buffer.readBytes(data);
+ return this;
+ }
+
+ @Override
+ public ReadableBuffer position(int position) {
+ buffer.readerIndex(position);
+ return this;
+ }
+
+ @Override
+ public ReadableBuffer slice() {
+ return new NettyReadable(buffer.slice());
+ }
+
+ @Override
+ public ReadableBuffer flip() {
+ return new NettyReadable(buffer.duplicate().setIndex(0, buffer.readerIndex()));
+ }
+
+ @Override
+ public ReadableBuffer limit(int limit) {
+ buffer.writerIndex(limit);
+ return this;
+ }
+
+ @Override
+ public int limit() {
+ return buffer.writerIndex();
+ }
+
+ @Override
+ public int remaining() {
+ return buffer.readableBytes();
+ }
+
+ @Override
+ public int position() {
+ return buffer.readerIndex();
+ }
+
+ @Override
+ public boolean hasRemaining() {
+ return buffer.readableBytes() > 0;
+ }
+
+ @Override
+ public ReadableBuffer duplicate() {
+ return new NettyReadable(buffer.duplicate());
+ }
+
+ @Override
+ public ByteBuffer byteBuffer() {
+ return buffer.nioBuffer(0, buffer.writerIndex());
+ }
+
+ @Override
+ public String readUTF8() {
+ return buffer.toString(Charset_UTF8);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java
new file mode 100644
index 0000000..b2f0fdc
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+
+/** This can go away if Proton provides this feature. */
+public class TLSEncode {
+
+ // For now Proton requires that we create a decoder to create an encoder
+ private static class EncoderDecoderPair {
+ DecoderImpl decoder = new DecoderImpl();
+ EncoderImpl encoder = new EncoderImpl(decoder);
+ {
+ AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+ }
+ }
+
+ private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() {
+ @Override
+ protected EncoderDecoderPair initialValue() {
+ return new EncoderDecoderPair();
+ }
+ };
+
+ public static EncoderImpl getEncoder() {
+ return tlsCodec.get().encoder;
+ }
+
+ public static DecoderImpl getDecoder() {
+ return tlsCodec.get().decoder;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 08c46be..8ced348 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -16,44 +16,28 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter;
-import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.wrapMessage;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.ProtonJMessage;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
public class TestConversions extends Assert {
@@ -72,18 +56,12 @@ public class TestConversions extends Assert {
message.setBody(new AmqpValue(new Boolean(true)));
- EncodedMessage encodedMessage = encodeMessage(message);
+ AMQPMessage encodedMessage = new AMQPMessage(message);
- ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerMessage serverMessage = converter.inbound(encodedMessage);
+ ICoreMessage serverMessage = encodedMessage.toCore();
- verifyProperties(new ServerJMSMessage(serverMessage, 0));
+ verifyProperties(ServerJMSMessage.wrapCoreMessage(serverMessage));
- EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
- Message amqpMessage = encoded.decode();
-
- AmqpValue value = (AmqpValue) amqpMessage.getBody();
- assertEquals(value.getValue(), true);
}
@Test
@@ -101,12 +79,11 @@ public class TestConversions extends Assert {
message.setBody(new Data(new Binary(bodyBytes)));
- EncodedMessage encodedMessage = encodeMessage(message);
+ AMQPMessage encodedMessage = new AMQPMessage(message);
- ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerMessage serverMessage = converter.inbound(encodedMessage);
+ ICoreMessage serverMessage = encodedMessage.toCore();
- ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) wrapMessage(BYTES_TYPE, serverMessage, 0);
+ ServerJMSBytesMessage bytesMessage = (ServerJMSBytesMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
verifyProperties(bytesMessage);
@@ -118,9 +95,6 @@ public class TestConversions extends Assert {
Assert.assertArrayEquals(bodyBytes, newBodyBytes);
- Object obj = converter.outbound(serverMessage, 0);
-
- System.out.println("output = " + obj);
}
private void verifyProperties(javax.jms.Message message) throws Exception {
@@ -151,12 +125,12 @@ public class TestConversions extends Assert {
message.setBody(new AmqpValue(mapValues));
- EncodedMessage encodedMessage = encodeMessage(message);
+ AMQPMessage encodedMessage = new AMQPMessage(message);
- ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerMessage serverMessage = converter.inbound(encodedMessage);
+ ICoreMessage serverMessage = encodedMessage.toCore();
+ serverMessage.getReadOnlyBodyBuffer();
- ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) wrapMessage(MAP_TYPE, serverMessage, 0);
+ ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
mapMessage.decode();
verifyProperties(mapMessage);
@@ -164,15 +138,8 @@ public class TestConversions extends Assert {
Assert.assertEquals(1, mapMessage.getInt("someint"));
Assert.assertEquals("value", mapMessage.getString("somestr"));
- EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
- Message amqpMessage = encoded.decode();
-
- AmqpValue value = (AmqpValue) amqpMessage.getBody();
- Map<?, ?> mapoutput = (Map<?, ?>) value.getValue();
-
- assertEquals(Integer.valueOf(1), mapoutput.get("someint"));
-
- System.out.println("output = " + amqpMessage);
+ AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
+ System.out.println(newAMQP.getProtonMessage().getBody());
}
@Test
@@ -188,14 +155,11 @@ public class TestConversions extends Assert {
message.setBody(new AmqpSequence(objects));
- EncodedMessage encodedMessage = encodeMessage(message);
+ AMQPMessage encodedMessage = new AMQPMessage(message);
- ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerMessage serverMessage = converter.inbound(encodedMessage);
+ ICoreMessage serverMessage = encodedMessage.toCore();
- simulatePersistence(serverMessage);
-
- ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) wrapMessage(STREAM_TYPE, serverMessage, 0);
+ ServerJMSStreamMessage streamMessage = (ServerJMSStreamMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
verifyProperties(streamMessage);
@@ -203,13 +167,6 @@ public class TestConversions extends Assert {
assertEquals(10, streamMessage.readInt());
assertEquals("10", streamMessage.readString());
-
- EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
- Message amqpMessage = encoded.decode();
-
- List<?> list = ((AmqpSequence) amqpMessage.getBody()).getValue();
- Assert.assertEquals(Integer.valueOf(10), list.get(0));
- Assert.assertEquals("10", list.get(1));
}
@Test
@@ -222,553 +179,17 @@ public class TestConversions extends Assert {
String text = "someText";
message.setBody(new AmqpValue(text));
- EncodedMessage encodedMessage = encodeMessage(message);
-
- ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
- ServerMessage serverMessage = converter.inbound(encodedMessage);
+ AMQPMessage encodedMessage = new AMQPMessage(message);
- simulatePersistence(serverMessage);
+ ICoreMessage serverMessage = encodedMessage.toCore();
- ServerJMSTextMessage textMessage = (ServerJMSTextMessage) wrapMessage(TEXT_TYPE, serverMessage, 0);
+ ServerJMSTextMessage textMessage = (ServerJMSTextMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
textMessage.decode();
verifyProperties(textMessage);
Assert.assertEquals(text, textMessage.getText());
- EncodedMessage encoded = (EncodedMessage) converter.outbound(serverMessage, 0);
- Message amqpMessage = encoded.decode();
-
- AmqpValue value = (AmqpValue) amqpMessage.getBody();
- String textValue = (String) value.getValue();
-
- Assert.assertEquals(text, textValue);
-
- System.out.println("output = " + amqpMessage);
- }
-
- private void simulatePersistence(ServerMessage serverMessage) {
- serverMessage.setAddress(new SimpleString("SomeAddress"));
- // This is just to simulate what would happen during the persistence of the message
- // We need to still be able to recover the message when we read it back
- ((EncodingSupport) serverMessage).encode(new EmptyBuffer());
- }
-
- private ProtonJMessage reEncodeMsg(Object obj) {
- ProtonJMessage objOut = (ProtonJMessage) obj;
-
- ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
-
- objOut.encode(new NettyWritable(nettyBuffer));
- return objOut;
}
- private EncodedMessage encodeMessage(MessageImpl message) {
- ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024);
- message.encode(new NettyWritable(buf));
- byte[] bytesConvert = new byte[buf.writerIndex()];
- buf.readBytes(bytesConvert);
- return new EncodedMessage(0, bytesConvert, 0, bytesConvert.length);
- }
-
- class EmptyBuffer implements ActiveMQBuffer {
-
- @Override
- public ByteBuf byteBuf() {
- return null;
- }
-
- @Override
- public int capacity() {
- return 0;
- }
-
- @Override
- public int readerIndex() {
- return 0;
- }
-
- @Override
- public void readerIndex(int readerIndex) {
-
- }
-
- @Override
- public int writerIndex() {
- return 0;
- }
-
- @Override
- public void writerIndex(int writerIndex) {
-
- }
-
- @Override
- public void setIndex(int readerIndex, int writerIndex) {
-
- }
-
- @Override
- public int readableBytes() {
- return 0;
- }
-
- @Override
- public int writableBytes() {
- return 0;
- }
-
- @Override
- public boolean readable() {
- return false;
- }
-
- @Override
- public boolean writable() {
- return false;
- }
-
- @Override
- public void clear() {
-
- }
-
- @Override
- public void markReaderIndex() {
-
- }
-
- @Override
- public void resetReaderIndex() {
-
- }
-
- @Override
- public void markWriterIndex() {
-
- }
-
- @Override
- public void resetWriterIndex() {
-
- }
-
- @Override
- public void discardReadBytes() {
-
- }
-
- @Override
- public byte getByte(int index) {
- return 0;
- }
-
- @Override
- public short getUnsignedByte(int index) {
- return 0;
- }
-
- @Override
- public short getShort(int index) {
- return 0;
- }
-
- @Override
- public int getUnsignedShort(int index) {
- return 0;
- }
-
- @Override
- public int getInt(int index) {
- return 0;
- }
-
- @Override
- public long getUnsignedInt(int index) {
- return 0;
- }
-
- @Override
- public long getLong(int index) {
- return 0;
- }
-
- @Override
- public void getBytes(int index, ActiveMQBuffer dst) {
-
- }
-
- @Override
- public void getBytes(int index, ActiveMQBuffer dst, int length) {
-
- }
-
- @Override
- public void getBytes(int index, ActiveMQBuffer dst, int dstIndex, int length) {
-
- }
-
- @Override
- public void getBytes(int index, byte[] dst) {
-
- }
-
- @Override
- public void getBytes(int index, byte[] dst, int dstIndex, int length) {
-
- }
-
- @Override
- public void getBytes(int index, ByteBuffer dst) {
-
- }
-
- @Override
- public char getChar(int index) {
- return 0;
- }
-
- @Override
- public float getFloat(int index) {
- return 0;
- }
-
- @Override
- public double getDouble(int index) {
- return 0;
- }
-
- @Override
- public void setByte(int index, byte value) {
-
- }
-
- @Override
- public void setShort(int index, short value) {
-
- }
-
- @Override
- public void setInt(int index, int value) {
-
- }
-
- @Override
- public void setLong(int index, long value) {
-
- }
-
- @Override
- public void setBytes(int index, ActiveMQBuffer src) {
-
- }
-
- @Override
- public void setBytes(int index, ActiveMQBuffer src, int length) {
-
- }
-
- @Override
- public void setBytes(int index, ActiveMQBuffer src, int srcIndex, int length) {
-
- }
-
- @Override
- public void setBytes(int index, byte[] src) {
-
- }
-
- @Override
- public void setBytes(int index, byte[] src, int srcIndex, int length) {
-
- }
-
- @Override
- public void setBytes(int index, ByteBuffer src) {
-
- }
-
- @Override
- public void setChar(int index, char value) {
-
- }
-
- @Override
- public void setFloat(int index, float value) {
-
- }
-
- @Override
- public void setDouble(int index, double value) {
-
- }
-
- @Override
- public byte readByte() {
- return 0;
- }
-
- @Override
- public int readUnsignedByte() {
- return 0;
- }
-
- @Override
- public short readShort() {
- return 0;
- }
-
- @Override
- public int readUnsignedShort() {
- return 0;
- }
-
- @Override
- public int readInt() {
- return 0;
- }
-
- @Override
- public long readUnsignedInt() {
- return 0;
- }
-
- @Override
- public long readLong() {
- return 0;
- }
-
- @Override
- public char readChar() {
- return 0;
- }
-
- @Override
- public float readFloat() {
- return 0;
- }
-
- @Override
- public double readDouble() {
- return 0;
- }
-
- @Override
- public boolean readBoolean() {
- return false;
- }
-
- @Override
- public SimpleString readNullableSimpleString() {
- return null;
- }
-
- @Override
- public String readNullableString() {
- return null;
- }
-
- @Override
- public SimpleString readSimpleString() {
- return null;
- }
-
- @Override
- public String readString() {
- return null;
- }
-
- @Override
- public String readUTF() {
- return null;
- }
-
- @Override
- public ActiveMQBuffer readBytes(int length) {
- return null;
- }
-
- @Override
- public ActiveMQBuffer readSlice(int length) {
- return null;
- }
-
- @Override
- public void readBytes(ActiveMQBuffer dst) {
-
- }
-
- @Override
- public void readBytes(ActiveMQBuffer dst, int length) {
-
- }
-
- @Override
- public void readBytes(ActiveMQBuffer dst, int dstIndex, int length) {
-
- }
-
- @Override
- public void readBytes(byte[] dst) {
-
- }
-
- @Override
- public void readBytes(byte[] dst, int dstIndex, int length) {
-
- }
-
- @Override
- public void readBytes(ByteBuffer dst) {
-
- }
-
- @Override
- public int skipBytes(int length) {
- return length;
- }
-
- @Override
- public void writeByte(byte value) {
-
- }
-
- @Override
- public void writeShort(short value) {
-
- }
-
- @Override
- public void writeInt(int value) {
-
- }
-
- @Override
- public void writeLong(long value) {
-
- }
-
- @Override
- public void writeChar(char chr) {
-
- }
-
- @Override
- public void writeFloat(float value) {
-
- }
-
- @Override
- public void writeDouble(double value) {
-
- }
-
- @Override
- public void writeBoolean(boolean val) {
-
- }
-
- @Override
- public void writeNullableSimpleString(SimpleString val) {
-
- }
-
- @Override
- public void writeNullableString(String val) {
-
- }
-
- @Override
- public void writeSimpleString(SimpleString val) {
-
- }
-
- @Override
- public void writeString(String val) {
-
- }
-
- @Override
- public void writeUTF(String utf) {
-
- }
-
- @Override
- public void writeBytes(ActiveMQBuffer src, int length) {
-
- }
-
- @Override
- public void writeBytes(ActiveMQBuffer src, int srcIndex, int length) {
-
- }
-
- @Override
- public void writeBytes(byte[] src) {
-
- }
-
- @Override
- public void writeBytes(byte[] src, int srcIndex, int length) {
-
- }
-
- @Override
- public void writeBytes(ByteBuffer src) {
-
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- }
-
- @Override
- public String readLine() throws IOException {
- return null;
- }
-
- @Override
- public ActiveMQBuffer copy() {
- return null;
- }
-
- @Override
- public ActiveMQBuffer copy(int index, int length) {
- return null;
- }
-
- @Override
- public ActiveMQBuffer slice() {
- return null;
- }
-
- @Override
- public ActiveMQBuffer slice(int index, int length) {
- return null;
- }
-
- @Override
- public ActiveMQBuffer duplicate() {
- return null;
- }
-
- @Override
- public ByteBuffer toByteBuffer() {
- return null;
- }
-
- @Override
- public ByteBuffer toByteBuffer(int index, int length) {
- return null;
- }
-
- @Override
- public void release() {
- //no-op
- }
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
index 4caead7..54a98e6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupportTest.java
@@ -16,15 +16,17 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPContentTypeSupport;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
public class AMQPContentTypeSupportTest {
@Test(expected = ActiveMQAMQPInvalidContentTypeException.class)
@@ -216,7 +218,7 @@ public class AMQPContentTypeSupportTest {
@Test
public void testParseContentTypeWithApplicationJavaSerialized() throws Exception {
// Expect null as this is not a textual type
- doParseContentTypeTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, null);
+ doParseContentTypeTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), null);
}
private void doParseContentTypeTestImpl(String contentType, Charset expected) throws ActiveMQAMQPInvalidContentTypeException {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
index c53cda5..60c1989 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
@@ -20,19 +20,20 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
import java.util.UUID;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
public class AMQPMessageIdHelperTest {
private AMQPMessageIdHelper messageIdHelper;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java
index d4e078f..6aeb4dc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupportTest.java
@@ -16,20 +16,21 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
import java.util.HashMap;
import java.util.Map;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
public class AMQPMessageSupportTest {
// ---------- getSymbol ---------------------------------------------------//
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
index d7a948a..b7092c3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformerTest.java
@@ -16,36 +16,29 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
@@ -57,13 +50,15 @@ import org.apache.qpid.proton.message.Message;
import org.junit.Before;
import org.junit.Test;
-public class JMSMappingInboundTransformerTest {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
- private IDGenerator idGenerator;
+public class JMSMappingInboundTransformerTest {
@Before
public void setUp() {
- this.idGenerator = new SimpleIDGenerator(0);
}
// ----- Null Body Section ------------------------------------------------//
@@ -77,13 +72,14 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
Message message = Message.Factory.create();
message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ AMQPMessage messageEncode = new AMQPMessage(message);
+
+ ICoreMessage coreMessage = messageEncode.toCore();
+
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(coreMessage);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -98,74 +94,25 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
Message message = Message.Factory.create();
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
}
- /**
- * Test that a message with no body section, but with the content type set to
- * {@value AMQPMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an
- * ObjectMessage when not otherwise annotated to indicate the type of JMS message it is.
- *
- * @throws Exception
- * if an error occurs during the test.
- */
- @Test
- public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
- Message message = Message.Factory.create();
- message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
-
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
-
- assertNotNull("Message should not be null", jmsMessage);
- assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
- }
-
@Test
public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
Message message = Message.Factory.create();
message.setContentType("text/plain");
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
}
- /**
- * Test that a message with no body section, and with the content type set to an unknown
- * value results in a plain Message when not otherwise annotated to indicate the type of JMS
- * message it is.
- *
- * @throws Exception
- * if an error occurs during the test.
- */
- public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
- Message message = Message.Factory.create();
- message.setContentType("unknown-content-type");
-
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
-
- assertNotNull("Message should not be null", jmsMessage);
- assertEquals("Unexpected message class type", ActiveMQMessage.class, jmsMessage.getClass());
- }
-
// ----- Data Body Section ------------------------------------------------//
/**
@@ -183,10 +130,7 @@ public class JMSMappingInboundTransformerTest {
message.setBody(new Data(binary));
message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -206,10 +150,7 @@ public class JMSMappingInboundTransformerTest {
message.setBody(new Data(binary));
message.setContentType("unknown-content-type");
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -230,10 +171,7 @@ public class JMSMappingInboundTransformerTest {
assertNull(message.getContentType());
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -252,12 +190,9 @@ public class JMSMappingInboundTransformerTest {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
- message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
-
- EncodedMessage em = encodeMessage(message);
+ message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@@ -357,10 +292,7 @@ public class JMSMappingInboundTransformerTest {
message.setBody(new Data(binary));
message.setContentType(contentType);
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
if (StandardCharsets.UTF_8.equals(expectedCharset)) {
@@ -384,10 +316,7 @@ public class JMSMappingInboundTransformerTest {
Message message = Proton.message();
message.setBody(new AmqpValue("content"));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -405,10 +334,7 @@ public class JMSMappingInboundTransformerTest {
Message message = Proton.message();
message.setBody(new AmqpValue(null));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -424,14 +350,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
Message message = Message.Factory.create();
message.setBody(new AmqpValue(new Binary(new byte[0])));
- message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
- EncodedMessage em = encodeMessage(message);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@@ -450,50 +373,13 @@ public class JMSMappingInboundTransformerTest {
Map<String, String> map = new HashMap<>();
message.setBody(new AmqpValue(map));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass());
}
/**
- * Test that an amqp-value body containing a map that has an AMQP Binary as one of the
- * entries encoded into the Map results in an MapMessage where a byte array can be read from
- * the entry.
- *
- * @throws Exception
- * if an error occurs during the test.
- */
- @Test
- public void testCreateAmqpMapMessageFromAmqpValueWithMapContainingBinaryEntry() throws Exception {
- final String ENTRY_NAME = "bytesEntry";
-
- Message message = Proton.message();
- Map<String, Object> map = new HashMap<>();
-
- byte[] inputBytes = new byte[] {1, 2, 3, 4, 5};
- map.put(ENTRY_NAME, new Binary(inputBytes));
-
- message.setBody(new AmqpValue(map));
-
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
-
- assertNotNull("Message should not be null", jmsMessage);
- assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass());
-
- MapMessage mapMessage = (MapMessage) jmsMessage;
- byte[] outputBytes = mapMessage.getBytes(ENTRY_NAME);
- assertNotNull(outputBytes);
- assertTrue(Arrays.equals(inputBytes, outputBytes));
- }
-
- /**
* Test that an amqp-value body containing a list results in an StreamMessage when not
* otherwise annotated to indicate the type of JMS message it is.
*
@@ -506,10 +392,7 @@ public class JMSMappingInboundTransformerTest {
List<String> list = new ArrayList<>();
message.setBody(new AmqpValue(list));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@@ -528,10 +411,7 @@ public class JMSMappingInboundTransformerTest {
List<String> list = new ArrayList<>();
message.setBody(new AmqpSequence(list));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@@ -550,10 +430,7 @@ public class JMSMappingInboundTransformerTest {
Binary binary = new Binary(new byte[0]);
message.setBody(new AmqpValue(binary));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -572,11 +449,7 @@ public class JMSMappingInboundTransformerTest {
Message message = Proton.message();
message.setBody(new AmqpValue(UUID.randomUUID()));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
-
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@@ -588,10 +461,8 @@ public class JMSMappingInboundTransformerTest {
Message message = Message.Factory.create();
message.setBody(new AmqpValue(contentString));
- EncodedMessage em = encodeMessage(message);
-
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
- javax.jms.Message jmsMessage = transformer.transform(em);
+ ServerJMSTextMessage jmsMessage = (ServerJMSTextMessage)ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
+ jmsMessage.decode();
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@@ -631,7 +502,6 @@ public class JMSMappingInboundTransformerTest {
private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass)
throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
String toAddress = "toAddress";
Message amqp = Message.Factory.create();
@@ -644,9 +514,7 @@ public class JMSMappingInboundTransformerTest {
amqp.setMessageAnnotations(ma);
}
- EncodedMessage em = encodeMessage(amqp);
-
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
}
@@ -679,7 +547,6 @@ public class JMSMappingInboundTransformerTest {
private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass)
throws Exception {
- JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(idGenerator);
String replyToAddress = "replyToAddress";
Message amqp = Message.Factory.create();
@@ -692,27 +559,8 @@ public class JMSMappingInboundTransformerTest {
amqp.setMessageAnnotations(ma);
}
- EncodedMessage em = encodeMessage(amqp);
-
- javax.jms.Message jmsMessage = transformer.transform(em);
+ javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
}
- // ----- Utility Methods --------------------------------------------------//
-
- private EncodedMessage encodeMessage(Message message) {
- byte[] encodeBuffer = new byte[1024 * 8];
- int encodedSize;
- while (true) {
- try {
- encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
- break;
- } catch (java.nio.BufferOverflowException e) {
- encodeBuffer = new byte[encodeBuffer.length * 2];
- }
- }
-
- long messageFormat = 0;
- return new EncodedMessage(messageFormat, encodeBuffer, 0, encodedSize);
- }
}