You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/20 21:23:58 UTC
qpid-jms git commit: QPIDJMS-178 Ensure that the object stored in the
ObjectMessage has a snapshot taken on set and a snapshot returned on get to
ensure that changes to the outside object do not get reflected in the stored
value as pre the spec.
Repository: qpid-jms
Updated Branches:
refs/heads/master b5f00d23a -> df2d911d4
QPIDJMS-178 Ensure that the object stored in the ObjectMessage has a
snapshot taken on set and a snapshot returned on get to ensure that
changes to the outside object do not get reflected in the stored value
as pre the spec.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/df2d911d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/df2d911d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/df2d911d
Branch: refs/heads/master
Commit: df2d911d4e75a65b50c23717636b0a6e15be526f
Parents: b5f00d2
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 20 17:23:44 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 20 17:23:44 2016 -0400
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpConsumer.java | 10 +--
.../amqp/message/AmqpJmsMessageBuilder.java | 33 ++++++----
.../message/AmqpJmsObjectMessageFacade.java | 20 +++---
.../amqp/message/AmqpMessageSupport.java | 44 +++++++++++++
.../message/AmqpSerializedObjectDelegate.java | 40 ++++++++++--
.../amqp/message/AmqpTypedObjectDelegate.java | 60 +++++++++++++----
.../amqp/message/AmqpJmsMessageBuilderTest.java | 51 ++++++++-------
.../message/AmqpJmsMessageTypesTestCase.java | 4 +-
.../message/AmqpJmsObjectMessageFacadeTest.java | 69 ++++++++++++++++++++
9 files changed, 257 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index f7a7200..a162af3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -48,7 +48,6 @@ import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -426,10 +425,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
private boolean processDelivery(Delivery incoming) throws Exception {
incoming.setDefaultDeliveryState(Released.getInstance());
- Message amqpMessage = decodeIncomingMessage(incoming);
JmsMessage message = null;
try {
- message = AmqpJmsMessageBuilder.createJmsMessage(this, amqpMessage);
+ message = AmqpJmsMessageBuilder.createJmsMessage(this, unwrapIncomingMessage(incoming));
} catch (Exception e) {
LOG.warn("Error on transform: {}", e.getMessage());
// TODO - We could signal provider error but not sure we want to fail
@@ -531,7 +529,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
}
- protected Message decodeIncomingMessage(Delivery incoming) {
+ protected ByteBuf unwrapIncomingMessage(Delivery incoming) {
int count;
while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) {
@@ -542,9 +540,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
try {
- Message protonMessage = Message.Factory.create();
- protonMessage.decode(incomingBuffer.array(), 0, incomingBuffer.readableBytes());
- return protonMessage;
+ return incomingBuffer.duplicate();
} finally {
incomingBuffer.clear();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
index 15f6d1d..4385246 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
@@ -25,6 +25,7 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_S
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.decodeMessage;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.isContentType;
import java.io.IOException;
@@ -47,6 +48,8 @@ import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
+import io.netty.buffer.ByteBuf;
+
/**
* Builder class used to construct the appropriate JmsMessage / JmsMessageFacade
* objects to wrap an incoming AMQP Message.
@@ -59,23 +62,25 @@ public class AmqpJmsMessageBuilder {
*
* @param consumer
* The provider AMQP Consumer instance where this message arrived at.
- * @param message
- * The Proton Message object that will be wrapped.
+ * @param messageBytes
+ * The the raw bytes that compose the incoming message. (Read-Only)
*
* @return a JmsMessage instance properly configured for dispatch to the provider listener.
*
* @throws IOException if an error occurs while creating the message objects.
*/
- public static JmsMessage createJmsMessage(AmqpConsumer consumer, Message message) throws IOException {
+ public static JmsMessage createJmsMessage(AmqpConsumer consumer, ByteBuf messageBytes) throws IOException {
+
+ Message amqpMessage = decodeMessage(messageBytes);
// First we try the easy way, if the annotation is there we don't have to work hard.
- JmsMessage result = createFromMsgAnnotation(consumer, message);
+ JmsMessage result = createFromMsgAnnotation(consumer, amqpMessage, messageBytes);
if (result != null) {
return result;
}
// Next, match specific section structures and content types
- result = createWithoutAnnotation(consumer, message);
+ result = createWithoutAnnotation(consumer, amqpMessage, messageBytes);
if (result != null) {
return result;
}
@@ -83,7 +88,7 @@ public class AmqpJmsMessageBuilder {
throw new IOException("Could not create a JMS message from incoming message");
}
- private static JmsMessage createFromMsgAnnotation(AmqpConsumer consumer, Message message) throws IOException {
+ private static JmsMessage createFromMsgAnnotation(AmqpConsumer consumer, Message message, ByteBuf messageBytes) throws IOException {
Object annotation = AmqpMessageSupport.getMessageAnnotation(JMS_MSG_TYPE, message);
if (annotation != null) {
@@ -99,7 +104,7 @@ public class AmqpJmsMessageBuilder {
case JMS_STREAM_MESSAGE:
return createStreamMessage(consumer, message);
case JMS_OBJECT_MESSAGE:
- return createObjectMessage(consumer, message);
+ return createObjectMessage(consumer, message, messageBytes);
default:
throw new IOException("Invalid JMS Message Type annotation value found in message: " + annotation);
}
@@ -108,12 +113,12 @@ public class AmqpJmsMessageBuilder {
return null;
}
- private static JmsMessage createWithoutAnnotation(AmqpConsumer consumer, Message message) {
+ private static JmsMessage createWithoutAnnotation(AmqpConsumer consumer, Message message, ByteBuf messageBytes) {
Section body = message.getBody();
if (body == null) {
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
- return createObjectMessage(consumer, message);
+ return createObjectMessage(consumer, message, messageBytes);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) {
return createBytesMessage(consumer, message);
} else {
@@ -128,7 +133,7 @@ public class AmqpJmsMessageBuilder {
if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) {
return createBytesMessage(consumer, message);
} else if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
- return createObjectMessage(consumer, message);
+ return createObjectMessage(consumer, message, messageBytes);
} else {
Charset charset = getCharsetForTextualContent(message.getContentType());
if (charset != null) {
@@ -145,17 +150,17 @@ public class AmqpJmsMessageBuilder {
} else if (value instanceof Binary) {
return createBytesMessage(consumer, message);
} else {
- return createObjectMessage(consumer, message);
+ return createObjectMessage(consumer, message, messageBytes);
}
} else if (body instanceof AmqpSequence) {
- return createObjectMessage(consumer, message);
+ return createObjectMessage(consumer, message, messageBytes);
}
return null;
}
- private static JmsObjectMessage createObjectMessage(AmqpConsumer consumer, Message message) {
- return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(consumer, message));
+ private static JmsObjectMessage createObjectMessage(AmqpConsumer consumer, Message message, ByteBuf messageBytes) {
+ return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(consumer, message, messageBytes.copy()));
}
private static JmsStreamMessage createStreamMessage(AmqpConsumer consumer, Message message) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
index 6935ea9..4db872a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
@@ -30,6 +30,8 @@ import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.message.Message;
+import io.netty.buffer.ByteBuf;
+
/**
* Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
* type.
@@ -50,7 +52,7 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
super(connection);
setMessageAnnotation(JMS_MSG_TYPE, JMS_OBJECT_MESSAGE);
- initDelegate(isAmqpTypeEncoded);
+ initDelegate(isAmqpTypeEncoded, null);
}
/**
@@ -61,12 +63,14 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
* the consumer that received this message.
* @param message
* the incoming Message instance that is being wrapped.
+ * @param messageBytes
+ * a copy of the raw bytes of the incoming message.
*/
- public AmqpJmsObjectMessageFacade(AmqpConsumer consumer, Message message) {
+ public AmqpJmsObjectMessageFacade(AmqpConsumer consumer, Message message, ByteBuf messageBytes) {
super(consumer, message);
boolean javaSerialized = AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(message.getContentType());
- initDelegate(!javaSerialized);
+ initDelegate(!javaSerialized, messageBytes);
}
/**
@@ -126,9 +130,9 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
AmqpObjectTypeDelegate newDelegate = null;
if (useAmqpTypedEncoding) {
- newDelegate = new AmqpTypedObjectDelegate(message);
+ newDelegate = new AmqpTypedObjectDelegate(message, null);
} else {
- newDelegate = new AmqpSerializedObjectDelegate(message);
+ newDelegate = new AmqpSerializedObjectDelegate(message, null);
}
newDelegate.setObject(existingObject);
@@ -140,11 +144,11 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
}
}
- private void initDelegate(boolean useAmqpTypes) {
+ private void initDelegate(boolean useAmqpTypes, ByteBuf messageBytes) {
if (!useAmqpTypes) {
- delegate = new AmqpSerializedObjectDelegate(getAmqpMessage());
+ delegate = new AmqpSerializedObjectDelegate(getAmqpMessage(), messageBytes);
} else {
- delegate = new AmqpTypedObjectDelegate(getAmqpMessage());
+ delegate = new AmqpTypedObjectDelegate(getAmqpMessage(), messageBytes);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
index 09c9160..4d93167 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -21,6 +21,9 @@ import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.message.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
/**
* Support class containing constant values and static methods that are
* used to map to / from AMQP Message types being sent or received.
@@ -148,4 +151,45 @@ public final class AmqpMessageSupport {
return contentType.equals(message.getContentType());
}
}
+
+ /**
+ * Given a byte buffer that represents an encoded AMQP Message instance,
+ * decode and return the Message.
+ *
+ * @param encodedBytes
+ * the bytes that represent an encoded AMQP Message.
+ *
+ * @return a new Message instance with the decoded data.
+ */
+ public static Message decodeMessage(ByteBuf encodedBytes) {
+ // For now we must fully decode the message to get at the annotations.
+ Message protonMessage = Message.Factory.create();
+ protonMessage.decode(encodedBytes.array(), 0, encodedBytes.readableBytes());
+ return protonMessage;
+ }
+
+ /**
+ * Given a Message instance, encode the Message to the wire level representation
+ * of that Message.
+ *
+ * @param message
+ * the Message that is to be encoded into the wire level representation.
+ *
+ * @return a buffer containing the wire level representation of the input Message.
+ */
+ public static ByteBuf encodeMessage(Message message) {
+ final int BUFFER_SIZE = 4096;
+ byte[] encodedMessage = new byte[BUFFER_SIZE];
+ int encodedSize = 0;
+ while (true) {
+ try {
+ encodedSize = message.encode(encodedMessage, 0, encodedMessage.length);
+ break;
+ } catch (java.nio.BufferOverflowException e) {
+ encodedMessage = new byte[encodedMessage.length * 2];
+ }
+ }
+
+ return Unpooled.wrappedBuffer(encodedMessage, 0, encodedSize);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
index d928e58..546060b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
@@ -17,12 +17,14 @@
package org.apache.qpid.jms.provider.amqp.message;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.decodeMessage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.jms.util.ClassLoadingAwareObjectInputStream;
import org.apache.qpid.proton.amqp.Binary;
@@ -30,6 +32,8 @@ import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
+import io.netty.buffer.ByteBuf;
+
/**
* Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
* type.
@@ -50,21 +54,33 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate {
}
private final Message message;
+ private final AtomicReference<Section> cachedReceivedBody = new AtomicReference<Section>();
+ private ByteBuf messageBytes;
/**
* Create a new delegate that uses Java serialization to store the message content.
*
* @param message
* the AMQP message instance where the object is to be stored / read.
+ * @param messageBytes
+ * the raw bytes that comprise the message when it was received.
*/
- public AmqpSerializedObjectDelegate(Message message) {
+ public AmqpSerializedObjectDelegate(Message message, ByteBuf messageBytes) {
this.message = message;
this.message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ this.messageBytes = messageBytes;
+
+ // We will decode the body on each access, so clear the current value
+ // so we don't carry along unneeded bloat.
+ if (messageBytes != null) {
+ cachedReceivedBody.set(message.getBody());
+ }
}
private static byte[] getSerializedBytes(Serializable value) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+
oos.writeObject(value);
oos.flush();
oos.close();
@@ -77,7 +93,15 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate {
public Serializable getObject() throws IOException, ClassNotFoundException {
Binary bin = null;
- Section body = message.getBody();
+ Section body = cachedReceivedBody.getAndSet(null);
+ if (body == null) {
+ if (messageBytes != null) {
+ body = decodeMessage(messageBytes).getBody();
+ } else {
+ body = message.getBody();
+ }
+ }
+
if (body == null || body == NULL_OBJECT_BODY) {
return null;
} else if (body instanceof Data) {
@@ -103,18 +127,22 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate {
@Override
public void setObject(Serializable value) throws IOException {
- if(value == null) {
+ cachedReceivedBody.set(null);
+
+ if (value == null) {
message.setBody(NULL_OBJECT_BODY);
} else {
byte[] bytes = getSerializedBytes(value);
message.setBody(new Data(new Binary(bytes)));
}
+
+ messageBytes = null;
}
@Override
public void onSend() {
- this.message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- if(message.getBody() == null) {
+ message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ if (message.getBody() == null) {
message.setBody(NULL_OBJECT_BODY);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
index 483771e..99ab86b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
@@ -16,10 +16,14 @@
*/
package org.apache.qpid.jms.provider.amqp.message;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.decodeMessage;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.encodeMessage;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@@ -27,6 +31,8 @@ import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
+import io.netty.buffer.ByteBuf;
+
/**
* Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
* type.
@@ -36,24 +42,41 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
static final AmqpValue NULL_OBJECT_BODY = new AmqpValue(null);
private final Message message;
+ private final AtomicReference<Section> cachedReceivedBody = new AtomicReference<Section>();
+ private ByteBuf messageBytes;
/**
* Create a new delegate that uses Java serialization to store the message content.
*
* @param message
* the AMQP message instance where the object is to be stored / read.
+ * @param messageBytes
+ * the raw bytes that comprise the AMQP message that was received.
*/
- public AmqpTypedObjectDelegate(Message message) {
+ public AmqpTypedObjectDelegate(Message message, ByteBuf messageBytes) {
this.message = message;
this.message.setContentType(null);
+ this.messageBytes = messageBytes;
+
+ // We will decode the body on each access, so clear the current value
+ // so we don't carry along unneeded bloat.
+ if (messageBytes != null) {
+ cachedReceivedBody.set(message.getBody());
+ }
}
@Override
public Serializable getObject() throws IOException, ClassNotFoundException {
- // TODO: this should actually return a snapshot of the object, so we
- // need to save the bytes so we can return an equal/unmodified object later
+ Section body = cachedReceivedBody.getAndSet(null);
+
+ if (body == null) {
+ if (messageBytes != null) {
+ body = decodeMessage(messageBytes).getBody();
+ } else {
+ body = message.getBody();
+ }
+ }
- Section body = message.getBody();
if (body == null) {
return null;
} else if (body instanceof AmqpValue) {
@@ -76,19 +99,28 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
@Override
public void setObject(Serializable value) throws IOException {
+ cachedReceivedBody.set(null);
+
if (value == null) {
message.setBody(NULL_OBJECT_BODY);
+ messageBytes = null;
} else if (isSupportedAmqpValueObjectType(value)) {
- // TODO: This is a temporary hack, we actually need to take a snapshot of the object
- // at this point in time, not simply set the object itself into the Proton message.
- // We will need to encode it now, first to save the snapshot to send, and also to
- // verify up front that we can actually send it later.
-
- // Even if we do that we would currently then need to decode it later to set the
- // body to send, unless we augment Proton to allow setting the bytes directly.
- // We will always need to decode bytes to return a snapshot from getObject(). We
- // will need to save the bytes somehow to support that on received messages.
- message.setBody(new AmqpValue(value));
+ Message transfer = Message.Factory.create();
+
+ // Exchange the incoming body value for one that is created from encoding
+ // and decoding the value.
+ transfer.setBody(new AmqpValue(value));
+ messageBytes = encodeMessage(transfer);
+ transfer = decodeMessage(messageBytes);
+ messageBytes = null;
+
+ // This step requires a heavy-weight operation of both encoding and decoding the
+ // incoming body value in order to create a copy such that changes to the original
+ // do not affect the stored value. In the future it makes sense to try to enhance
+ // proton such that we can encode the body and use those bytes directly on the
+ // message as it is being sent.
+
+ message.setBody(transfer.getBody());
} else {
// TODO: Data and AmqpSequence?
throw new IllegalArgumentException("Encoding this object type with the AMQP type system is not supported: " + value.getClass().getName());
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilderTest.java
index 2ebff2b..a534de0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilderTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.jms.provider.amqp.message;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.encodeMessage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -32,6 +33,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.qpid.jms.message.JmsBytesMessage;
import org.apache.qpid.jms.message.JmsMessage;
@@ -88,7 +90,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
- AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
}
/**
@@ -108,7 +110,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsMessage.class, jmsMessage.getClass());
@@ -134,7 +136,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
@@ -160,7 +162,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass());
@@ -209,7 +211,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
}
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
@@ -234,6 +236,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
*/
@Test
public void testCreateStreamMessageFromMessageTypeAnnotation() throws Exception {
+
Message message = Proton.message();
Map<Symbol, Object> map = new HashMap<Symbol, Object>();
@@ -242,7 +245,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
MessageAnnotations messageAnnotations = new MessageAnnotations(map);
message.setMessageAnnotations(messageAnnotations);
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsStreamMessage.class, jmsMessage.getClass());
@@ -268,7 +271,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
Message message = Proton.message();
message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE);
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
@@ -289,7 +292,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
assertNull(message.getContentType());
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
@@ -310,7 +313,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
Message message = Proton.message();
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
@@ -327,7 +330,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
Message message = Proton.message();
message.setContentType("text/plain");
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass());
@@ -347,7 +350,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
Message message = Proton.message();
message.setContentType("unknown-content-type");
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsMessage.class, jmsMessage.getClass());
@@ -372,7 +375,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
message.setBody(new Data(binary));
message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE);
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
@@ -394,7 +397,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
message.setBody(new Data(binary));
message.setContentType("unknown-content-type");
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
@@ -418,7 +421,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
assertNull(message.getContentType());
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
@@ -441,7 +444,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
message.setBody(new Data(binary));
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
@@ -554,7 +557,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
message.setBody(new Data(binary));
message.setContentType(contentType);
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass());
@@ -579,7 +582,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
Message message = Proton.message();
message.setBody(new AmqpValue("content"));
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass());
@@ -599,7 +602,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
Message message = Proton.message();
message.setBody(new AmqpValue(null));
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass());
@@ -620,7 +623,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
Map<String, String> map = new HashMap<String,String>();
message.setBody(new AmqpValue(map));
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
@@ -644,7 +647,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
List<String> list = new ArrayList<String>();
message.setBody(new AmqpValue(list));
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
@@ -668,7 +671,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
Binary binary = new Binary(new byte[0]);
message.setBody(new AmqpValue(binary));
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass());
@@ -686,9 +689,9 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
@Test
public void testCreateObjectMessageFromAmqpValueWithUncategorisedContent() throws Exception {
Message message = Proton.message();
- message.setBody(new AmqpValue(new Object()));// This obviously shouldn't happen in practice
+ message.setBody(new AmqpValue(UUID.randomUUID()));
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
@@ -715,7 +718,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase {
List<String> list = new ArrayList<String>();
message.setBody(new AmqpSequence(list));
- JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message);
+ JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message));
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass());
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
index c49e97d..3775269 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
@@ -16,6 +16,8 @@
*/
package org.apache.qpid.jms.provider.amqp.message;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.encodeMessage;
+
import java.nio.charset.StandardCharsets;
import org.apache.qpid.jms.JmsDestination;
@@ -85,7 +87,7 @@ public class AmqpJmsMessageTypesTestCase extends QpidJmsTestCase {
}
protected AmqpJmsObjectMessageFacade createReceivedObjectMessageFacade(AmqpConsumer amqpConsumer, Message message) {
- return new AmqpJmsObjectMessageFacade(amqpConsumer, message);
+ return new AmqpJmsObjectMessageFacade(amqpConsumer, message, encodeMessage(message));
}
protected AmqpConsumer createMockAmqpConsumer() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java
index bd0c24e..f95c685 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java
@@ -285,4 +285,73 @@ public class AmqpJmsObjectMessageFacadeTest extends AmqpJmsMessageTypesTestCase
// expected
}
}
+
+ /**
+ * Test that setting an object on a received message and later getting the value, returns an
+ * equal but different object that does not pick up intermediate changes to the set object.
+ *
+ * @throws Exception if an error occurs during the test.
+ */
+ @Test
+ public void testSetThenGetObjectOnSerializedReceivedMessageNoContentTypeReturnsSnapshot() throws Exception {
+ doTestSetThenGetObjectOnSerializedReceivedMessageReturnsSnapshot(false);
+ }
+
+ @Test
+ public void testSetThenGetObjectOnSerializedReceivedMessageReturnsSnapshot() throws Exception {
+ doTestSetThenGetObjectOnSerializedReceivedMessageReturnsSnapshot(true);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void doTestSetThenGetObjectOnSerializedReceivedMessageReturnsSnapshot(boolean contentType) throws Exception {
+
+ HashMap<String, String> origMap = new HashMap<String, String>();
+ origMap.put("key1", "value1");
+
+ Message message = Message.Factory.create();
+ if (contentType) {
+ message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ message.setBody(new Data(new Binary(getSerializedBytes(origMap))));
+ } else {
+ message.setBody(new AmqpValue(origMap));
+ }
+ AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createReceivedObjectMessageFacade(createMockAmqpConsumer(), message);
+
+ // verify we get a different-but-equal object back
+ Serializable serialized = amqpObjectMessageFacade.getObject();
+ assertTrue("Unexpected object type returned", serialized instanceof Map<?, ?>);
+ Map<String, String> returnedObject1 = (Map<String, String>) serialized;
+ if (contentType) {
+ assertNotSame("Expected different objects, due to snapshot being taken", origMap, returnedObject1);
+ } else {
+ assertSame("Expected same objects, due to initial snapshot of delivered value", origMap, returnedObject1);
+ }
+ assertEquals("Expected equal objects, due to snapshot being taken", origMap, returnedObject1);
+
+ // verify we get a different-but-equal object back when compared to the previously retrieved object
+ Serializable serialized2 = amqpObjectMessageFacade.getObject();
+ assertTrue("Unexpected object type returned", serialized2 instanceof Map<?, ?>);
+ Map<String, String> returnedObject2 = (Map<String, String>) serialized2;
+ assertNotSame("Expected different objects, due to snapshot being taken", returnedObject1, returnedObject2);
+ assertEquals("Expected equal objects, due to snapshot being taken", returnedObject1, returnedObject2);
+
+ // mutate the first returned object
+ returnedObject1.put("key2", "value2");
+
+ // verify the mutated map is a different and not equal object
+ assertNotSame("Expected different objects, due to snapshot being taken", returnedObject1, returnedObject2);
+ assertNotEquals("Expected objects to differ, due to snapshot being taken", returnedObject1, returnedObject2);
+ }
+
+ private static byte[] getSerializedBytes(Serializable value) throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+
+ oos.writeObject(value);
+ oos.flush();
+ oos.close();
+
+ return baos.toByteArray();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org