You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2009/07/29 23:40:29 UTC

svn commit: r799090 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQPrefetchPolicy.java broker/region/AbstractSubscription.java broker/region/QueueRegion.java broker/region/policy/PolicyEntry.java

Author: rajdavies
Date: Wed Jul 29 21:40:29 2009
New Revision: 799090

URL: http://svn.apache.org/viewvc?rev=799090&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1846

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java?rev=799090&r1=799089&r2=799090&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java Wed Jul 29 21:40:29 2009
@@ -26,9 +26,17 @@
  * @org.apache.xbean.XBean element="prefetchPolicy"
  * @version $Revision: 1.3 $
  */
-public class ActiveMQPrefetchPolicy implements Serializable {
+public class ActiveMQPrefetchPolicy extends Object implements Serializable {
+    public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
+    public static final int DEFAULT_QUEUE_PREFETCH = 1000;
+    public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
+    public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
+    public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
+    public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
+    public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
+    
     private static final Log LOG = LogFactory.getLog(ActiveMQPrefetchPolicy.class);
-    private static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
+    
     private int queuePrefetch;
     private int queueBrowserPrefetch;
     private int topicPrefetch;
@@ -41,12 +49,12 @@
      * Initialize default prefetch policies
      */
     public ActiveMQPrefetchPolicy() {
-        this.queuePrefetch = 1000;
-        this.queueBrowserPrefetch = 500;
-        this.topicPrefetch = MAX_PREFETCH_SIZE;
-        this.durableTopicPrefetch = 100;
-        this.optimizeDurableTopicPrefetch = 1000;
-        this.inputStreamPrefetch = 100;
+        this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
+        this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
+        this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
+        this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
+        this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
+        this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
     }
 
     /**
@@ -156,5 +164,18 @@
     public void setInputStreamPrefetch(int inputStreamPrefetch) {
         this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
     }
+    
+    public boolean equals(Object object){
+        if (object instanceof ActiveMQPrefetchPolicy){
+            ActiveMQPrefetchPolicy other = (ActiveMQPrefetchPolicy) object;
+            return this.queuePrefetch == other.queuePrefetch &&
+            this.queueBrowserPrefetch == other.queueBrowserPrefetch &&
+            this.topicPrefetch == other.topicPrefetch &&
+            this.durableTopicPrefetch == other.durableTopicPrefetch &&
+            this.optimizeDurableTopicPrefetch == other.optimizeDurableTopicPrefetch &&
+            this.inputStreamPrefetch == other.inputStreamPrefetch;
+        }
+        return false;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=799090&r1=799089&r2=799090&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Wed Jul 29 21:40:29 2009
@@ -154,6 +154,9 @@
     public int getPrefetchSize() {
         return info.getPrefetchSize();
     }
+    public void setPrefetchSize(int newSize) {
+        info.setPrefetchSize(newSize);
+    }
 
     public boolean isRecoveryRequired() {
         return true;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=799090&r1=799089&r2=799090&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Wed Jul 29 21:40:29 2009
@@ -22,6 +22,7 @@
 import javax.jms.JMSException;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageDispatchNotification;
@@ -47,11 +48,24 @@
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
         throws JMSException {
-        
+        ActiveMQDestination destination = info.getDestination();
+        PolicyEntry entry = null;
+        if (destination != null && broker.getDestinationPolicy() != null) {
+            entry = broker.getDestinationPolicy().getEntryFor(destination);
+            
+        }
         if (info.isBrowser()) {
-            return new QueueBrowserSubscription(broker,usageManager, context, info);
+            QueueBrowserSubscription sub = new QueueBrowserSubscription(broker,usageManager, context, info);
+            if (entry != null) {
+                entry.configure(broker, usageManager, sub);
+            }
+            return sub;
         } else {
-            return new QueueSubscription(broker, usageManager,context, info);
+            QueueSubscription sub =   new QueueSubscription(broker, usageManager,context, info);
+            if (entry != null) {
+                entry.configure(broker, usageManager, sub);
+            }
+            return sub;
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=799090&r1=799089&r2=799090&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Jul 29 21:40:29 2009
@@ -16,11 +16,14 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueBrowserSubscription;
+import org.apache.activemq.broker.region.QueueSubscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -75,6 +78,11 @@
     private boolean advisoryForConsumed;
     private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
     private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
+    private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
+    private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
+    private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
+    private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
+    
    
     public void configure(Broker broker,Queue queue) {
         baseConfiguration(queue);
@@ -155,6 +163,11 @@
         }
         if (pendingSubscriberPolicy != null) {
             String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
+            //override prefetch size if not set by the Consumer
+            int prefetch=subscription.getConsumerInfo().getPrefetchSize();
+            if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH){
+                subscription.getConsumerInfo().setPrefetchSize(getTopicPrefetch());
+            }
             int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
             subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
         }
@@ -164,6 +177,11 @@
         String clientId = sub.getSubscriptionKey().getClientId();
         String subName = sub.getSubscriptionKey().getSubscriptionName();
         int prefetch = sub.getPrefetchSize();
+        //override prefetch size if not set by the Consumer
+        
+        if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){
+            sub.setPrefetchSize(getDurableTopicPrefetch());
+        }
         if (pendingDurableSubscriberPolicy != null) {
             PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,prefetch,sub);
             cursor.setSystemUsage(memoryManager);
@@ -172,6 +190,26 @@
         sub.setMaxAuditDepth(getMaxAuditDepth());
         sub.setMaxProducersToAudit(getMaxProducersToAudit());
     }
+    
+    public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) {
+       
+        int prefetch = sub.getPrefetchSize();
+        //override prefetch size if not set by the Consumer
+        
+        if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH){
+            sub.setPrefetchSize(getQueueBrowserPrefetch());
+        }
+    }
+    
+    public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
+        
+        int prefetch = sub.getPrefetchSize();
+        //override prefetch size if not set by the Consumer
+        
+        if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH){
+            sub.setPrefetchSize(getQueuePrefetch());
+        }
+    }
 
     // Properties
     // -------------------------------------------------------------------------
@@ -559,5 +597,70 @@
         return expireMessagesPeriod;
     }
 
+    /**
+     * Get the queuePrefetch
+     * @return the queuePrefetch
+     */
+    public int getQueuePrefetch() {
+        return this.queuePrefetch;
+    }
+
+    /**
+     * Set the queuePrefetch
+     * @param queuePrefetch the queuePrefetch to set
+     */
+    public void setQueuePrefetch(int queuePrefetch) {
+        this.queuePrefetch = queuePrefetch;
+    }
+
+    /**
+     * Get the queueBrowserPrefetch
+     * @return the queueBrowserPrefetch
+     */
+    public int getQueueBrowserPrefetch() {
+        return this.queueBrowserPrefetch;
+    }
+
+    /**
+     * Set the queueBrowserPrefetch
+     * @param queueBrowserPrefetch the queueBrowserPrefetch to set
+     */
+    public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
+        this.queueBrowserPrefetch = queueBrowserPrefetch;
+    }
+
+    /**
+     * Get the topicPrefetch
+     * @return the topicPrefetch
+     */
+    public int getTopicPrefetch() {
+        return this.topicPrefetch;
+    }
+
+    /**
+     * Set the topicPrefetch
+     * @param topicPrefetch the topicPrefetch to set
+     */
+    public void setTopicPrefetch(int topicPrefetch) {
+        this.topicPrefetch = topicPrefetch;
+    }
+
+    /**
+     * Get the durableTopicPrefetch
+     * @return the durableTopicPrefetch
+     */
+    public int getDurableTopicPrefetch() {
+        return this.durableTopicPrefetch;
+    }
+
+    /**
+     * Set the durableTopicPrefetch
+     * @param durableTopicPrefetch the durableTopicPrefetch to set
+     */
+    public void setDurableTopicPrefetch(int durableTopicPrefetch) {
+        this.durableTopicPrefetch = durableTopicPrefetch;
+    }
+
+
 
 }