You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/12/08 03:27:30 UTC

svn commit: r888227 [1/2] - in /activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker: jmx/ region/ region/policy/

Author: cmacnaug
Date: Tue Dec  8 02:27:30 2009
New Revision: 888227

URL: http://svn.apache.org/viewvc?rev=888227&view=rev
Log:
Backport from trunk revision 883458: Adding a blockedProducerWarningInterval attribute to destinations to control the rate at which warnings about blocked producers are generated (otherwise the warnings can flood the log). 

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Tue Dec  8 02:27:30 2009
@@ -82,11 +82,11 @@
     public long getDispatchCount() {
         return destination.getDestinationStatistics().getDispatched().getCount();
     }
-    
+
     public long getInFlightCount() {
         return destination.getDestinationStatistics().getInflight().getCount();
     }
-    
+
     public long getExpiredCount() {
         return destination.getDestinationStatistics().getExpired().getCount();
     }
@@ -220,7 +220,7 @@
         OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
         Message[] messages = destination.browse();
         CompositeType ct = factory.getCompositeType();
-        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
+        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
         TabularDataSupport rc = new TabularDataSupport(tt);
 
         MessageEvaluationContext ctx = new MessageEvaluationContext();
@@ -248,16 +248,16 @@
     public String sendTextMessage(String body) throws Exception {
         return sendTextMessage(Collections.EMPTY_MAP, body);
     }
-    
+
     public String sendTextMessage(Map headers, String body) throws Exception {
-        return sendTextMessage(headers,body,null,null);
+        return sendTextMessage(headers, body, null, null);
     }
 
     public String sendTextMessage(String body, String user, String password) throws Exception {
-        return sendTextMessage(Collections.EMPTY_MAP,body,user,password);
+        return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
     }
 
-    public String sendTextMessage(Map headers, String body,String userName,String password) throws Exception {
+    public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception {
 
         String brokerUrl = "vm://" + broker.getBrokerName();
         ActiveMQDestination dest = destination.getActiveMQDestination();
@@ -266,14 +266,14 @@
         Connection connection = null;
         try {
 
-            connection = cf.createConnection(userName,password);
+            connection = cf.createConnection(userName, password);
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             MessageProducer producer = session.createProducer(dest);
-            ActiveMQTextMessage msg = (ActiveMQTextMessage)session.createTextMessage(body);
+            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
 
             for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
-                Map.Entry entry = (Map.Entry)iter.next();
-                msg.setObjectProperty((String)entry.getKey(), entry.getValue());
+                Map.Entry entry = (Map.Entry) iter.next();
+                msg.setObjectProperty((String) entry.getKey(), entry.getValue());
             }
 
             producer.setDeliveryMode(msg.getJMSDeliveryMode());
@@ -292,30 +292,28 @@
 
     public int getMaxAuditDepth() {
         return destination.getMaxAuditDepth();
-     }
+    }
+
+    public int getMaxProducersToAudit() {
+        return destination.getMaxProducersToAudit();
+    }
+
+    public boolean isEnableAudit() {
+        return destination.isEnableAudit();
+    }
+
+    public void setEnableAudit(boolean enableAudit) {
+        destination.setEnableAudit(enableAudit);
+    }
+
+    public void setMaxAuditDepth(int maxAuditDepth) {
+        destination.setMaxAuditDepth(maxAuditDepth);
+    }
 
-     public int getMaxProducersToAudit() {
-         return destination.getMaxProducersToAudit();
-     }
-
-     public boolean isEnableAudit() {
-         return destination.isEnableAudit();
-     }
-
-     
-     public void setEnableAudit(boolean enableAudit) {
-         destination.setEnableAudit(enableAudit);
-     }
-
-     public void setMaxAuditDepth(int maxAuditDepth) {
-         destination.setMaxAuditDepth(maxAuditDepth);
-     }
- 
-     public void setMaxProducersToAudit(int maxProducersToAudit) {
-         destination.setMaxProducersToAudit(maxProducersToAudit);
-     }
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+        destination.setMaxProducersToAudit(maxProducersToAudit);
+    }
 
-    
     public float getMemoryUsagePortion() {
         return destination.getMemoryUsage().getUsagePortion();
     }
