You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Gary Tully (JIRA)" <ji...@apache.org> on 2017/10/26 09:49:00 UTC

[jira] [Commented] (AMQ-6851) Messages using Message Groups can arrive out of order when using CachedMessageGroupMap

    [ https://issues.apache.org/jira/browse/AMQ-6851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220243#comment-16220243 ] 

Gary Tully commented on AMQ-6851:
---------------------------------

you can also increase the cache size - org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory#setCacheSize

are you requesting a different default behaviour?

> Messages using Message Groups can arrive out of order when using CachedMessageGroupMap
> --------------------------------------------------------------------------------------
>
>                 Key: AMQ-6851
>                 URL: https://issues.apache.org/jira/browse/AMQ-6851
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.12.0, 5.15.2
>         Environment: Linux, CentOS 7
> openjdk version "1.8.0_151"
> OpenJDK Runtime Environment (build 1.8.0_151-b12)
> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>            Reporter: Joshua Montgomery
>
> The default broker behavior for message groups uses a CachedMessageGroupMap with a least-recently-used cache with a capacity of 1024. When more that 1024 group IDs are used messages can be consumed out-of-order.
> Scenario.
> Configure two consumers for a queue.
> Send a message with group ID '0' that requires a long time to consume.
> Send 1024 additional messages with group IDs '1' through '1024' that require a short time to consume.
> Send a message of group ID '0' that requires a short time to consume.
> Expected:
> The second message in group '0' is consumed *after* the first message in group '0'
> Actual:
> The second message in group '0' is consumed *before* the first message in group '0' has finished.
> The LRU cache is evicting the group to consumer mapping for group '0' before the second message arrives, allowing the second message in group '0' to be processed by a different consumer than the first message.
> Using the MessageGroupHashBucket or the SimpleMessageGroupMap results in the expected behavior.
> {code}
> package com.example.outoforderjms;
> import java.io.Serializable;
> import java.time.Instant;
> import java.time.ZoneId;
> import java.time.format.DateTimeFormatter;
> import java.util.Locale;
> import javax.jms.ConnectionFactory;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.pool.PooledConnectionFactory;
> import org.springframework.context.annotation.AnnotationConfigApplicationContext;
> import org.springframework.context.annotation.Bean;
> import org.springframework.context.annotation.Configuration;
> import org.springframework.jms.annotation.EnableJms;
> import org.springframework.jms.annotation.JmsListener;
> import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessagePostProcessor;
> @EnableJms
> @Configuration
> public class OutOfOrderJms {
>   private static final int MODULUS = 1025;
>   private static final int COUNT = MODULUS + 1;
>   private static final String QUEUE_NAME = "MessageGroupTest";
>   public static void main(String[] args) {
>     AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
>     ctx.register(OutOfOrderJms.class);
>     ctx.refresh();
>     JmsTemplate template = new JmsTemplate();
>     template.setConnectionFactory(CONNECTION_FACTORY);
>     for (int i = 0; i < COUNT; i++) {
>       SomeMessage someMessage = new SomeMessage(i, Integer.toString(i % MODULUS));
>       if (someMessage.getGroup().equals("0")) {
>         System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " producing message " + someMessage);
>       }
>       template.convertAndSend(QUEUE_NAME, someMessage, getMessageGroupPostProcessor(someMessage));
>     }
>   }
>   private static String getTimeStamp() {
>     DateTimeFormatter formatter =
>         DateTimeFormatter.ofPattern("hh:mm:ss:SSSS")
>             .withLocale(Locale.US)
>             .withZone(ZoneId.systemDefault());
>     return formatter.format(Instant.now());
>   }
>   private static MessagePostProcessor getMessageGroupPostProcessor(Serializable object) {
>     return message -> {
>       SomeMessage m = ((SomeMessage) object);
>       message.setStringProperty(
>           "JMSXGroupID", m.getGroup());
>       return message;
>     };
>   }
>   @JmsListener(destination = QUEUE_NAME, containerFactory = "containerFactory")
>   private void process(SomeMessage someMessage) throws InterruptedException {
>   //  Simulate long-processing message for first message produced.
>   if (someMessage.getMessage() == 0) {
>       for (int i = 10; i > 0; i--) {
>         Thread.sleep(1000);
>         System.out.println(i + " ");
>       }
>     }
>     if (someMessage.getGroup().equals("0") || someMessage.getGroup().equals("1")) {
>       System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " consuming message " + someMessage);
>     }
>   }
>   @Bean
>   public DefaultJmsListenerContainerFactory containerFactory() {
>     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
>     factory.setConnectionFactory(CONNECTION_FACTORY);
>     factory.setConcurrency("1-2");
>     return factory;
>   }
>   private static ConnectionFactory CONNECTION_FACTORY = new PooledConnectionFactory(
>       new ActiveMQConnectionFactory(
>           "admin",
>           "admin",
>           "failover:tcp://localhost:61616")
>   );
>   private static class SomeMessage implements Serializable {
>     private final int message;
>     private final String group;
>     private SomeMessage(int message, String group) {
>       this.message = message;
>       this.group = group;
>     }
>     int getMessage() {
>       return message;
>     }
>     String getGroup() {
>       return group;
>     }
>     @Override
>     public String toString() {
>       return "SomeMessage{" +
>           "message=" + message +
>           ", group='" + group + '\'' +
>           '}';
>     }
>   }
> }
> {code}
> Output shows message 1025 finishing before message 0
> {code}
> 03:11:15:1730 main producing message SomeMessage{message=0, group='0'}
> 03:11:15:2220 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1, group='1'}
> 10 
> 9 
> 8 
> 03:11:18:9530 main producing message SomeMessage{message=1025, group='0'}
> 03:11:18:9540 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1025, group='0'}
> 7 
> 6 
> 5 
> 4 
> 3 
> 2 
> 1 
> 03:11:25:2130 DefaultMessageListenerContainer-1 consuming message SomeMessage{message=0, group='0'}
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)