You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/12/08 03:27:30 UTC
svn commit: r888227 [1/2] - in
/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker:
jmx/ region/ region/policy/
Author: cmacnaug
Date: Tue Dec 8 02:27:30 2009
New Revision: 888227
URL: http://svn.apache.org/viewvc?rev=888227&view=rev
Log:
Backport from trunk revision 883458: Adding a blockedProducerWarningInterval attribute to destinations to control the rate at which warnings about blocked producers are generated (otherwise the warnings can flood the log).
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Tue Dec 8 02:27:30 2009
@@ -82,11 +82,11 @@
public long getDispatchCount() {
return destination.getDestinationStatistics().getDispatched().getCount();
}
-
+
public long getInFlightCount() {
return destination.getDestinationStatistics().getInflight().getCount();
}
-
+
public long getExpiredCount() {
return destination.getDestinationStatistics().getExpired().getCount();
}
@@ -220,7 +220,7 @@
OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
Message[] messages = destination.browse();
CompositeType ct = factory.getCompositeType();
- TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
+ TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
TabularDataSupport rc = new TabularDataSupport(tt);
MessageEvaluationContext ctx = new MessageEvaluationContext();
@@ -248,16 +248,16 @@
public String sendTextMessage(String body) throws Exception {
return sendTextMessage(Collections.EMPTY_MAP, body);
}
-
+
public String sendTextMessage(Map headers, String body) throws Exception {
- return sendTextMessage(headers,body,null,null);
+ return sendTextMessage(headers, body, null, null);
}
public String sendTextMessage(String body, String user, String password) throws Exception {
- return sendTextMessage(Collections.EMPTY_MAP,body,user,password);
+ return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
}
- public String sendTextMessage(Map headers, String body,String userName,String password) throws Exception {
+ public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception {
String brokerUrl = "vm://" + broker.getBrokerName();
ActiveMQDestination dest = destination.getActiveMQDestination();
@@ -266,14 +266,14 @@
Connection connection = null;
try {
- connection = cf.createConnection(userName,password);
+ connection = cf.createConnection(userName, password);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(dest);
- ActiveMQTextMessage msg = (ActiveMQTextMessage)session.createTextMessage(body);
+ ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Map.Entry)iter.next();
- msg.setObjectProperty((String)entry.getKey(), entry.getValue());
+ Map.Entry entry = (Map.Entry) iter.next();
+ msg.setObjectProperty((String) entry.getKey(), entry.getValue());
}
producer.setDeliveryMode(msg.getJMSDeliveryMode());
@@ -292,30 +292,28 @@
public int getMaxAuditDepth() {
return destination.getMaxAuditDepth();
- }
+ }
+
+ public int getMaxProducersToAudit() {
+ return destination.getMaxProducersToAudit();
+ }
+
+ public boolean isEnableAudit() {
+ return destination.isEnableAudit();
+ }
+
+ public void setEnableAudit(boolean enableAudit) {
+ destination.setEnableAudit(enableAudit);
+ }
+
+ public void setMaxAuditDepth(int maxAuditDepth) {
+ destination.setMaxAuditDepth(maxAuditDepth);
+ }
- public int getMaxProducersToAudit() {
- return destination.getMaxProducersToAudit();
- }
-
- public boolean isEnableAudit() {
- return destination.isEnableAudit();
- }
-
-
- public void setEnableAudit(boolean enableAudit) {
- destination.setEnableAudit(enableAudit);
- }
-
- public void setMaxAuditDepth(int maxAuditDepth) {
- destination.setMaxAuditDepth(maxAuditDepth);
- }
-
- public void setMaxProducersToAudit(int maxProducersToAudit) {
- destination.setMaxProducersToAudit(maxProducersToAudit);
- }
+ public void setMaxProducersToAudit(int maxProducersToAudit) {
+ destination.setMaxProducersToAudit(maxProducersToAudit);
+ }
-
public float getMemoryUsagePortion() {
return destination.getMemoryUsage().getUsagePortion();
}
@@ -325,31 +323,52 @@
}
public boolean isProducerFlowControl() {
- return destination.isProducerFlowControl();
+ return destination.isProducerFlowControl();
}
-
+
public void setMemoryUsagePortion(float value) {
destination.getMemoryUsage().setUsagePortion(value);
}
public void setProducerFlowControl(boolean producerFlowControl) {
- destination.setProducerFlowControl(producerFlowControl);
+ destination.setProducerFlowControl(producerFlowControl);
+ }
+
+ /**
+ * Set's the interval at which warnings about producers being blocked by
+ * resource usage will be triggered. Values of 0 or less will disable
+ * warnings
+ *
+ * @param blockedProducerWarningInterval the interval at which warning about
+ * blocked producers will be triggered.
+ */
+ public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
+ destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
+ }
+
+ /**
+ *
+ * @return the interval at which warning about blocked producers will be
+ * triggered.
+ */
+ public long getBlockedProducerWarningInterval() {
+ return destination.getBlockedProducerWarningInterval();
}
public int getMaxPageSize() {
return destination.getMaxPageSize();
}
-
+
public void setMaxPageSize(int pageSize) {
destination.setMaxPageSize(pageSize);
}
-
+
public boolean isUseCache() {
return destination.isUseCache();
}
public void setUseCache(boolean value) {
- destination.setUseCache(value);
+ destination.setUseCache(value);
}
public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Tue Dec 8 02:27:30 2009
@@ -248,12 +248,31 @@
*/
@MBeanInfo("Producers are flow controlled")
boolean isProducerFlowControl();
+
/**
* @param producerFlowControl the producerFlowControl to set
*/
public void setProducerFlowControl(@MBeanInfo("producerFlowControl") boolean producerFlowControl);
/**
+ * Set's the interval at which warnings about producers being blocked by
+ * resource usage will be triggered. Values of 0 or less will disable
+ * warnings
+ *
+ * @param blockedProducerWarningInterval the interval at which warning about
+ * blocked producers will be triggered.
+ */
+ public void setBlockedProducerWarningInterval(@MBeanInfo("blockedProducerWarningInterval") long blockedProducerWarningInterval);
+
+ /**
+ *
+ * @return the interval at which warning about blocked producers will be
+ * triggered.
+ */
+ @MBeanInfo("Blocked Producer Warning Interval")
+ public long getBlockedProducerWarningInterval();
+
+ /**
* @return the maxProducersToAudit
*/
@MBeanInfo("Maximum number of producers to audit")
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Tue Dec 8 02:27:30 2009
@@ -40,18 +40,21 @@
*/
public abstract class BaseDestination implements Destination {
/**
- * The maximum number of messages to page in to the destination from persistent storage
+ * The maximum number of messages to page in to the destination from
+ * persistent storage
*/
public static final int MAX_PAGE_SIZE = 200;
public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
- public static final long EXPIRE_MESSAGE_PERIOD = 30*1000;
+ public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
protected SystemUsage systemUsage;
protected MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
- protected boolean warnOnProducerFlowControl = true;
+ protected boolean warnOnProducerFlowControl = true;
+ protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
+
private int maxProducersToAudit = 1024;
private int maxAuditDepth = 2048;
private boolean enableAudit = true;
@@ -82,8 +85,7 @@
* @param parentStats
* @throws Exception
*/
- public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination,
- DestinationStatistics parentStats) throws Exception {
+ public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
this.brokerService = brokerService;
this.broker = brokerService.getBroker();
this.store = store;
@@ -118,14 +120,34 @@
}
/**
- * @param producerFlowControl
- * the producerFlowControl to set
+ * @param producerFlowControl the producerFlowControl to set
*/
public void setProducerFlowControl(boolean producerFlowControl) {
this.producerFlowControl = producerFlowControl;
}
/**
+ * Set's the interval at which warnings about producers being blocked by
+ * resource usage will be triggered. Values of 0 or less will disable
+ * warnings
+ *
+ * @param blockedProducerWarningInterval the interval at which warning about
+ * blocked producers will be triggered.
+ */
+ public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
+ this.blockedProducerWarningInterval = blockedProducerWarningInterval;
+ }
+
+ /**
+ *
+ * @return the interval at which warning about blocked producers will be
+ * triggered.
+ */
+ public long getBlockedProducerWarningInterval() {
+ return blockedProducerWarningInterval;
+ }
+
+ /**
* @return the maxProducersToAudit
*/
public int getMaxProducersToAudit() {
@@ -133,8 +155,7 @@
}
/**
- * @param maxProducersToAudit
- * the maxProducersToAudit to set
+ * @param maxProducersToAudit the maxProducersToAudit to set
*/
public void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
@@ -148,8 +169,7 @@
}
/**
- * @param maxAuditDepth
- * the maxAuditDepth to set
+ * @param maxAuditDepth the maxAuditDepth to set
*/
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
@@ -163,8 +183,7 @@
}
/**
- * @param enableAudit
- * the enableAudit to set
+ * @param enableAudit the enableAudit to set
*/
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
@@ -199,8 +218,7 @@
}
public final boolean isActive() {
- return destinationStatistics.getConsumers().getCount() != 0
- || destinationStatistics.getProducers().getCount() != 0;
+ return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0;
}
public int getMaxPageSize() {
@@ -218,13 +236,13 @@
public void setMaxBrowsePageSize(int maxPageSize) {
this.maxBrowsePageSize = maxPageSize;
}
-
+
public int getMaxExpirePageSize() {
return this.maxExpirePageSize;
}
public void setMaxExpirePageSize(int maxPageSize) {
- this.maxExpirePageSize = maxPageSize;
+ this.maxExpirePageSize = maxPageSize;
}
public void setExpireMessagesPeriod(long expireMessagesPeriod) {
@@ -234,7 +252,7 @@
public long getExpireMessagesPeriod() {
return expireMessagesPeriod;
}
-
+
public boolean isUseCache() {
return useCache;
}
@@ -271,8 +289,7 @@
}
/**
- * @param advisoryForSlowConsumers
- * the advisoryForSlowConsumers to set
+ * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
*/
public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
this.advisoryForSlowConsumers = advisoryForSlowConsumers;
@@ -286,8 +303,8 @@
}
/**
- * @param advisoryForDiscardingMessages
- * the advisoryForDiscardingMessages to set
+ * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
+ * set
*/
public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
@@ -301,8 +318,7 @@
}
/**
- * @param advisoryWhenFull
- * the advisoryWhenFull to set
+ * @param advisoryWhenFull the advisoryWhenFull to set
*/
public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
this.advisoryWhenFull = advisoryWhenFull;
@@ -316,8 +332,7 @@
}
/**
- * @param advisoryForDelivery
- * the advisoryForDelivery to set
+ * @param advisoryForDelivery the advisoryForDelivery to set
*/
public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
this.advisoryForDelivery = advisoryForDelivery;
@@ -331,8 +346,7 @@
}
/**
- * @param advisoryForConsumed
- * the advisoryForConsumed to set
+ * @param advisoryForConsumed the advisoryForConsumed to set
*/
public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
this.advisoryForConsumed = advisoryForConsumed;
@@ -346,13 +360,12 @@
}
/**
- * @param advisdoryForFastProducers
- * the advisdoryForFastProducers to set
+ * @param advisdoryForFastProducers the advisdoryForFastProducers to set
*/
public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
this.advisdoryForFastProducers = advisdoryForFastProducers;
}
-
+
public boolean isSendAdvisoryIfNoConsumers() {
return sendAdvisoryIfNoConsumers;
}
@@ -376,14 +389,14 @@
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
-
+
public int getCursorMemoryHighWaterMark() {
- return this.cursorMemoryHighWaterMark;
- }
+ return this.cursorMemoryHighWaterMark;
+ }
- public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
- this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
- }
+ public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+ this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
+ }
/**
* called when message is consumed
@@ -410,8 +423,8 @@
}
/**
- * Called when a message is discarded - e.g. running low on memory This will happen only if the policy is enabled -
- * e.g. non durable topics
+ * Called when a message is discarded - e.g. running low on memory This will
+ * happen only if the policy is enabled - e.g. non durable topics
*
* @param context
* @param messageReference
@@ -460,19 +473,19 @@
public void dispose(ConnectionContext context) throws IOException {
if (this.store != null) {
- this.store.removeAllMessages(context);
+ this.store.removeAllMessages(context);
this.store.dispose(context);
}
this.destinationStatistics.setParent(null);
this.memoryUsage.stop();
}
-
+
/**
* Provides a hook to allow messages with no consumer to be processed in
* some way - such as to send to a dead letter queue or something..
*/
protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
- if (!msg.isPersistent()) {
+ if (!msg.isPersistent()) {
if (isSendAdvisoryIfNoConsumers()) {
// allow messages with no consumers to be dispatched to a dead
// letter queue
@@ -489,12 +502,12 @@
if (message.getOriginalTransactionId() != null) {
message.setOriginalTransactionId(message.getTransactionId());
}
-
+
ActiveMQTopic advisoryTopic;
if (destination.isQueue()) {
- advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
+ advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
} else {
- advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
+ advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
}
message.setDestination(advisoryTopic);
message.setTransactionId(null);
@@ -517,8 +530,9 @@
}
}
}
-
- public void processDispatchNotification(
- MessageDispatchNotification messageDispatchNotification) throws Exception {
+
+ public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
}
+
+
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Tue Dec 8 02:27:30 2009
@@ -41,10 +41,12 @@
public interface Destination extends Service, Task {
public static final DeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new SharedDeadLetterStrategy();
+ public static final long DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL = 30000;
+
void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception;
-
+
void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
@@ -70,122 +72,146 @@
String getName();
MessageStore getMessageStore();
-
+
boolean isProducerFlowControl();
-
+
void setProducerFlowControl(boolean value);
-
+
+ /**
+ * Set's the interval at which warnings about producers being blocked by
+ * resource usage will be triggered. Values of 0 or less will disable
+ * warnings
+ *
+ * @param blockedProducerWarningInterval the interval at which warning about
+ * blocked producers will be triggered.
+ */
+ public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval);
+
+ /**
+ *
+ * @return the interval at which warning about blocked producers will be
+ * triggered.
+ */
+ public long getBlockedProducerWarningInterval();
+
int getMaxProducersToAudit();
-
+
void setMaxProducersToAudit(int maxProducersToAudit);
-
+
int getMaxAuditDepth();
-
+
void setMaxAuditDepth(int maxAuditDepth);
-
+
boolean isEnableAudit();
-
+
void setEnableAudit(boolean enableAudit);
-
- boolean isActive();
-
+
+ boolean isActive();
+
int getMaxPageSize();
-
+
public void setMaxPageSize(int maxPageSize);
-
+
public int getMaxBrowsePageSize();
public void setMaxBrowsePageSize(int maxPageSize);
-
+
public boolean isUseCache();
-
+
public void setUseCache(boolean useCache);
-
+
public int getMinimumMessageSize();
public void setMinimumMessageSize(int minimumMessageSize);
-
+
public int getCursorMemoryHighWaterMark();
- public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
-
+ public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
+
/**
- * optionally called by a Subscriber - to inform the Destination its
- * ready for more messages
+ * optionally called by a Subscriber - to inform the Destination its ready
+ * for more messages
*/
public void wakeup();
-
+
/**
* @return true if lazyDispatch is enabled
*/
public boolean isLazyDispatch();
-
-
+
/**
* set the lazy dispatch - default is false
+ *
* @param value
*/
public void setLazyDispatch(boolean value);
-
/**
* Inform the Destination a message has expired
+ *
* @param context
- * @param subs
+ * @param subs
* @param node
*/
- void messageExpired(ConnectionContext context, Subscription subs,MessageReference node);
+ void messageExpired(ConnectionContext context, Subscription subs, MessageReference node);
/**
* called when message is consumed
+ *
* @param context
* @param messageReference
*/
- void messageConsumed(ConnectionContext context, MessageReference messageReference);
-
+ void messageConsumed(ConnectionContext context, MessageReference messageReference);
+
/**
* Called when message is delivered to the broker
+ *
* @param context
* @param messageReference
*/
- void messageDelivered(ConnectionContext context, MessageReference messageReference);
-
+ void messageDelivered(ConnectionContext context, MessageReference messageReference);
+
/**
- * Called when a message is discarded - e.g. running low on memory
- * This will happen only if the policy is enabled - e.g. non durable topics
+ * Called when a message is discarded - e.g. running low on memory This will
+ * happen only if the policy is enabled - e.g. non durable topics
+ *
* @param context
* @param messageReference
*/
- void messageDiscarded(ConnectionContext context, MessageReference messageReference);
-
+ void messageDiscarded(ConnectionContext context, MessageReference messageReference);
+
/**
* Called when there is a slow consumer
+ *
* @param context
* @param subs
*/
- void slowConsumer(ConnectionContext context, Subscription subs);
-
+ void slowConsumer(ConnectionContext context, Subscription subs);
+
/**
* Called to notify a producer is too fast
+ *
* @param context
* @param producerInfo
*/
- void fastProducer(ConnectionContext context,ProducerInfo producerInfo);
-
+ void fastProducer(ConnectionContext context, ProducerInfo producerInfo);
+
/**
* Called when a Usage reaches a limit
+ *
* @param context
* @param usage
*/
- void isFull(ConnectionContext context,Usage usage);
+ void isFull(ConnectionContext context, Usage usage);
List<Subscription> getConsumers();
/**
- * called on Queues in slave mode to allow dispatch to follow subscription choice of master
+ * called on Queues in slave mode to allow dispatch to follow subscription
+ * choice of master
+ *
* @param messageDispatchNotification
* @throws Exception
*/
- void processDispatchNotification(
- MessageDispatchNotification messageDispatchNotification) throws Exception;
+ void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=888227&r1=888226&r2=888227&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Tue Dec 8 02:27:30 2009
@@ -45,8 +45,7 @@
this.next = next;
}
- public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
- throws IOException {
+ public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
next.acknowledge(context, sub, ack, node);
}
@@ -108,13 +107,13 @@
/**
* Sends a message to the given destination which may be a wildcard
+ *
* @param context broker context
* @param message message to send
* @param destination possibly wildcard destination to send the message to
* @throws Exception on error
*/
- protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination)
- throws Exception {
+ protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
Broker broker = context.getConnectionContext().getBroker();
Set<Destination> destinations = broker.getDestinations(destination);
@@ -130,24 +129,30 @@
public boolean isProducerFlowControl() {
return next.isProducerFlowControl();
}
-
- public void setProducerFlowControl(boolean value){
+
+ public void setProducerFlowControl(boolean value) {
next.setProducerFlowControl(value);
}
- public void addProducer(ConnectionContext context, ProducerInfo info)
- throws Exception {
+ public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
+ next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
+ }
+
+ public long getBlockedProducerWarningInterval() {
+ return next.getBlockedProducerWarningInterval();
+ }
+
+ public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.addProducer(context, info);
-
+
}
- public void removeProducer(ConnectionContext context, ProducerInfo info)
- throws Exception {
- next.removeProducer(context, info);
+ public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
+ next.removeProducer(context, info);
}
public int getMaxAuditDepth() {
- return next.getMaxAuditDepth();
+ return next.getMaxAuditDepth();
}
public int getMaxProducersToAudit() {
@@ -157,20 +162,19 @@
public boolean isEnableAudit() {
return next.isEnableAudit();
}
-
+
public void setEnableAudit(boolean enableAudit) {
next.setEnableAudit(enableAudit);
}
public void setMaxAuditDepth(int maxAuditDepth) {
- next.setMaxAuditDepth(maxAuditDepth);
+ next.setMaxAuditDepth(maxAuditDepth);
}
-
public void setMaxProducersToAudit(int maxProducersToAudit) {
- next.setMaxProducersToAudit(maxProducersToAudit);
+ next.setMaxProducersToAudit(maxProducersToAudit);
}
-
+
public boolean isActive() {
return next.isActive();
}
@@ -189,88 +193,81 @@
public void setUseCache(boolean useCache) {
next.setUseCache(useCache);
- }
-
+ }
+
public int getMinimumMessageSize() {
return next.getMinimumMessageSize();
}
public void setMinimumMessageSize(int minimumMessageSize) {
next.setMinimumMessageSize(minimumMessageSize);
- }
-
+ }
+
public void wakeup() {
next.wakeup();
}
public boolean isLazyDispatch() {
- return next.isLazyDispatch();
+ return next.isLazyDispatch();
}
public void setLazyDispatch(boolean value) {
- next.setLazyDispatch(value);
+ next.setLazyDispatch(value);
}
public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
- next.messageExpired(context, prefetchSubscription, node);
+ next.messageExpired(context, prefetchSubscription, node);
}
- public boolean iterate() {
- return next.iterate();
- }
+ public boolean iterate() {
+ return next.iterate();
+ }
- public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
- next.fastProducer(context, producerInfo);
+ public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
+ next.fastProducer(context, producerInfo);
}
-
public void isFull(ConnectionContext context, Usage usage) {
- next.isFull(context, usage);
+ next.isFull(context, usage);
}
-
- public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
+ public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
next.messageConsumed(context, messageReference);
}
-
- public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
+ public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
next.messageDelivered(context, messageReference);
}
-
- public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
+ public void messageDiscarded(ConnectionContext context, MessageReference messageReference) {
next.messageDiscarded(context, messageReference);
}
-
public void slowConsumer(ConnectionContext context, Subscription subs) {
- next.slowConsumer(context, subs);
+ next.slowConsumer(context, subs);
}
-
- public void messageExpired(ConnectionContext context, Subscription subs,MessageReference node) {
- next.messageExpired(context,subs, node);
+ public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) {
+ next.messageExpired(context, subs, node);
}
public int getMaxBrowsePageSize() {
- return next.getMaxBrowsePageSize();
+ return next.getMaxBrowsePageSize();
}
public void setMaxBrowsePageSize(int maxPageSize) {
next.setMaxBrowsePageSize(maxPageSize);
}
- public void processDispatchNotification(
- MessageDispatchNotification messageDispatchNotification) throws Exception {
- next.processDispatchNotification(messageDispatchNotification);
+ public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
+ next.processDispatchNotification(messageDispatchNotification);
}
- public int getCursorMemoryHighWaterMark() {
- return next.getCursorMemoryHighWaterMark();
- }
-
- public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
- next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
- }
+ public int getCursorMemoryHighWaterMark() {
+ return next.getCursorMemoryHighWaterMark();
+ }
+
+ public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+ next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
+ }
}