You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/01/05 16:58:54 UTC
svn commit: r366208 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
Queue.java policy/PolicyEntry.java
Author: jstrachan
Date: Thu Jan 5 07:58:49 2006
New Revision: 366208
URL: http://svn.apache.org/viewcvs?rev=366208&view=rev
Log:
enabled the hash bucket based implementation of MessageGroupMap by default and made the bucketCount configurable in the destination policy map
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=366208&r1=366207&r2=366208&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jan 5 07:58:49 2006
@@ -19,9 +19,9 @@
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.group.MessageGroupHashBucket;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupSet;
-import org.apache.activemq.broker.region.group.SimpleMessageGroupMap;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
@@ -45,7 +45,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
@@ -65,7 +64,8 @@
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
private Subscription exclusiveOwner;
- private final MessageGroupMap messageGroupOwners = new SimpleMessageGroupMap();
+ private MessageGroupMap messageGroupOwners;
+ private int messageGroupHashBucketCount = 1024;
protected long garbageSize = 0;
protected long garbageSizeBeforeCollection = 1000;
@@ -186,7 +186,7 @@
}
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
- MessageGroupSet ownedGroups = messageGroupOwners.removeConsumer(consumerId);
+ MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(consumerId);
synchronized (messages) {
if (!sub.getConsumerInfo().isBrowser()) {
@@ -323,6 +323,9 @@
}
public MessageGroupMap getMessageGroupOwners() {
+ if (messageGroupOwners == null) {
+ messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount );
+ }
return messageGroupOwners;
}
@@ -341,6 +344,15 @@
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
+
+ public int getMessageGroupHashBucketCount() {
+ return messageGroupHashBucketCount;
+ }
+
+ public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
+ this.messageGroupHashBucketCount = messageGroupHashBucketCount;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=366208&r1=366207&r2=366208&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Thu Jan 5 07:58:49 2006
@@ -34,6 +34,7 @@
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy;
+ private int messageGroupHashBucketCount = 1024;
public void configure(Queue queue) {
if (dispatchPolicy != null) {
@@ -42,6 +43,7 @@
if (deadLetterStrategy != null) {
queue.setDeadLetterStrategy(deadLetterStrategy);
}
+ queue.setMessageGroupHashBucketCount(messageGroupHashBucketCount);
}
public void configure(Topic topic) {
@@ -96,6 +98,20 @@
*/
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
+ }
+
+ public int getMessageGroupHashBucketCount() {
+ return messageGroupHashBucketCount;
+ }
+
+ /**
+ * Sets the number of hash buckets to use for the message group functionality.
+ * This is only applicable to using message groups to parallelize processing of a queue
+ * while preserving order across an individual JMSXGroupID header value.
+ * This value sets the number of hash buckets that will be used (i.e. the maximum possible concurrency).
+ */
+ public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
+ this.messageGroupHashBucketCount = messageGroupHashBucketCount;
}