@@ -325,31 +323,52 @@
     }
 
     public boolean isProducerFlowControl() {
-       return destination.isProducerFlowControl();
+        return destination.isProducerFlowControl();
     }
-    
+
     public void setMemoryUsagePortion(float value) {
         destination.getMemoryUsage().setUsagePortion(value);
     }
 
     public void setProducerFlowControl(boolean producerFlowControl) {
-      destination.setProducerFlowControl(producerFlowControl);      
+        destination.setProducerFlowControl(producerFlowControl);
+    }
+
+    /**
+     * Set's the interval at which warnings about producers being blocked by
+     * resource usage will be triggered. Values of 0 or less will disable
+     * warnings
+     * 
+     * @param blockedProducerWarningInterval the interval at which warning about
+     *            blocked producers will be triggered.
+     */
+    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
+        destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
+    }
+
+    /**
+     * 
+     * @return the interval at which warning about blocked producers will be
+     *         triggered.
+     */
+    public long getBlockedProducerWarningInterval() {
+        return destination.getBlockedProducerWarningInterval();
     }
 
     public int getMaxPageSize() {
         return destination.getMaxPageSize();
     }
-    
+
     public void setMaxPageSize(int pageSize) {
         destination.setMaxPageSize(pageSize);
     }
-    
+
     public boolean isUseCache() {
         return destination.isUseCache();
     }
 
     public void setUseCache(boolean value) {
-        destination.setUseCache(value);    
+        destination.setUseCache(value);
     }
 
     public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Tue Dec  8 02:27:30 2009
@@ -248,12 +248,31 @@
      */
     @MBeanInfo("Producers are flow controlled")
     boolean isProducerFlowControl();
+    
     /**
      * @param producerFlowControl the producerFlowControl to set
      */
     public void setProducerFlowControl(@MBeanInfo("producerFlowControl") boolean producerFlowControl);
     
     /**
+     * Set's the interval at which warnings about producers being blocked by
+     * resource usage will be triggered. Values of 0 or less will disable
+     * warnings
+     * 
+     * @param blockedProducerWarningInterval the interval at which warning about
+     *            blocked producers will be triggered.
+     */
+    public void setBlockedProducerWarningInterval(@MBeanInfo("blockedProducerWarningInterval")  long blockedProducerWarningInterval);
+
+    /**
+     * 
+     * @return the interval at which warning about blocked producers will be
+     *         triggered.
+     */
+    @MBeanInfo("Blocked Producer Warning Interval")
+    public long getBlockedProducerWarningInterval();
+    
+    /**
      * @return the maxProducersToAudit
      */
     @MBeanInfo("Maximum number of producers to audit") 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Tue Dec  8 02:27:30 2009
@@ -40,18 +40,21 @@
  */
 public abstract class BaseDestination implements Destination {
     /**
-     * The maximum number of messages to page in to the destination from persistent storage
+     * The maximum number of messages to page in to the destination from
+     * persistent storage
      */
     public static final int MAX_PAGE_SIZE = 200;
     public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
-    public static final long EXPIRE_MESSAGE_PERIOD = 30*1000;
+    public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
     protected final ActiveMQDestination destination;
     protected final Broker broker;
     protected final MessageStore store;
     protected SystemUsage systemUsage;
     protected MemoryUsage memoryUsage;
     private boolean producerFlowControl = true;
-    protected boolean warnOnProducerFlowControl = true; 
+    protected boolean warnOnProducerFlowControl = true;
+    protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
+
     private int maxProducersToAudit = 1024;
     private int maxAuditDepth = 2048;
     private boolean enableAudit = true;
@@ -82,8 +85,7 @@
      * @param parentStats
      * @throws Exception
      */
-    public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination,
-            DestinationStatistics parentStats) throws Exception {
+    public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
         this.brokerService = brokerService;
         this.broker = brokerService.getBroker();
         this.store = store;
@@ -118,14 +120,34 @@
     }
 
     /**
-     * @param producerFlowControl
-     *            the producerFlowControl to set
+     * @param producerFlowControl the producerFlowControl to set
      */
     public void setProducerFlowControl(boolean producerFlowControl) {
         this.producerFlowControl = producerFlowControl;
     }
 
     /**
+     * Set's the interval at which warnings about producers being blocked by
+     * resource usage will be triggered. Values of 0 or less will disable
+     * warnings
+     * 
+     * @param blockedProducerWarningInterval the interval at which warning about
+     *            blocked producers will be triggered.
+     */
+    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
+        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
+    }
+
+    /**
+     * 
+     * @return the interval at which warning about blocked producers will be
+     *         triggered.
+     */
+    public long getBlockedProducerWarningInterval() {
+        return blockedProducerWarningInterval;
+    }
+
+    /**
      * @return the maxProducersToAudit
      */
     public int getMaxProducersToAudit() {
@@ -133,8 +155,7 @@
     }
 
     /**
-     * @param maxProducersToAudit
-     *            the maxProducersToAudit to set
+     * @param maxProducersToAudit the maxProducersToAudit to set
      */
     public void setMaxProducersToAudit(int maxProducersToAudit) {
         this.maxProducersToAudit = maxProducersToAudit;
@@ -148,8 +169,7 @@
     }
 
     /**
-     * @param maxAuditDepth
-     *            the maxAuditDepth to set
+     * @param maxAuditDepth the maxAuditDepth to set
      */
     public void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
