You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/07/11 18:02:52 UTC
svn commit: r1145217 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/policy/
test/java/org/apache/activemq/test/retroactive/ test...
Author: gtully
Date: Mon Jul 11 16:02:52 2011
New Revision: 1145217
URL: http://svn.apache.org/viewvc?rev=1145217&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2911: Option to make all consumers retroactive. Patch applied and with thanks and an additional test. new destination policy entry 'alwaysRetroactive'.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Jul 11 16:02:52 2011
@@ -339,6 +339,14 @@ public class DestinationView implements
public void setProducerFlowControl(boolean producerFlowControl) {
destination.setProducerFlowControl(producerFlowControl);
}
+
+ public boolean isAlwaysRetroactive() {
+ return destination.isAlwaysRetroactive();
+ }
+
+ public void setAlwaysRetroactive(boolean alwaysRetroactive) {
+ destination.setAlwaysRetroactive(alwaysRetroactive);
+ }
/**
* Set's the interval at which warnings about producers being blocked by
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Mon Jul 11 16:02:52 2011
@@ -254,6 +254,17 @@ public interface DestinationViewMBean {
public void setProducerFlowControl(@MBeanInfo("producerFlowControl") boolean producerFlowControl);
/**
+ * @return if we treat consumers as alwaysRetroactive
+ */
+ @MBeanInfo("Always treat consumers as retroActive")
+ boolean isAlwaysRetroactive();
+
+ /**
+ * @param alwaysRetroactive set as always retroActive
+ */
+ public void setAlwaysRetroactive(@MBeanInfo("alwaysRetroactive") boolean alwaysRetroactive);
+
+ /**
* Set's the interval at which warnings about producers being blocked by
* resource usage will be triggered. Values of 0 or less will disable
* warnings
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Mon Jul 11 16:02:52 2011
@@ -63,6 +63,7 @@ public abstract class BaseDestination im
protected SystemUsage systemUsage;
protected MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
+ private boolean alwaysRetroactive = false;
protected boolean warnOnProducerFlowControl = true;
protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
@@ -147,6 +148,14 @@ public abstract class BaseDestination im
public void setProducerFlowControl(boolean producerFlowControl) {
this.producerFlowControl = producerFlowControl;
}
+
+ public boolean isAlwaysRetroactive() {
+ return alwaysRetroactive;
+ }
+
+ public void setAlwaysRetroactive(boolean alwaysRetroactive) {
+ this.alwaysRetroactive = alwaysRetroactive;
+ }
/**
* Set's the interval at which warnings about producers being blocked by
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Jul 11 16:02:52 2011
@@ -77,6 +77,10 @@ public interface Destination extends Ser
boolean isProducerFlowControl();
void setProducerFlowControl(boolean value);
+
+ boolean isAlwaysRetroactive();
+
+ void setAlwaysRetroactive(boolean value);
/**
* Set's the interval at which warnings about producers being blocked by
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Jul 11 16:02:52 2011
@@ -137,6 +137,14 @@ public class DestinationFilter implement
public void setProducerFlowControl(boolean value) {
next.setProducerFlowControl(value);
}
+
+ public boolean isAlwaysRetroactive() {
+ return next.isAlwaysRetroactive();
+ }
+
+ public void setAlwaysRetroactive(boolean value) {
+ next.setAlwaysRetroactive(value);
+ }
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Jul 11 16:02:52 2011
@@ -117,7 +117,7 @@ public class Topic extends BaseDestinati
if (!sub.getConsumerInfo().isDurable()) {
// Do a retroactive recovery if needed.
- if (sub.getConsumerInfo().isRetroactive()) {
+ if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
// synchronize with dispatch method so that no new messages are
// sent
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Jul 11 16:02:52 2011
@@ -61,6 +61,7 @@ public class PolicyEntry extends Destina
private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
private boolean enableAudit=true;
private boolean producerFlowControl = true;
+ private boolean alwaysRetroactive = false;
private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
private boolean optimizedDispatch=false;
private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
@@ -140,6 +141,7 @@ public class PolicyEntry extends Destina
public void baseConfiguration(Broker broker,BaseDestination destination) {
destination.setProducerFlowControl(isProducerFlowControl());
+ destination.setAlwaysRetroactive(isAlwaysRetroactive());
destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
destination.setEnableAudit(isEnableAudit());
destination.setMaxAuditDepth(getMaxQueueAuditDepth());
@@ -412,6 +414,21 @@ public class PolicyEntry extends Destina
}
/**
+ * @return true if topic is always retroactive
+ */
+ public boolean isAlwaysRetroactive() {
+ return alwaysRetroactive;
+ }
+
+ /**
+ * @param alwaysRetroactive
+ */
+ public void setAlwaysRetroactive(boolean alwaysRetroactive) {
+ this.alwaysRetroactive = alwaysRetroactive;
+ }
+
+
+ /**
* Set's the interval at which warnings about producers being blocked by
* resource usage will be triggered. Values of 0 or less will disable
* warnings
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java Mon Jul 11 16:02:52 2011
@@ -88,7 +88,7 @@ public class RetroactiveConsumerWithMess
protected ConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(bindAddress);
- answer.setUseRetroactiveConsumer(true);
+ //answer.setUseRetroactiveConsumer(true);
return answer;
}
Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml Mon Jul 11 16:02:52 2011
@@ -32,7 +32,7 @@
<destinationPolicy>
<policyMap>
<policyEntries>
- <policyEntry topic="org.apache.activemq.test.>">
+ <policyEntry topic="org.apache.activemq.test.>" alwaysRetroactive="true" >
<subscriptionRecoveryPolicy>
<queryBasedSubscriptionRecoveryPolicy query="#myQuery" />
</subscriptionRecoveryPolicy>