You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/06 20:10:44 UTC

svn commit: r383627 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/command/ test/java/org/apache/activemq/

Author: jstrachan
Date: Mon Mar  6 11:10:32 2006
New Revision: 383627

URL: http://svn.apache.org/viewcvs?rev=383627&view=rev
Log:
added fix for AMQ-519 so that we explicitly force the redelivered message to be deserialized again for ObjectMessage instances to avoid mutable objects being changed, then rolled back

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=383627&r1=383626&r2=383627&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Mar  6 11:10:32 2006
@@ -692,7 +692,7 @@
                 
                 for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
                     MessageDispatch md = (MessageDispatch) iter.next();
-                    md.getMessage().incrementRedeliveryCounter();
+                    md.getMessage().onMessageRolledBack();
                     unconsumedMessages.enqueueFirst(md);
                 }
             }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=383627&r1=383626&r2=383627&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon Mar  6 11:10:32 2006
@@ -677,7 +677,7 @@
                     getTransactionContext().addSynchronization(new Synchronization(){
                         public void afterRollback() throws Exception {
 
-                            md.getMessage().incrementRedeliveryCounter();
+                            md.getMessage().onMessageRolledBack();
                             
                             RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
                             int redeliveryCounter = md.getMessage().getRedeliveryCounter();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=383627&r1=383626&r2=383627&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java Mon Mar  6 11:10:32 2006
@@ -29,18 +29,19 @@
 
 import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
-import java.lang.reflect.Proxy;
 
 /**
  * An <CODE>ObjectMessage</CODE> object is used to send a message that contains a serializable object in the Java
@@ -172,6 +173,13 @@
             }
         }
         return this.object;
+    }
+
+    public void onMessageRolledBack() {
+        super.onMessageRolledBack();
+        
+        // lets force the object to be deserialized again - as we could have changed the object
+        object = null;
     }
 
     public String toString() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=383627&r1=383626&r2=383627&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Mon Mar  6 11:10:32 2006
@@ -611,5 +611,8 @@
     public void setRecievedByDFBridge(boolean recievedByDFBridge){
         this.recievedByDFBridge=recievedByDFBridge;
     }
-    
+
+    public void onMessageRolledBack() {
+        incrementRedeliveryCounter();
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java?rev=383627&r1=383626&r2=383627&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java Mon Mar  6 11:10:32 2006
@@ -30,10 +30,12 @@
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.MessageListener;
 
+import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -432,9 +434,55 @@
     }
 
 
+    public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
+        ArrayList list = new ArrayList();
+        list.add("First");
+        Message outbound = session.createObjectMessage(list);
+        outbound.setStringProperty("foo", "abc");
+
+        producer.send(outbound);
+        session.commit();
+
+        log.info("About to consume message 1");
+        Message message = consumer.receive(1000);
+
+        List body = assertReceivedObjectMessageWithListBody(message);
+
+        // now lets try mutate it
+        try {
+            message.setStringProperty("foo", "def");
+            fail("Cannot change properties of the object!");
+        }
+        catch (JMSException e) {
+            log.info("Caught expected exception: " + e, e);
+        }
+        body.clear();
+        body.add("This should never be seen!");
+        session.rollback();
+
+        message = consumer.receive(1000);
+        List secondBody = assertReceivedObjectMessageWithListBody(message);
+        assertNotSame("Second call should return a different body", secondBody, body);
+        session.commit();
+    }
+
+    protected List assertReceivedObjectMessageWithListBody(Message message) throws JMSException {
+        assertNotNull("Should have received a message!", message);
+        assertEquals("foo header", "abc", message.getStringProperty("foo"));
+
+        assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage);
+        ObjectMessage objectMessage = (ObjectMessage) message;
+        List body = (List) objectMessage.getObject();
+        log.info("Received body: " + body);
+
+        assertEquals("Size of list should be 1", 1, body.size());
+        assertEquals("element 0 of list", "First", body.get(0));
+        return body;
+    }
+
     /**
      * Recreates the connection.
-     *
+     * 
      * @throws JMSException
      */
     protected void reconnect() throws JMSException {