@@ -163,8 +183,7 @@
     }
 
     /**
-     * @param enableAudit
-     *            the enableAudit to set
+     * @param enableAudit the enableAudit to set
      */
     public void setEnableAudit(boolean enableAudit) {
         this.enableAudit = enableAudit;
@@ -199,8 +218,7 @@
     }
 
     public final boolean isActive() {
-        return destinationStatistics.getConsumers().getCount() != 0
-                || destinationStatistics.getProducers().getCount() != 0;
+        return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0;
     }
 
     public int getMaxPageSize() {
@@ -218,13 +236,13 @@
     public void setMaxBrowsePageSize(int maxPageSize) {
         this.maxBrowsePageSize = maxPageSize;
     }
-    
+
     public int getMaxExpirePageSize() {
         return this.maxExpirePageSize;
     }
 
     public void setMaxExpirePageSize(int maxPageSize) {
-        this.maxExpirePageSize  = maxPageSize;
+        this.maxExpirePageSize = maxPageSize;
     }
 
     public void setExpireMessagesPeriod(long expireMessagesPeriod) {
@@ -234,7 +252,7 @@
     public long getExpireMessagesPeriod() {
         return expireMessagesPeriod;
     }
-    
+
     public boolean isUseCache() {
         return useCache;
     }
@@ -271,8 +289,7 @@
     }
 
     /**
-     * @param advisoryForSlowConsumers
-     *            the advisoryForSlowConsumers to set
+     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
      */
     public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
         this.advisoryForSlowConsumers = advisoryForSlowConsumers;
@@ -286,8 +303,8 @@
     }
 
     /**
-     * @param advisoryForDiscardingMessages
-     *            the advisoryForDiscardingMessages to set
+     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
+     *            set
      */
     public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
         this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
@@ -301,8 +318,7 @@
     }
 
     /**
-     * @param advisoryWhenFull
-     *            the advisoryWhenFull to set
+     * @param advisoryWhenFull the advisoryWhenFull to set
      */
     public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
         this.advisoryWhenFull = advisoryWhenFull;
@@ -316,8 +332,7 @@
     }
 
     /**
-     * @param advisoryForDelivery
-     *            the advisoryForDelivery to set
+     * @param advisoryForDelivery the advisoryForDelivery to set
      */
     public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
         this.advisoryForDelivery = advisoryForDelivery;
@@ -331,8 +346,7 @@
     }
 
     /**
-     * @param advisoryForConsumed
-     *            the advisoryForConsumed to set
+     * @param advisoryForConsumed the advisoryForConsumed to set
      */
     public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
         this.advisoryForConsumed = advisoryForConsumed;
@@ -346,13 +360,12 @@
     }
 
     /**
-     * @param advisdoryForFastProducers
-     *            the advisdoryForFastProducers to set
+     * @param advisdoryForFastProducers the advisdoryForFastProducers to set
      */
     public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
         this.advisdoryForFastProducers = advisdoryForFastProducers;
     }
