You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/25 12:28:19 UTC
svn commit: r957881 [1/2] - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./
broker/jmx/ broker/region/ broker/region/cursors/ broker/region/policy/
broker/util/ command/ memory/list/ plugin/
Author: rajdavies
Date: Fri Jun 25 10:28:17 2010
New Revision: 957881
URL: http://svn.apache.org/viewvc?rev=957881&view=rev
Log:
changes for https://issues.apache.org/activemq/browse/AMQ-2791
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java Fri Jun 25 10:28:17 2010
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.activemq.command.MessageDispatch;
public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
- private static Integer MAX_PRIORITY = 10;
+ private static final Integer MAX_PRIORITY = 10;
private final Object mutex = new Object();
private final LinkedList<MessageDispatch>[] lists;
private boolean closed;
@@ -234,7 +234,7 @@ public class SimplePriorityMessageDispat
}
protected int getPriority(MessageDispatch message) {
- int priority = Message.DEFAULT_PRIORITY;
+ int priority = javax.jms.Message.DEFAULT_PRIORITY;
if (message.getMessage() != null) {
Math.max(message.getMessage().getPriority(), 0);
priority = Math.min(priority, 9);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Fri Jun 25 10:28:17 2010
@@ -16,19 +16,6 @@
*/
package org.apache.activemq.broker.jmx;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.selector.SelectorParser;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -48,6 +35,19 @@ import javax.management.openmbean.OpenDa
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public class DestinationView implements DestinationViewMBean {
private static final Log LOG = LogFactory.getLog(DestinationViewMBean.class);
@@ -126,6 +126,10 @@ public class DestinationView implements
public long getMinEnqueueTime() {
return destination.getDestinationStatistics().getProcessTime().getMinTime();
}
+
+ public boolean isPrioritizedMessages() {
+ return destination.isPrioritizedMessages();
+ }
public CompositeData[] browse() throws OpenDataException {
try {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Fri Jun 25 10:28:17 2010
@@ -16,16 +16,15 @@
*/
package org.apache.activemq.broker.jmx;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.io.IOException;
-
import javax.jms.InvalidSelectorException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
-import javax.management.ObjectName;
-import javax.management.MalformedObjectNameException;
public interface DestinationViewMBean {
@@ -314,6 +313,12 @@ public interface DestinationViewMBean {
public boolean isUseCache();
/**
+ * @return true if prioritized messages are enabled for the destination
+ */
+ @MBeanInfo("Prioritized messages is enabled")
+ public boolean isPrioritizedMessages();
+
+ /**
* @param value
* enable/disable caching on the destination
*/
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Jun 25 10:28:17 2010
@@ -81,6 +81,7 @@ public abstract class BaseDestination im
protected int cursorMemoryHighWaterMark = 70;
protected int storeUsageHighWaterMark = 100;
private SlowConsumerStrategy slowConsumerStrategy;
+ private boolean prioritizedMessages;
/**
* @param broker
@@ -580,5 +581,14 @@ public abstract class BaseDestination im
public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
this.slowConsumerStrategy = slowConsumerStrategy;
}
+
+
+ public boolean isPrioritizedMessages() {
+ return this.prioritizedMessages;
+ }
+
+ public void setPrioritizedMessages(boolean prioritizedMessages) {
+ this.prioritizedMessages = prioritizedMessages;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Fri Jun 25 10:28:17 2010
@@ -18,14 +18,12 @@ package org.apache.activemq.broker.regio
import java.io.IOException;
import java.util.List;
-
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
@@ -215,4 +213,6 @@ public interface Destination extends Ser
* @throws Exception
*/
void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
+
+ boolean isPrioritizedMessages();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Fri Jun 25 10:28:17 2010
@@ -19,7 +19,6 @@ package org.apache.activemq.broker.regio
import java.io.IOException;
import java.util.List;
import java.util.Set;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -39,7 +38,7 @@ import org.apache.activemq.usage.Usage;
*/
public class DestinationFilter implements Destination {
- private Destination next;
+ private final Destination next;
public DestinationFilter(Destination next) {
this.next = next;
@@ -270,4 +269,8 @@ public class DestinationFilter implement
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
}
+
+ public boolean isPrioritizedMessages() {
+ return next.isPrioritizedMessages();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java Fri Jun 25 10:28:17 2010
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.broker.region;
-import java.io.IOException;
-
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
@@ -33,7 +31,7 @@ public interface MessageReference {
MessageId getMessageId();
Message getMessageHardRef();
- Message getMessage() throws IOException;
+ Message getMessage();
boolean isPersistent();
Destination getRegionDestination();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java Fri Jun 25 10:28:17 2010
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.broker.region;
-import java.io.IOException;
-
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
@@ -28,7 +26,7 @@ import org.apache.activemq.command.Messa
*/
final class NullMessageReference implements QueueMessageReference {
- private ActiveMQMessage message = new ActiveMQMessage();
+ private final ActiveMQMessage message = new ActiveMQMessage();
private volatile int references;
public void drop() {
@@ -75,7 +73,7 @@ final class NullMessageReference impleme
return 0;
}
- public Message getMessage() throws IOException {
+ public Message getMessage() {
return message;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Jun 25 10:28:17 2010
@@ -78,7 +78,7 @@ public abstract class PrefetchSubscripti
}
public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
- this(broker,usageManager,context, info, new VMPendingMessageCursor());
+ this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jun 25 10:28:17 2010
@@ -236,7 +236,7 @@ public class Queue extends BaseDestinati
public void initialize() throws Exception {
if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) {
- this.messages = new VMPendingMessageCursor();
+ this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
} else {
this.messages = new StoreQueueCursor(broker, this);
}
@@ -951,38 +951,30 @@ public class Queue extends BaseDestinati
public Message getMessage(String id) {
MessageId msgId = new MessageId(id);
- try {
- synchronized (pagedInMessages) {
- QueueMessageReference r = this.pagedInMessages.get(msgId);
- if (r != null) {
- return r.getMessage();
- }
+ synchronized (pagedInMessages) {
+ QueueMessageReference r = this.pagedInMessages.get(msgId);
+ if (r != null) {
+ return r.getMessage();
}
- synchronized (messages) {
- try {
- messages.reset();
- while (messages.hasNext()) {
- try {
- MessageReference r = messages.next();
- r.decrementReferenceCount();
- messages.rollback(r.getMessageId());
- if (msgId.equals(r.getMessageId())) {
- Message m = r.getMessage();
- if (m != null) {
- return m;
- }
- break;
- }
- } catch (IOException e) {
- LOG.error("got an exception retrieving message " + id);
+ }
+ synchronized (messages) {
+ try {
+ messages.reset();
+ while (messages.hasNext()) {
+ MessageReference r = messages.next();
+ r.decrementReferenceCount();
+ messages.rollback(r.getMessageId());
+ if (msgId.equals(r.getMessageId())) {
+ Message m = r.getMessage();
+ if (m != null) {
+ return m;
}
+ break;
}
- } finally {
- messages.release();
}
+ } finally {
+ messages.release();
}
- } catch (IOException e) {
- LOG.error("got an exception retrieving message " + id);
}
return null;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java Fri Jun 25 10:28:17 2010
@@ -56,7 +56,7 @@ public class TempQueue extends Queue{
@Override
public void initialize() throws Exception {
- this.messages=new VMPendingMessageCursor();
+ this.messages=new VMPendingMessageCursor(false);
this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.systemUsage = brokerService.getSystemUsage();
memoryUsage.setParent(systemUsage.getMemoryUsage());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Jun 25 10:28:17 2010
@@ -72,9 +72,9 @@ public class TopicSubscription extends A
this.usageManager = usageManager;
String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
- this.matched = new VMPendingMessageCursor();
+ this.matched = new VMPendingMessageCursor(false);
} else {
- this.matched = new FilePendingMessageCursor(broker,matchedName);
+ this.matched = new FilePendingMessageCursor(broker,matchedName,false);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Fri Jun 25 10:28:17 2010
@@ -19,11 +19,14 @@ package org.apache.activemq.broker.regio
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import org.apache.activemq.ActiveMQMessageAudit;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
@@ -44,6 +47,11 @@ public abstract class AbstractPendingMes
protected boolean useCache=true;
private boolean started=false;
protected MessageReference last = null;
+ protected final boolean prioritizedMessages;
+
+ public AbstractPendingMessageCursor(boolean prioritizedMessages) {
+ this.prioritizedMessages=prioritizedMessages;
+ }
public synchronized void start() throws Exception {
@@ -304,4 +312,19 @@ public abstract class AbstractPendingMes
protected synchronized boolean isStarted() {
return started;
}
+
+ public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
+ boolean result = false;
+ Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
+ if (destinations != null) {
+ for (Destination dest:destinations) {
+ if (dest.isPrioritizedMessages()) {
+ result = true;
+ break;
+ }
+ }
+ }
+ return result;
+
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Fri Jun 25 10:28:17 2010
@@ -17,8 +17,6 @@
package org.apache.activemq.broker.region.cursors;
import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map.Entry;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
@@ -34,8 +32,8 @@ import org.apache.commons.logging.LogFac
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
protected final Destination regionDestination;
- private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
- private Iterator<Entry<MessageId, Message>> iterator = null;
+ private final PendingList batchList;
+ private Iterator<MessageReference> iterator = null;
private boolean cacheEnabled=false;
protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false;
@@ -43,10 +41,16 @@ public abstract class AbstractStoreCurso
private MessageId lastCachedId;
protected AbstractStoreCursor(Destination destination) {
+ super((destination != null ? destination.isPrioritizedMessages():false));
this.regionDestination=destination;
+ if (this.prioritizedMessages) {
+ this.batchList= new PrioritizedPendingList();
+ }else {
+ this.batchList = new OrderedPendingList();
+ }
}
- @Override
+
public final synchronized void start() throws Exception{
if (!isStarted()) {
super.start();
@@ -60,7 +64,7 @@ public abstract class AbstractStoreCurso
}
}
- @Override
+
public final synchronized void stop() throws Exception {
resetBatch();
super.stop();
@@ -82,7 +86,7 @@ public abstract class AbstractStoreCurso
}
}
message.incrementReferenceCount();
- batchList.put(message.getMessageId(), message);
+ batchList.addMessageLast(message);
clearIterator(true);
recovered = true;
} else {
@@ -99,7 +103,7 @@ public abstract class AbstractStoreCurso
return recovered;
}
- @Override
+
public final void reset() {
if (batchList.isEmpty()) {
try {
@@ -113,7 +117,7 @@ public abstract class AbstractStoreCurso
size();
}
- @Override
+
public synchronized void release() {
clearIterator(false);
}
@@ -129,7 +133,7 @@ public abstract class AbstractStoreCurso
private synchronized void ensureIterator() {
if(this.iterator==null) {
- this.iterator=this.batchList.entrySet().iterator();
+ this.iterator=this.batchList.iterator();
}
}
@@ -137,7 +141,7 @@ public abstract class AbstractStoreCurso
public final void finished() {
}
- @Override
+
public final synchronized boolean hasNext() {
if (batchList.isEmpty()) {
try {
@@ -151,11 +155,11 @@ public abstract class AbstractStoreCurso
return this.iterator.hasNext();
}
- @Override
+
public final synchronized MessageReference next() {
MessageReference result = null;
if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
- result = this.iterator.next().getValue();
+ result = this.iterator.next();
}
last = result;
if (result != null) {
@@ -164,7 +168,7 @@ public abstract class AbstractStoreCurso
return result;
}
- @Override
+
public final synchronized void addMessageLast(MessageReference node) throws Exception {
if (cacheEnabled && hasSpace()) {
recoverMessage(node.getMessage(),true);
@@ -189,13 +193,13 @@ public abstract class AbstractStoreCurso
protected void setBatch(MessageId messageId) throws Exception {
}
- @Override
+
public final synchronized void addMessageFirst(MessageReference node) throws Exception {
cacheEnabled=false;
size++;
}
- @Override
+
public final synchronized void remove() {
size--;
if (iterator!=null) {
@@ -212,21 +216,22 @@ public abstract class AbstractStoreCurso
}
}
- @Override
+
public final synchronized void remove(MessageReference node) {
size--;
cacheEnabled=false;
- batchList.remove(node.getMessageId());
+ batchList.remove(node);
}
- @Override
+
public final synchronized void clear() {
gc();
}
- @Override
+
public final synchronized void gc() {
- for (Message msg : batchList.values()) {
+ for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) {
+ MessageReference msg = i.next();
rollback(msg.getMessageId());
msg.decrementReferenceCount();
}
@@ -241,7 +246,7 @@ public abstract class AbstractStoreCurso
}
}
- @Override
+
protected final synchronized void fillBatch() {
if (batchResetNeeded) {
resetBatch();
@@ -261,18 +266,18 @@ public abstract class AbstractStoreCurso
}
}
- @Override
+
public final synchronized boolean isEmpty() {
// negative means more messages added to store through queue.send since last reset
return size == 0;
}
- @Override
+
public final synchronized boolean hasMessagesBufferedToDeliver() {
return !batchList.isEmpty();
}
- @Override
+
public final synchronized int size() {
if (size < 0) {
this.size = getStoreSize();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Fri Jun 25 10:28:17 2010
@@ -63,9 +63,11 @@ public class FilePendingMessageCursor ex
/**
* @param broker
* @param name
+ * @param prioritizedMessages
* @param store
*/
- public FilePendingMessageCursor(Broker broker, String name) {
+ public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
+ super(prioritizedMessages);
this.useCache = false;
this.broker = broker;
// the store can be null if the BrokerService has persistence
@@ -190,6 +192,7 @@ public class FilePendingMessageCursor ex
tryAddMessageLast(node, 0);
}
+ @Override
public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
if (!node.isExpired()) {
try {
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java?rev=957881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java Fri Jun 25 10:28:17 2010
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+
+public class OrderedPendingList implements PendingList {
+ PendingNode root = null;
+ PendingNode tail = null;
+ final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
+
+ public PendingNode addMessageFirst(MessageReference message) {
+ PendingNode node = new PendingNode(this, message);
+ if (root == null) {
+ root = node;
+ tail = node;
+ } else {
+ root.linkBefore(node);
+ }
+ this.map.put(message.getMessageId(), node);
+ return node;
+ }
+
+ public PendingNode addMessageLast(MessageReference message) {
+ PendingNode node = new PendingNode(this, message);
+ if (root == null) {
+ root = node;
+ } else {
+ tail.linkAfter(node);
+ }
+ tail = node;
+ this.map.put(message.getMessageId(), node);
+ return node;
+ }
+
+ public void clear() {
+ this.root = null;
+ this.tail = null;
+ this.map.clear();
+ }
+
+ public boolean isEmpty() {
+ return this.map.isEmpty();
+ }
+
+ public Iterator<MessageReference> iterator() {
+ return new Iterator<MessageReference>() {
+ private PendingNode current = null;
+ private PendingNode next = root;
+
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ public MessageReference next() {
+ MessageReference result = null;
+ this.current = this.next;
+ result = this.current.getMessage();
+ this.next = (PendingNode) this.next.getNext();
+ return result;
+ }
+
+ public void remove() {
+ if (this.current != null && this.current.getMessage() != null) {
+ map.remove(this.current.getMessage().getMessageId());
+ }
+ removeNode(this.current);
+ }
+ };
+ }
+
+ public void remove(MessageReference message) {
+ if (message != null) {
+ PendingNode node = this.map.remove(message.getMessageId());
+ removeNode(node);
+ }
+ }
+
+ public int size() {
+ return this.map.size();
+ }
+
+ void removeNode(PendingNode node) {
+ if (node != null) {
+ map.remove(node.getMessage().getMessageId());
+ if (root == node) {
+ root = (PendingNode) node.getNext();
+ }
+ if (tail == node) {
+ tail = (PendingNode) node.getPrevious();
+ }
+ node.unlink();
+ }
+ }
+
+ List<PendingNode> getAsList() {
+ List<PendingNode> result = new ArrayList<PendingNode>(size());
+ PendingNode node = root;
+ while (node != null) {
+ result.add(node);
+ node = (PendingNode) node.getNext();
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "OrderedPendingList(" + System.identityHashCode(this) + ")";
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java?rev=957881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java Fri Jun 25 10:28:17 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.Iterator;
+import org.apache.activemq.broker.region.MessageReference;
+
+public interface PendingList {
+
+ public boolean isEmpty();
+ public void clear();
+ public PendingNode addMessageFirst(MessageReference message);
+ public PendingNode addMessageLast(MessageReference message);
+ public void remove(MessageReference message);
+ public int size();
+ public Iterator<MessageReference> iterator();
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java?rev=957881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java Fri Jun 25 10:28:17 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.util.LinkedNode;
+
+public class PendingNode extends LinkedNode {
+ private final MessageReference message;
+ private final OrderedPendingList list;
+ public PendingNode(OrderedPendingList list,MessageReference message) {
+ this.list = list;
+ this.message = message;
+ }
+
+ MessageReference getMessage() {
+ return this.message;
+ }
+
+ OrderedPendingList getList() {
+ return this.list;
+ }
+
+ @Override
+ public String toString() {
+ PendingNode n = (PendingNode) getNext();
+ String str = "PendingNode(";
+ str += System.identityHashCode(this) + "),root="+isHeadNode()+",next="+(n != null ?System.identityHashCode(n):"NULL");
+ return str;
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java?rev=957881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java Fri Jun 25 10:28:17 2010
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+
+public class PrioritizedPendingList implements PendingList {
+ static final Integer MAX_PRIORITY = 10;
+ private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
+ final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
+
+ public PrioritizedPendingList() {
+ for (int i = 0; i < MAX_PRIORITY; i++) {
+ this.lists[i] = new OrderedPendingList();
+ }
+ }
+ public PendingNode addMessageFirst(MessageReference message) {
+ PendingNode node = getList(message).addMessageFirst(message);
+ this.map.put(message.getMessageId(), node);
+ return node;
+ }
+
+ public PendingNode addMessageLast(MessageReference message) {
+ PendingNode node = getList(message).addMessageLast(message);
+ this.map.put(message.getMessageId(), node);
+ return node;
+ }
+
+ public void clear() {
+ for (int i = 0; i < MAX_PRIORITY; i++) {
+ this.lists[i].clear();
+ }
+ this.map.clear();
+ }
+
+ public boolean isEmpty() {
+ return this.map.isEmpty();
+ }
+
+ public Iterator<MessageReference> iterator() {
+ return new PrioritizedPendingListIterator();
+ }
+
+ public void remove(MessageReference message) {
+ if (message != null) {
+ PendingNode node = this.map.remove(message.getMessageId());
+ if (node != null) {
+ node.getList().removeNode(node);
+ }
+ }
+ }
+
+ public int size() {
+ return this.map.size();
+ }
+
+ @Override
+ public String toString() {
+ return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
+ }
+
+ protected int getPriority(MessageReference message) {
+ int priority = javax.jms.Message.DEFAULT_PRIORITY;
+ if (message.getMessageId() != null) {
+ Math.max(message.getMessage().getPriority(), 0);
+ priority = Math.min(priority, 9);
+ }
+ return priority;
+ }
+
+ protected OrderedPendingList getList(MessageReference msg) {
+ return lists[getPriority(msg)];
+ }
+
+ private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
+ private int index = 0;
+ private int currentIndex = 0;
+ List<PendingNode> list = new ArrayList<PendingNode>(size());
+
+ PrioritizedPendingListIterator() {
+ for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+ OrderedPendingList orderedPendingList = lists[i];
+ if (!orderedPendingList.isEmpty()) {
+ list.addAll(orderedPendingList.getAsList());
+ }
+ }
+ }
+ public boolean hasNext() {
+ return list.size() > index;
+ }
+
+ public MessageReference next() {
+ PendingNode node = list.get(this.index);
+ this.currentIndex = this.index;
+ this.index++;
+ return node.getMessage();
+ }
+
+ public void remove() {
+ PendingNode node = list.get(this.currentIndex);
+ if (node != null) {
+ map.remove(node.getMessage().getMessageId());
+ node.getList().removeNode(node);
+ }
+
+ }
+
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Fri Jun 25 10:28:17 2010
@@ -21,7 +21,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
-
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@@ -58,13 +57,14 @@ public class StoreDurableSubscriberCurso
* @param subscription subscription for this cursor
*/
public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) {
+ super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
this.subscription=subscription;
this.clientId = clientId;
this.subscriberName = subscriberName;
if (broker.getBrokerService().isPersistent()) {
- this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName);
+ this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
}else {
- this.nonPersistent = new VMPendingMessageCursor();
+ this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
}
this.nonPersistent.setMaxBatchSize(maxBatchSize);
@@ -72,6 +72,7 @@ public class StoreDurableSubscriberCurso
this.storePrefetches.add(this.nonPersistent);
}
+ @Override
public synchronized void start() throws Exception {
if (!isStarted()) {
super.start();
@@ -82,6 +83,7 @@ public class StoreDurableSubscriberCurso
}
}
+ @Override
public synchronized void stop() throws Exception {
if (isStarted()) {
super.stop();
@@ -98,6 +100,7 @@ public class StoreDurableSubscriberCurso
* @param destination
* @throws Exception
*/
+ @Override
public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
@@ -122,6 +125,7 @@ public class StoreDurableSubscriberCurso
* @param destination
* @throws Exception
*/
+ @Override
public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
PendingMessageCursor tsp = topics.remove(destination);
if (tsp != null) {
@@ -133,6 +137,7 @@ public class StoreDurableSubscriberCurso
/**
* @return true if there are no pending messages
*/
+ @Override
public synchronized boolean isEmpty() {
for (PendingMessageCursor tsp : storePrefetches) {
if( !tsp.isEmpty() )
@@ -141,6 +146,7 @@ public class StoreDurableSubscriberCurso
return true;
}
+ @Override
public synchronized boolean isEmpty(Destination destination) {
boolean result = true;
TopicStorePrefetch tsp = topics.get(destination);
@@ -157,10 +163,12 @@ public class StoreDurableSubscriberCurso
* @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
* @return true if recovery required
*/
+ @Override
public boolean isRecoveryRequired() {
return false;
}
+ @Override
public synchronized void addMessageLast(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
@@ -179,16 +187,19 @@ public class StoreDurableSubscriberCurso
}
}
+ @Override
public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
nonPersistent.addMessageLast(node);
}
+ @Override
public synchronized void clear() {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.clear();
}
}
+ @Override
public synchronized boolean hasNext() {
boolean result = true;
if (result) {
@@ -203,35 +214,41 @@ public class StoreDurableSubscriberCurso
return result;
}
+ @Override
public synchronized MessageReference next() {
MessageReference result = currentCursor != null ? currentCursor.next() : null;
return result;
}
+ @Override
public synchronized void remove() {
if (currentCursor != null) {
currentCursor.remove();
}
}
+ @Override
public synchronized void remove(MessageReference node) {
if (currentCursor != null) {
currentCursor.remove(node);
}
}
+ @Override
public synchronized void reset() {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.reset();
}
}
+ @Override
public synchronized void release() {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.release();
}
}
+ @Override
public synchronized int size() {
int pendingCount=0;
for (PendingMessageCursor tsp : storePrefetches) {
@@ -240,6 +257,7 @@ public class StoreDurableSubscriberCurso
return pendingCount;
}
+ @Override
public void setMaxBatchSize(int maxBatchSize) {
for (PendingMessageCursor storePrefetch : storePrefetches) {
storePrefetch.setMaxBatchSize(maxBatchSize);
@@ -247,12 +265,14 @@ public class StoreDurableSubscriberCurso
super.setMaxBatchSize(maxBatchSize);
}
+ @Override
public synchronized void gc() {
for (PendingMessageCursor tsp : storePrefetches) {
tsp.gc();
}
}
+ @Override
public void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager);
for (PendingMessageCursor tsp : storePrefetches) {
@@ -260,6 +280,7 @@ public class StoreDurableSubscriberCurso
}
}
+ @Override
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
for (PendingMessageCursor cursor : storePrefetches) {
@@ -267,6 +288,7 @@ public class StoreDurableSubscriberCurso
}
}
+ @Override
public void setMaxProducersToAudit(int maxProducersToAudit) {
super.setMaxProducersToAudit(maxProducersToAudit);
for (PendingMessageCursor cursor : storePrefetches) {
@@ -274,6 +296,7 @@ public class StoreDurableSubscriberCurso
}
}
+ @Override
public void setMaxAuditDepth(int maxAuditDepth) {
super.setMaxAuditDepth(maxAuditDepth);
for (PendingMessageCursor cursor : storePrefetches) {
@@ -281,6 +304,7 @@ public class StoreDurableSubscriberCurso
}
}
+ @Override
public void setEnableAudit(boolean enableAudit) {
super.setEnableAudit(enableAudit);
for (PendingMessageCursor cursor : storePrefetches) {
@@ -288,6 +312,7 @@ public class StoreDurableSubscriberCurso
}
}
+ @Override
public void setUseCache(boolean useCache) {
super.setUseCache(useCache);
for (PendingMessageCursor cursor : storePrefetches) {
@@ -313,6 +338,7 @@ public class StoreDurableSubscriberCurso
return currentCursor;
}
+ @Override
public String toString() {
return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Fri Jun 25 10:28:17 2010
@@ -32,21 +32,21 @@ import org.apache.commons.logging.LogFac
public class StoreQueueCursor extends AbstractPendingMessageCursor {
private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class);
- private Broker broker;
+ private final Broker broker;
private int pendingCount;
- private Queue queue;
+ private final Queue queue;
private PendingMessageCursor nonPersistent;
- private QueueStorePrefetch persistent;
+ private final QueueStorePrefetch persistent;
private boolean started;
private PendingMessageCursor currentCursor;
/**
* Construct
- *
+ * @param broker
* @param queue
- * @param tmpStore
*/
public StoreQueueCursor(Broker broker,Queue queue) {
+ super((queue != null ? queue.isPrioritizedMessages():false));
this.broker=broker;
this.queue = queue;
this.persistent = new QueueStorePrefetch(queue);
@@ -58,9 +58,9 @@ public class StoreQueueCursor extends Ab
super.start();
if (nonPersistent == null) {
if (broker.getBrokerService().isPersistent()) {
- nonPersistent = new FilePendingMessageCursor(broker,queue.getName());
+ nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
}else {
- nonPersistent = new VMPendingMessageCursor();
+ nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
}
nonPersistent.setMaxBatchSize(getMaxBatchSize());
nonPersistent.setSystemUsage(systemUsage);
@@ -101,7 +101,7 @@ public class StoreQueueCursor extends Ab
}
}
}
-
+
public synchronized void addMessageFirst(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
@@ -240,6 +240,7 @@ public class StoreQueueCursor extends Ab
}
}
+ @Override
public void setUseCache(boolean useCache) {
super.setUseCache(useCache);
if (persistent != null) {
@@ -250,6 +251,7 @@ public class StoreQueueCursor extends Ab
}
}
+ @Override
public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
if (persistent != null) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Fri Jun 25 10:28:17 2010
@@ -32,13 +32,20 @@ import org.apache.activemq.broker.region
* @version $Revision$
*/
public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
- private final LinkedList<MessageReference> list = new LinkedList<MessageReference>();
+ private final PendingList list;
private Iterator<MessageReference> iter;
- public VMPendingMessageCursor() {
+
+ public VMPendingMessageCursor(boolean prioritizedMessages) {
+ super(prioritizedMessages);
this.useCache = false;
+ if (this.prioritizedMessages) {
+ this.list= new PrioritizedPendingList();
+ }else {
+ this.list = new OrderedPendingList();
+ }
}
- @Override
+
public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
throws Exception {
List<MessageReference> rc = new ArrayList<MessageReference>();
@@ -56,7 +63,7 @@ public class VMPendingMessageCursor exte
/**
* @return true if there are no pending messages
*/
- @Override
+
public synchronized boolean isEmpty() {
if (list.isEmpty()) {
return true;
@@ -79,9 +86,9 @@ public class VMPendingMessageCursor exte
/**
* reset the cursor
*/
- @Override
+
public synchronized void reset() {
- iter = list.listIterator();
+ iter = list.iterator();
last = null;
}
@@ -90,10 +97,10 @@ public class VMPendingMessageCursor exte
*
* @param node
*/
- @Override
+
public synchronized void addMessageLast(MessageReference node) {
node.incrementReferenceCount();
- list.addLast(node);
+ list.addMessageLast(node);
}
/**
@@ -102,16 +109,16 @@ public class VMPendingMessageCursor exte
* @param position
* @param node
*/
- @Override
+
public synchronized void addMessageFirst(MessageReference node) {
node.incrementReferenceCount();
- list.addFirst(node);
+ list.addMessageFirst(node);
}
/**
* @return true if there pending messages to dispatch
*/
- @Override
+
public synchronized boolean hasNext() {
return iter.hasNext();
}
@@ -119,7 +126,7 @@ public class VMPendingMessageCursor exte
/**
* @return the next pending message
*/
- @Override
+
public synchronized MessageReference next() {
last = iter.next();
if (last != null) {
@@ -131,7 +138,7 @@ public class VMPendingMessageCursor exte
/**
* remove the message at the cursor position
*/
- @Override
+
public synchronized void remove() {
if (last != null) {
last.decrementReferenceCount();
@@ -142,7 +149,7 @@ public class VMPendingMessageCursor exte
/**
* @return the number of pending messages
*/
- @Override
+
public synchronized int size() {
return list.size();
}
@@ -150,7 +157,7 @@ public class VMPendingMessageCursor exte
/**
* clear all pending messages
*/
- @Override
+
public synchronized void clear() {
for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
MessageReference ref = i.next();
@@ -159,16 +166,10 @@ public class VMPendingMessageCursor exte
list.clear();
}
- @Override
+
public synchronized void remove(MessageReference node) {
- for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
- MessageReference ref = i.next();
- if (node.getMessageId().equals(ref.getMessageId())) {
- ref.decrementReferenceCount();
- i.remove();
- break;
- }
- }
+ list.remove(node);
+ node.decrementReferenceCount();
}
/**
@@ -177,10 +178,11 @@ public class VMPendingMessageCursor exte
* @param maxItems
* @return a list of paged in messages
*/
- @Override
+
public LinkedList<MessageReference> pageInList(int maxItems) {
LinkedList<MessageReference> result = new LinkedList<MessageReference>();
- for (MessageReference ref: list) {
+ for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
+ MessageReference ref = i.next();
ref.incrementReferenceCount();
result.add(ref);
if (result.size() >= maxItems) {
@@ -190,12 +192,12 @@ public class VMPendingMessageCursor exte
return result;
}
- @Override
+
public boolean isTransient() {
return true;
}
- @Override
+
public void destroy() throws Exception {
super.destroy();
clear();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -42,6 +43,6 @@ public class FilePendingDurableSubscribe
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
- return new FilePendingMessageCursor(broker,name);
+ return new FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub));
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -39,7 +39,7 @@ public class FilePendingQueueMessageStor
* org.apache.activemq.kaha.Store)
*/
public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
- return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName());
+ return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName(),queue.isPrioritizedMessages());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -17,6 +17,8 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -31,14 +33,16 @@ import org.apache.activemq.broker.region
public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
/**
- * @param broker
+ * @param broker
* @param name
* @param maxBatchSize
* @return a Cursor
* @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
* org.apache.activemq.kaha.Store, int)
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
- return new FilePendingMessageCursor(broker,"PendingCursor:" + name);
+ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker, String name, int maxBatchSize,
+ Subscription subs) {
+ return new FilePendingMessageCursor(broker, "PendingCursor:" + name, AbstractPendingMessageCursor
+ .isPrioritizedMessageSubscriber(broker, subs));
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
/**
@@ -35,5 +36,5 @@ public interface PendingSubscriberMessag
* @param maxBatchSize
* @return the Pending Message cursor
*/
- PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize);
+ PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize,Subscription subs);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Jun 25 10:28:17 2010
@@ -87,6 +87,7 @@ public class PolicyEntry extends Destina
private int cursorMemoryHighWaterMark = 70;
private int storeUsageHighWaterMark = 100;
private SlowConsumerStrategy slowConsumerStrategy;
+ private boolean prioritizedMessages;
public void configure(Broker broker,Queue queue) {
@@ -155,6 +156,7 @@ public class PolicyEntry extends Destina
scs.setScheduler(broker.getScheduler());
}
destination.setSlowConsumerStrategy(scs);
+ destination.setPrioritizedMessages(isPrioritizedMessages());
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@@ -184,7 +186,7 @@ public class PolicyEntry extends Destina
if (pendingSubscriberPolicy != null) {
String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
- subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
+ subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription));
}
if (enableAudit) {
subscription.setEnableAudit(enableAudit);
@@ -739,5 +741,14 @@ public class PolicyEntry extends Destina
public SlowConsumerStrategy getSlowConsumerStrategy() {
return this.slowConsumerStrategy;
}
+
+
+ public boolean isPrioritizedMessages() {
+ return this.prioritizedMessages;
+ }
+
+ public void setPrioritizedMessages(boolean prioritizedMessages) {
+ this.prioritizedMessages = prioritizedMessages;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
@@ -40,6 +41,6 @@ public class VMPendingDurableSubscriberM
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) {
- return new VMPendingMessageCursor();
+ return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub));
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -37,6 +37,6 @@ public class VMPendingQueueMessageStorag
* @return the cursor
*/
public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
- return new VMPendingMessageCursor();
+ return new VMPendingMessageCursor(queue.isPrioritizedMessages());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -17,6 +17,8 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
@@ -38,7 +40,7 @@ public class VMPendingSubscriberMessageS
* @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
* org.apache.activemq.kaha.Store, int)
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
- return new VMPendingMessageCursor();
+ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize,Subscription subs) {
+ return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, subs));
}
}