You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/17 21:22:10 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-4920
Updated Branches:
refs/heads/trunk 7c01c9b58 -> c387e842e
https://issues.apache.org/jira/browse/AMQ-4920
And code to prevent concurrent writes to a message when dispatched to
multiple Topic consumers.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c387e842
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c387e842
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c387e842
Branch: refs/heads/trunk
Commit: c387e842ee16dcfc6d4de7bd3acf9f0f5595775d
Parents: 7c01c9b
Author: Timothy Bish <ta...@gmai.com>
Authored: Tue Dec 17 15:22:08 2013 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Tue Dec 17 15:22:08 2013 -0500
----------------------------------------------------------------------
.../transport/amqp/AmqpProtocolConverter.java | 23 ++++++++++++++++----
1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c387e842/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 893fa1b..ed5343c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -129,6 +129,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
updateTracer();
}
+ @Override
public void updateTracer() {
if (amqpTransport.isTrace()) {
((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@@ -849,13 +850,27 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
final MessageDispatch md = outbound.removeFirst();
try {
+
+ ActiveMQMessage temp = null;
if (md.getMessage() != null) {
- org.apache.activemq.command.Message message = md.getMessage();
- if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
- message.setProperty(MESSAGE_FORMAT_KEY, 0);
+
+ // Topics can dispatch the same Message to more than one consumer
+ // so we must copy to prevent concurrent read / write to the same
+ // message object.
+ if (md.getDestination().isTopic()) {
+ synchronized (md.getMessage()) {
+ temp = (ActiveMQMessage) md.getMessage().copy();
+ }
+ } else {
+ temp = (ActiveMQMessage) md.getMessage();
+ }
+
+ if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
+ temp.setProperty(MESSAGE_FORMAT_KEY, 0);
}
}
- final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
+
+ final ActiveMQMessage jms = temp;
if (jms == null) {
// It's the end of browse signal.
endOfBrowse = true;