You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/01/05 16:58:54 UTC

svn commit: r366208 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Queue.java policy/PolicyEntry.java

Author: jstrachan
Date: Thu Jan  5 07:58:49 2006
New Revision: 366208

URL: http://svn.apache.org/viewcvs?rev=366208&view=rev
Log:
enabled the hash bucket based implementation of MessageGroupMap by default and made the bucketCount configurable in the destination policy map

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=366208&r1=366207&r2=366208&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jan  5 07:58:49 2006
@@ -19,9 +19,9 @@
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.group.MessageGroupHashBucket;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.broker.region.group.MessageGroupSet;
-import org.apache.activemq.broker.region.group.SimpleMessageGroupMap;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
@@ -45,7 +45,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
 
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching
@@ -65,7 +64,8 @@
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
 
     private Subscription exclusiveOwner;
-    private final MessageGroupMap messageGroupOwners = new SimpleMessageGroupMap();
+    private MessageGroupMap messageGroupOwners;
+    private int messageGroupHashBucketCount = 1024;
 
     protected long garbageSize = 0;
     protected long garbageSizeBeforeCollection = 1000;
@@ -186,7 +186,7 @@
             }
 
             ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
-            MessageGroupSet ownedGroups = messageGroupOwners.removeConsumer(consumerId);
+            MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(consumerId);
 
             synchronized (messages) {
                 if (!sub.getConsumerInfo().isBrowser()) {
@@ -323,6 +323,9 @@
     }
 
     public MessageGroupMap getMessageGroupOwners() {
+        if (messageGroupOwners == null) {
+            messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount );
+        }
         return messageGroupOwners;
     }
 
@@ -341,6 +344,15 @@
     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
         this.deadLetterStrategy = deadLetterStrategy;
     }
+
+    public int getMessageGroupHashBucketCount() {
+        return messageGroupHashBucketCount;
+    }
+
+    public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
+        this.messageGroupHashBucketCount = messageGroupHashBucketCount;
+    }
+    
 
     // Implementation methods
     // -------------------------------------------------------------------------

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=366208&r1=366207&r2=366208&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Thu Jan  5 07:58:49 2006
@@ -34,6 +34,7 @@
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;
     private DeadLetterStrategy deadLetterStrategy;
+    private int messageGroupHashBucketCount = 1024;
 
     public void configure(Queue queue) {
         if (dispatchPolicy != null) {
@@ -42,6 +43,7 @@
         if (deadLetterStrategy != null) {
             queue.setDeadLetterStrategy(deadLetterStrategy);
         }
+        queue.setMessageGroupHashBucketCount(messageGroupHashBucketCount);
     }
 
     public void configure(Topic topic) {
@@ -96,6 +98,20 @@
      */
     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
         this.deadLetterStrategy = deadLetterStrategy;
+    }
+
+    public int getMessageGroupHashBucketCount() {
+        return messageGroupHashBucketCount;
+    }
+
+    /**
+     * Sets the number of hash buckets to use for the message group functionality. 
+     * This is only applicable to using message groups to parallelize processing of a queue
+     * while preserving order across an individual JMSXGroupID header value.
+     * This value sets the number of hash buckets that will be used (i.e. the maximum possible concurrency).
+     */
+    public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
+        this.messageGroupHashBucketCount = messageGroupHashBucketCount;
     }