You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/07/11 18:02:52 UTC

svn commit: r1145217 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/test/retroactive/ test...

Author: gtully
Date: Mon Jul 11 16:02:52 2011
New Revision: 1145217

URL: http://svn.apache.org/viewvc?rev=1145217&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2911: Option to make all consumers retroactive. Patch applied and with thanks and an additional test. new destination policy entry 'alwaysRetroactive'.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Jul 11 16:02:52 2011
@@ -339,6 +339,14 @@ public class DestinationView implements 
     public void setProducerFlowControl(boolean producerFlowControl) {
         destination.setProducerFlowControl(producerFlowControl);
     }
+    
+    public boolean isAlwaysRetroactive() {
+    	return destination.isAlwaysRetroactive();
+    }
+    
+    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
+    	destination.setAlwaysRetroactive(alwaysRetroactive);
+    }
 
     /**
      * Set's the interval at which warnings about producers being blocked by

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Mon Jul 11 16:02:52 2011
@@ -254,6 +254,17 @@ public interface DestinationViewMBean {
     public void setProducerFlowControl(@MBeanInfo("producerFlowControl") boolean producerFlowControl);
     
     /**
+     * @return if we treat consumers as alwaysRetroactive
+     */
+    @MBeanInfo("Always treat consumers as retroActive")
+    boolean isAlwaysRetroactive();
+    
+    /**
+     * @param alwaysRetroactive set as always retroActive
+     */
+    public void setAlwaysRetroactive(@MBeanInfo("alwaysRetroactive") boolean alwaysRetroactive);    
+    
+    /**
      * Set's the interval at which warnings about producers being blocked by
      * resource usage will be triggered. Values of 0 or less will disable
      * warnings

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Mon Jul 11 16:02:52 2011
@@ -63,6 +63,7 @@ public abstract class BaseDestination im
     protected SystemUsage systemUsage;
     protected MemoryUsage memoryUsage;
     private boolean producerFlowControl = true;
+    private boolean alwaysRetroactive = false;
     protected boolean warnOnProducerFlowControl = true;
     protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
 
@@ -147,6 +148,14 @@ public abstract class BaseDestination im
     public void setProducerFlowControl(boolean producerFlowControl) {
         this.producerFlowControl = producerFlowControl;
     }
+    
+    public boolean isAlwaysRetroactive() {
+    	return alwaysRetroactive;
+    }
+    
+    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
+    	this.alwaysRetroactive = alwaysRetroactive;
+    }
 
     /**
      * Set's the interval at which warnings about producers being blocked by

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Jul 11 16:02:52 2011
@@ -77,6 +77,10 @@ public interface Destination extends Ser
     boolean isProducerFlowControl();
 
     void setProducerFlowControl(boolean value);
+    
+    boolean isAlwaysRetroactive();
+    
+    void setAlwaysRetroactive(boolean value);
 
     /**
      * Set's the interval at which warnings about producers being blocked by

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Jul 11 16:02:52 2011
@@ -137,6 +137,14 @@ public class DestinationFilter implement
     public void setProducerFlowControl(boolean value) {
         next.setProducerFlowControl(value);
     }
+    
+    public boolean isAlwaysRetroactive() {
+    	return next.isAlwaysRetroactive();
+    }
+    
+    public void setAlwaysRetroactive(boolean value) {
+    	next.setAlwaysRetroactive(value);
+    }
 
     public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
         next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Jul 11 16:02:52 2011
@@ -117,7 +117,7 @@ public class Topic extends BaseDestinati
         if (!sub.getConsumerInfo().isDurable()) {
 
             // Do a retroactive recovery if needed.
-            if (sub.getConsumerInfo().isRetroactive()) {
+            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
 
                 // synchronize with dispatch method so that no new messages are
                 // sent

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Jul 11 16:02:52 2011
@@ -61,6 +61,7 @@ public class PolicyEntry extends Destina
     private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
     private boolean enableAudit=true;
     private boolean producerFlowControl = true;
+    private boolean alwaysRetroactive = false;
     private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
     private boolean optimizedDispatch=false;
     private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
@@ -140,6 +141,7 @@ public class PolicyEntry extends Destina
     
     public void baseConfiguration(Broker broker,BaseDestination destination) {
         destination.setProducerFlowControl(isProducerFlowControl());
+        destination.setAlwaysRetroactive(isAlwaysRetroactive());
         destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
         destination.setEnableAudit(isEnableAudit());
         destination.setMaxAuditDepth(getMaxQueueAuditDepth());
@@ -412,6 +414,21 @@ public class PolicyEntry extends Destina
     }
 
     /**
+     * @return true if topic is always retroactive
+     */
+    public boolean isAlwaysRetroactive() {
+        return alwaysRetroactive;
+    }
+
+    /**
+     * @param alwaysRetroactive
+     */
+    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
+        this.alwaysRetroactive = alwaysRetroactive;
+    }
+    
+    
+    /**
      * Set's the interval at which warnings about producers being blocked by
      * resource usage will be triggered. Values of 0 or less will disable
      * warnings

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java Mon Jul 11 16:02:52 2011
@@ -88,7 +88,7 @@ public class RetroactiveConsumerWithMess
 
     protected ConnectionFactory createConnectionFactory() throws Exception {
         ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(bindAddress);
-        answer.setUseRetroactiveConsumer(true);
+        //answer.setUseRetroactiveConsumer(true);
         return answer;
     }
 

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml?rev=1145217&r1=1145216&r2=1145217&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/test/retroactive/activemq-message-query.xml Mon Jul 11 16:02:52 2011
@@ -32,7 +32,7 @@
     <destinationPolicy>
       <policyMap>
         <policyEntries>
-          <policyEntry topic="org.apache.activemq.test.>">
+          <policyEntry topic="org.apache.activemq.test.>" alwaysRetroactive="true" >
             <subscriptionRecoveryPolicy>
               <queryBasedSubscriptionRecoveryPolicy query="#myQuery" />
             </subscriptionRecoveryPolicy>