You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/06 20:10:44 UTC
svn commit: r383627 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/command/
test/java/org/apache/activemq/
Author: jstrachan
Date: Mon Mar 6 11:10:32 2006
New Revision: 383627
URL: http://svn.apache.org/viewcvs?rev=383627&view=rev
Log:
added fix for AMQ-519 so that we explicitly force the redelivered message to be deserialized again for ObjectMessage instances to avoid mutable objects being changed, then rolled back
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=383627&r1=383626&r2=383627&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Mar 6 11:10:32 2006
@@ -692,7 +692,7 @@
for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = (MessageDispatch) iter.next();
- md.getMessage().incrementRedeliveryCounter();
+ md.getMessage().onMessageRolledBack();
unconsumedMessages.enqueueFirst(md);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=383627&r1=383626&r2=383627&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon Mar 6 11:10:32 2006
@@ -677,7 +677,7 @@
getTransactionContext().addSynchronization(new Synchronization(){
public void afterRollback() throws Exception {
- md.getMessage().incrementRedeliveryCounter();
+ md.getMessage().onMessageRolledBack();
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=383627&r1=383626&r2=383627&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java Mon Mar 6 11:10:32 2006
@@ -29,18 +29,19 @@
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
import java.io.OutputStream;
import java.io.Serializable;
-import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
-import java.lang.reflect.Proxy;
/**
* An <CODE>ObjectMessage</CODE> object is used to send a message that contains a serializable object in the Java
@@ -172,6 +173,13 @@
}
}
return this.object;
+ }
+
+ public void onMessageRolledBack() {
+ super.onMessageRolledBack();
+
+ // lets force the object to be deserialized again - as we could have changed the object
+ object = null;
}
public String toString() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=383627&r1=383626&r2=383627&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Mon Mar 6 11:10:32 2006
@@ -611,5 +611,8 @@
public void setRecievedByDFBridge(boolean recievedByDFBridge){
this.recievedByDFBridge=recievedByDFBridge;
}
-
+
+ public void onMessageRolledBack() {
+ incrementRedeliveryCounter();
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java?rev=383627&r1=383626&r2=383627&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java Mon Mar 6 11:10:32 2006
@@ -30,10 +30,12 @@
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.MessageListener;
+import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -432,9 +434,55 @@
}
+ public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
+ ArrayList list = new ArrayList();
+ list.add("First");
+ Message outbound = session.createObjectMessage(list);
+ outbound.setStringProperty("foo", "abc");
+
+ producer.send(outbound);
+ session.commit();
+
+ log.info("About to consume message 1");
+ Message message = consumer.receive(1000);
+
+ List body = assertReceivedObjectMessageWithListBody(message);
+
+ // now lets try mutate it
+ try {
+ message.setStringProperty("foo", "def");
+ fail("Cannot change properties of the object!");
+ }
+ catch (JMSException e) {
+ log.info("Caught expected exception: " + e, e);
+ }
+ body.clear();
+ body.add("This should never be seen!");
+ session.rollback();
+
+ message = consumer.receive(1000);
+ List secondBody = assertReceivedObjectMessageWithListBody(message);
+ assertNotSame("Second call should return a different body", secondBody, body);
+ session.commit();
+ }
+
+ protected List assertReceivedObjectMessageWithListBody(Message message) throws JMSException {
+ assertNotNull("Should have received a message!", message);
+ assertEquals("foo header", "abc", message.getStringProperty("foo"));
+
+ assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage);
+ ObjectMessage objectMessage = (ObjectMessage) message;
+ List body = (List) objectMessage.getObject();
+ log.info("Received body: " + body);
+
+ assertEquals("Size of list should be 1", 1, body.size());
+ assertEquals("element 0 of list", "First", body.get(0));
+ return body;
+ }
+
/**
* Recreates the connection.
- *
+ *
* @throws JMSException
*/
protected void reconnect() throws JMSException {