You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org> on 2012/11/07 10:03:12 UTC
[jira] [Commented] (AMQ-4107) Message order can be broken for Topic
under a high load when topicPrefetch=1 and comsumer is slow
[ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13492206#comment-13492206 ]
Yuriy Sidelnikov commented on AMQ-4107:
---------------------------------------
To fix the issue with a wrong message order the following patch can be applied.This patch synchronizes the message processing and remove an excess syncs inside sync block.
Index: C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
===================================================================
--- C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -93,6 +93,7 @@
return;
}
enqueueCounter.incrementAndGet();
+ synchronized (matchedListMutex) {
if (!isFull() && matched.isEmpty() && !isSlave()) {
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
@@ -112,7 +113,6 @@
if (maximumPendingMessages != 0) {
boolean warnedAboutWait = false;
while (active) {
- synchronized (matchedListMutex) {
while (matched.isFull()) {
if (getContext().getStopping().get()) {
LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
@@ -136,10 +136,7 @@
if (matched.tryAddMessageLast(node, 10)) {
break;
}
- }
}
- synchronized (matchedListMutex) {
-
// NOTE - be careful about the slaveBroker!
if (maximumPendingMessages > 0) {
// calculate the high water mark from which point we
@@ -159,13 +156,11 @@
pageInSize = Math.max(1000, pageInSize);
LinkedList<MessageReference> list = null;
MessageReference[] oldMessages=null;
- synchronized(matched){
list = matched.pageInList(pageInSize);
oldMessages = messageEvictionStrategy.evictMessages(list);
for (MessageReference ref : list) {
ref.decrementReferenceCount();
}
- }
int messagesToEvict = 0;
if (oldMessages != null){
messagesToEvict = oldMessages.length;
@@ -183,10 +178,10 @@
}
}
}
- }
- dispatchMatched();
+ dispatchMatched();
}
}
+ }
}
private boolean isDuplicate(MessageReference node) {
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
> Key: AMQ-4107
> URL: https://issues.apache.org/jira/browse/AMQ-4107
> Project: ActiveMQ
> Issue Type: Bug
> Components: Transport
> Affects Versions: 5.6.0
> Reporter: Yuriy Sidelnikov
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
> if ({color:red}!isFull(){color} && matched.isEmpty() && !isSlave()) {
> // if maximumPendingMessages is set we will only discard messages which
> // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
> {color:red}dispatch(node);{color} <- Second message will go this way and might be dispatched sooner than first one.
> setSlowConsumer(false);
> } else {
> …….
> if ({color:red}matched.tryAddMessageLast(node, 10)){color} { <- first message will be put in the VMCursor queue and might be dispatched later
>
> break;
> }
> .....
> {color:red}dispatchMatched();{color} <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.
> 4. Second message will be dispatched immediately and might be first to be delivered.
> 5. Then first message is dispatched.
> 6. Message order is broken.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira