You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org> on 2012/10/15 10:10:03 UTC

[jira] [Created] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Yuriy Sidelnikov created AMQ-4107:
-------------------------------------

             Summary: Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
                 Key: AMQ-4107
                 URL: https://issues.apache.org/jira/browse/AMQ-4107
             Project: ActiveMQ
          Issue Type: Bug
          Components: Transport
    Affects Versions: 5.6.0
            Reporter: Yuriy Sidelnikov


For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" topicPrefetch="1" blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if (!isFull() && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
            dispatch(node);                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
                            break;
                        }
.....
 dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Comment Edited] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13492208#comment-13492208 ] 

Yuriy Sidelnikov edited comment on AMQ-4107 at 11/7/12 9:09 AM:
----------------------------------------------------------------

The patch to fix wrong message order issue.The patch synchronizes a message processing and remove excess syncs inside added sync block.
                
      was (Author: sdcf):
    The patch to fix wrong message orderissue.
                  
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>         Attachments: 4107.diff
>
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>        {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
>         
> break;
>                         }
> .....
>  {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuriy Sidelnikov updated AMQ-4107:
----------------------------------

    Description: 
For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
       {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
                            break;
                        }
.....
 {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


  was:
For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
       {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
                            break;
                        }
.....
 dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


    
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>        {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
>                             break;
>                         }
> .....
>  {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuriy Sidelnikov updated AMQ-4107:
----------------------------------

    Description: 
For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
       {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
                            break;
                        }
.....
 dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


  was:
For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
            dispatch(node);                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
                            break;
                        }
.....
 dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


    
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>        {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
>                             break;
>                         }
> .....
>  dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Comment Edited] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13492206#comment-13492206 ] 

Yuriy Sidelnikov edited comment on AMQ-4107 at 11/7/12 9:04 AM:
----------------------------------------------------------------

To fix the issue with a wrong message order the following patch can be applied.This patch synchronizes the message processing and remove an excess syncs inside sync block.
Index: C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
===================================================================
--- C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java	
@@ -93,6 +93,7 @@
             return;
         }
         enqueueCounter.incrementAndGet();
+        synchronized (matchedListMutex) {
         if (!isFull() && matched.isEmpty()  && !isSlave()) {
             // if maximumPendingMessages is set we will only discard messages which
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
@@ -112,7 +113,6 @@
             if (maximumPendingMessages != 0) {
                 boolean warnedAboutWait = false;
                 while (active) {
 -                    synchronized (matchedListMutex) {
                         while (matched.isFull()) {
                             if (getContext().getStopping().get()) {
                                 LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
@@ -136,10 +136,7 @@
                         if (matched.tryAddMessageLast(node, 10)) {
                             break;
                         }
 -                    }
                 }
 -                synchronized (matchedListMutex) {
 -                    
                     // NOTE - be careful about the slaveBroker!
                     if (maximumPendingMessages > 0) {
                         // calculate the high water mark from which point we
@@ -159,13 +156,11 @@
                             pageInSize = Math.max(1000, pageInSize);
                             LinkedList<MessageReference> list = null;
                             MessageReference[] oldMessages=null;
 -                            synchronized(matched){
                                 list = matched.pageInList(pageInSize);
                             	oldMessages = messageEvictionStrategy.evictMessages(list);
                             	for (MessageReference ref : list) {
                             	    ref.decrementReferenceCount();
                             	}
 -                            }
                             int messagesToEvict = 0;
                             if (oldMessages != null){
 	                            messagesToEvict = oldMessages.length;
@@ -183,10 +178,10 @@
                             }
                         }
                     }
 -                }
 -                dispatchMatched();
+                 dispatchMatched();
             }
         }
+         }
     }
 
     private boolean isDuplicate(MessageReference node) {

                
      was (Author: sdcf):
    To fix the issue with a wrong message order the following patch can be applied.This patch synchronizes the message processing and remove an excess syncs inside sync block.
Index: C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
===================================================================
--- C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java	
@@ -93,6 +93,7 @@
             return;
         }
         enqueueCounter.incrementAndGet();
