You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/02/18 10:38:17 UTC
svn commit: r628667 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
./ policy/
Author: rajdavies
Date: Mon Feb 18 01:38:10 2008
New Revision: 628667
URL: http://svn.apache.org/viewvc?rev=628667&view=rev
Log:
Change Queue dispatch model to reduce contention for lots of
consumers
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Mon Feb 18 01:38:10 2008
@@ -179,6 +179,17 @@
public ActiveMQDestination getActiveMQDestination() {
return info != null ? info.getDestination() : null;
}
+
+ public boolean isBrowser() {
+ return info != null && info.isBrowser();
+ }
+
+ public int getInFlightUsage() {
+ if (info.getPrefetchSize() > 0) {
+ return (getInFlightSize() * 100)/info.getPrefetchSize();
+ }
+ return Integer.MAX_VALUE;
+ }
protected void doAddRecoveredMessage(MessageReference message) throws Exception {
add(message);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Feb 18 01:38:10 2008
@@ -28,7 +28,6 @@
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
/**
* @version $Revision: 1.12 $
@@ -44,8 +43,6 @@
void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
-
- boolean lock(MessageReference node, LockOwner lockOwner);
void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Feb 18 01:38:10 2008
@@ -85,10 +85,6 @@
return next.getMemoryUsage();
}
- public boolean lock(MessageReference node, LockOwner lockOwner) {
- return next.lock(node, lockOwner);
- }
-
public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception {
next.removeSubscription(context, sub);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java Mon Feb 18 01:38:10 2008
@@ -36,6 +36,7 @@
protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
+ protected CountStatisticImpl inflight;
protected TimeStatisticImpl processTime;
public DestinationStatistics() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Feb 18 01:38:10 2008
@@ -72,7 +72,7 @@
return active;
}
- protected boolean isFull() {
+ public boolean isFull() {
return !active || super.isFull();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Mon Feb 18 01:38:10 2008
@@ -140,9 +140,6 @@
}
public boolean lock(LockOwner subscription) {
- if (!regionDestination.lock(this, subscription)) {
- return false;
- }
synchronized (this) {
if (dropped || (lockOwner != null && lockOwner != subscription)) {
return false;
@@ -152,8 +149,10 @@
}
}
- public synchronized void unlock() {
+ public synchronized boolean unlock() {
+ boolean result = lockOwner != null;
lockOwner = null;
+ return result;
}
public synchronized LockOwner getLockOwner() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java Mon Feb 18 01:38:10 2008
@@ -44,7 +44,7 @@
}
public boolean isDropped() {
- throw new RuntimeException("not implemented");
+ return false;
}
public boolean lock(LockOwner subscription) {
@@ -55,7 +55,8 @@
throw new RuntimeException("not implemented");
}
- public void unlock() {
+ public boolean unlock() {
+ return true;
}
public int decrementReferenceCount() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Feb 18 01:38:10 2008
@@ -360,13 +360,17 @@
protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
broker.sendToDeadLetterQueue(context, node);
}
-
+
+ public int getInFlightSize() {
+ return dispatched.size();
+ }
+
/**
* Used to determine if the broker can dispatch to the consumer.
*
* @return
*/
- protected boolean isFull() {
+ public boolean isFull() {
return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize();
}
@@ -603,6 +607,16 @@
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
+ }
+
+
+ public List<MessageReference> getInFlightMessages(){
+ List<MessageReference> result = new ArrayList<MessageReference>();
+ synchronized(pendingLock) {
+ result.addAll(dispatched);
+ result.addAll(pending.pageInList(1000));
+ }
+ return result;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Feb 18 01:38:10 2008
@@ -22,6 +22,9 @@
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
@@ -55,6 +58,7 @@
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.DeterministicTaskRunner;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -75,23 +79,23 @@
private final List<Subscription> consumers = new ArrayList<Subscription>(50);
private PendingMessageCursor messages;
private final LinkedHashMap<MessageId,MessageReference> pagedInMessages = new LinkedHashMap<MessageId,MessageReference>();
- private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
- private final Object exclusiveLockMutex = new Object();
private final Object sendLock = new Object();
+ private final ExecutorService executor;
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final ReentrantLock dispatchLock = new ReentrantLock();
+ private QueueDispatchSelector dispatchSelector;
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
wakeup();
}
};
-
- public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
+
+ public Queue(Broker broker, final ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
super(broker, store, destination,systemUsage, parentStats);
@@ -100,8 +104,31 @@
} else {
this.messages = new StoreQueueCursor(broker,this);
}
- this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName());
+
+ this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "QueueThread:"+destination);
+ thread.setDaemon(true);
+ thread.setPriority(Thread.NORM_PRIORITY);
+ return thread;
+ }
+ });
+
+ this.taskRunner = new DeterministicTaskRunner(this.executor,this);
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
+ this.dispatchSelector=new QueueDispatchSelector(destination);
+
+ }
+
+ /**
+ * @param queue
+ * @param string
+ * @param b
+ * @return
+ */
+ private TaskRunner DedicatedTaskRunner(Queue queue, String string, boolean b) {
+ // TODO Auto-generated method stub
+ return null;
}
public void initialize() throws Exception {
@@ -153,26 +180,7 @@
}
}
- /**
- * Lock a node
- *
- * @param node
- * @param lockOwner
- * @return true if can be locked
- * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference,
- * org.apache.activemq.broker.region.LockOwner)
- */
- public boolean lock(MessageReference node, LockOwner lockOwner) {
- synchronized (exclusiveLockMutex) {
- if (exclusiveOwner == lockOwner) {
- return true;
- }
- if (exclusiveOwner != null) {
- return false;
- }
- }
- return true;
- }
+
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
dispatchLock.lock();
@@ -185,54 +193,41 @@
synchronized (consumers) {
consumers.add(sub);
if (sub.getConsumerInfo().isExclusive()) {
- LockOwner owner = (LockOwner) sub;
- if (exclusiveOwner == null) {
- exclusiveOwner = owner;
- } else {
- // switch the owner if the priority is higher.
- if (owner.getLockPriority() > exclusiveOwner
- .getLockPriority()) {
- exclusiveOwner = owner;
- }
+ Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
+ if(exclusiveConsumer==null) {
+ exclusiveConsumer=sub;
+ }else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()){
+ exclusiveConsumer=sub;
}
+ dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
}
-
- // we hold the lock on the dispatchValue - so lets build the paged
- // in
- // list directly;
- doPageIn(false);
-
// synchronize with dispatch method so that no new messages are sent
// while
// setting up a subscription. avoid out of order messages,
// duplicates
// etc.
-
+ doPageIn(false);
msgContext.setDestination(destination);
synchronized (pagedInMessages) {
// Add all the matching messages in the queue to the
// subscription.
+
for (Iterator<MessageReference> i = pagedInMessages.values()
.iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i
.next();
- if (node.isDropped()
- || (!sub.getConsumerInfo().isBrowser() && node
- .getLockOwner() != null)) {
- continue;
- }
- try {
+ if (!node.isDropped() && !node.isAcked() && (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
msgContext.setMessageReference(node);
if (sub.matches(node, msgContext)) {
sub.add(node);
}
- } catch (IOException e) {
- log.warn("Could not load message: " + e, e);
}
}
+
}
- } finally {
+ wakeup();
+ }finally {
dispatchLock.unlock();
}
}
@@ -240,79 +235,60 @@
public void removeSubscription(ConnectionContext context, Subscription sub)
throws Exception {
destinationStatistics.getConsumers().decrement();
- // synchronize with dispatch method so that no new messages are sent
- // while
- // removing up a subscription.
- synchronized (consumers) {
- consumers.remove(sub);
- if (sub.getConsumerInfo().isExclusive()) {
- LockOwner owner = (LockOwner) sub;
- // Did we loose the exclusive owner??
- if (exclusiveOwner == owner) {
- // Find the exclusive consumer with the higest Lock
- // Priority.
- exclusiveOwner = null;
- for (Iterator<Subscription> iter = consumers.iterator(); iter
- .hasNext();) {
- Subscription s = iter.next();
- LockOwner so = (LockOwner) s;
- if (s.getConsumerInfo().isExclusive()
- && (exclusiveOwner == null || so
- .getLockPriority() > exclusiveOwner
- .getLockPriority())) {
- exclusiveOwner = so;
+ dispatchLock.lock();
+ try {
+ // synchronize with dispatch method so that no new messages are sent
+ // while
+ // removing up a subscription.
+ synchronized (consumers) {
+ consumers.remove(sub);
+ if (sub.getConsumerInfo().isExclusive()) {
+ Subscription exclusiveConsumer = dispatchSelector
+ .getExclusiveConsumer();
+ if (exclusiveConsumer == sub) {
+ exclusiveConsumer = null;
+ for (Subscription s : consumers) {
+ if (s.getConsumerInfo().isExclusive()
+ && (exclusiveConsumer == null
+ || s.getConsumerInfo().getPriority() > exclusiveConsumer
+ .getConsumerInfo().getPriority())) {
+ exclusiveConsumer = s;
+
+ }
}
+ dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
}
- }
- if (consumers.isEmpty()) {
- messages.gc();
- }
- }
- sub.remove(context, this);
- boolean wasExclusiveOwner = false;
- if (exclusiveOwner == sub) {
- exclusiveOwner = null;
- wasExclusiveOwner = true;
- }
- ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
- MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(
- consumerId);
- if (!sub.getConsumerInfo().isBrowser()) {
- MessageEvaluationContext msgContext = new MessageEvaluationContext();
+ ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
+ MessageGroupSet ownedGroups = getMessageGroupOwners()
+ .removeConsumer(consumerId);
+ // redeliver inflight messages
+ sub.remove(context, this);
- msgContext.setDestination(destination);
- // lets copy the messages to dispatch to avoid deadlock
- List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
- synchronized (pagedInMessages) {
- for (Iterator<MessageReference> i = pagedInMessages.values().iterator(); i
- .hasNext();) {
+ List<MessageReference> list = new ArrayList<MessageReference>();
+ for (Iterator<MessageReference> i = pagedInMessages.values()
+ .iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i
.next();
- if (node.isDropped()) {
- continue;
- }
- String groupID = node.getGroupID();
- // Re-deliver all messages that the sub locked
- if (node.getLockOwner() == sub
- || wasExclusiveOwner
- || (groupID != null && ownedGroups
- .contains(groupID))) {
- messagesToDispatch.add(node);
+ if (!node.isDropped() && !node.isAcked()
+ && node.getLockOwner() == sub) {
+ if (node.unlock()) {
+ node.incrementRedeliveryCounter();
+ list.add(node);
+ }
}
}
- }
- // now lets dispatch from the copy of the collection to
- // avoid deadlocks
- for (Iterator<QueueMessageReference> iter = messagesToDispatch
- .iterator(); iter.hasNext();) {
- QueueMessageReference node = iter.next();
- node.incrementRedeliveryCounter();
- node.unlock();
- msgContext.setMessageReference(node);
- dispatchPolicy.dispatch(node, msgContext, consumers);
+ if (list != null && !consumers.isEmpty()) {
+ doDispatch(list);
+ }
}
+ if (consumers.isEmpty()) {
+ messages.gc();
+ }
+ wakeup();
+ }finally {
+ dispatchLock.unlock();
}
}
@@ -523,6 +499,9 @@
if (taskRunner != null) {
taskRunner.shutdown();
}
+ if (this.executor != null) {
+ this.executor.shutdownNow();
+ }
if (messages != null) {
messages.stop();
}
@@ -677,11 +656,7 @@
for (MessageReference ref : list) {
try {
QueueMessageReference r = (QueueMessageReference) ref;
-
- // We should only delete messages that can be locked.
- if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
removeMessage(c,(IndirectMessageReference) r);
- }
} catch (IOException e) {
}
}
@@ -791,19 +766,16 @@
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
- // We should only copy messages that can be locked.
- if (lockMessage(r)) {
- r.incrementReferenceCount();
- try {
- Message m = r.getMessage();
- BrokerSupport.resend(context, m, dest);
- if (++movedCounter >= maximumMessages
- && maximumMessages > 0) {
- return movedCounter;
- }
- } finally {
- r.decrementReferenceCount();
+ r.incrementReferenceCount();
+ try {
+ Message m = r.getMessage();
+ BrokerSupport.resend(context, m, dest);
+ if (++movedCounter >= maximumMessages
+ && maximumMessages > 0) {
+ return movedCounter;
}
+ } finally {
+ r.decrementReferenceCount();
}
}
count++;
@@ -853,19 +825,17 @@
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
// We should only move messages that can be locked.
- if (lockMessage(r)) {
- r.incrementReferenceCount();
- try {
- Message m = r.getMessage();
- BrokerSupport.resend(context, m, dest);
- removeMessage(context, r);
- if (++movedCounter >= maximumMessages
- && maximumMessages > 0) {
- return movedCounter;
- }
- } finally {
- r.decrementReferenceCount();
+ r.incrementReferenceCount();
+ try {
+ Message m = r.getMessage();
+ BrokerSupport.resend(context, m, dest);
+ removeMessage(context, r);
+ if (++movedCounter >= maximumMessages
+ && maximumMessages > 0) {
+ return movedCounter;
}
+ } finally {
+ r.decrementReferenceCount();
}
}
count++;
@@ -885,7 +855,7 @@
}
if (result) {
try {
- pageInMessages(false);
+ pageInMessages(false);
} catch (Throwable e) {
log.error("Failed to page in more queue messages ", e);
@@ -895,7 +865,6 @@
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
- //must return false to prevent spinning
return false;
}
@@ -942,10 +911,7 @@
wakeup();
}
- protected boolean lockMessage(IndirectMessageReference r) {
- return r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER);
- }
-
+
protected ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext();
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
@@ -972,7 +938,8 @@
private List<MessageReference> doPageIn(boolean force) throws Exception {
List<MessageReference> result = null;
dispatchLock.lock();
- try {
+ try{
+
final int toPageIn = getMaxPageSize() - pagedInMessages.size();
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
@@ -1009,16 +976,48 @@
}
return result;
}
-
+
private void doDispatch(List<MessageReference> list) throws Exception {
-
- if (list != null && !list.isEmpty()) {
- MessageEvaluationContext msgContext = new MessageEvaluationContext();
- for (int i = 0; i < list.size(); i++) {
- MessageReference node = list.get(i);
- msgContext.setDestination(destination);
- msgContext.setMessageReference(node);
- dispatchPolicy.dispatch(node, msgContext, consumers);
+
+ if (list != null) {
+ synchronized (consumers) {
+ for (MessageReference node : list) {
+ Subscription target = null;
+ List<Subscription> targets = null;
+ for (Subscription s : consumers) {
+ if (dispatchSelector.canSelect(s, node)) {
+ if (!s.isFull()) {
+ s.add(node);
+ target = s;
+ break;
+ } else {
+ if (targets == null) {
+ targets = new ArrayList<Subscription>();
+ }
+ targets.add(s);
+ }
+ }
+ }
+ if (targets != null) {
+ // pick the least loaded to add the messag too
+
+ for (Subscription s : targets) {
+ if (target == null
+ || target.getInFlightUsage() > s
+ .getInFlightUsage()) {
+ target = s;
+ }
+ }
+ if (target != null) {
+ target.add(node);
+ }
+ }
+ if (target != null && !dispatchSelector.isExclusiveConsumer(target)) {
+ consumers.remove(target);
+ consumers.add(target);
+ }
+
+ }
}
}
}
@@ -1030,7 +1029,4 @@
private void pageInMessages(boolean force) throws Exception {
doDispatch(doPageIn(force));
}
-
-
-
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java?rev=628667&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java Mon Feb 18 01:38:10 2008
@@ -0,0 +1,115 @@
+/**
+ *
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.region.group.MessageGroupMap;
+import org.apache.activemq.broker.region.policy.SimpleDispatchSelector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Queue dispatch policy that determines if a message can be sent to a subscription
+ *
+ * @org.apache.xbean.XBean
+ * @version $Revision$
+ */
+public class QueueDispatchSelector extends SimpleDispatchSelector {
+ private static final Log LOG = LogFactory.getLog(QueueDispatchSelector.class);
+ private Subscription exclusiveConsumer;
+
+
+ /**
+ * @param destination
+ */
+ public QueueDispatchSelector(ActiveMQDestination destination) {
+ super(destination);
+ }
+
+ public Subscription getExclusiveConsumer() {
+ return exclusiveConsumer;
+ }
+ public void setExclusiveConsumer(Subscription exclusiveConsumer) {
+ this.exclusiveConsumer = exclusiveConsumer;
+ }
+
+ public boolean isExclusiveConsumer(Subscription s) {
+ return s == this.exclusiveConsumer;
+ }
+
+
+ public boolean canSelect(Subscription subscription,
+ MessageReference m) throws Exception {
+ if (subscription.isBrowser() && super.canDispatch(subscription, m)) {
+ return true;
+ }
+
+ boolean result = super.canDispatch(subscription, m) ;
+ if (result) {
+ result = exclusiveConsumer == null
+ || exclusiveConsumer == subscription;
+ if (result) {
+ QueueMessageReference node = (QueueMessageReference) m;
+ // Keep message groups together.
+ String groupId = node.getGroupID();
+ int sequence = node.getGroupSequence();
+ if (groupId != null) {
+ MessageGroupMap messageGroupOwners = ((Queue) node
+ .getRegionDestination()).getMessageGroupOwners();
+
+ // If we can own the first, then no-one else should own the
+ // rest.
+ if (sequence == 1) {
+ assignGroup(subscription, messageGroupOwners, node,groupId);
+ }else {
+
+ // Make sure that the previous owner is still valid, we may
+ // need to become the new owner.
+ ConsumerId groupOwner;
+
+ groupOwner = messageGroupOwners.get(groupId);
+ if (groupOwner == null) {
+ assignGroup(subscription, messageGroupOwners, node,groupId);
+ } else {
+ if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
+ // A group sequence < 1 is an end of group signal.
+ if (sequence < 0) {
+ messageGroupOwners.removeGroup(groupId);
+ }
+ } else {
+ result = false;
+ }
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ protected void assignGroup(Subscription subs,MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
+ messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
+ Message message = n.getMessage();
+ if (message instanceof ActiveMQMessage) {
+ ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+ try {
+ activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
+ } catch (JMSException e) {
+ LOG.warn("Failed to set boolean header: " + e, e);
+ }
+ }
+ }
+
+
+
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java Mon Feb 18 01:38:10 2008
@@ -36,7 +36,7 @@
boolean lock(LockOwner subscription);
- void unlock();
+ boolean unlock();
LockOwner getLockOwner();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Mon Feb 18 01:38:10 2008
@@ -17,13 +17,14 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -67,54 +68,13 @@
}
protected boolean canDispatch(MessageReference n) throws IOException {
+ boolean result = true;
QueueMessageReference node = (QueueMessageReference)n;
- if (node.isAcked()) {
- return false;
- }
- // Keep message groups together.
- String groupId = node.getGroupID();
- int sequence = node.getGroupSequence();
- if (groupId != null) {
- MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();
-
- // If we can own the first, then no-one else should own the rest.
- if (sequence == 1) {
- if (node.lock(this)) {
- assignGroupToMe(messageGroupOwners, n, groupId);
- return true;
- } else {
- return false;
- }
- }
-
- // Make sure that the previous owner is still valid, we may
- // need to become the new owner.
- ConsumerId groupOwner;
- synchronized (node) {
- groupOwner = messageGroupOwners.get(groupId);
- if (groupOwner == null) {
- if (node.lock(this)) {
- assignGroupToMe(messageGroupOwners, n, groupId);
- return true;
- } else {
- return false;
- }
- }
- }
-
- if (groupOwner.equals(info.getConsumerId())) {
- // A group sequence < 1 is an end of group signal.
- if (sequence < 0) {
- messageGroupOwners.removeGroup(groupId);
- }
- return true;
- }
-
- return false;
-
- } else {
- return node.lock(this);
+ if (node.isAcked() || node.isDropped()) {
+ result = false;
}
+ result = result && (isBrowser() || node.lock(this));
+ return result;
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Mon Feb 18 01:38:10 2008
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.util.List;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
@@ -38,6 +39,7 @@
/**
* Used to add messages that match the subscription.
* @param node
+ * @throws Exception
* @throws InterruptedException
* @throws IOException
*/
@@ -169,6 +171,11 @@
boolean isHighWaterMark();
/**
+ * @return true if there is no space to dispatch messages
+ */
+ boolean isFull();
+
+ /**
* inform the MessageConsumer on the client to change it's prefetch
* @param newPrefetch
*/
@@ -186,11 +193,33 @@
int getPrefetchSize();
/**
+ * @return the number of messages awaiting acknowledgement
+ */
+ int getInFlightSize();
+
+ /**
+ * @return the in flight messages as a percentage of the prefetch size
+ */
+ int getInFlightUsage();
+
+ /**
* Informs the Broker if the subscription needs to intervention to recover it's state
* e.g. DurableTopicSubscriber may do
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
boolean isRecoveryRequired();
+
+
+ /**
+ * @return true if a browser
+ */
+ boolean isBrowser();
+
+ /**
+ * Get the list of in flight messages
+ * @return list
+ */
+ List<MessageReference> getInFlightMessages();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Feb 18 01:38:10 2008
@@ -33,6 +33,7 @@
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
+import org.apache.activemq.broker.region.policy.SimpleDispatchSelector;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
@@ -555,8 +556,7 @@
protected void dispatch(final ConnectionContext context, Message message) throws Exception {
destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
- dispatchValve.increment();
- MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
+ dispatchValve.increment();
try {
if (!subscriptionRecoveryPolicy.add(context, message)) {
return;
@@ -567,7 +567,7 @@
return;
}
}
-
+ MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
msgContext.setDestination(destination);
msgContext.setMessageReference(message);
@@ -575,7 +575,6 @@
onMessageWithNoConsumers(context, message);
}
} finally {
- msgContext.clear();
dispatchValve.decrement();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Feb 18 01:38:10 2008
@@ -18,6 +18,8 @@
import java.io.IOException;
import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
@@ -37,7 +39,6 @@
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
-import org.apache.activemq.kaha.Store;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
@@ -51,8 +52,7 @@
protected PendingMessageCursor matched;
protected final SystemUsage usageManager;
protected AtomicLong dispatchedCounter = new AtomicLong();
- protected AtomicLong prefetchExtension = new AtomicLong();
-
+
boolean singleDestination = true;
Destination destination;
@@ -83,8 +83,7 @@
public void add(MessageReference node) throws Exception {
enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
- if (!isFull() && !isSlave()) {
- optimizePrefetch();
+ if (!isFull() && matched.isEmpty() && !isSlave()) {
// if maximumPendingMessages is set we will only discard messages
// which
// have not been dispatched (i.e. we allow the prefetch buffer to be
@@ -128,6 +127,7 @@
}
}
}
+ dispatchMatched();
}
}
}
@@ -177,20 +177,18 @@
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
- boolean wasFull = isFull();
if (ack.isStandardAck() || ack.isPoisonAck()) {
if (context.isInTransaction()) {
- prefetchExtension.addAndGet(ack.getMessageCount());
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
- synchronized (TopicSubscription.this) {
+ synchronized (TopicSubscription.this) {
if (singleDestination && destination != null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
}
dequeueCounter.addAndGet(ack.getMessageCount());
- prefetchExtension.addAndGet(ack.getMessageCount());
+ dispatchMatched();
}
});
} else {
@@ -198,19 +196,14 @@
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount());
- prefetchExtension.addAndGet(ack.getMessageCount());
- }
- if (wasFull && !isFull()) {
- dispatchMatched();
}
+ dispatchMatched();
return;
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
- prefetchExtension.addAndGet(ack.getMessageCount());
- if (wasFull && !isFull()) {
- dispatchMatched();
- }
+ dequeueCounter.addAndGet(ack.getMessageCount());
+ dispatchMatched();
return;
}
throw new JMSException("Invalid acknowledgment: " + ack);
@@ -287,22 +280,27 @@
// Implementation methods
// -------------------------------------------------------------------------
- private boolean isFull() {
- return getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize();
+ public boolean isFull() {
+ return getDispatchedQueueSize() >= info.getPrefetchSize();
}
-
+
+ public int getInFlightSize() {
+ return getDispatchedQueueSize();
+ }
+
+
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark() {
- return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
+ return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark() {
- return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
+ return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
}
/**
@@ -354,42 +352,30 @@
}
}
- /**
- * optimize message consumer prefetch if the consumer supports it
- */
- public void optimizePrefetch() {
- /*
- * if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
- * &&context.getConnection().isManageable()){
- * if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
- * isLowWaterMark()){
- * info.setCurrentPrefetchSize(info.getPrefetchSize());
- * updateConsumerPrefetch(info.getPrefetchSize()); }else
- * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() &&
- * isHighWaterMark()){ // want to purge any outstanding acks held by the
- * consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } }
- */
- }
-
- private void dispatchMatched() throws IOException {
+ private void dispatchMatched() throws IOException {
synchronized (matchedListMutex) {
- try {
- matched.reset();
- while (matched.hasNext() && !isFull()) {
- MessageReference message = (MessageReference)matched.next();
- matched.remove();
- // Message may have been sitting in the matched list a while
- // waiting for the consumer to ak the message.
- if (broker.isExpired(message)) {
- message.decrementReferenceCount();
- broker.messageExpired(getContext(), message);
- dequeueCounter.incrementAndGet();
- continue; // just drop it.
+ if (!matched.isEmpty() && !isFull()) {
+ try {
+ matched.reset();
+
+ while (matched.hasNext() && !isFull()) {
+ MessageReference message = (MessageReference) matched
+ .next();
+ matched.remove();
+ // Message may have been sitting in the matched list a
+ // while
+ // waiting for the consumer to ak the message.
+ if (broker.isExpired(message)) {
+ message.decrementReferenceCount();
+ broker.messageExpired(getContext(), message);
+ dequeueCounter.incrementAndGet();
+ continue; // just drop it.
+ }
+ dispatch(message);
}
- dispatch(message);
+ } finally {
+ matched.release();
}
- } finally {
- matched.release();
}
}
}
@@ -456,7 +442,15 @@
}
public int getPrefetchSize() {
- return (int)(info.getPrefetchSize() + prefetchExtension.get());
+ return (int)info.getPrefetchSize();
+ }
+
+ /**
+ * Get the list of inflight messages
+ * @return the list
+ */
+ public synchronized List<MessageReference> getInFlightMessages(){
+ return matched.pageInList(1000);
}
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java?rev=628667&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java Mon Feb 18 01:38:10 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+
+/**
+ * Determines if a subscription can dispatch a message reference
+ *
+ */
+public interface DispatchSelector {
+
+
+ /**
+ * return true if a subscription can dispatch a message reference
+ * @param subscription
+ * @param node
+ * @return true if can dispatch
+ * @throws Exception
+ */
+
+ boolean canDispatch(Subscription subscription, MessageReference node) throws Exception;
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java?rev=628667&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java Mon Feb 18 01:38:10 2008
@@ -0,0 +1,34 @@
+/**
+ *
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * Simple dispatch policy that determines if a message can be sent to a subscription
+ *
+ * @org.apache.xbean.XBean
+ * @version $Revision$
+ */
+public class SimpleDispatchSelector implements DispatchSelector {
+
+ private final ActiveMQDestination destination;
+
+ /**
+ * @param destination
+ */
+ public SimpleDispatchSelector(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ public boolean canDispatch(Subscription subscription, MessageReference node) throws Exception {
+ MessageEvaluationContext msgContext = new MessageEvaluationContext();
+ msgContext.setDestination(this.destination);
+ msgContext.setMessageReference(node);
+ return subscription.matches(node, msgContext);
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
------------------------------------------------------------------------------
svn:eol-style = native
Re: svn commit: r628667 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
./ policy/
Posted by David Sitsky <si...@nuix.com>.
Hi Rob,
I will for sure - I'll do some benchmarking in the next few days and get
back to you.
Cheers,
David
Rob Davies wrote:
> Hi David,
>
> please let us know if these changes helps/hinders your app!
>
> cheers,
>
> Rob
> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>
>>>> If what I said above is true, then the immediately above if
>>>> statement needs to be moved outside its enclosing if - otherwise it
>>>> only gets executed when targets != null. We'd want this to execute
>>>> if we found a matching target wouldn't we?
>>> Don't think so? We only want the message going to one subscription?
>>> I may have misunderstood what you mean!
>> Yes - ignore what I said, I had my wires crossed.
>>
>> Cheers,
>> David
>>
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902
Re: Queue performance from recent changes
Posted by David Sitsky <si...@nuix.com>.
David Sitsky wrote:
> With this change in effect - I see slightly faster or almost the same
> times with the previous benchmark. However memory usage on the broker
> is far better, as the pending queues for each consumer is either 0 or
> very small.
In case this wasn't clear - I meant with the change in effect, the
performance was similar, or slightly better than the old activemq based
on large pending queues, but the broker uses less memory / CPU.
Cheers,
David
Re: Queue performance from recent changes
Posted by David Sitsky <si...@nuix.com>.
James Strachan wrote:
> On 06/03/2008, David Sitsky <si...@nuix.com> wrote:
>> I am sure it will be application-dependent, so making it a policy makes
>> a lot of sense. For my application, I only have a pending size of 1
>> since each work item's processing requirements can vary tremendously.
>
> I wonder could the same code be smart enough to work in the 2
> different modes based on the prefetch size?
>
> i.e. use the default if the consumers's prefetch size is > 100 or
> something or use David's approach if its smaller
>
> If not then using destination policies sounds fine to me; just
> wondered if we could be smart enough to use the right policy based on
> the consumer configuration?
I think it is very much application-dependent - and it is based on more
than just the prefetch size.
In my situation, I may have 500,000 messages in my system that need to
be delivered, but I don't want them to be delivered to pending queues
unnecessarily, since it may be some time before the consumers have a
chance to eat them up. I also need a large queue page size since I
can't do a commit() after each message received. So I also have a lot
of uncommitted messages floating about the system - maybe 24,000 at a
given time.
I really need the requirement for only putting a message to a consumer's
pending queue when it can process it. Otherwise I found the pending
queues for each consumer would grow to be extremely large, consuming
unnecessary CPU and memory resources. With my changes, the broker's
usage was kept nice and small.
A lot of this may only occur for applications that have a large queue
page size.
It feels right for this to be a policy option... I know how complex
different messaging application performance requirements can be!
Cheers,
David
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902
Re: Queue performance from recent changes
Posted by James Strachan <ja...@gmail.com>.
On 06/03/2008, David Sitsky <si...@nuix.com> wrote:
> I am sure it will be application-dependent, so making it a policy makes
> a lot of sense. For my application, I only have a pending size of 1
> since each work item's processing requirements can vary tremendously.
I wonder could the same code be smart enough to work in the 2
different modes based on the prefetch size?
i.e. use the default if the consumers's prefetch size is > 100 or
something or use David's approach if its smaller
If not then using destination policies sounds fine to me; just
wondered if we could be smart enough to use the right policy based on
the consumer configuration?
--
James
-------
http://macstrac.blogspot.com/
Open Source Integration
http://open.iona.com
Re: Queue performance from recent changes
Posted by Rob Davies <ra...@gmail.com>.
Hi Dave,
there will still be some contention there - but as this has been
greatly reduced since the beginning of the year - I am hoping that it
will be more than countered by not having to go through the extra
thread for dispatching? Be interested in seeing if its better or worse
for your case.
cheers,
Rob
On 10 Mar 2008, at 06:07, David Sitsky wrote:
> Hi Rob,
>
> I'll give it a go in the next day or two when I get some spare time.
> From what I can see, we call iterate() inline rather than delegating
> it to the dedicated "wakeup" thread when optimizeDispatch is set.
>
> From memory, this caused contention with synchronisation blocks last
> time - as many consumers would end up calling iterate(). Would this
> still not be an issue? I must admit I'd need to check the code
> closely.
>
> Cheers,
> David
>
> Rob Davies wrote:
>> David,
>> you might like to try enabling the optimizeDispatch property on the
>> Destination policy map - see http://activemq.apache.org/configure-version-5-brokers.html
>> from trunk, if you are using non-persistent messages
>> cheers,
>> Rob
>> On 6 Mar 2008, at 22:48, David Sitsky wrote:
>>> I am sure it will be application-dependent, so making it a policy
>>> makes a lot of sense. For my application, I only have a pending
>>> size of 1 since each work item's processing requirements can vary
>>> tremendously.
>>>
>>> Just curious - what kind of benchmarks did you run this against?
>>> I'm curious to know what kind of performance degregation you saw..
>>> it would be interesting to understand why. I am using non-
>>> persistent messaging, so perhaps that could make a difference,
>>> since I am only paging a small number of messages in at a time.
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> Yes - actually - I tried it a few days ago. I haven't committed
>>>> it because message throughput is generally lower. I will look at
>>>> making it optional via a destination policy
>>>> cheers,
>>>> Rob
>>>> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>>>>> Hi Rob,
>>>>>
>>>>> I know its been a couple of weeks. I've been using my changes
>>>>> for a while and I see nice CPU and memory usage on the broker,
>>>>> and good messaging performance for my application. Have you had
>>>>> a chance to try it out?
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Rob Davies wrote:
>>>>>> Hi David,
>>>>>> thanks for the great feedback - will try your patch and see how
>>>>>> it works!
>>>>>> cheers,
>>>>>> Rob
>>>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>>>>> Hi Rob,
>>>>>>>
>>>>>>> I like the new changes, but with the changes as they are, for
>>>>>>> my application for one of my benchmarks, it takes twice as
>>>>>>> long to complete.
>>>>>>>
>>>>>>> I believe the culprit for this is that when the new code can't
>>>>>>> find a consumer which is not full, the broker chooses the
>>>>>>> consumer with the lowest dispatch queue size.
>>>>>>>
>>>>>>> In my application, since I have a prefetch size of 1, and have
>>>>>>> longish-running transactions, the dispatch queue size is not
>>>>>>> indicative of the current load for that consumer. As a
>>>>>>> result, I think this is what is responsible for poor load-
>>>>>>> balancing in my case.
>>>>>>>
>>>>>>> For applications which commit() after each processed message,
>>>>>>> I am sure this wouldn't be the case. In some ways, reverting
>>>>>>> to the old behaviour of adding the pending message to all
>>>>>>> consumers might lead to better load balancing with this code.
>>>>>>>
>>>>>>> However - I think it is better if the consumers can decide
>>>>>>> when they want more messages rather than the broker pushing
>>>>>>> messages at them? I've attached a patch which demonstrates
>>>>>>> this. When LAZY_DISPATCH is set to true (set via a system
>>>>>>> property for now for testing purposes) this changes the
>>>>>>> behaviour slightly.
>>>>>>>
>>>>>>> The basic idea is pageInMessages() only pages in the minimum
>>>>>>> number of messages that can be dispatched immediately to non-
>>>>>>> full consumers. Whenever a consumer acks a message, which
>>>>>>> updates its prefetch size, we make sure Queue.wakeup() is
>>>>>>> called so that the consumer will receive new messages.
>>>>>>>
>>>>>>> With this change in effect - I see slightly faster or almost
>>>>>>> the same times with the previous benchmark. However memory
>>>>>>> usage on the broker is far better, as the pending queues for
>>>>>>> each consumer is either 0 or very small.
>>>>>>>
>>>>>>> What do you think? I guess there are better ways of doing this.
>>>>>>>
>>>>>>> I am doing a large overnight run with 16 consumers, so we'll
>>>>>>> see how the performance goes.
>>>>>>>
>>>>>>> You'll also notice in the patch, that in
>>>>>>> Queue.addSubscriber(), I thought there didn't seem to be any
>>>>>>> need for adding a message to a new consumer if the message has
>>>>>>> already been locked by another consumer?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> David
>>>>>>>
>>>>>>> Rob Davies wrote:
>>>>>>>> Hi David,
>>>>>>>> please let us know if these changes helps/hinders your app!
>>>>>>>> cheers,
>>>>>>>> Rob
>>>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>>>>> If what I said above is true, then the immediately above
>>>>>>>>>>> if statement needs to be moved outside its enclosing if -
>>>>>>>>>>> otherwise it only gets executed when targets != null.
>>>>>>>>>>> We'd want this to execute if we found a matching target
>>>>>>>>>>> wouldn't we?
>>>>>>>>>> Don't think so? We only want the message going to one
>>>>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> David
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cheers,
>>>>>>> David
>>>>>>>
>>>>>>> Nuix Pty Ltd
>>>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2
>>>>>>> 9280 0699
>>>>>>> Web: http://www.nuix.com Fax: +61 2
>>>>>>> 9212 6902
>>>>>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/
>>>>>>> region/PrefetchSubscription.java
>>>>>>> =
>>>>>>> =
>>>>>>> =
>>>>>>> ================================================================
>>>>>>> --- activemq-core/src/main/java/org/apache/activemq/broker/
>>>>>>> region/PrefetchSubscription.java (revision 628917)
>>>>>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/
>>>>>>> region/PrefetchSubscription.java (working copy)
>>>>>>> @@ -160,6 +160,8 @@
>>>>>>> public void acknowledge(final ConnectionContext
>>>>>>> context,final MessageAck ack) throws Exception {
>>>>>>> // Handle the standard acknowledgment case.
>>>>>>> boolean callDispatchMatched = false;
>>>>>>> + Queue queue = null;
>>>>>>> + synchronized(dispatchLock) {
>>>>>>> if (ack.isStandardAck()) {
>>>>>>> // Acknowledge all dispatched messages up till
>>>>>>> the message id of
>>>>>>> @@ -223,8 +225,12 @@
>>>>>>> prefetchExtension = Math.max(0,
>>>>>>> prefetchExtension -
>>>>>>> (index + 1));
>>>>>>> }
>>>>>>> + if (queue == null)
>>>>>>> + {
>>>>>>> + queue = (Queue)node.getRegionDestination();
>>>>>>> + }
>>>>>>> callDispatchMatched = true;
>>>>>>> - break;
>>>>>>> + break;
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>> @@ -253,6 +259,10 @@
>>>>>>> if
>>>>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>>>> prefetchExtension =
>>>>>>> Math.max(prefetchExtension,
>>>>>>> index + 1);
>>>>>>> + if (queue == null)
>>>>>>> + {
>>>>>>> + queue =
>>>>>>> (Queue)node.getRegionDestination();
>>>>>>> + }
>>>>>>> callDispatchMatched = true;
>>>>>>> break;
>>>>>>> }
>>>>>>> @@ -279,6 +289,10 @@
>>>>>>> if (inAckRange) {
>>>>>>> node.incrementRedeliveryCounter();
>>>>>>> if
>>>>>>> (ack.getLastMessageId().equals(messageId)) {
>>>>>>> + if (queue == null)
>>>>>>> + {
>>>>>>> + queue = (Queue)node.getRegionDestination();
>>>>>>> + }
>>>>>>> callDispatchMatched = true;
>>>>>>> break;
>>>>>>> }
>>>>>>> @@ -320,6 +334,10 @@
>>>>>>> if
>>>>>>> (ack.getLastMessageId().equals(messageId)) {
>>>>>>> prefetchExtension = Math.max(0,
>>>>>>> prefetchExtension
>>>>>>> - (index + 1));
>>>>>>> + if (queue == null)
>>>>>>> + {
>>>>>>> + queue = (Queue)node.getRegionDestination();
>>>>>>> + }
>>>>>>> callDispatchMatched = true;
>>>>>>> break;
>>>>>>> }
>>>>>>> @@ -336,6 +354,9 @@
>>>>>>> }
>>>>>>> }
>>>>>>> if (callDispatchMatched) {
>>>>>>> + if (Queue.LAZY_DISPATCH) {
>>>>>>> + queue.wakeup();
>>>>>>> + }
>>>>>>> dispatchPending();
>>>>>>> } else {
>>>>>>> if (isSlave()) {
>>>>>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/
>>>>>>> region/Queue.java
>>>>>>> =
>>>>>>> =
>>>>>>> =
>>>>>>> ================================================================
>>>>>>> --- activemq-core/src/main/java/org/apache/activemq/broker/
>>>>>>> region/Queue.java (revision 628917)
>>>>>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/
>>>>>>> region/Queue.java (working copy)
>>>>>>> @@ -75,6 +75,8 @@
>>>>>>> * @version $Revision: 1.28 $
>>>>>>> */
>>>>>>> public class Queue extends BaseDestination implements Task {
>>>>>>> + public static final boolean LAZY_DISPATCH =
>>>>>>> +
>>>>>>> Boolean
>>>>>>> .parseBoolean(System.getProperty("activemq.lazy.dispatch",
>>>>>>> "true"));
>>>>>>> private final Log log;
>>>>>>> private final List<Subscription> consumers = new
>>>>>>> ArrayList<Subscription>(50);
>>>>>>> private PendingMessageCursor messages;
>>>>>>> @@ -212,12 +214,12 @@
>>>>>>> synchronized (pagedInMessages) {
>>>>>>> // Add all the matching messages in the queue to
>>>>>>> the
>>>>>>> // subscription.
>>>>>>> -
>>>>>>> for (Iterator<MessageReference> i =
>>>>>>> pagedInMessages.values()
>>>>>>> .iterator(); i.hasNext();) {
>>>>>>> QueueMessageReference node =
>>>>>>> (QueueMessageReference) i
>>>>>>> .next();
>>>>>>> - if (!node.isDropped() && !node.isAcked()
>>>>>>> && (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>>>>> + if ((!node.isDropped() ||
>>>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>>>>> + node.getLockOwner() == null) {
>>>>>>> msgContext.setMessageReference(node);
>>>>>>> if (sub.matches(node, msgContext)) {
>>>>>>> sub.add(node);
>>>>>>> @@ -940,7 +945,11 @@
>>>>>>> dispatchLock.lock();
>>>>>>> try{
>>>>>>>
>>>>>>> - final int toPageIn = getMaxPageSize() -
>>>>>>> pagedInMessages.size();
>>>>>>> + int toPageIn = getMaxPageSize() -
>>>>>>> pagedInMessages.size();
>>>>>>> + if (LAZY_DISPATCH) {
>>>>>>> + // Only page in the minimum number of messages which
>>>>>>> can be dispatched immediately.
>>>>>>> + toPageIn =
>>>>>>> Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
>>>>>>> + }
>>>>>>> if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>>>>> messages.setMaxBatchSize(toPageIn);
>>>>>>> int count = 0;
>>>>>>> @@ -976,12 +985,25 @@
>>>>>>> }
>>>>>>> return result;
>>>>>>> }
>>>>>>> +
>>>>>>> + private int getConsumerMessageCountBeforeFull() throws
>>>>>>> Exception {
>>>>>>> + int total = 0;
>>>>>>> + synchronized (consumers) {
>>>>>>> + for (Subscription s : consumers) {
>>>>>>> + if (s instanceof PrefetchSubscription) {
>>>>>>> + total +=
>>>>>>> ((PrefetchSubscription)s).countBeforeFull();
>>>>>>> + }
>>>>>>> + }
>>>>>>> + }
>>>>>>> + return total;
>>>>>>> + }
>>>>>>>
>>>>>>> private void doDispatch(List<MessageReference> list) throws
>>>>>>> Exception {
>>>>>>>
>>>>>>> if (list != null) {
>>>>>>> synchronized (consumers) {
>>>>>>> for (MessageReference node : list) {
>>>>>>> +
>>>>>>> Subscription target = null;
>>>>>>> List<Subscription> targets = null;
>>>>>>> for (Subscription s : consumers) {
>>>>>
>>>>>
>>>>> --
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Nuix Pty Ltd
>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2
>>>>> 9280 0699
>>>>> Web: http://www.nuix.com Fax: +61 2
>>>>> 9212 6902
>>>
>>>
>>> --
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2
>>> 9280 0699
>>> Web: http://www.nuix.com Fax: +61 2
>>> 9212 6902
>
>
> --
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
> 0699
> Web: http://www.nuix.com Fax: +61 2 9212
> 6902
Re: Queue performance from recent changes
Posted by David Sitsky <si...@nuix.com>.
Hi Rob,
I'll give it a go in the next day or two when I get some spare time.
From what I can see, we call iterate() inline rather than delegating it
to the dedicated "wakeup" thread when optimizeDispatch is set.
From memory, this caused contention with synchronisation blocks last
time - as many consumers would end up calling iterate(). Would this
still not be an issue? I must admit I'd need to check the code closely.
Cheers,
David
Rob Davies wrote:
> David,
>
> you might like to try enabling the optimizeDispatch property on the
> Destination policy map - see
> http://activemq.apache.org/configure-version-5-brokers.html from trunk,
> if you are using non-persistent messages
>
> cheers,
>
> Rob
> On 6 Mar 2008, at 22:48, David Sitsky wrote:
>
>> I am sure it will be application-dependent, so making it a policy
>> makes a lot of sense. For my application, I only have a pending size
>> of 1 since each work item's processing requirements can vary
>> tremendously.
>>
>> Just curious - what kind of benchmarks did you run this against? I'm
>> curious to know what kind of performance degregation you saw.. it
>> would be interesting to understand why. I am using non-persistent
>> messaging, so perhaps that could make a difference, since I am only
>> paging a small number of messages in at a time.
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> Yes - actually - I tried it a few days ago. I haven't committed it
>>> because message throughput is generally lower. I will look at making
>>> it optional via a destination policy
>>> cheers,
>>> Rob
>>> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>>>> Hi Rob,
>>>>
>>>> I know its been a couple of weeks. I've been using my changes for a
>>>> while and I see nice CPU and memory usage on the broker, and good
>>>> messaging performance for my application. Have you had a chance to
>>>> try it out?
>>>>
>>>> Cheers,
>>>> David
>>>>
>>>> Rob Davies wrote:
>>>>> Hi David,
>>>>> thanks for the great feedback - will try your patch and see how it
>>>>> works!
>>>>> cheers,
>>>>> Rob
>>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>>>> Hi Rob,
>>>>>>
>>>>>> I like the new changes, but with the changes as they are, for my
>>>>>> application for one of my benchmarks, it takes twice as long to
>>>>>> complete.
>>>>>>
>>>>>> I believe the culprit for this is that when the new code can't
>>>>>> find a consumer which is not full, the broker chooses the consumer
>>>>>> with the lowest dispatch queue size.
>>>>>>
>>>>>> In my application, since I have a prefetch size of 1, and have
>>>>>> longish-running transactions, the dispatch queue size is not
>>>>>> indicative of the current load for that consumer. As a result, I
>>>>>> think this is what is responsible for poor load-balancing in my case.
>>>>>>
>>>>>> For applications which commit() after each processed message, I am
>>>>>> sure this wouldn't be the case. In some ways, reverting to the
>>>>>> old behaviour of adding the pending message to all consumers might
>>>>>> lead to better load balancing with this code.
>>>>>>
>>>>>> However - I think it is better if the consumers can decide when
>>>>>> they want more messages rather than the broker pushing messages at
>>>>>> them? I've attached a patch which demonstrates this. When
>>>>>> LAZY_DISPATCH is set to true (set via a system property for now
>>>>>> for testing purposes) this changes the behaviour slightly.
>>>>>>
>>>>>> The basic idea is pageInMessages() only pages in the minimum
>>>>>> number of messages that can be dispatched immediately to non-full
>>>>>> consumers. Whenever a consumer acks a message, which updates its
>>>>>> prefetch size, we make sure Queue.wakeup() is called so that the
>>>>>> consumer will receive new messages.
>>>>>>
>>>>>> With this change in effect - I see slightly faster or almost the
>>>>>> same times with the previous benchmark. However memory usage on
>>>>>> the broker is far better, as the pending queues for each consumer
>>>>>> is either 0 or very small.
>>>>>>
>>>>>> What do you think? I guess there are better ways of doing this.
>>>>>>
>>>>>> I am doing a large overnight run with 16 consumers, so we'll see
>>>>>> how the performance goes.
>>>>>>
>>>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I
>>>>>> thought there didn't seem to be any need for adding a message to a
>>>>>> new consumer if the message has already been locked by another
>>>>>> consumer?
>>>>>>
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>>> Rob Davies wrote:
>>>>>>> Hi David,
>>>>>>> please let us know if these changes helps/hinders your app!
>>>>>>> cheers,
>>>>>>> Rob
>>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>>>> If what I said above is true, then the immediately above if
>>>>>>>>>> statement needs to be moved outside its enclosing if -
>>>>>>>>>> otherwise it only gets executed when targets != null. We'd
>>>>>>>>>> want this to execute if we found a matching target wouldn't we?
>>>>>>>>> Don't think so? We only want the message going to one
>>>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> David
>>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>>> Nuix Pty Ltd
>>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2
>>>>>> 9280 0699
>>>>>> Web: http://www.nuix.com Fax: +61 2
>>>>>> 9212 6902
>>>>>> Index:
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>>>>
>>>>>> ===================================================================
>>>>>> ---
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>>>> (revision 628917)
>>>>>> +++
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>>>> (working copy)
>>>>>> @@ -160,6 +160,8 @@
>>>>>> public void acknowledge(final ConnectionContext context,final
>>>>>> MessageAck ack) throws Exception {
>>>>>> // Handle the standard acknowledgment case.
>>>>>> boolean callDispatchMatched = false;
>>>>>> + Queue queue = null;
>>>>>> + synchronized(dispatchLock) {
>>>>>> if (ack.isStandardAck()) {
>>>>>> // Acknowledge all dispatched messages up till the
>>>>>> message id of
>>>>>> @@ -223,8 +225,12 @@
>>>>>> prefetchExtension = Math.max(0,
>>>>>> prefetchExtension - (index +
>>>>>> 1));
>>>>>> }
>>>>>> + if (queue == null)
>>>>>> + {
>>>>>> + queue = (Queue)node.getRegionDestination();
>>>>>> + }
>>>>>> callDispatchMatched = true;
>>>>>> - break;
>>>>>> + break;
>>>>>> }
>>>>>> }
>>>>>> }
>>>>>> @@ -253,6 +259,10 @@
>>>>>> if
>>>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>>> prefetchExtension = Math.max(prefetchExtension,
>>>>>> index + 1);
>>>>>> + if (queue == null)
>>>>>> + {
>>>>>> + queue =
>>>>>> (Queue)node.getRegionDestination();
>>>>>> + }
>>>>>> callDispatchMatched = true;
>>>>>> break;
>>>>>> }
>>>>>> @@ -279,6 +289,10 @@
>>>>>> if (inAckRange) {
>>>>>> node.incrementRedeliveryCounter();
>>>>>> if (ack.getLastMessageId().equals(messageId)) {
>>>>>> + if (queue == null)
>>>>>> + {
>>>>>> + queue = (Queue)node.getRegionDestination();
>>>>>> + }
>>>>>> callDispatchMatched = true;
>>>>>> break;
>>>>>> }
>>>>>> @@ -320,6 +334,10 @@
>>>>>> if (ack.getLastMessageId().equals(messageId)) {
>>>>>> prefetchExtension = Math.max(0,
>>>>>> prefetchExtension
>>>>>> - (index + 1));
>>>>>> + if (queue == null)
>>>>>> + {
>>>>>> + queue = (Queue)node.getRegionDestination();
>>>>>> + }
>>>>>> callDispatchMatched = true;
>>>>>> break;
>>>>>> }
>>>>>> @@ -336,6 +354,9 @@
>>>>>> }
>>>>>> }
>>>>>> if (callDispatchMatched) {
>>>>>> + if (Queue.LAZY_DISPATCH) {
>>>>>> + queue.wakeup();
>>>>>> + }
>>>>>> dispatchPending();
>>>>>> } else {
>>>>>> if (isSlave()) {
>>>>>> Index:
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>>>>
>>>>>> ===================================================================
>>>>>> ---
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>>>> (revision 628917)
>>>>>> +++
>>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>>>> (working copy)
>>>>>> @@ -75,6 +75,8 @@
>>>>>> * @version $Revision: 1.28 $
>>>>>> */
>>>>>> public class Queue extends BaseDestination implements Task {
>>>>>> + public static final boolean LAZY_DISPATCH =
>>>>>> +
>>>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",
>>>>>> "true"));
>>>>>> private final Log log;
>>>>>> private final List<Subscription> consumers = new
>>>>>> ArrayList<Subscription>(50);
>>>>>> private PendingMessageCursor messages;
>>>>>> @@ -212,12 +214,12 @@
>>>>>> synchronized (pagedInMessages) {
>>>>>> // Add all the matching messages in the queue to the
>>>>>> // subscription.
>>>>>> -
>>>>>> for (Iterator<MessageReference> i =
>>>>>> pagedInMessages.values()
>>>>>> .iterator(); i.hasNext();) {
>>>>>> QueueMessageReference node =
>>>>>> (QueueMessageReference) i
>>>>>> .next();
>>>>>> - if (!node.isDropped() && !node.isAcked() &&
>>>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>>>> + if ((!node.isDropped() ||
>>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>>>> + node.getLockOwner() == null) {
>>>>>> msgContext.setMessageReference(node);
>>>>>> if (sub.matches(node, msgContext)) {
>>>>>> sub.add(node);
>>>>>> @@ -940,7 +945,11 @@
>>>>>> dispatchLock.lock();
>>>>>> try{
>>>>>>
>>>>>> - final int toPageIn = getMaxPageSize() -
>>>>>> pagedInMessages.size();
>>>>>> + int toPageIn = getMaxPageSize() -
>>>>>> pagedInMessages.size();
>>>>>> + if (LAZY_DISPATCH) {
>>>>>> + // Only page in the minimum number of messages which can
>>>>>> be dispatched immediately.
>>>>>> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(),
>>>>>> toPageIn);
>>>>>> + }
>>>>>> if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>>>> messages.setMaxBatchSize(toPageIn);
>>>>>> int count = 0;
>>>>>> @@ -976,12 +985,25 @@
>>>>>> }
>>>>>> return result;
>>>>>> }
>>>>>> +
>>>>>> + private int getConsumerMessageCountBeforeFull() throws
>>>>>> Exception {
>>>>>> + int total = 0;
>>>>>> + synchronized (consumers) {
>>>>>> + for (Subscription s : consumers) {
>>>>>> + if (s instanceof PrefetchSubscription) {
>>>>>> + total += ((PrefetchSubscription)s).countBeforeFull();
>>>>>> + }
>>>>>> + }
>>>>>> + }
>>>>>> + return total;
>>>>>> + }
>>>>>>
>>>>>> private void doDispatch(List<MessageReference> list) throws
>>>>>> Exception {
>>>>>>
>>>>>> if (list != null) {
>>>>>> synchronized (consumers) {
>>>>>> for (MessageReference node : list) {
>>>>>> +
>>>>>> Subscription target = null;
>>>>>> List<Subscription> targets = null;
>>>>>> for (Subscription s : consumers) {
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> David
>>>>
>>>> Nuix Pty Ltd
>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
>>>> 0699
>>>> Web: http://www.nuix.com Fax: +61 2 9212
>>>> 6902
>>
>>
>> --
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
>> Web: http://www.nuix.com Fax: +61 2 9212 6902
>
>
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902
Re: Queue performance from recent changes
Posted by Rob Davies <ra...@gmail.com>.
David,
you might like to try enabling the optimizeDispatch property on the
Destination policy map - see http://activemq.apache.org/configure-version-5-brokers.html
from trunk, if you are using non-persistent messages
cheers,
Rob
On 6 Mar 2008, at 22:48, David Sitsky wrote:
> I am sure it will be application-dependent, so making it a policy
> makes a lot of sense. For my application, I only have a pending
> size of 1 since each work item's processing requirements can vary
> tremendously.
>
> Just curious - what kind of benchmarks did you run this against?
> I'm curious to know what kind of performance degregation you saw..
> it would be interesting to understand why. I am using non-
> persistent messaging, so perhaps that could make a difference, since
> I am only paging a small number of messages in at a time.
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>> Yes - actually - I tried it a few days ago. I haven't committed it
>> because message throughput is generally lower. I will look at
>> making it optional via a destination policy
>> cheers,
>> Rob
>> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>>> Hi Rob,
>>>
>>> I know its been a couple of weeks. I've been using my changes for
>>> a while and I see nice CPU and memory usage on the broker, and
>>> good messaging performance for my application. Have you had a
>>> chance to try it out?
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> thanks for the great feedback - will try your patch and see how
>>>> it works!
>>>> cheers,
>>>> Rob
>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>>> Hi Rob,
>>>>>
>>>>> I like the new changes, but with the changes as they are, for my
>>>>> application for one of my benchmarks, it takes twice as long to
>>>>> complete.
>>>>>
>>>>> I believe the culprit for this is that when the new code can't
>>>>> find a consumer which is not full, the broker chooses the
>>>>> consumer with the lowest dispatch queue size.
>>>>>
>>>>> In my application, since I have a prefetch size of 1, and have
>>>>> longish-running transactions, the dispatch queue size is not
>>>>> indicative of the current load for that consumer. As a result,
>>>>> I think this is what is responsible for poor load-balancing in
>>>>> my case.
>>>>>
>>>>> For applications which commit() after each processed message, I
>>>>> am sure this wouldn't be the case. In some ways, reverting to
>>>>> the old behaviour of adding the pending message to all consumers
>>>>> might lead to better load balancing with this code.
>>>>>
>>>>> However - I think it is better if the consumers can decide when
>>>>> they want more messages rather than the broker pushing messages
>>>>> at them? I've attached a patch which demonstrates this. When
>>>>> LAZY_DISPATCH is set to true (set via a system property for now
>>>>> for testing purposes) this changes the behaviour slightly.
>>>>>
>>>>> The basic idea is pageInMessages() only pages in the minimum
>>>>> number of messages that can be dispatched immediately to non-
>>>>> full consumers. Whenever a consumer acks a message, which
>>>>> updates its prefetch size, we make sure Queue.wakeup() is called
>>>>> so that the consumer will receive new messages.
>>>>>
>>>>> With this change in effect - I see slightly faster or almost the
>>>>> same times with the previous benchmark. However memory usage on
>>>>> the broker is far better, as the pending queues for each
>>>>> consumer is either 0 or very small.
>>>>>
>>>>> What do you think? I guess there are better ways of doing this.
>>>>>
>>>>> I am doing a large overnight run with 16 consumers, so we'll see
>>>>> how the performance goes.
>>>>>
>>>>> You'll also notice in the patch, that in Queue.addSubscriber(),
>>>>> I thought there didn't seem to be any need for adding a message
>>>>> to a new consumer if the message has already been locked by
>>>>> another consumer?
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Rob Davies wrote:
>>>>>> Hi David,
>>>>>> please let us know if these changes helps/hinders your app!
>>>>>> cheers,
>>>>>> Rob
>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>>> If what I said above is true, then the immediately above if
>>>>>>>>> statement needs to be moved outside its enclosing if -
>>>>>>>>> otherwise it only gets executed when targets != null. We'd
>>>>>>>>> want this to execute if we found a matching target wouldn't
>>>>>>>>> we?
>>>>>>>> Don't think so? We only want the message going to one
>>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> David
>>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Nuix Pty Ltd
>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2
>>>>> 9280 0699
>>>>> Web: http://www.nuix.com Fax: +61 2
>>>>> 9212 6902
>>>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/
>>>>> region/PrefetchSubscription.java
>>>>> =
>>>>> ==================================================================
>>>>> --- activemq-core/src/main/java/org/apache/activemq/broker/
>>>>> region/PrefetchSubscription.java (revision 628917)
>>>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/
>>>>> region/PrefetchSubscription.java (working copy)
>>>>> @@ -160,6 +160,8 @@
>>>>> public void acknowledge(final ConnectionContext context,final
>>>>> MessageAck ack) throws Exception {
>>>>> // Handle the standard acknowledgment case.
>>>>> boolean callDispatchMatched = false;
>>>>> + Queue queue = null;
>>>>> + synchronized(dispatchLock) {
>>>>> if (ack.isStandardAck()) {
>>>>> // Acknowledge all dispatched messages up till the
>>>>> message id of
>>>>> @@ -223,8 +225,12 @@
>>>>> prefetchExtension = Math.max(0,
>>>>> prefetchExtension - (index
>>>>> + 1));
>>>>> }
>>>>> + if (queue == null)
>>>>> + {
>>>>> + queue = (Queue)node.getRegionDestination();
>>>>> + }
>>>>> callDispatchMatched = true;
>>>>> - break;
>>>>> + break;
>>>>> }
>>>>> }
>>>>> }
>>>>> @@ -253,6 +259,10 @@
>>>>> if
>>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>> prefetchExtension =
>>>>> Math.max(prefetchExtension,
>>>>> index + 1);
>>>>> + if (queue == null)
>>>>> + {
>>>>> + queue =
>>>>> (Queue)node.getRegionDestination();
>>>>> + }
>>>>> callDispatchMatched = true;
>>>>> break;
>>>>> }
>>>>> @@ -279,6 +289,10 @@
>>>>> if (inAckRange) {
>>>>> node.incrementRedeliveryCounter();
>>>>> if
>>>>> (ack.getLastMessageId().equals(messageId)) {
>>>>> + if (queue == null)
>>>>> + {
>>>>> + queue = (Queue)node.getRegionDestination();
>>>>> + }
>>>>> callDispatchMatched = true;
>>>>> break;
>>>>> }
>>>>> @@ -320,6 +334,10 @@
>>>>> if
>>>>> (ack.getLastMessageId().equals(messageId)) {
>>>>> prefetchExtension = Math.max(0,
>>>>> prefetchExtension
>>>>> - (index + 1));
>>>>> + if (queue == null)
>>>>> + {
>>>>> + queue = (Queue)node.getRegionDestination();
>>>>> + }
>>>>> callDispatchMatched = true;
>>>>> break;
>>>>> }
>>>>> @@ -336,6 +354,9 @@
>>>>> }
>>>>> }
>>>>> if (callDispatchMatched) {
>>>>> + if (Queue.LAZY_DISPATCH) {
>>>>> + queue.wakeup();
>>>>> + }
>>>>> dispatchPending();
>>>>> } else {
>>>>> if (isSlave()) {
>>>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/
>>>>> region/Queue.java
>>>>> =
>>>>> ==================================================================
>>>>> --- activemq-core/src/main/java/org/apache/activemq/broker/
>>>>> region/Queue.java (revision 628917)
>>>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/
>>>>> region/Queue.java (working copy)
>>>>> @@ -75,6 +75,8 @@
>>>>> * @version $Revision: 1.28 $
>>>>> */
>>>>> public class Queue extends BaseDestination implements Task {
>>>>> + public static final boolean LAZY_DISPATCH =
>>>>> +
>>>>> Boolean
>>>>> .parseBoolean(System.getProperty("activemq.lazy.dispatch",
>>>>> "true"));
>>>>> private final Log log;
>>>>> private final List<Subscription> consumers = new
>>>>> ArrayList<Subscription>(50);
>>>>> private PendingMessageCursor messages;
>>>>> @@ -212,12 +214,12 @@
>>>>> synchronized (pagedInMessages) {
>>>>> // Add all the matching messages in the queue to the
>>>>> // subscription.
>>>>> -
>>>>> for (Iterator<MessageReference> i =
>>>>> pagedInMessages.values()
>>>>> .iterator(); i.hasNext();) {
>>>>> QueueMessageReference node =
>>>>> (QueueMessageReference) i
>>>>> .next();
>>>>> - if (!node.isDropped() && !node.isAcked() &&
>>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>>> + if ((!node.isDropped() ||
>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>>> + node.getLockOwner() == null) {
>>>>> msgContext.setMessageReference(node);
>>>>> if (sub.matches(node, msgContext)) {
>>>>> sub.add(node);
>>>>> @@ -940,7 +945,11 @@
>>>>> dispatchLock.lock();
>>>>> try{
>>>>>
>>>>> - final int toPageIn = getMaxPageSize() -
>>>>> pagedInMessages.size();
>>>>> + int toPageIn = getMaxPageSize() -
>>>>> pagedInMessages.size();
>>>>> + if (LAZY_DISPATCH) {
>>>>> + // Only page in the minimum number of messages which
>>>>> can be dispatched immediately.
>>>>> + toPageIn =
>>>>> Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
>>>>> + }
>>>>> if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>>> messages.setMaxBatchSize(toPageIn);
>>>>> int count = 0;
>>>>> @@ -976,12 +985,25 @@
>>>>> }
>>>>> return result;
>>>>> }
>>>>> +
>>>>> + private int getConsumerMessageCountBeforeFull() throws
>>>>> Exception {
>>>>> + int total = 0;
>>>>> + synchronized (consumers) {
>>>>> + for (Subscription s : consumers) {
>>>>> + if (s instanceof PrefetchSubscription) {
>>>>> + total += ((PrefetchSubscription)s).countBeforeFull();
>>>>> + }
>>>>> + }
>>>>> + }
>>>>> + return total;
>>>>> + }
>>>>>
>>>>> private void doDispatch(List<MessageReference> list) throws
>>>>> Exception {
>>>>>
>>>>> if (list != null) {
>>>>> synchronized (consumers) {
>>>>> for (MessageReference node : list) {
>>>>> +
>>>>> Subscription target = null;
>>>>> List<Subscription> targets = null;
>>>>> for (Subscription s : consumers) {
>>>
>>>
>>> --
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2
>>> 9280 0699
>>> Web: http://www.nuix.com Fax: +61 2
>>> 9212 6902
>
>
> --
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
> 0699
> Web: http://www.nuix.com Fax: +61 2 9212
> 6902
Re: Queue performance from recent changes
Posted by David Sitsky <si...@nuix.com>.
I am sure it will be application-dependent, so making it a policy makes
a lot of sense. For my application, I only have a pending size of 1
since each work item's processing requirements can vary tremendously.
Just curious - what kind of benchmarks did you run this against? I'm
curious to know what kind of performance degregation you saw.. it would
be interesting to understand why. I am using non-persistent messaging,
so perhaps that could make a difference, since I am only paging a small
number of messages in at a time.
Cheers,
David
Rob Davies wrote:
> Hi David,
>
> Yes - actually - I tried it a few days ago. I haven't committed it
> because message throughput is generally lower. I will look at making it
> optional via a destination policy
>
> cheers,
>
> Rob
> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>
>> Hi Rob,
>>
>> I know its been a couple of weeks. I've been using my changes for a
>> while and I see nice CPU and memory usage on the broker, and good
>> messaging performance for my application. Have you had a chance to
>> try it out?
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> thanks for the great feedback - will try your patch and see how it
>>> works!
>>> cheers,
>>> Rob
>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>> Hi Rob,
>>>>
>>>> I like the new changes, but with the changes as they are, for my
>>>> application for one of my benchmarks, it takes twice as long to
>>>> complete.
>>>>
>>>> I believe the culprit for this is that when the new code can't find
>>>> a consumer which is not full, the broker chooses the consumer with
>>>> the lowest dispatch queue size.
>>>>
>>>> In my application, since I have a prefetch size of 1, and have
>>>> longish-running transactions, the dispatch queue size is not
>>>> indicative of the current load for that consumer. As a result, I
>>>> think this is what is responsible for poor load-balancing in my case.
>>>>
>>>> For applications which commit() after each processed message, I am
>>>> sure this wouldn't be the case. In some ways, reverting to the old
>>>> behaviour of adding the pending message to all consumers might lead
>>>> to better load balancing with this code.
>>>>
>>>> However - I think it is better if the consumers can decide when they
>>>> want more messages rather than the broker pushing messages at them?
>>>> I've attached a patch which demonstrates this. When LAZY_DISPATCH
>>>> is set to true (set via a system property for now for testing
>>>> purposes) this changes the behaviour slightly.
>>>>
>>>> The basic idea is pageInMessages() only pages in the minimum number
>>>> of messages that can be dispatched immediately to non-full
>>>> consumers. Whenever a consumer acks a message, which updates its
>>>> prefetch size, we make sure Queue.wakeup() is called so that the
>>>> consumer will receive new messages.
>>>>
>>>> With this change in effect - I see slightly faster or almost the
>>>> same times with the previous benchmark. However memory usage on the
>>>> broker is far better, as the pending queues for each consumer is
>>>> either 0 or very small.
>>>>
>>>> What do you think? I guess there are better ways of doing this.
>>>>
>>>> I am doing a large overnight run with 16 consumers, so we'll see how
>>>> the performance goes.
>>>>
>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I
>>>> thought there didn't seem to be any need for adding a message to a
>>>> new consumer if the message has already been locked by another
>>>> consumer?
>>>>
>>>> Cheers,
>>>> David
>>>>
>>>> Rob Davies wrote:
>>>>> Hi David,
>>>>> please let us know if these changes helps/hinders your app!
>>>>> cheers,
>>>>> Rob
>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>> If what I said above is true, then the immediately above if
>>>>>>>> statement needs to be moved outside its enclosing if - otherwise
>>>>>>>> it only gets executed when targets != null. We'd want this to
>>>>>>>> execute if we found a matching target wouldn't we?
>>>>>>> Don't think so? We only want the message going to one
>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> David
>>>>
>>>> Nuix Pty Ltd
>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
>>>> 0699
>>>> Web: http://www.nuix.com Fax: +61 2 9212
>>>> 6902
>>>> Index:
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>>
>>>> ===================================================================
>>>> ---
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>> (revision 628917)
>>>> +++
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>> (working copy)
>>>> @@ -160,6 +160,8 @@
>>>> public void acknowledge(final ConnectionContext context,final
>>>> MessageAck ack) throws Exception {
>>>> // Handle the standard acknowledgment case.
>>>> boolean callDispatchMatched = false;
>>>> + Queue queue = null;
>>>> + synchronized(dispatchLock) {
>>>> if (ack.isStandardAck()) {
>>>> // Acknowledge all dispatched messages up till the
>>>> message id of
>>>> @@ -223,8 +225,12 @@
>>>> prefetchExtension = Math.max(0,
>>>> prefetchExtension - (index +
>>>> 1));
>>>> }
>>>> + if (queue == null)
>>>> + {
>>>> + queue = (Queue)node.getRegionDestination();
>>>> + }
>>>> callDispatchMatched = true;
>>>> - break;
>>>> + break;
>>>> }
>>>> }
>>>> }
>>>> @@ -253,6 +259,10 @@
>>>> if
>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>> prefetchExtension = Math.max(prefetchExtension,
>>>> index + 1);
>>>> + if (queue == null)
>>>> + {
>>>> + queue =
>>>> (Queue)node.getRegionDestination();
>>>> + }
>>>> callDispatchMatched = true;
>>>> break;
>>>> }
>>>> @@ -279,6 +289,10 @@
>>>> if (inAckRange) {
>>>> node.incrementRedeliveryCounter();
>>>> if (ack.getLastMessageId().equals(messageId)) {
>>>> + if (queue == null)
>>>> + {
>>>> + queue = (Queue)node.getRegionDestination();
>>>> + }
>>>> callDispatchMatched = true;
>>>> break;
>>>> }
>>>> @@ -320,6 +334,10 @@
>>>> if (ack.getLastMessageId().equals(messageId)) {
>>>> prefetchExtension = Math.max(0,
>>>> prefetchExtension
>>>> - (index + 1));
>>>> + if (queue == null)
>>>> + {
>>>> + queue = (Queue)node.getRegionDestination();
>>>> + }
>>>> callDispatchMatched = true;
>>>> break;
>>>> }
>>>> @@ -336,6 +354,9 @@
>>>> }
>>>> }
>>>> if (callDispatchMatched) {
>>>> + if (Queue.LAZY_DISPATCH) {
>>>> + queue.wakeup();
>>>> + }
>>>> dispatchPending();
>>>> } else {
>>>> if (isSlave()) {
>>>> Index:
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>>
>>>> ===================================================================
>>>> ---
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>> (revision 628917)
>>>> +++
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>> (working copy)
>>>> @@ -75,6 +75,8 @@
>>>> * @version $Revision: 1.28 $
>>>> */
>>>> public class Queue extends BaseDestination implements Task {
>>>> + public static final boolean LAZY_DISPATCH =
>>>> +
>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",
>>>> "true"));
>>>> private final Log log;
>>>> private final List<Subscription> consumers = new
>>>> ArrayList<Subscription>(50);
>>>> private PendingMessageCursor messages;
>>>> @@ -212,12 +214,12 @@
>>>> synchronized (pagedInMessages) {
>>>> // Add all the matching messages in the queue to the
>>>> // subscription.
>>>> -
>>>> for (Iterator<MessageReference> i =
>>>> pagedInMessages.values()
>>>> .iterator(); i.hasNext();) {
>>>> QueueMessageReference node =
>>>> (QueueMessageReference) i
>>>> .next();
>>>> - if (!node.isDropped() && !node.isAcked() &&
>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>> + if ((!node.isDropped() ||
>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>> + node.getLockOwner() == null) {
>>>> msgContext.setMessageReference(node);
>>>> if (sub.matches(node, msgContext)) {
>>>> sub.add(node);
>>>> @@ -940,7 +945,11 @@
>>>> dispatchLock.lock();
>>>> try{
>>>>
>>>> - final int toPageIn = getMaxPageSize() -
>>>> pagedInMessages.size();
>>>> + int toPageIn = getMaxPageSize() - pagedInMessages.size();
>>>> + if (LAZY_DISPATCH) {
>>>> + // Only page in the minimum number of messages which can be
>>>> dispatched immediately.
>>>> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(),
>>>> toPageIn);
>>>> + }
>>>> if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>> messages.setMaxBatchSize(toPageIn);
>>>> int count = 0;
>>>> @@ -976,12 +985,25 @@
>>>> }
>>>> return result;
>>>> }
>>>> +
>>>> + private int getConsumerMessageCountBeforeFull() throws Exception {
>>>> + int total = 0;
>>>> + synchronized (consumers) {
>>>> + for (Subscription s : consumers) {
>>>> + if (s instanceof PrefetchSubscription) {
>>>> + total += ((PrefetchSubscription)s).countBeforeFull();
>>>> + }
>>>> + }
>>>> + }
>>>> + return total;
>>>> + }
>>>>
>>>> private void doDispatch(List<MessageReference> list) throws
>>>> Exception {
>>>>
>>>> if (list != null) {
>>>> synchronized (consumers) {
>>>> for (MessageReference node : list) {
>>>> +
>>>> Subscription target = null;
>>>> List<Subscription> targets = null;
>>>> for (Subscription s : consumers) {
>>
>>
>> --
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
>> Web: http://www.nuix.com Fax: +61 2 9212 6902
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902
Re: Queue performance from recent changes
Posted by Rob Davies <ra...@gmail.com>.
Hi David,
Yes - actually - I tried it a few days ago. I haven't committed it
because message throughput is generally lower. I will look at making
it optional via a destination policy
cheers,
Rob
On 6 Mar 2008, at 05:54, David Sitsky wrote:
> Hi Rob,
>
> I know its been a couple of weeks. I've been using my changes for a
> while and I see nice CPU and memory usage on the broker, and good
> messaging performance for my application. Have you had a chance to
> try it out?
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>> thanks for the great feedback - will try your patch and see how it
>> works!
>> cheers,
>> Rob
>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>> Hi Rob,
>>>
>>> I like the new changes, but with the changes as they are, for my
>>> application for one of my benchmarks, it takes twice as long to
>>> complete.
>>>
>>> I believe the culprit for this is that when the new code can't
>>> find a consumer which is not full, the broker chooses the consumer
>>> with the lowest dispatch queue size.
>>>
>>> In my application, since I have a prefetch size of 1, and have
>>> longish-running transactions, the dispatch queue size is not
>>> indicative of the current load for that consumer. As a result, I
>>> think this is what is responsible for poor load-balancing in my
>>> case.
>>>
>>> For applications which commit() after each processed message, I am
>>> sure this wouldn't be the case. In some ways, reverting to the
>>> old behaviour of adding the pending message to all consumers might
>>> lead to better load balancing with this code.
>>>
>>> However - I think it is better if the consumers can decide when
>>> they want more messages rather than the broker pushing messages at
>>> them? I've attached a patch which demonstrates this. When
>>> LAZY_DISPATCH is set to true (set via a system property for now
>>> for testing purposes) this changes the behaviour slightly.
>>>
>>> The basic idea is pageInMessages() only pages in the minimum
>>> number of messages that can be dispatched immediately to non-full
>>> consumers. Whenever a consumer acks a message, which updates its
>>> prefetch size, we make sure Queue.wakeup() is called so that the
>>> consumer will receive new messages.
>>>
>>> With this change in effect - I see slightly faster or almost the
>>> same times with the previous benchmark. However memory usage on
>>> the broker is far better, as the pending queues for each consumer
>>> is either 0 or very small.
>>>
>>> What do you think? I guess there are better ways of doing this.
>>>
>>> I am doing a large overnight run with 16 consumers, so we'll see
>>> how the performance goes.
>>>
>>> You'll also notice in the patch, that in Queue.addSubscriber(), I
>>> thought there didn't seem to be any need for adding a message to a
>>> new consumer if the message has already been locked by another
>>> consumer?
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> please let us know if these changes helps/hinders your app!
>>>> cheers,
>>>> Rob
>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>> If what I said above is true, then the immediately above if
>>>>>>> statement needs to be moved outside its enclosing if -
>>>>>>> otherwise it only gets executed when targets != null. We'd
>>>>>>> want this to execute if we found a matching target wouldn't we?
>>>>>> Don't think so? We only want the message going to one
>>>>>> subscription? I may have misunderstood what you mean!
>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>
>>>
>>> --
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2
>>> 9280 0699
>>> Web: http://www.nuix.com Fax: +61 2
>>> 9212 6902
>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/
>>> region/PrefetchSubscription.java
>>> ===================================================================
>>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/
>>> PrefetchSubscription.java (revision 628917)
>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/
>>> PrefetchSubscription.java (working copy)
>>> @@ -160,6 +160,8 @@
>>> public void acknowledge(final ConnectionContext context,final
>>> MessageAck ack) throws Exception {
>>> // Handle the standard acknowledgment case.
>>> boolean callDispatchMatched = false;
>>> + Queue queue = null;
>>> + synchronized(dispatchLock) {
>>> if (ack.isStandardAck()) {
>>> // Acknowledge all dispatched messages up till the
>>> message id of
>>> @@ -223,8 +225,12 @@
>>> prefetchExtension = Math.max(0,
>>> prefetchExtension - (index
>>> + 1));
>>> }
>>> + if (queue == null)
>>> + {
>>> + queue = (Queue)node.getRegionDestination();
>>> + }
>>> callDispatchMatched = true;
>>> - break;
>>> + break;
>>> }
>>> }
>>> }
>>> @@ -253,6 +259,10 @@
>>> if
>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>> prefetchExtension =
>>> Math.max(prefetchExtension,
>>> index + 1);
>>> + if (queue == null)
>>> + {
>>> + queue =
>>> (Queue)node.getRegionDestination();
>>> + }
>>> callDispatchMatched = true;
>>> break;
>>> }
>>> @@ -279,6 +289,10 @@
>>> if (inAckRange) {
>>> node.incrementRedeliveryCounter();
>>> if
>>> (ack.getLastMessageId().equals(messageId)) {
>>> + if (queue == null)
>>> + {
>>> + queue = (Queue)node.getRegionDestination();
>>> + }
>>> callDispatchMatched = true;
>>> break;
>>> }
>>> @@ -320,6 +334,10 @@
>>> if
>>> (ack.getLastMessageId().equals(messageId)) {
>>> prefetchExtension = Math.max(0,
>>> prefetchExtension
>>> - (index + 1));
>>> + if (queue == null)
>>> + {
>>> + queue = (Queue)node.getRegionDestination();
>>> + }
>>> callDispatchMatched = true;
>>> break;
>>> }
>>> @@ -336,6 +354,9 @@
>>> }
>>> }
>>> if (callDispatchMatched) {
>>> + if (Queue.LAZY_DISPATCH) {
>>> + queue.wakeup();
>>> + }
>>> dispatchPending();
>>> } else {
>>> if (isSlave()) {
>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/
>>> region/Queue.java
>>> ===================================================================
>>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/
>>> Queue.java (revision 628917)
>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/
>>> Queue.java (working copy)
>>> @@ -75,6 +75,8 @@
>>> * @version $Revision: 1.28 $
>>> */
>>> public class Queue extends BaseDestination implements Task {
>>> + public static final boolean LAZY_DISPATCH =
>>> +
>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",
>>> "true"));
>>> private final Log log;
>>> private final List<Subscription> consumers = new
>>> ArrayList<Subscription>(50);
>>> private PendingMessageCursor messages;
>>> @@ -212,12 +214,12 @@
>>> synchronized (pagedInMessages) {
>>> // Add all the matching messages in the queue to the
>>> // subscription.
>>> -
>>> for (Iterator<MessageReference> i =
>>> pagedInMessages.values()
>>> .iterator(); i.hasNext();) {
>>> QueueMessageReference node =
>>> (QueueMessageReference) i
>>> .next();
>>> - if (!node.isDropped() && !node.isAcked() && (!
>>> node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>> + if ((!node.isDropped() ||
>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>> + node.getLockOwner() == null) {
>>> msgContext.setMessageReference(node);
>>> if (sub.matches(node, msgContext)) {
>>> sub.add(node);
>>> @@ -940,7 +945,11 @@
>>> dispatchLock.lock();
>>> try{
>>>
>>> - final int toPageIn = getMaxPageSize() -
>>> pagedInMessages.size();
>>> + int toPageIn = getMaxPageSize() -
>>> pagedInMessages.size();
>>> + if (LAZY_DISPATCH) {
>>> + // Only page in the minimum number of messages which can
>>> be dispatched immediately.
>>> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(),
>>> toPageIn);
>>> + }
>>> if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>> messages.setMaxBatchSize(toPageIn);
>>> int count = 0;
>>> @@ -976,12 +985,25 @@
>>> }
>>> return result;
>>> }
>>> +
>>> + private int getConsumerMessageCountBeforeFull() throws
>>> Exception {
>>> + int total = 0;
>>> + synchronized (consumers) {
>>> + for (Subscription s : consumers) {
>>> + if (s instanceof PrefetchSubscription) {
>>> + total += ((PrefetchSubscription)s).countBeforeFull();
>>> + }
>>> + }
>>> + }
>>> + return total;
>>> + }
>>>
>>> private void doDispatch(List<MessageReference> list) throws
>>> Exception {
>>>
>>> if (list != null) {
>>> synchronized (consumers) {
>>> for (MessageReference node : list) {
>>> +
>>> Subscription target = null;
>>> List<Subscription> targets = null;
>>> for (Subscription s : consumers) {
>
>
> --
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
> 0699
> Web: http://www.nuix.com Fax: +61 2 9212
> 6902
Re: Queue performance from recent changes
Posted by David Sitsky <si...@nuix.com>.
Hi Rob,
I've finally had some time to run some benchmarks with the trunk checked
out today (637703) and the numbers look great! Many thanks for checking
in this code - I'm happy now that I don't have to maintain any of my own
private modifications to activemq.
Thanks again.
Cheers,
David
David Sitsky wrote:
> Many thanks Rob - I'll try and do a fresh checkout today and let you
> know how the performance looks using my standard benchmarks.
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>>
>> the changes you suggested are now in and lazyDispatch can be set by a
>> destination policy - its currently on by default
>>
>> cheers,
>>
>> Rob
>> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>>
>>> Hi Rob,
>>>
>>> I know its been a couple of weeks. I've been using my changes for a
>>> while and I see nice CPU and memory usage on the broker, and good
>>> messaging performance for my application. Have you had a chance to
>>> try it out?
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> thanks for the great feedback - will try your patch and see how it
>>>> works!
>>>> cheers,
>>>> Rob
>>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>>> Hi Rob,
>>>>>
>>>>> I like the new changes, but with the changes as they are, for my
>>>>> application for one of my benchmarks, it takes twice as long to
>>>>> complete.
>>>>>
>>>>> I believe the culprit for this is that when the new code can't find
>>>>> a consumer which is not full, the broker chooses the consumer with
>>>>> the lowest dispatch queue size.
>>>>>
>>>>> In my application, since I have a prefetch size of 1, and have
>>>>> longish-running transactions, the dispatch queue size is not
>>>>> indicative of the current load for that consumer. As a result, I
>>>>> think this is what is responsible for poor load-balancing in my case.
>>>>>
>>>>> For applications which commit() after each processed message, I am
>>>>> sure this wouldn't be the case. In some ways, reverting to the old
>>>>> behaviour of adding the pending message to all consumers might lead
>>>>> to better load balancing with this code.
>>>>>
>>>>> However - I think it is better if the consumers can decide when
>>>>> they want more messages rather than the broker pushing messages at
>>>>> them? I've attached a patch which demonstrates this. When
>>>>> LAZY_DISPATCH is set to true (set via a system property for now for
>>>>> testing purposes) this changes the behaviour slightly.
>>>>>
>>>>> The basic idea is pageInMessages() only pages in the minimum number
>>>>> of messages that can be dispatched immediately to non-full
>>>>> consumers. Whenever a consumer acks a message, which updates its
>>>>> prefetch size, we make sure Queue.wakeup() is called so that the
>>>>> consumer will receive new messages.
>>>>>
>>>>> With this change in effect - I see slightly faster or almost the
>>>>> same times with the previous benchmark. However memory usage on
>>>>> the broker is far better, as the pending queues for each consumer
>>>>> is either 0 or very small.
>>>>>
>>>>> What do you think? I guess there are better ways of doing this.
>>>>>
>>>>> I am doing a large overnight run with 16 consumers, so we'll see
>>>>> how the performance goes.
>>>>>
>>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I
>>>>> thought there didn't seem to be any need for adding a message to a
>>>>> new consumer if the message has already been locked by another
>>>>> consumer?
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Rob Davies wrote:
>>>>>> Hi David,
>>>>>> please let us know if these changes helps/hinders your app!
>>>>>> cheers,
>>>>>> Rob
>>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>>> If what I said above is true, then the immediately above if
>>>>>>>>> statement needs to be moved outside its enclosing if -
>>>>>>>>> otherwise it only gets executed when targets != null. We'd
>>>>>>>>> want this to execute if we found a matching target wouldn't we?
>>>>>>>> Don't think so? We only want the message going to one
>>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> David
>>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cheers,
>>>>> David
>>>>>
>>>>> Nuix Pty Ltd
>>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
>>>>> 0699
>>>>> Web: http://www.nuix.com Fax: +61 2 9212
>>>>> 6902
>>>>> Index:
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>>>
>>>>> ===================================================================
>>>>> ---
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>>> (revision 628917)
>>>>> +++
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>>> (working copy)
>>>>> @@ -160,6 +160,8 @@
>>>>> public void acknowledge(final ConnectionContext context,final
>>>>> MessageAck ack) throws Exception {
>>>>> // Handle the standard acknowledgment case.
>>>>> boolean callDispatchMatched = false;
>>>>> + Queue queue = null;
>>>>> + synchronized(dispatchLock) {
>>>>> if (ack.isStandardAck()) {
>>>>> // Acknowledge all dispatched messages up till the
>>>>> message id of
>>>>> @@ -223,8 +225,12 @@
>>>>> prefetchExtension = Math.max(0,
>>>>> prefetchExtension - (index +
>>>>> 1));
>>>>> }
>>>>> + if (queue == null)
>>>>> + {
>>>>> + queue = (Queue)node.getRegionDestination();
>>>>> + }
>>>>> callDispatchMatched = true;
>>>>> - break;
>>>>> + break;
>>>>> }
>>>>> }
>>>>> }
>>>>> @@ -253,6 +259,10 @@
>>>>> if
>>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>>> prefetchExtension = Math.max(prefetchExtension,
>>>>> index + 1);
>>>>> + if (queue == null)
>>>>> + {
>>>>> + queue =
>>>>> (Queue)node.getRegionDestination();
>>>>> + }
>>>>> callDispatchMatched = true;
>>>>> break;
>>>>> }
>>>>> @@ -279,6 +289,10 @@
>>>>> if (inAckRange) {
>>>>> node.incrementRedeliveryCounter();
>>>>> if (ack.getLastMessageId().equals(messageId)) {
>>>>> + if (queue == null)
>>>>> + {
>>>>> + queue = (Queue)node.getRegionDestination();
>>>>> + }
>>>>> callDispatchMatched = true;
>>>>> break;
>>>>> }
>>>>> @@ -320,6 +334,10 @@
>>>>> if (ack.getLastMessageId().equals(messageId)) {
>>>>> prefetchExtension = Math.max(0,
>>>>> prefetchExtension
>>>>> - (index + 1));
>>>>> + if (queue == null)
>>>>> + {
>>>>> + queue = (Queue)node.getRegionDestination();
>>>>> + }
>>>>> callDispatchMatched = true;
>>>>> break;
>>>>> }
>>>>> @@ -336,6 +354,9 @@
>>>>> }
>>>>> }
>>>>> if (callDispatchMatched) {
>>>>> + if (Queue.LAZY_DISPATCH) {
>>>>> + queue.wakeup();
>>>>> + }
>>>>> dispatchPending();
>>>>> } else {
>>>>> if (isSlave()) {
>>>>> Index:
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>>>
>>>>> ===================================================================
>>>>> ---
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>>> (revision 628917)
>>>>> +++
>>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>>> (working copy)
>>>>> @@ -75,6 +75,8 @@
>>>>> * @version $Revision: 1.28 $
>>>>> */
>>>>> public class Queue extends BaseDestination implements Task {
>>>>> + public static final boolean LAZY_DISPATCH =
>>>>> +
>>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",
>>>>> "true"));
>>>>> private final Log log;
>>>>> private final List<Subscription> consumers = new
>>>>> ArrayList<Subscription>(50);
>>>>> private PendingMessageCursor messages;
>>>>> @@ -212,12 +214,12 @@
>>>>> synchronized (pagedInMessages) {
>>>>> // Add all the matching messages in the queue to the
>>>>> // subscription.
>>>>> -
>>>>> for (Iterator<MessageReference> i =
>>>>> pagedInMessages.values()
>>>>> .iterator(); i.hasNext();) {
>>>>> QueueMessageReference node =
>>>>> (QueueMessageReference) i
>>>>> .next();
>>>>> - if (!node.isDropped() && !node.isAcked() &&
>>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>>> + if ((!node.isDropped() ||
>>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>>> + node.getLockOwner() == null) {
>>>>> msgContext.setMessageReference(node);
>>>>> if (sub.matches(node, msgContext)) {
>>>>> sub.add(node);
>>>>> @@ -940,7 +945,11 @@
>>>>> dispatchLock.lock();
>>>>> try{
>>>>>
>>>>> - final int toPageIn = getMaxPageSize() -
>>>>> pagedInMessages.size();
>>>>> + int toPageIn = getMaxPageSize() - pagedInMessages.size();
>>>>> + if (LAZY_DISPATCH) {
>>>>> + // Only page in the minimum number of messages which can
>>>>> be dispatched immediately.
>>>>> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(),
>>>>> toPageIn);
>>>>> + }
>>>>> if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>>> messages.setMaxBatchSize(toPageIn);
>>>>> int count = 0;
>>>>> @@ -976,12 +985,25 @@
>>>>> }
>>>>> return result;
>>>>> }
>>>>> +
>>>>> + private int getConsumerMessageCountBeforeFull() throws
>>>>> Exception {
>>>>> + int total = 0;
>>>>> + synchronized (consumers) {
>>>>> + for (Subscription s : consumers) {
>>>>> + if (s instanceof PrefetchSubscription) {
>>>>> + total += ((PrefetchSubscription)s).countBeforeFull();
>>>>> + }
>>>>> + }
>>>>> + }
>>>>> + return total;
>>>>> + }
>>>>>
>>>>> private void doDispatch(List<MessageReference> list) throws
>>>>> Exception {
>>>>>
>>>>> if (list != null) {
>>>>> synchronized (consumers) {
>>>>> for (MessageReference node : list) {
>>>>> +
>>>>> Subscription target = null;
>>>>> List<Subscription> targets = null;
>>>>> for (Subscription s : consumers) {
>>>
>>>
>>> --
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
>>> Web: http://www.nuix.com Fax: +61 2 9212 6902
>
>
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902
Re: Queue performance from recent changes
Posted by David Sitsky <si...@nuix.com>.
Many thanks Rob - I'll try and do a fresh checkout today and let you
know how the performance looks using my standard benchmarks.
Cheers,
David
Rob Davies wrote:
> Hi David,
>
> the changes you suggested are now in and lazyDispatch can be set by a
> destination policy - its currently on by default
>
> cheers,
>
> Rob
> On 6 Mar 2008, at 05:54, David Sitsky wrote:
>
>> Hi Rob,
>>
>> I know its been a couple of weeks. I've been using my changes for a
>> while and I see nice CPU and memory usage on the broker, and good
>> messaging performance for my application. Have you had a chance to
>> try it out?
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> thanks for the great feedback - will try your patch and see how it
>>> works!
>>> cheers,
>>> Rob
>>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>>> Hi Rob,
>>>>
>>>> I like the new changes, but with the changes as they are, for my
>>>> application for one of my benchmarks, it takes twice as long to
>>>> complete.
>>>>
>>>> I believe the culprit for this is that when the new code can't find
>>>> a consumer which is not full, the broker chooses the consumer with
>>>> the lowest dispatch queue size.
>>>>
>>>> In my application, since I have a prefetch size of 1, and have
>>>> longish-running transactions, the dispatch queue size is not
>>>> indicative of the current load for that consumer. As a result, I
>>>> think this is what is responsible for poor load-balancing in my case.
>>>>
>>>> For applications which commit() after each processed message, I am
>>>> sure this wouldn't be the case. In some ways, reverting to the old
>>>> behaviour of adding the pending message to all consumers might lead
>>>> to better load balancing with this code.
>>>>
>>>> However - I think it is better if the consumers can decide when they
>>>> want more messages rather than the broker pushing messages at them?
>>>> I've attached a patch which demonstrates this. When LAZY_DISPATCH
>>>> is set to true (set via a system property for now for testing
>>>> purposes) this changes the behaviour slightly.
>>>>
>>>> The basic idea is pageInMessages() only pages in the minimum number
>>>> of messages that can be dispatched immediately to non-full
>>>> consumers. Whenever a consumer acks a message, which updates its
>>>> prefetch size, we make sure Queue.wakeup() is called so that the
>>>> consumer will receive new messages.
>>>>
>>>> With this change in effect - I see slightly faster or almost the
>>>> same times with the previous benchmark. However memory usage on the
>>>> broker is far better, as the pending queues for each consumer is
>>>> either 0 or very small.
>>>>
>>>> What do you think? I guess there are better ways of doing this.
>>>>
>>>> I am doing a large overnight run with 16 consumers, so we'll see how
>>>> the performance goes.
>>>>
>>>> You'll also notice in the patch, that in Queue.addSubscriber(), I
>>>> thought there didn't seem to be any need for adding a message to a
>>>> new consumer if the message has already been locked by another
>>>> consumer?
>>>>
>>>> Cheers,
>>>> David
>>>>
>>>> Rob Davies wrote:
>>>>> Hi David,
>>>>> please let us know if these changes helps/hinders your app!
>>>>> cheers,
>>>>> Rob
>>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>>> If what I said above is true, then the immediately above if
>>>>>>>> statement needs to be moved outside its enclosing if - otherwise
>>>>>>>> it only gets executed when targets != null. We'd want this to
>>>>>>>> execute if we found a matching target wouldn't we?
>>>>>>> Don't think so? We only want the message going to one
>>>>>>> subscription? I may have misunderstood what you mean!
>>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>>
>>>>>> Cheers,
>>>>>> David
>>>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> David
>>>>
>>>> Nuix Pty Ltd
>>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
>>>> 0699
>>>> Web: http://www.nuix.com Fax: +61 2 9212
>>>> 6902
>>>> Index:
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>>
>>>> ===================================================================
>>>> ---
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>> (revision 628917)
>>>> +++
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>>> (working copy)
>>>> @@ -160,6 +160,8 @@
>>>> public void acknowledge(final ConnectionContext context,final
>>>> MessageAck ack) throws Exception {
>>>> // Handle the standard acknowledgment case.
>>>> boolean callDispatchMatched = false;
>>>> + Queue queue = null;
>>>> + synchronized(dispatchLock) {
>>>> if (ack.isStandardAck()) {
>>>> // Acknowledge all dispatched messages up till the
>>>> message id of
>>>> @@ -223,8 +225,12 @@
>>>> prefetchExtension = Math.max(0,
>>>> prefetchExtension - (index +
>>>> 1));
>>>> }
>>>> + if (queue == null)
>>>> + {
>>>> + queue = (Queue)node.getRegionDestination();
>>>> + }
>>>> callDispatchMatched = true;
>>>> - break;
>>>> + break;
>>>> }
>>>> }
>>>> }
>>>> @@ -253,6 +259,10 @@
>>>> if
>>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>>> prefetchExtension = Math.max(prefetchExtension,
>>>> index + 1);
>>>> + if (queue == null)
>>>> + {
>>>> + queue =
>>>> (Queue)node.getRegionDestination();
>>>> + }
>>>> callDispatchMatched = true;
>>>> break;
>>>> }
>>>> @@ -279,6 +289,10 @@
>>>> if (inAckRange) {
>>>> node.incrementRedeliveryCounter();
>>>> if (ack.getLastMessageId().equals(messageId)) {
>>>> + if (queue == null)
>>>> + {
>>>> + queue = (Queue)node.getRegionDestination();
>>>> + }
>>>> callDispatchMatched = true;
>>>> break;
>>>> }
>>>> @@ -320,6 +334,10 @@
>>>> if (ack.getLastMessageId().equals(messageId)) {
>>>> prefetchExtension = Math.max(0,
>>>> prefetchExtension
>>>> - (index + 1));
>>>> + if (queue == null)
>>>> + {
>>>> + queue = (Queue)node.getRegionDestination();
>>>> + }
>>>> callDispatchMatched = true;
>>>> break;
>>>> }
>>>> @@ -336,6 +354,9 @@
>>>> }
>>>> }
>>>> if (callDispatchMatched) {
>>>> + if (Queue.LAZY_DISPATCH) {
>>>> + queue.wakeup();
>>>> + }
>>>> dispatchPending();
>>>> } else {
>>>> if (isSlave()) {
>>>> Index:
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>>
>>>> ===================================================================
>>>> ---
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>> (revision 628917)
>>>> +++
>>>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>>>> (working copy)
>>>> @@ -75,6 +75,8 @@
>>>> * @version $Revision: 1.28 $
>>>> */
>>>> public class Queue extends BaseDestination implements Task {
>>>> + public static final boolean LAZY_DISPATCH =
>>>> +
>>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",
>>>> "true"));
>>>> private final Log log;
>>>> private final List<Subscription> consumers = new
>>>> ArrayList<Subscription>(50);
>>>> private PendingMessageCursor messages;
>>>> @@ -212,12 +214,12 @@
>>>> synchronized (pagedInMessages) {
>>>> // Add all the matching messages in the queue to the
>>>> // subscription.
>>>> -
>>>> for (Iterator<MessageReference> i =
>>>> pagedInMessages.values()
>>>> .iterator(); i.hasNext();) {
>>>> QueueMessageReference node =
>>>> (QueueMessageReference) i
>>>> .next();
>>>> - if (!node.isDropped() && !node.isAcked() &&
>>>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>>> + if ((!node.isDropped() ||
>>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>>> + node.getLockOwner() == null) {
>>>> msgContext.setMessageReference(node);
>>>> if (sub.matches(node, msgContext)) {
>>>> sub.add(node);
>>>> @@ -940,7 +945,11 @@
>>>> dispatchLock.lock();
>>>> try{
>>>>
>>>> - final int toPageIn = getMaxPageSize() -
>>>> pagedInMessages.size();
>>>> + int toPageIn = getMaxPageSize() - pagedInMessages.size();
>>>> + if (LAZY_DISPATCH) {
>>>> + // Only page in the minimum number of messages which can be
>>>> dispatched immediately.
>>>> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(),
>>>> toPageIn);
>>>> + }
>>>> if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>>> messages.setMaxBatchSize(toPageIn);
>>>> int count = 0;
>>>> @@ -976,12 +985,25 @@
>>>> }
>>>> return result;
>>>> }
>>>> +
>>>> + private int getConsumerMessageCountBeforeFull() throws Exception {
>>>> + int total = 0;
>>>> + synchronized (consumers) {
>>>> + for (Subscription s : consumers) {
>>>> + if (s instanceof PrefetchSubscription) {
>>>> + total += ((PrefetchSubscription)s).countBeforeFull();
>>>> + }
>>>> + }
>>>> + }
>>>> + return total;
>>>> + }
>>>>
>>>> private void doDispatch(List<MessageReference> list) throws
>>>> Exception {
>>>>
>>>> if (list != null) {
>>>> synchronized (consumers) {
>>>> for (MessageReference node : list) {
>>>> +
>>>> Subscription target = null;
>>>> List<Subscription> targets = null;
>>>> for (Subscription s : consumers) {
>>
>>
>> --
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
>> Web: http://www.nuix.com Fax: +61 2 9212 6902
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902
Re: Queue performance from recent changes
Posted by Rob Davies <ra...@gmail.com>.
Hi David,
the changes you suggested are now in and lazyDispatch can be set by a
destination policy - its currently on by default
cheers,
Rob
On 6 Mar 2008, at 05:54, David Sitsky wrote:
> Hi Rob,
>
> I know its been a couple of weeks. I've been using my changes for a
> while and I see nice CPU and memory usage on the broker, and good
> messaging performance for my application. Have you had a chance to
> try it out?
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>> thanks for the great feedback - will try your patch and see how it
>> works!
>> cheers,
>> Rob
>> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>>> Hi Rob,
>>>
>>> I like the new changes, but with the changes as they are, for my
>>> application for one of my benchmarks, it takes twice as long to
>>> complete.
>>>
>>> I believe the culprit for this is that when the new code can't
>>> find a consumer which is not full, the broker chooses the consumer
>>> with the lowest dispatch queue size.
>>>
>>> In my application, since I have a prefetch size of 1, and have
>>> longish-running transactions, the dispatch queue size is not
>>> indicative of the current load for that consumer. As a result, I
>>> think this is what is responsible for poor load-balancing in my
>>> case.
>>>
>>> For applications which commit() after each processed message, I am
>>> sure this wouldn't be the case. In some ways, reverting to the
>>> old behaviour of adding the pending message to all consumers might
>>> lead to better load balancing with this code.
>>>
>>> However - I think it is better if the consumers can decide when
>>> they want more messages rather than the broker pushing messages at
>>> them? I've attached a patch which demonstrates this. When
>>> LAZY_DISPATCH is set to true (set via a system property for now
>>> for testing purposes) this changes the behaviour slightly.
>>>
>>> The basic idea is pageInMessages() only pages in the minimum
>>> number of messages that can be dispatched immediately to non-full
>>> consumers. Whenever a consumer acks a message, which updates its
>>> prefetch size, we make sure Queue.wakeup() is called so that the
>>> consumer will receive new messages.
>>>
>>> With this change in effect - I see slightly faster or almost the
>>> same times with the previous benchmark. However memory usage on
>>> the broker is far better, as the pending queues for each consumer
>>> is either 0 or very small.
>>>
>>> What do you think? I guess there are better ways of doing this.
>>>
>>> I am doing a large overnight run with 16 consumers, so we'll see
>>> how the performance goes.
>>>
>>> You'll also notice in the patch, that in Queue.addSubscriber(), I
>>> thought there didn't seem to be any need for adding a message to a
>>> new consumer if the message has already been locked by another
>>> consumer?
>>>
>>> Cheers,
>>> David
>>>
>>> Rob Davies wrote:
>>>> Hi David,
>>>> please let us know if these changes helps/hinders your app!
>>>> cheers,
>>>> Rob
>>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>>> If what I said above is true, then the immediately above if
>>>>>>> statement needs to be moved outside its enclosing if -
>>>>>>> otherwise it only gets executed when targets != null. We'd
>>>>>>> want this to execute if we found a matching target wouldn't we?
>>>>>> Don't think so? We only want the message going to one
>>>>>> subscription? I may have misunderstood what you mean!
>>>>> Yes - ignore what I said, I had my wires crossed.
>>>>>
>>>>> Cheers,
>>>>> David
>>>>>
>>>
>>>
>>> --
>>> Cheers,
>>> David
>>>
>>> Nuix Pty Ltd
>>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2
>>> 9280 0699
>>> Web: http://www.nuix.com Fax: +61 2
>>> 9212 6902
>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/
>>> region/PrefetchSubscription.java
>>> ===================================================================
>>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/
>>> PrefetchSubscription.java (revision 628917)
>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/
>>> PrefetchSubscription.java (working copy)
>>> @@ -160,6 +160,8 @@
>>> public void acknowledge(final ConnectionContext context,final
>>> MessageAck ack) throws Exception {
>>> // Handle the standard acknowledgment case.
>>> boolean callDispatchMatched = false;
>>> + Queue queue = null;
>>> + synchronized(dispatchLock) {
>>> if (ack.isStandardAck()) {
>>> // Acknowledge all dispatched messages up till the
>>> message id of
>>> @@ -223,8 +225,12 @@
>>> prefetchExtension = Math.max(0,
>>> prefetchExtension - (index
>>> + 1));
>>> }
>>> + if (queue == null)
>>> + {
>>> + queue = (Queue)node.getRegionDestination();
>>> + }
>>> callDispatchMatched = true;
>>> - break;
>>> + break;
>>> }
>>> }
>>> }
>>> @@ -253,6 +259,10 @@
>>> if
>>> (ack.getLastMessageId().equals(node.getMessageId())) {
>>> prefetchExtension =
>>> Math.max(prefetchExtension,
>>> index + 1);
>>> + if (queue == null)
>>> + {
>>> + queue =
>>> (Queue)node.getRegionDestination();
>>> + }
>>> callDispatchMatched = true;
>>> break;
>>> }
>>> @@ -279,6 +289,10 @@
>>> if (inAckRange) {
>>> node.incrementRedeliveryCounter();
>>> if
>>> (ack.getLastMessageId().equals(messageId)) {
>>> + if (queue == null)
>>> + {
>>> + queue = (Queue)node.getRegionDestination();
>>> + }
>>> callDispatchMatched = true;
>>> break;
>>> }
>>> @@ -320,6 +334,10 @@
>>> if
>>> (ack.getLastMessageId().equals(messageId)) {
>>> prefetchExtension = Math.max(0,
>>> prefetchExtension
>>> - (index + 1));
>>> + if (queue == null)
>>> + {
>>> + queue = (Queue)node.getRegionDestination();
>>> + }
>>> callDispatchMatched = true;
>>> break;
>>> }
>>> @@ -336,6 +354,9 @@
>>> }
>>> }
>>> if (callDispatchMatched) {
>>> + if (Queue.LAZY_DISPATCH) {
>>> + queue.wakeup();
>>> + }
>>> dispatchPending();
>>> } else {
>>> if (isSlave()) {
>>> Index: activemq-core/src/main/java/org/apache/activemq/broker/
>>> region/Queue.java
>>> ===================================================================
>>> --- activemq-core/src/main/java/org/apache/activemq/broker/region/
>>> Queue.java (revision 628917)
>>> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/
>>> Queue.java (working copy)
>>> @@ -75,6 +75,8 @@
>>> * @version $Revision: 1.28 $
>>> */
>>> public class Queue extends BaseDestination implements Task {
>>> + public static final boolean LAZY_DISPATCH =
>>> +
>>> Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",
>>> "true"));
>>> private final Log log;
>>> private final List<Subscription> consumers = new
>>> ArrayList<Subscription>(50);
>>> private PendingMessageCursor messages;
>>> @@ -212,12 +214,12 @@
>>> synchronized (pagedInMessages) {
>>> // Add all the matching messages in the queue to the
>>> // subscription.
>>> -
>>> for (Iterator<MessageReference> i =
>>> pagedInMessages.values()
>>> .iterator(); i.hasNext();) {
>>> QueueMessageReference node =
>>> (QueueMessageReference) i
>>> .next();
>>> - if (!node.isDropped() && !node.isAcked() && (!
>>> node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>>> + if ((!node.isDropped() ||
>>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>>> + node.getLockOwner() == null) {
>>> msgContext.setMessageReference(node);
>>> if (sub.matches(node, msgContext)) {
>>> sub.add(node);
>>> @@ -940,7 +945,11 @@
>>> dispatchLock.lock();
>>> try{
>>>
>>> - final int toPageIn = getMaxPageSize() -
>>> pagedInMessages.size();
>>> + int toPageIn = getMaxPageSize() -
>>> pagedInMessages.size();
>>> + if (LAZY_DISPATCH) {
>>> + // Only page in the minimum number of messages which can
>>> be dispatched immediately.
>>> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(),
>>> toPageIn);
>>> + }
>>> if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>>> messages.setMaxBatchSize(toPageIn);
>>> int count = 0;
>>> @@ -976,12 +985,25 @@
>>> }
>>> return result;
>>> }
>>> +
>>> + private int getConsumerMessageCountBeforeFull() throws
>>> Exception {
>>> + int total = 0;
>>> + synchronized (consumers) {
>>> + for (Subscription s : consumers) {
>>> + if (s instanceof PrefetchSubscription) {
>>> + total += ((PrefetchSubscription)s).countBeforeFull();
>>> + }
>>> + }
>>> + }
>>> + return total;
>>> + }
>>>
>>> private void doDispatch(List<MessageReference> list) throws
>>> Exception {
>>>
>>> if (list != null) {
>>> synchronized (consumers) {
>>> for (MessageReference node : list) {
>>> +
>>> Subscription target = null;
>>> List<Subscription> targets = null;
>>> for (Subscription s : consumers) {
>
>
> --
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
> 0699
> Web: http://www.nuix.com Fax: +61 2 9212
> 6902
Re: Queue performance from recent changes
Posted by David Sitsky <si...@nuix.com>.
Hi Rob,
I know its been a couple of weeks. I've been using my changes for a
while and I see nice CPU and memory usage on the broker, and good
messaging performance for my application. Have you had a chance to try
it out?
Cheers,
David
Rob Davies wrote:
> Hi David,
>
> thanks for the great feedback - will try your patch and see how it works!
>
> cheers,
>
> Rob
> On 20 Feb 2008, at 06:31, David Sitsky wrote:
>
>> Hi Rob,
>>
>> I like the new changes, but with the changes as they are, for my
>> application for one of my benchmarks, it takes twice as long to complete.
>>
>> I believe the culprit for this is that when the new code can't find a
>> consumer which is not full, the broker chooses the consumer with the
>> lowest dispatch queue size.
>>
>> In my application, since I have a prefetch size of 1, and have
>> longish-running transactions, the dispatch queue size is not
>> indicative of the current load for that consumer. As a result, I
>> think this is what is responsible for poor load-balancing in my case.
>>
>> For applications which commit() after each processed message, I am
>> sure this wouldn't be the case. In some ways, reverting to the old
>> behaviour of adding the pending message to all consumers might lead to
>> better load balancing with this code.
>>
>> However - I think it is better if the consumers can decide when they
>> want more messages rather than the broker pushing messages at them?
>> I've attached a patch which demonstrates this. When LAZY_DISPATCH is
>> set to true (set via a system property for now for testing purposes)
>> this changes the behaviour slightly.
>>
>> The basic idea is pageInMessages() only pages in the minimum number of
>> messages that can be dispatched immediately to non-full consumers.
>> Whenever a consumer acks a message, which updates its prefetch size,
>> we make sure Queue.wakeup() is called so that the consumer will
>> receive new messages.
>>
>> With this change in effect - I see slightly faster or almost the same
>> times with the previous benchmark. However memory usage on the broker
>> is far better, as the pending queues for each consumer is either 0 or
>> very small.
>>
>> What do you think? I guess there are better ways of doing this.
>>
>> I am doing a large overnight run with 16 consumers, so we'll see how
>> the performance goes.
>>
>> You'll also notice in the patch, that in Queue.addSubscriber(), I
>> thought there didn't seem to be any need for adding a message to a new
>> consumer if the message has already been locked by another consumer?
>>
>> Cheers,
>> David
>>
>> Rob Davies wrote:
>>> Hi David,
>>> please let us know if these changes helps/hinders your app!
>>> cheers,
>>> Rob
>>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>>> If what I said above is true, then the immediately above if
>>>>>> statement needs to be moved outside its enclosing if - otherwise
>>>>>> it only gets executed when targets != null. We'd want this to
>>>>>> execute if we found a matching target wouldn't we?
>>>>> Don't think so? We only want the message going to one
>>>>> subscription? I may have misunderstood what you mean!
>>>> Yes - ignore what I said, I had my wires crossed.
>>>>
>>>> Cheers,
>>>> David
>>>>
>>
>>
>> --
>> Cheers,
>> David
>>
>> Nuix Pty Ltd
>> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
>> Web: http://www.nuix.com Fax: +61 2 9212 6902
>> Index:
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>>
>> ===================================================================
>> ---
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>> (revision 628917)
>> +++
>> activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>> (working copy)
>> @@ -160,6 +160,8 @@
>> public void acknowledge(final ConnectionContext context,final
>> MessageAck ack) throws Exception {
>> // Handle the standard acknowledgment case.
>> boolean callDispatchMatched = false;
>> + Queue queue = null;
>> +
>> synchronized(dispatchLock) {
>> if (ack.isStandardAck()) {
>> // Acknowledge all dispatched messages up till the
>> message id of
>> @@ -223,8 +225,12 @@
>> prefetchExtension = Math.max(0,
>> prefetchExtension - (index + 1));
>> }
>> + if (queue == null)
>> + {
>> + queue = (Queue)node.getRegionDestination();
>> + }
>> callDispatchMatched = true;
>> - break;
>> + break;
>> }
>> }
>> }
>> @@ -253,6 +259,10 @@
>> if
>> (ack.getLastMessageId().equals(node.getMessageId())) {
>> prefetchExtension = Math.max(prefetchExtension,
>> index + 1);
>> + if (queue == null)
>> + {
>> + queue = (Queue)node.getRegionDestination();
>> + }
>> callDispatchMatched = true;
>> break;
>> }
>> @@ -279,6 +289,10 @@
>> if (inAckRange) {
>> node.incrementRedeliveryCounter();
>> if (ack.getLastMessageId().equals(messageId)) {
>> + if (queue == null)
>> + {
>> + queue = (Queue)node.getRegionDestination();
>> + }
>> callDispatchMatched = true;
>> break;
>> }
>> @@ -320,6 +334,10 @@
>> if (ack.getLastMessageId().equals(messageId)) {
>> prefetchExtension = Math.max(0,
>> prefetchExtension
>> - (index + 1));
>> + if (queue == null)
>> + {
>> + queue = (Queue)node.getRegionDestination();
>> + }
>> callDispatchMatched = true;
>> break;
>> }
>> @@ -336,6 +354,9 @@
>> }
>> }
>> if (callDispatchMatched) {
>> + if (Queue.LAZY_DISPATCH) {
>> + queue.wakeup();
>> + }
>> dispatchPending();
>> } else {
>> if (isSlave()) {
>> Index:
>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>> ===================================================================
>> ---
>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>> (revision 628917)
>> +++
>> activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>> (working copy)
>> @@ -75,6 +75,8 @@
>> * @version $Revision: 1.28 $
>> */
>> public class Queue extends BaseDestination implements Task {
>> + public static final boolean LAZY_DISPATCH =
>> + Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",
>> "true"));
>> private final Log log;
>> private final List<Subscription> consumers = new
>> ArrayList<Subscription>(50);
>> private PendingMessageCursor messages;
>> @@ -212,12 +214,12 @@
>> synchronized (pagedInMessages) {
>> // Add all the matching messages in the queue to the
>> // subscription.
>> -
>> for (Iterator<MessageReference> i =
>> pagedInMessages.values()
>> .iterator(); i.hasNext();) {
>> QueueMessageReference node =
>> (QueueMessageReference) i
>> .next();
>> - if (!node.isDropped() && !node.isAcked() &&
>> (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
>> + if ((!node.isDropped() ||
>> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
>> + node.getLockOwner() == null) {
>> msgContext.setMessageReference(node);
>> if (sub.matches(node, msgContext)) {
>> sub.add(node);
>> @@ -940,7 +945,11 @@
>> dispatchLock.lock();
>> try{
>>
>> - final int toPageIn = getMaxPageSize() -
>> pagedInMessages.size();
>> + int toPageIn = getMaxPageSize() - pagedInMessages.size();
>> + if (LAZY_DISPATCH) {
>> + // Only page in the minimum number of messages which can be
>> dispatched immediately.
>> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(),
>> toPageIn);
>> + }
>> if ((force || !consumers.isEmpty()) && toPageIn > 0) {
>> messages.setMaxBatchSize(toPageIn);
>> int count = 0;
>> @@ -976,12 +985,25 @@
>> }
>> return result;
>> }
>> +
>> + private int getConsumerMessageCountBeforeFull() throws Exception {
>> + int total = 0;
>> + synchronized (consumers) {
>> + for (Subscription s : consumers) {
>> + if (s instanceof PrefetchSubscription) {
>> + total += ((PrefetchSubscription)s).countBeforeFull();
>> + }
>> + }
>> + }
>> + return total;
>> + }
>>
>> private void doDispatch(List<MessageReference> list) throws
>> Exception {
>>
>> if (list != null) {
>> synchronized (consumers) {
>> for (MessageReference node : list) {
>> +
>> Subscription target = null;
>> List<Subscription> targets = null;
>> for (Subscription s : consumers) {
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902
Re: Queue performance from recent changes
Posted by Rob Davies <ra...@gmail.com>.
Hi David,
thanks for the great feedback - will try your patch and see how it
works!
cheers,
Rob
On 20 Feb 2008, at 06:31, David Sitsky wrote:
> Hi Rob,
>
> I like the new changes, but with the changes as they are, for my
> application for one of my benchmarks, it takes twice as long to
> complete.
>
> I believe the culprit for this is that when the new code can't find
> a consumer which is not full, the broker chooses the consumer with
> the lowest dispatch queue size.
>
> In my application, since I have a prefetch size of 1, and have
> longish-running transactions, the dispatch queue size is not
> indicative of the current load for that consumer. As a result, I
> think this is what is responsible for poor load-balancing in my case.
>
> For applications which commit() after each processed message, I am
> sure this wouldn't be the case. In some ways, reverting to the old
> behaviour of adding the pending message to all consumers might lead
> to better load balancing with this code.
>
> However - I think it is better if the consumers can decide when they
> want more messages rather than the broker pushing messages at them?
> I've attached a patch which demonstrates this. When LAZY_DISPATCH
> is set to true (set via a system property for now for testing
> purposes) this changes the behaviour slightly.
>
> The basic idea is pageInMessages() only pages in the minimum number
> of messages that can be dispatched immediately to non-full
> consumers. Whenever a consumer acks a message, which updates its
> prefetch size, we make sure Queue.wakeup() is called so that the
> consumer will receive new messages.
>
> With this change in effect - I see slightly faster or almost the
> same times with the previous benchmark. However memory usage on the
> broker is far better, as the pending queues for each consumer is
> either 0 or very small.
>
> What do you think? I guess there are better ways of doing this.
>
> I am doing a large overnight run with 16 consumers, so we'll see how
> the performance goes.
>
> You'll also notice in the patch, that in Queue.addSubscriber(), I
> thought there didn't seem to be any need for adding a message to a
> new consumer if the message has already been locked by another
> consumer?
>
> Cheers,
> David
>
> Rob Davies wrote:
>> Hi David,
>> please let us know if these changes helps/hinders your app!
>> cheers,
>> Rob
>> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>>>> If what I said above is true, then the immediately above if
>>>>> statement needs to be moved outside its enclosing if - otherwise
>>>>> it only gets executed when targets != null. We'd want this to
>>>>> execute if we found a matching target wouldn't we?
>>>> Don't think so? We only want the message going to one
>>>> subscription? I may have misunderstood what you mean!
>>> Yes - ignore what I said, I had my wires crossed.
>>>
>>> Cheers,
>>> David
>>>
>
>
> --
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
> 0699
> Web: http://www.nuix.com Fax: +61 2 9212
> 6902
> Index: activemq-core/src/main/java/org/apache/activemq/broker/region/
> PrefetchSubscription.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/broker/region/
> PrefetchSubscription.java (revision 628917)
> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/
> PrefetchSubscription.java (working copy)
> @@ -160,6 +160,8 @@
> public void acknowledge(final ConnectionContext context,final
> MessageAck ack) throws Exception {
> // Handle the standard acknowledgment case.
> boolean callDispatchMatched = false;
> + Queue queue = null;
> +
> synchronized(dispatchLock) {
> if (ack.isStandardAck()) {
> // Acknowledge all dispatched messages up till the
> message id of
> @@ -223,8 +225,12 @@
> prefetchExtension = Math.max(0,
> prefetchExtension - (index +
> 1));
> }
> + if (queue == null)
> + {
> + queue = (Queue)node.getRegionDestination();
> + }
> callDispatchMatched = true;
> - break;
> + break;
> }
> }
> }
> @@ -253,6 +259,10 @@
> if
> (ack.getLastMessageId().equals(node.getMessageId())) {
> prefetchExtension =
> Math.max(prefetchExtension,
> index + 1);
> + if (queue == null)
> + {
> + queue =
> (Queue)node.getRegionDestination();
> + }
> callDispatchMatched = true;
> break;
> }
> @@ -279,6 +289,10 @@
> if (inAckRange) {
> node.incrementRedeliveryCounter();
> if
> (ack.getLastMessageId().equals(messageId)) {
> + if (queue == null)
> + {
> + queue = (Queue)node.getRegionDestination();
> + }
> callDispatchMatched = true;
> break;
> }
> @@ -320,6 +334,10 @@
> if
> (ack.getLastMessageId().equals(messageId)) {
> prefetchExtension = Math.max(0,
> prefetchExtension
> - (index + 1));
> + if (queue == null)
> + {
> + queue = (Queue)node.getRegionDestination();
> + }
> callDispatchMatched = true;
> break;
> }
> @@ -336,6 +354,9 @@
> }
> }
> if (callDispatchMatched) {
> + if (Queue.LAZY_DISPATCH) {
> + queue.wakeup();
> + }
> dispatchPending();
> } else {
> if (isSlave()) {
> Index: activemq-core/src/main/java/org/apache/activemq/broker/region/
> Queue.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/broker/region/
> Queue.java (revision 628917)
> +++ activemq-core/src/main/java/org/apache/activemq/broker/region/
> Queue.java (working copy)
> @@ -75,6 +75,8 @@
> * @version $Revision: 1.28 $
> */
> public class Queue extends BaseDestination implements Task {
> + public static final boolean LAZY_DISPATCH =
> + Boolean.parseBoolean(System.getProperty("activemq.lazy.dispatch",
> "true"));
> private final Log log;
> private final List<Subscription> consumers = new
> ArrayList<Subscription>(50);
> private PendingMessageCursor messages;
> @@ -212,12 +214,12 @@
> synchronized (pagedInMessages) {
> // Add all the matching messages in the queue to the
> // subscription.
> -
> for (Iterator<MessageReference> i =
> pagedInMessages.values()
> .iterator(); i.hasNext();) {
> QueueMessageReference node =
> (QueueMessageReference) i
> .next();
> - if (!node.isDropped() && !node.isAcked() && (!
> node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
> + if ((!node.isDropped() ||
> sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
> + node.getLockOwner() == null) {
> msgContext.setMessageReference(node);
> if (sub.matches(node, msgContext)) {
> sub.add(node);
> @@ -940,7 +945,11 @@
> dispatchLock.lock();
> try{
>
> - final int toPageIn = getMaxPageSize() -
> pagedInMessages.size();
> + int toPageIn = getMaxPageSize() - pagedInMessages.size();
> + if (LAZY_DISPATCH) {
> + // Only page in the minimum number of messages which can be
> dispatched immediately.
> + toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
> + }
> if ((force || !consumers.isEmpty()) && toPageIn > 0) {
> messages.setMaxBatchSize(toPageIn);
> int count = 0;
> @@ -976,12 +985,25 @@
> }
> return result;
> }
> +
> + private int getConsumerMessageCountBeforeFull() throws
> Exception {
> + int total = 0;
> + synchronized (consumers) {
> + for (Subscription s : consumers) {
> + if (s instanceof PrefetchSubscription) {
> + total += ((PrefetchSubscription)s).countBeforeFull();
> + }
> + }
> + }
> + return total;
> + }
>
> private void doDispatch(List<MessageReference> list) throws
> Exception {
>
> if (list != null) {
> synchronized (consumers) {
> for (MessageReference node : list) {
> +
> Subscription target = null;
> List<Subscription> targets = null;
> for (Subscription s : consumers) {
Queue performance from recent changes
Posted by David Sitsky <si...@nuix.com>.
Hi Rob,
I like the new changes, but with the changes as they are, for my
application for one of my benchmarks, it takes twice as long to complete.
I believe the culprit for this is that when the new code can't find a
consumer which is not full, the broker chooses the consumer with the
lowest dispatch queue size.
In my application, since I have a prefetch size of 1, and have
longish-running transactions, the dispatch queue size is not indicative
of the current load for that consumer. As a result, I think this is
what is responsible for poor load-balancing in my case.
For applications which commit() after each processed message, I am sure
this wouldn't be the case. In some ways, reverting to the old behaviour
of adding the pending message to all consumers might lead to better load
balancing with this code.
However - I think it is better if the consumers can decide when they
want more messages rather than the broker pushing messages at them?
I've attached a patch which demonstrates this. When LAZY_DISPATCH is
set to true (set via a system property for now for testing purposes)
this changes the behaviour slightly.
The basic idea is pageInMessages() only pages in the minimum number of
messages that can be dispatched immediately to non-full consumers.
Whenever a consumer acks a message, which updates its prefetch size, we
make sure Queue.wakeup() is called so that the consumer will receive new
messages.
With this change in effect - I see slightly faster or almost the same
times with the previous benchmark. However memory usage on the broker
is far better, as the pending queues for each consumer is either 0 or
very small.
What do you think? I guess there are better ways of doing this.
I am doing a large overnight run with 16 consumers, so we'll see how the
performance goes.
You'll also notice in the patch, that in Queue.addSubscriber(), I
thought there didn't seem to be any need for adding a message to a new
consumer if the message has already been locked by another consumer?
Cheers,
David
Rob Davies wrote:
> Hi David,
>
> please let us know if these changes helps/hinders your app!
>
> cheers,
>
> Rob
> On 19 Feb 2008, at 08:32, David Sitsky wrote:
>
>>>> If what I said above is true, then the immediately above if
>>>> statement needs to be moved outside its enclosing if - otherwise it
>>>> only gets executed when targets != null. We'd want this to execute
>>>> if we found a matching target wouldn't we?
>>> Don't think so? We only want the message going to one subscription?
>>> I may have misunderstood what you mean!
>> Yes - ignore what I said, I had my wires crossed.
>>
>> Cheers,
>> David
>>
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902
Re: svn commit: r628667 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ policy/
Posted by Rob Davies <ra...@gmail.com>.
Hi David,
please let us know if these changes helps/hinders your app!
cheers,
Rob
On 19 Feb 2008, at 08:32, David Sitsky wrote:
>>> If what I said above is true, then the immediately above if
>>> statement needs to be moved outside its enclosing if - otherwise
>>> it only gets executed when targets != null. We'd want this to
>>> execute if we found a matching target wouldn't we?
>> Don't think so? We only want the message going to one
>> subscription? I may have misunderstood what you mean!
> Yes - ignore what I said, I had my wires crossed.
>
> Cheers,
> David
>
Re: svn commit: r628667 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
./ policy/
Posted by David Sitsky <si...@nuix.com>.
>> If what I said above is true, then the immediately above if statement
>> needs to be moved outside its enclosing if - otherwise it only gets
>> executed when targets != null. We'd want this to execute if we found
>> a matching target wouldn't we?
> Don't think so? We only want the message going to one subscription? I
> may have misunderstood what you mean!
Yes - ignore what I said, I had my wires crossed.
Cheers,
David
Re: svn commit: r628667 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ policy/
Posted by Rob Davies <ra...@gmail.com>.
Hi David,
On 19 Feb 2008, at 05:09, David Sitsky wrote:
> Hi Rob,
>
> Great to see these changes. I am a little confused about this
> change in Queue.java:
>
>> + if (list != null) {
>> + synchronized (consumers) { + for
>> (MessageReference node : list) {
>> + Subscription target = null;
>> + List<Subscription> targets = null;
>> + for (Subscription s : consumers) {
>> + if (dispatchSelector.canSelect(s, node)) {
>> + if (!s.isFull()) {
>> + s.add(node);
>> + target = s;
>> + break;
>> + } else {
>> + if (targets == null) {
>> + targets = new
>> ArrayList<Subscription>();
>> + }
>> + targets.add(s);
>> + }
>> + }
>> + }
>> + if (targets != null) {
>
> Shouldn't this if statement be?
>
> if (target == null && targets != null)
Yes it should!- changing ...
>
>
> I would have thought if you already found a matching subscriber that
> isn't full, then there is no need to check the list of other full
> subscribers that were found?
>
>> + // pick the least loaded to add the messag
>> too
>> + + for (Subscription s : targets) {
>> + if (target == null
>> + || target.getInFlightUsage() > s
>> + .getInFlightUsage()) {
>> + target = s;
>> + }
>> + }
>> + if (target != null) {
>> + target.add(node);
>> + }
>> + }
>
> If what I said above is true, then the immediately above if
> statement needs to be moved outside its enclosing if - otherwise it
> only gets executed when targets != null. We'd want this to execute
> if we found a matching target wouldn't we?
Don't think so? We only want the message going to one subscription? I
may have misunderstood what you mean!
cheers,
Rob
>
>
> --
> Cheers,
> David
>
> Nuix Pty Ltd
> Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280
> 0699
> Web: http://www.nuix.com Fax: +61 2 9212
> 6902
Re: svn commit: r628667 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
./ policy/
Posted by David Sitsky <si...@nuix.com>.
Hi Rob,
Great to see these changes. I am a little confused about this change in
Queue.java:
> + if (list != null) {
> + synchronized (consumers) {
> + for (MessageReference node : list) {
> + Subscription target = null;
> + List<Subscription> targets = null;
> + for (Subscription s : consumers) {
> + if (dispatchSelector.canSelect(s, node)) {
> + if (!s.isFull()) {
> + s.add(node);
> + target = s;
> + break;
> + } else {
> + if (targets == null) {
> + targets = new ArrayList<Subscription>();
> + }
> + targets.add(s);
> + }
> + }
> + }
> + if (targets != null) {
Shouldn't this if statement be?
if (target == null && targets != null)
I would have thought if you already found a matching subscriber that
isn't full, then there is no need to check the list of other full
subscribers that were found?
> + // pick the least loaded to add the messag too
> +
> + for (Subscription s : targets) {
> + if (target == null
> + || target.getInFlightUsage() > s
> + .getInFlightUsage()) {
> + target = s;
> + }
> + }
> + if (target != null) {
> + target.add(node);
> + }
> + }
If what I said above is true, then the immediately above if statement
needs to be moved outside its enclosing if - otherwise it only gets
executed when targets != null. We'd want this to execute if we found a
matching target wouldn't we?
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902