-    
+
     public boolean isSendAdvisoryIfNoConsumers() {
         return sendAdvisoryIfNoConsumers;
     }
@@ -376,14 +389,14 @@
     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
         this.deadLetterStrategy = deadLetterStrategy;
     }
-    
+
     public int getCursorMemoryHighWaterMark() {
-		return this.cursorMemoryHighWaterMark;
-	}
+        return this.cursorMemoryHighWaterMark;
+    }
 
-	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
-		this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
-	}
+    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
+    }
 
     /**
      * called when message is consumed
@@ -410,8 +423,8 @@
     }
 
     /**
-     * Called when a message is discarded - e.g. running low on memory This will happen only if the policy is enabled -
-     * e.g. non durable topics
+     * Called when a message is discarded - e.g. running low on memory This will
+     * happen only if the policy is enabled - e.g. non durable topics
      * 
      * @param context
      * @param messageReference
@@ -460,19 +473,19 @@
 
     public void dispose(ConnectionContext context) throws IOException {
         if (this.store != null) {
-        	this.store.removeAllMessages(context);
+            this.store.removeAllMessages(context);
             this.store.dispose(context);
         }
         this.destinationStatistics.setParent(null);
         this.memoryUsage.stop();
     }
-    
+
     /**
      * Provides a hook to allow messages with no consumer to be processed in
      * some way - such as to send to a dead letter queue or something..
      */
     protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
-    	if (!msg.isPersistent()) {
+        if (!msg.isPersistent()) {
             if (isSendAdvisoryIfNoConsumers()) {
                 // allow messages with no consumers to be dispatched to a dead
                 // letter queue
@@ -489,12 +502,12 @@
                     if (message.getOriginalTransactionId() != null) {
                         message.setOriginalTransactionId(message.getTransactionId());
                     }
-                    
+
                     ActiveMQTopic advisoryTopic;
                     if (destination.isQueue()) {
-                    	advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
+                        advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
                     } else {
-                    	advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
+                        advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
                     }
                     message.setDestination(advisoryTopic);
                     message.setTransactionId(null);
@@ -517,8 +530,9 @@
             }
         }
     }
-    
-    public void processDispatchNotification(
-            MessageDispatchNotification messageDispatchNotification) throws Exception {
+
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
     }
+    
+    
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Tue Dec  8 02:27:30 2009
@@ -41,10 +41,12 @@
 public interface Destination extends Service, Task {
 
     public static final DeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new SharedDeadLetterStrategy();
+    public static final long DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL = 30000;
+
     void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
 
     void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception;
-    
+
     void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
 
     void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
@@ -70,122 +72,146 @@
     String getName();
 
     MessageStore getMessageStore();
-    
+
     boolean isProducerFlowControl();
-    
+
     void setProducerFlowControl(boolean value);
-    
+
+    /**
+     * Set's the interval at which warnings about producers being blocked by
+     * resource usage will be triggered. Values of 0 or less will disable
+     * warnings
+     * 
+     * @param blockedProducerWarningInterval the interval at which warning about
+     *            blocked producers will be triggered.
+     */
+    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval);
+
+    /**
+     * 
+     * @return the interval at which warning about blocked producers will be
+     *         triggered.
+     */
+    public long getBlockedProducerWarningInterval();
+
     int getMaxProducersToAudit();
-    
+
     void setMaxProducersToAudit(int maxProducersToAudit);
-   
+
     int getMaxAuditDepth();
-   
+
     void setMaxAuditDepth(int maxAuditDepth);
-  
+
     boolean isEnableAudit();
-    
+
     void setEnableAudit(boolean enableAudit);
-    
-    boolean isActive();   
-    
+
+    boolean isActive();
+
     int getMaxPageSize();
-    
+
     public void setMaxPageSize(int maxPageSize);
-    
+
     public int getMaxBrowsePageSize();
 
     public void setMaxBrowsePageSize(int maxPageSize);
-    
+
     public boolean isUseCache();
-    
+
     public void setUseCache(boolean useCache);
-    
+
     public int getMinimumMessageSize();
 
     public void setMinimumMessageSize(int minimumMessageSize);
