You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/07/29 23:40:29 UTC
svn commit: r799090 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
ActiveMQPrefetchPolicy.java broker/region/AbstractSubscription.java
broker/region/QueueRegion.java broker/region/policy/PolicyEntry.java
Author: rajdavies
Date: Wed Jul 29 21:40:29 2009
New Revision: 799090
URL: http://svn.apache.org/viewvc?rev=799090&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1846
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java?rev=799090&r1=799089&r2=799090&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java Wed Jul 29 21:40:29 2009
@@ -26,9 +26,17 @@
* @org.apache.xbean.XBean element="prefetchPolicy"
* @version $Revision: 1.3 $
*/
-public class ActiveMQPrefetchPolicy implements Serializable {
+public class ActiveMQPrefetchPolicy extends Object implements Serializable {
+ public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
+ public static final int DEFAULT_QUEUE_PREFETCH = 1000;
+ public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
+ public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
+ public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
+ public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
+ public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
+
private static final Log LOG = LogFactory.getLog(ActiveMQPrefetchPolicy.class);
- private static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
+
private int queuePrefetch;
private int queueBrowserPrefetch;
private int topicPrefetch;
@@ -41,12 +49,12 @@
* Initialize default prefetch policies
*/
public ActiveMQPrefetchPolicy() {
- this.queuePrefetch = 1000;
- this.queueBrowserPrefetch = 500;
- this.topicPrefetch = MAX_PREFETCH_SIZE;
- this.durableTopicPrefetch = 100;
- this.optimizeDurableTopicPrefetch = 1000;
- this.inputStreamPrefetch = 100;
+ this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
+ this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
+ this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
+ this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
+ this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
+ this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
}
/**
@@ -156,5 +164,18 @@
public void setInputStreamPrefetch(int inputStreamPrefetch) {
this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
}
+
+ public boolean equals(Object object){
+ if (object instanceof ActiveMQPrefetchPolicy){
+ ActiveMQPrefetchPolicy other = (ActiveMQPrefetchPolicy) object;
+ return this.queuePrefetch == other.queuePrefetch &&
+ this.queueBrowserPrefetch == other.queueBrowserPrefetch &&
+ this.topicPrefetch == other.topicPrefetch &&
+ this.durableTopicPrefetch == other.durableTopicPrefetch &&
+ this.optimizeDurableTopicPrefetch == other.optimizeDurableTopicPrefetch &&
+ this.inputStreamPrefetch == other.inputStreamPrefetch;
+ }
+ return false;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=799090&r1=799089&r2=799090&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Wed Jul 29 21:40:29 2009
@@ -154,6 +154,9 @@
public int getPrefetchSize() {
return info.getPrefetchSize();
}
+ public void setPrefetchSize(int newSize) {
+ info.setPrefetchSize(newSize);
+ }
public boolean isRecoveryRequired() {
return true;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=799090&r1=799089&r2=799090&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Wed Jul 29 21:40:29 2009
@@ -22,6 +22,7 @@
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageDispatchNotification;
@@ -47,11 +48,24 @@
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws JMSException {
-
+ ActiveMQDestination destination = info.getDestination();
+ PolicyEntry entry = null;
+ if (destination != null && broker.getDestinationPolicy() != null) {
+ entry = broker.getDestinationPolicy().getEntryFor(destination);
+
+ }
if (info.isBrowser()) {
- return new QueueBrowserSubscription(broker,usageManager, context, info);
+ QueueBrowserSubscription sub = new QueueBrowserSubscription(broker,usageManager, context, info);
+ if (entry != null) {
+ entry.configure(broker, usageManager, sub);
+ }
+ return sub;
} else {
- return new QueueSubscription(broker, usageManager,context, info);
+ QueueSubscription sub = new QueueSubscription(broker, usageManager,context, info);
+ if (entry != null) {
+ entry.configure(broker, usageManager, sub);
+ }
+ return sub;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=799090&r1=799089&r2=799090&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Jul 29 21:40:29 2009
@@ -16,11 +16,14 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueBrowserSubscription;
+import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -75,6 +78,11 @@
private boolean advisoryForConsumed;
private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
+ private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
+ private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
+ private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
+ private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
+
public void configure(Broker broker,Queue queue) {
baseConfiguration(queue);
@@ -155,6 +163,11 @@
}
if (pendingSubscriberPolicy != null) {
String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
+ //override prefetch size if not set by the Consumer
+ int prefetch=subscription.getConsumerInfo().getPrefetchSize();
+ if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH){
+ subscription.getConsumerInfo().setPrefetchSize(getTopicPrefetch());
+ }
int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
}
@@ -164,6 +177,11 @@
String clientId = sub.getSubscriptionKey().getClientId();
String subName = sub.getSubscriptionKey().getSubscriptionName();
int prefetch = sub.getPrefetchSize();
+ //override prefetch size if not set by the Consumer
+
+ if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){
+ sub.setPrefetchSize(getDurableTopicPrefetch());
+ }
if (pendingDurableSubscriberPolicy != null) {
PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,prefetch,sub);
cursor.setSystemUsage(memoryManager);
@@ -172,6 +190,26 @@
sub.setMaxAuditDepth(getMaxAuditDepth());
sub.setMaxProducersToAudit(getMaxProducersToAudit());
}
+
+ public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) {
+
+ int prefetch = sub.getPrefetchSize();
+ //override prefetch size if not set by the Consumer
+
+ if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH){
+ sub.setPrefetchSize(getQueueBrowserPrefetch());
+ }
+ }
+
+ public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
+
+ int prefetch = sub.getPrefetchSize();
+ //override prefetch size if not set by the Consumer
+
+ if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH){
+ sub.setPrefetchSize(getQueuePrefetch());
+ }
+ }
// Properties
// -------------------------------------------------------------------------
@@ -559,5 +597,70 @@
return expireMessagesPeriod;
}
+ /**
+ * Get the queuePrefetch
+ * @return the queuePrefetch
+ */
+ public int getQueuePrefetch() {
+ return this.queuePrefetch;
+ }
+
+ /**
+ * Set the queuePrefetch
+ * @param queuePrefetch the queuePrefetch to set
+ */
+ public void setQueuePrefetch(int queuePrefetch) {
+ this.queuePrefetch = queuePrefetch;
+ }
+
+ /**
+ * Get the queueBrowserPrefetch
+ * @return the queueBrowserPrefetch
+ */
+ public int getQueueBrowserPrefetch() {
+ return this.queueBrowserPrefetch;
+ }
+
+ /**
+ * Set the queueBrowserPrefetch
+ * @param queueBrowserPrefetch the queueBrowserPrefetch to set
+ */
+ public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
+ this.queueBrowserPrefetch = queueBrowserPrefetch;
+ }
+
+ /**
+ * Get the topicPrefetch
+ * @return the topicPrefetch
+ */
+ public int getTopicPrefetch() {
+ return this.topicPrefetch;
+ }
+
+ /**
+ * Set the topicPrefetch
+ * @param topicPrefetch the topicPrefetch to set
+ */
+ public void setTopicPrefetch(int topicPrefetch) {
+ this.topicPrefetch = topicPrefetch;
+ }
+
+ /**
+ * Get the durableTopicPrefetch
+ * @return the durableTopicPrefetch
+ */
+ public int getDurableTopicPrefetch() {
+ return this.durableTopicPrefetch;
+ }
+
+ /**
+ * Set the durableTopicPrefetch
+ * @param durableTopicPrefetch the durableTopicPrefetch to set
+ */
+ public void setDurableTopicPrefetch(int durableTopicPrefetch) {
+ this.durableTopicPrefetch = durableTopicPrefetch;
+ }
+
+
}