You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Dragos Slujeru (Jira)" <ji...@apache.org> on 2020/08/13 07:41:00 UTC

[jira] [Commented] (AMQ-6851) Messages using Message Groups can arrive out of order when using CachedMessageGroupMap

    [ https://issues.apache.org/jira/browse/AMQ-6851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176819#comment-17176819 ] 

Dragos Slujeru commented on AMQ-6851:
-------------------------------------

I am implementing a system with message groups and I was looking through this issue and at the implementation of the 2 GroupMaps that "retain" all groups.

Wanted to point out that the only reason why MessageGroupHashBucked worked with the given example is because of how the bucket hashing is done.

The implementation uses an array to hold all the consumers and uses the string hash of the group id in order to associate the group id with the consumer.

The array has 1024 slots (if not configured differently), this means that it can be associated only to 1024 groups. The hashed group id will be a number from 0 to 1024 so as all slots in array will be filled.

This worked for [~jlmont]  because the hash of group 1025 was not 0 so it did not override the first group id, it was something else, *but still one message group was overriden.*

> Messages using Message Groups can arrive out of order when using CachedMessageGroupMap
> --------------------------------------------------------------------------------------
>
>                 Key: AMQ-6851
>                 URL: https://issues.apache.org/jira/browse/AMQ-6851
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.12.0, 5.15.2
>         Environment: Linux, CentOS 7
> openjdk version "1.8.0_151"
> OpenJDK Runtime Environment (build 1.8.0_151-b12)
> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>            Reporter: Joshua Montgomery
>            Priority: Major
>
> The default broker behavior for message groups uses a CachedMessageGroupMap with a least-recently-used cache with a capacity of 1024. When more that 1024 group IDs are used messages can be consumed out-of-order.
> Scenario.
> Configure two consumers for a queue.
> Send a message with group ID '0' that requires a long time to consume.
> Send 1024 additional messages with group IDs '1' through '1024' that require a short time to consume.
> Send a message of group ID '0' that requires a short time to consume.
> Expected:
> The second message in group '0' is consumed *after* the first message in group '0'
> Actual:
> The second message in group '0' is consumed *before* the first message in group '0' has finished.
> The LRU cache is evicting the group to consumer mapping for group '0' before the second message arrives, allowing the second message in group '0' to be processed by a different consumer than the first message.
> Using the MessageGroupHashBucket or the SimpleMessageGroupMap results in the expected behavior.
> {code}
> package com.example.outoforderjms;
> import java.io.Serializable;
> import java.time.Instant;
> import java.time.ZoneId;
> import java.time.format.DateTimeFormatter;
> import java.util.Locale;
> import javax.jms.ConnectionFactory;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.pool.PooledConnectionFactory;
> import org.springframework.context.annotation.AnnotationConfigApplicationContext;
> import org.springframework.context.annotation.Bean;
> import org.springframework.context.annotation.Configuration;
> import org.springframework.jms.annotation.EnableJms;
> import org.springframework.jms.annotation.JmsListener;
> import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessagePostProcessor;
> @EnableJms
> @Configuration
> public class OutOfOrderJms {
>   private static final int MODULUS = 1025;
>   private static final int COUNT = MODULUS + 1;
>   private static final String QUEUE_NAME = "MessageGroupTest";
>   public static void main(String[] args) {
>     AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
>     ctx.register(OutOfOrderJms.class);
>     ctx.refresh();
>     JmsTemplate template = new JmsTemplate();
>     template.setConnectionFactory(CONNECTION_FACTORY);
>     for (int i = 0; i < COUNT; i++) {
>       SomeMessage someMessage = new SomeMessage(i, Integer.toString(i % MODULUS));
>       if (someMessage.getGroup().equals("0")) {
>         System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " producing message " + someMessage);
>       }
>       template.convertAndSend(QUEUE_NAME, someMessage, getMessageGroupPostProcessor(someMessage));
>     }
>   }
>   private static String getTimeStamp() {
>     DateTimeFormatter formatter =
>         DateTimeFormatter.ofPattern("hh:mm:ss:SSSS")
>             .withLocale(Locale.US)
>             .withZone(ZoneId.systemDefault());
>     return formatter.format(Instant.now());
>   }
>   private static MessagePostProcessor getMessageGroupPostProcessor(Serializable object) {
>     return message -> {
>       SomeMessage m = ((SomeMessage) object);
>       message.setStringProperty(
>           "JMSXGroupID", m.getGroup());
>       return message;
>     };
>   }
>   @JmsListener(destination = QUEUE_NAME, containerFactory = "containerFactory")
>   private void process(SomeMessage someMessage) throws InterruptedException {
>   //  Simulate long-processing message for first message produced.
>   if (someMessage.getMessage() == 0) {
>       for (int i = 10; i > 0; i--) {
>         Thread.sleep(1000);
>         System.out.println(i + " ");
>       }
>     }
>     if (someMessage.getGroup().equals("0") || someMessage.getGroup().equals("1")) {
>       System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " consuming message " + someMessage);
>     }
>   }
>   @Bean
>   public DefaultJmsListenerContainerFactory containerFactory() {
>     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
>     factory.setConnectionFactory(CONNECTION_FACTORY);
>     factory.setConcurrency("1-2");
>     return factory;
>   }
>   private static ConnectionFactory CONNECTION_FACTORY = new PooledConnectionFactory(
>       new ActiveMQConnectionFactory(
>           "admin",
>           "admin",
>           "failover:tcp://localhost:61616")
>   );
>   private static class SomeMessage implements Serializable {
>     private final int message;
>     private final String group;
>     private SomeMessage(int message, String group) {
>       this.message = message;
>       this.group = group;
>     }
>     int getMessage() {
>       return message;
>     }
>     String getGroup() {
>       return group;
>     }
>     @Override
>     public String toString() {
>       return "SomeMessage{" +
>           "message=" + message +
>           ", group='" + group + '\'' +
>           '}';
>     }
>   }
> }
> {code}
> Output shows message 1025 finishing before message 0
> {code}
> 03:11:15:1730 main producing message SomeMessage{message=0, group='0'}
> 03:11:15:2220 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1, group='1'}
> 10 
> 9 
> 8 
> 03:11:18:9530 main producing message SomeMessage{message=1025, group='0'}
> 03:11:18:9540 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1025, group='0'}
> 7 
> 6 
> 5 
> 4 
> 3 
> 2 
> 1 
> 03:11:25:2130 DefaultMessageListenerContainer-1 consuming message SomeMessage{message=0, group='0'}
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)