-    
+
     public int getCursorMemoryHighWaterMark();
 
-	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
-    
+    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
+
     /**
-     * optionally called by a Subscriber - to inform the Destination its
-     * ready for more messages
+     * optionally called by a Subscriber - to inform the Destination its ready
+     * for more messages
      */
     public void wakeup();
-    
+
     /**
      * @return true if lazyDispatch is enabled
      */
     public boolean isLazyDispatch();
-    
-    
+
     /**
      * set the lazy dispatch - default is false
+     * 
      * @param value
      */
     public void setLazyDispatch(boolean value);
 
-        
     /**
      * Inform the Destination a message has expired
+     * 
      * @param context
-     * @param subs 
+     * @param subs
      * @param node
      */
-    void messageExpired(ConnectionContext context, Subscription subs,MessageReference node);
+    void messageExpired(ConnectionContext context, Subscription subs, MessageReference node);
 
     /**
      * called when message is consumed
+     * 
      * @param context
      * @param messageReference
      */
-     void messageConsumed(ConnectionContext context, MessageReference messageReference);
-    
+    void messageConsumed(ConnectionContext context, MessageReference messageReference);
+
     /**
      * Called when message is delivered to the broker
+     * 
      * @param context
      * @param messageReference
      */
-     void messageDelivered(ConnectionContext context, MessageReference messageReference);
-    
+    void messageDelivered(ConnectionContext context, MessageReference messageReference);
+
     /**
-     * Called when a message is discarded - e.g. running low on memory
-     * This will happen only if the policy is enabled - e.g. non durable topics
+     * Called when a message is discarded - e.g. running low on memory This will
+     * happen only if the policy is enabled - e.g. non durable topics
+     * 
      * @param context
      * @param messageReference
      */
-     void messageDiscarded(ConnectionContext context, MessageReference messageReference);
-    
+    void messageDiscarded(ConnectionContext context, MessageReference messageReference);
+
     /**
      * Called when there is a slow consumer
+     * 
      * @param context
      * @param subs
      */
-     void slowConsumer(ConnectionContext context, Subscription subs);
-    
+    void slowConsumer(ConnectionContext context, Subscription subs);
+
     /**
      * Called to notify a producer is too fast
+     * 
      * @param context
      * @param producerInfo
      */
-     void fastProducer(ConnectionContext context,ProducerInfo producerInfo);
-    
+    void fastProducer(ConnectionContext context, ProducerInfo producerInfo);
+
     /**
      * Called when a Usage reaches a limit
+     * 
      * @param context
      * @param usage
      */
-     void isFull(ConnectionContext context,Usage usage);
+    void isFull(ConnectionContext context, Usage usage);
 
     List<Subscription> getConsumers();
 
     /**
-     * called on Queues in slave mode to allow dispatch to follow subscription choice of master
+     * called on Queues in slave mode to allow dispatch to follow subscription
+     * choice of master
+     * 
      * @param messageDispatchNotification
      * @throws Exception
      */
-    void processDispatchNotification(
-            MessageDispatchNotification messageDispatchNotification) throws Exception;
+    void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Tue Dec  8 02:27:30 2009
@@ -45,8 +45,7 @@
         this.next = next;
     }
 
-    public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
-        throws IOException {
+    public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
         next.acknowledge(context, sub, ack, node);
     }
 
@@ -108,13 +107,13 @@
 
     /**
      * Sends a message to the given destination which may be a wildcard
+     * 
      * @param context broker context
      * @param message message to send
      * @param destination possibly wildcard destination to send the message to
      * @throws Exception on error
      */