+        synchronized (matchedListMutex) {
         if (!isFull() && matched.isEmpty()  && !isSlave()) {
             // if maximumPendingMessages is set we will only discard messages which
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
@@ -112,7 +113,6 @@
             if (maximumPendingMessages != 0) {
                 boolean warnedAboutWait = false;
                 while (active) {
-                    synchronized (matchedListMutex) {
                         while (matched.isFull()) {
                             if (getContext().getStopping().get()) {
                                 LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
@@ -136,10 +136,7 @@
                         if (matched.tryAddMessageLast(node, 10)) {
                             break;
                         }
-                    }
                 }
-                synchronized (matchedListMutex) {
-                    
                     // NOTE - be careful about the slaveBroker!
                     if (maximumPendingMessages > 0) {
                         // calculate the high water mark from which point we
@@ -159,13 +156,11 @@
                             pageInSize = Math.max(1000, pageInSize);
                             LinkedList<MessageReference> list = null;
                             MessageReference[] oldMessages=null;
-                            synchronized(matched){
                                 list = matched.pageInList(pageInSize);
                             	oldMessages = messageEvictionStrategy.evictMessages(list);
                             	for (MessageReference ref : list) {
                             	    ref.decrementReferenceCount();
                             	}
-                            }
                             int messagesToEvict = 0;
                             if (oldMessages != null){
 	                            messagesToEvict = oldMessages.length;
@@ -183,10 +178,10 @@
                             }
                         }
                     }
-                }
-                dispatchMatched();
+                 dispatchMatched();
             }
         }
+         }
     }
 
     private boolean isDuplicate(MessageReference node) {

                  
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>        {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
>         
> break;
>                         }
> .....
>  {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13492206#comment-13492206 ] 

Yuriy Sidelnikov commented on AMQ-4107:
---------------------------------------

To fix the issue with a wrong message order the following patch can be applied.This patch synchronizes the message processing and remove an excess syncs inside sync block.
Index: C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
===================================================================
--- C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java	
@@ -93,6 +93,7 @@
             return;
         }
         enqueueCounter.incrementAndGet();
+        synchronized (matchedListMutex) {
         if (!isFull() && matched.isEmpty()  && !isSlave()) {
             // if maximumPendingMessages is set we will only discard messages which
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
@@ -112,7 +113,6 @@
             if (maximumPendingMessages != 0) {
                 boolean warnedAboutWait = false;
                 while (active) {
-                    synchronized (matchedListMutex) {
                         while (matched.isFull()) {
                             if (getContext().getStopping().get()) {
                                 LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
@@ -136,10 +136,7 @@
                         if (matched.tryAddMessageLast(node, 10)) {
                             break;
                         }
-                    }
                 }
-                synchronized (matchedListMutex) {
-                    
                     // NOTE - be careful about the slaveBroker!
                     if (maximumPendingMessages > 0) {
                         // calculate the high water mark from which point we
@@ -159,13 +156,11 @@
                             pageInSize = Math.max(1000, pageInSize);
                             LinkedList<MessageReference> list = null;
                             MessageReference[] oldMessages=null;
-                            synchronized(matched){
                                 list = matched.pageInList(pageInSize);
                             	oldMessages = messageEvictionStrategy.evictMessages(list);
                             	for (MessageReference ref : list) {
                             	    ref.decrementReferenceCount();
                             	}
-                            }
                             int messagesToEvict = 0;
                             if (oldMessages != null){
 	                            messagesToEvict = oldMessages.length;
@@ -183,10 +178,10 @@
                             }
                         }
                     }
-                }
-                dispatchMatched();
+                 dispatchMatched();
             }
         }
+         }
     }
 
     private boolean isDuplicate(MessageReference node) {

                
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>        {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
>         
> break;
>                         }
> .....
>  {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Comment Edited] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13492208#comment-13492208 ] 

Yuriy Sidelnikov edited comment on AMQ-4107 at 11/7/12 9:11 AM:
----------------------------------------------------------------

Please have a look at the attachment for the patch to fix wrong message order issue.The patch synchronizes a message processing and remove excess syncs inside added sync block.
                
      was (Author: sdcf):
    The patch to fix wrong message order issue.The patch synchronizes a message processing and remove excess syncs inside added sync block.
                  
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>         Attachments: 4107.diff
>
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>        {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
>         
> break;
>                         }
> .....
>  {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuriy Sidelnikov updated AMQ-4107:
----------------------------------

    Description: 
For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
       {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
        
break;
                        }
.....
 {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


  was:
For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
       {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
                            break;
                        }
.....
 {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


    
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>        {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
>         
> break;
>                         }
> .....
>  {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuriy Sidelnikov updated AMQ-4107:
----------------------------------

    Description: 
For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
            dispatch(node);                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
                            break;
                        }
.....
 dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


  was:
For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" topicPrefetch="1" blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
            dispatch(node);                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
                            break;
                        }
.....
 dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


    
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>             dispatch(node);                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
>                             break;
>                         }
> .....
>  dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuriy Sidelnikov updated AMQ-4107:
----------------------------------

    Comment: was deleted

(was: To fix the issue with a wrong message order the following patch can be applied.This patch synchronizes the message processing and remove an excess syncs inside sync block.
Index: C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
===================================================================
--- C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ C:/Java/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java	
@@ -93,6 +93,7 @@
             return;
         }
         enqueueCounter.incrementAndGet();
+        synchronized (matchedListMutex) {
         if (!isFull() && matched.isEmpty()  && !isSlave()) {
             // if maximumPendingMessages is set we will only discard messages which
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
@@ -112,7 +113,6 @@
             if (maximumPendingMessages != 0) {
                 boolean warnedAboutWait = false;
                 while (active) {
 -                    synchronized (matchedListMutex) {
                         while (matched.isFull()) {
                             if (getContext().getStopping().get()) {
                                 LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
@@ -136,10 +136,7 @@
                         if (matched.tryAddMessageLast(node, 10)) {
                             break;
                         }
 -                    }
                 }
 -                synchronized (matchedListMutex) {
 -                    
                     // NOTE - be careful about the slaveBroker!
                     if (maximumPendingMessages > 0) {
                         // calculate the high water mark from which point we
@@ -159,13 +156,11 @@
                             pageInSize = Math.max(1000, pageInSize);
                             LinkedList<MessageReference> list = null;
                             MessageReference[] oldMessages=null;
 -                            synchronized(matched){
                                 list = matched.pageInList(pageInSize);
                             	oldMessages = messageEvictionStrategy.evictMessages(list);
                             	for (MessageReference ref : list) {
                             	    ref.decrementReferenceCount();
                             	}
 -                            }
                             int messagesToEvict = 0;
                             if (oldMessages != null){
 	                            messagesToEvict = oldMessages.length;
@@ -183,10 +178,10 @@
                             }
                         }
                     }
 -                }
 -                dispatchMatched();
+                 dispatchMatched();
             }
         }
+         }
     }
 
     private boolean isDuplicate(MessageReference node) {
)
    
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>        {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
>         
> break;
>                         }
> .....
>  {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuriy Sidelnikov updated AMQ-4107:
----------------------------------

    Attachment: 4107.diff

The patch to fix wrong message orderissue.
                
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>         Attachments: 4107.diff
>
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" {color:red}topicPrefetch="1"{color} blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>        {color:red}dispatch(node);{color}                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if ({color:red}matched.tryAddMessageLast(node, 10)){color} {    <- first message will be put in the VMCursor queue and might be dispatched later 
>         
> break;
>                         }
> .....
>  {color:red}dispatchMatched();{color}   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-4107) Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow

Posted by "Yuriy Sidelnikov (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuriy Sidelnikov updated AMQ-4107:
----------------------------------

    Description: 
For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" topicPrefetch="1" blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
            dispatch(node);                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
                            break;
                        }
.....
 dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


  was:
For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" topicPrefetch="1" blockedProducerWarningInterval="30">

Short excerpt from TopicSubscription class:
public void add(MessageReference node) throws Exception {

…..
              if (!isFull() && matched.isEmpty()  && !isSlave()) {
            // if maximumPendingMessages is set we will only discard messages which
            // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
            dispatch(node);                   <- Second message will go this way and might be dispatched sooner than first one.
            setSlowConsumer(false);
        } else {
…….
if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
                            break;
                        }
.....
 dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
}
Possible scenario as I can see it from logs:
1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
3. Meanwhile consumer reads some previous message and !isFull() will return true.    
4. Second message will be dispatched immediately and might be first to be delivered. 
5. Then first message is dispatched.
6. Message order is broken.


    
> Message order can be broken for Topic under a high load when topicPrefetch=1 and comsumer is slow
> -------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4107
>                 URL: https://issues.apache.org/jira/browse/AMQ-4107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.6.0
>            Reporter: Yuriy Sidelnikov
>
> For <amq:policyEntry topic=">" producerFlowControl="true" memoryLimit="30mb" topicPrefetch="1" blockedProducerWarningInterval="30">
> Short excerpt from TopicSubscription class:
> public void add(MessageReference node) throws Exception {
> …..
>               if ({color:red}!isFull(){color} && matched.isEmpty()  && !isSlave()) {
>             // if maximumPendingMessages is set we will only discard messages which
>             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
>             dispatch(node);                   <- Second message will go this way and might be dispatched sooner than first one.
>             setSlowConsumer(false);
>         } else {
> …….
> if (matched.tryAddMessageLast(node, 10)) {    <- first message will be put in the VMCursor queue and might be dispatched later 
>                             break;
>                         }
> .....
>  dispatchMatched();   <- First message won't be dispatched immediately because !isFull() is still false
> }
> Possible scenario as I can see it from logs:
> 1. First message has arrived and !isFull() is false because consumer didn't take some previous message yet.
> 2. First message will be processed by tryAddMessageLast in VMPendingMessageCursor class and will be dispatched very lately because !isFull() is still false.
> 3. Meanwhile consumer reads some previous message and !isFull() will return true.    
> 4. Second message will be dispatched immediately and might be first to be delivered. 
> 5. Then first message is dispatched.
> 6. Message order is broken.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira