You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/17 21:22:10 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4920

Updated Branches:
  refs/heads/trunk 7c01c9b58 -> c387e842e


https://issues.apache.org/jira/browse/AMQ-4920

And code to prevent concurrent writes to a message when dispatched to
multiple Topic consumers.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c387e842
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c387e842
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c387e842

Branch: refs/heads/trunk
Commit: c387e842ee16dcfc6d4de7bd3acf9f0f5595775d
Parents: 7c01c9b
Author: Timothy Bish <ta...@gmai.com>
Authored: Tue Dec 17 15:22:08 2013 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Tue Dec 17 15:22:08 2013 -0500

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 23 ++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c387e842/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 893fa1b..ed5343c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -129,6 +129,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         updateTracer();
     }
 
+    @Override
     public void updateTracer() {
         if (amqpTransport.isTrace()) {
             ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@@ -849,13 +850,27 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
                 final MessageDispatch md = outbound.removeFirst();
                 try {
+
+                    ActiveMQMessage temp = null;
                     if (md.getMessage() != null) {
-                        org.apache.activemq.command.Message message = md.getMessage();
-                        if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
-                            message.setProperty(MESSAGE_FORMAT_KEY, 0);
+
+                        // Topics can dispatch the same Message to more than one consumer
+                        // so we must copy to prevent concurrent read / write to the same
+                        // message object.
+                        if (md.getDestination().isTopic()) {
+                            synchronized (md.getMessage()) {
+                                temp = (ActiveMQMessage) md.getMessage().copy();
+                            }
+                        } else {
+                            temp = (ActiveMQMessage) md.getMessage();
+                        }
+
+                        if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
+                            temp.setProperty(MESSAGE_FORMAT_KEY, 0);
                         }
                     }
-                    final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
+
+                    final ActiveMQMessage jms = temp;
                     if (jms == null) {
                         // It's the end of browse signal.
                         endOfBrowse = true;