You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/07/03 22:45:14 UTC
svn commit: r1356927 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ test/java/org/apache/activemq/
Author: tabish
Date: Tue Jul 3 20:45:12 2012
New Revision: 1356927
URL: http://svn.apache.org/viewvc?rev=1356927&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3224
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1356927&r1=1356926&r2=1356927&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Jul 3 20:45:12 2012
@@ -55,6 +55,7 @@ import javax.jms.XAConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
@@ -131,7 +132,7 @@ public class ActiveMQConnection implemen
// Configuration options variables
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
private BlobTransferPolicy blobTransferPolicy;
- private RedeliveryPolicy redeliveryPolicy;
+ private RedeliveryPolicyMap redeliveryPolicyMap;
private MessageTransformer transformer;
private boolean disableTimeStampsByDefault;
@@ -1644,14 +1645,14 @@ public class ActiveMQConnection implemen
* @throws JMSException
*/
public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
- return redeliveryPolicy;
+ return redeliveryPolicyMap.getDefaultEntry();
}
/**
* Sets the redelivery policy to be used when messages are rolled back
*/
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
- this.redeliveryPolicy = redeliveryPolicy;
+ this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
}
public BlobTransferPolicy getBlobTransferPolicy() {
@@ -2549,4 +2550,22 @@ public class ActiveMQConnection implemen
}
}
}
+
+ /**
+ * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
+ * @param redeliveryPolicyMap the redeliveryPolicyMap to set
+ */
+ public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
+ this.redeliveryPolicyMap = redeliveryPolicyMap;
+ }
+
+ /**
+ * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
+ * Consumers when dealing with transaction messages that have been rolled back.
+ *
+ * @return the redeliveryPolicyMap
+ */
+ public RedeliveryPolicyMap getRedeliveryPolicyMap() {
+ return redeliveryPolicyMap;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1356927&r1=1356926&r2=1356927&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Tue Jul 3 20:45:12 2012
@@ -36,6 +36,7 @@ import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.jndi.JNDIBaseStorable;
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.management.StatsCapable;
@@ -90,7 +91,10 @@ public class ActiveMQConnectionFactory e
// client policies
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
- private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+ {
+ redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
+ }
private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
private MessageTransformer transformer;
@@ -317,7 +321,7 @@ public class ActiveMQConnectionFactory e
connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
connection.setExclusiveConsumer(isExclusiveConsumer());
- connection.setRedeliveryPolicy(getRedeliveryPolicy());
+ connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
connection.setTransformer(getTransformer());
connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
@@ -577,15 +581,27 @@ public class ActiveMQConnectionFactory e
}
public RedeliveryPolicy getRedeliveryPolicy() {
- return redeliveryPolicy;
+ return redeliveryPolicyMap.getDefaultEntry();
}
/**
- * Sets the global redelivery policy to be used when a message is delivered
+ * Sets the global default redelivery policy to be used when a message is delivered
* but the session is rolled back
*/
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
- this.redeliveryPolicy = redeliveryPolicy;
+ this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
+ }
+
+ public RedeliveryPolicyMap getRedeliveryPolicyMap() {
+ return this.redeliveryPolicyMap;
+ }
+
+ /**
+ * Sets the global redelivery policy mapping to be used when a message is delivered
+ * but the session is rolled back
+ */
+ public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
+ this.redeliveryPolicyMap = redeliveryPolicyMap;
}
public MessageTransformer getTransformer() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1356927&r1=1356926&r2=1356927&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Tue Jul 3 20:45:12 2012
@@ -208,7 +208,7 @@ public class ActiveMQMessageConsumer imp
}
this.session = session;
- this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
+ this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
setTransformer(session.getTransformer());
this.info = new ConsumerInfo(consumerId);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java?rev=1356927&r1=1356926&r2=1356927&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java Tue Jul 3 20:45:12 2012
@@ -23,7 +23,9 @@ import javax.jms.TextMessage;
import junit.framework.Test;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
/**
* Test cases used to test the JMS message exclusive consumers.
@@ -479,4 +481,112 @@ public class RedeliveryPolicyTest extend
assertEquals("2nd", m.getText());
session.commit();
}
+
+ public void testRedeliveryPolicyPerDestination() throws Exception {
+
+ RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
+ queuePolicy.setInitialRedeliveryDelay(0);
+ queuePolicy.setRedeliveryDelay(1000);
+ queuePolicy.setUseExponentialBackOff(false);
+ queuePolicy.setMaximumRedeliveries(2);
+
+ RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
+ topicPolicy.setInitialRedeliveryDelay(0);
+ topicPolicy.setRedeliveryDelay(1000);
+ topicPolicy.setUseExponentialBackOff(false);
+ topicPolicy.setMaximumRedeliveries(3);
+
+ // Receive a message with the JMS API
+ RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
+ map.put(new ActiveMQTopic(">"), topicPolicy);
+ map.put(new ActiveMQQueue(">"), queuePolicy);
+
+ connection.start();
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue queue = new ActiveMQQueue("TEST");
+ ActiveMQTopic topic = new ActiveMQTopic("TEST");
+
+ MessageProducer producer = session.createProducer(null);
+
+ MessageConsumer queueConsumer = session.createConsumer(queue);
+ MessageConsumer topicConsumer = session.createConsumer(topic);
+
+ // Send the messages
+ producer.send(queue, session.createTextMessage("1st"));
+ producer.send(queue, session.createTextMessage("2nd"));
+ producer.send(topic, session.createTextMessage("1st"));
+ producer.send(topic, session.createTextMessage("2nd"));
+
+ session.commit();
+
+ TextMessage m;
+ m = (TextMessage)queueConsumer.receive(100);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ m = (TextMessage)topicConsumer.receive(100);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ m = (TextMessage)queueConsumer.receive(100);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ m = (TextMessage)topicConsumer.receive(100);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ session.rollback();
+
+ m = (TextMessage)queueConsumer.receive(100);
+ assertNotNull("first immediate redelivery", m);
+ m = (TextMessage)topicConsumer.receive(100);
+ assertNotNull("first immediate redelivery", m);
+ session.rollback();
+
+ m = (TextMessage)queueConsumer.receive(100);
+ assertNull("second delivery delayed: " + m, m);
+ m = (TextMessage)topicConsumer.receive(100);
+ assertNull("second delivery delayed: " + m, m);
+
+ m = (TextMessage)queueConsumer.receive(2000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ m = (TextMessage)topicConsumer.receive(2000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+
+ m = (TextMessage)queueConsumer.receive(100);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ m = (TextMessage)topicConsumer.receive(100);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ session.rollback();
+
+ m = (TextMessage)queueConsumer.receive(2000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ m = (TextMessage)topicConsumer.receive(2000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+
+ m = (TextMessage)queueConsumer.receive(100);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ m = (TextMessage)topicConsumer.receive(100);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ session.rollback();
+
+ // No third attempt for the Queue consumer
+ m = (TextMessage)queueConsumer.receive(2000);
+ assertNull(m);
+ m = (TextMessage)topicConsumer.receive(2000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+
+ m = (TextMessage)queueConsumer.receive(100);
+ assertNull(m);
+ m = (TextMessage)topicConsumer.receive(100);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ session.commit();
+ }
}