-    protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination)
-        throws Exception {
+    protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
         Broker broker = context.getConnectionContext().getBroker();
         Set<Destination> destinations = broker.getDestinations(destination);
 
@@ -130,24 +129,30 @@
     public boolean isProducerFlowControl() {
         return next.isProducerFlowControl();
     }
-    
-    public void setProducerFlowControl(boolean value){
+
+    public void setProducerFlowControl(boolean value) {
         next.setProducerFlowControl(value);
     }
 
-    public void addProducer(ConnectionContext context, ProducerInfo info)
-            throws Exception {
+    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
+        next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
+    }
+    
+    public long getBlockedProducerWarningInterval() {
+        return next.getBlockedProducerWarningInterval();
+    }
+
+    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         next.addProducer(context, info);
-        
+
     }
 
-    public void removeProducer(ConnectionContext context, ProducerInfo info)
-            throws Exception {
-       next.removeProducer(context, info);
+    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
+        next.removeProducer(context, info);
     }
 
     public int getMaxAuditDepth() {
-       return next.getMaxAuditDepth();
+        return next.getMaxAuditDepth();
     }
 
     public int getMaxProducersToAudit() {
@@ -157,20 +162,19 @@
     public boolean isEnableAudit() {
         return next.isEnableAudit();
     }
-    
+
     public void setEnableAudit(boolean enableAudit) {
         next.setEnableAudit(enableAudit);
     }
 
     public void setMaxAuditDepth(int maxAuditDepth) {
-       next.setMaxAuditDepth(maxAuditDepth);
+        next.setMaxAuditDepth(maxAuditDepth);
     }
 
-    
     public void setMaxProducersToAudit(int maxProducersToAudit) {
-       next.setMaxProducersToAudit(maxProducersToAudit);
+        next.setMaxProducersToAudit(maxProducersToAudit);
     }
-    
+
     public boolean isActive() {
         return next.isActive();
     }
@@ -189,88 +193,81 @@
 
     public void setUseCache(boolean useCache) {
         next.setUseCache(useCache);
-    }   
-    
+    }
+
     public int getMinimumMessageSize() {
         return next.getMinimumMessageSize();
     }
 
     public void setMinimumMessageSize(int minimumMessageSize) {
         next.setMinimumMessageSize(minimumMessageSize);
-    }   
-    
+    }
+
     public void wakeup() {
         next.wakeup();
     }
 
     public boolean isLazyDispatch() {
-       return next.isLazyDispatch();
+        return next.isLazyDispatch();
     }
 
     public void setLazyDispatch(boolean value) {
-      next.setLazyDispatch(value);        
+        next.setLazyDispatch(value);
     }
 
     public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
-        next.messageExpired(context, prefetchSubscription, node);        
+        next.messageExpired(context, prefetchSubscription, node);
     }
 
-	public boolean iterate() {
-		return next.iterate();
-	}
+    public boolean iterate() {
+        return next.iterate();
+    }
 
-    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
-       next.fastProducer(context, producerInfo);       
+    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
+        next.fastProducer(context, producerInfo);
     }
 
-   
     public void isFull(ConnectionContext context, Usage usage) {
-       next.isFull(context, usage);
+        next.isFull(context, usage);
     }
 
-   
-    public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
+    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
         next.messageConsumed(context, messageReference);
     }
 
-    
-    public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
+    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
         next.messageDelivered(context, messageReference);
     }
 
-    
-    public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
+    public void messageDiscarded(ConnectionContext context, MessageReference messageReference) {
         next.messageDiscarded(context, messageReference);
     }
 
-    
     public void slowConsumer(ConnectionContext context, Subscription subs) {
-       next.slowConsumer(context, subs);
+        next.slowConsumer(context, subs);
     }
 
-   
-    public void messageExpired(ConnectionContext context, Subscription subs,MessageReference node) {
-       next.messageExpired(context,subs, node);    
+    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) {
+        next.messageExpired(context, subs, node);
     }
 
     public int getMaxBrowsePageSize() {
-       return next.getMaxBrowsePageSize();
+        return next.getMaxBrowsePageSize();
     }
 
     public void setMaxBrowsePageSize(int maxPageSize) {
         next.setMaxBrowsePageSize(maxPageSize);
     }
 
-    public void processDispatchNotification(
-            MessageDispatchNotification messageDispatchNotification) throws Exception {
-        next.processDispatchNotification(messageDispatchNotification);   
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
+        next.processDispatchNotification(messageDispatchNotification);
     }
 
-	public int getCursorMemoryHighWaterMark() {
-		return next.getCursorMemoryHighWaterMark();
-	}
-
-	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
-		next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
-	}
+    public int getCursorMemoryHighWaterMark() {
+        return next.getCursorMemoryHighWaterMark();
+    }
+
+    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+        next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
+    }
 }