You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Gary Tully (JIRA)" <ji...@apache.org> on 2017/10/26 09:49:00 UTC
[jira] [Commented] (AMQ-6851) Messages using Message Groups can
arrive out of order when using CachedMessageGroupMap
[ https://issues.apache.org/jira/browse/AMQ-6851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220243#comment-16220243 ]
Gary Tully commented on AMQ-6851:
---------------------------------
you can also increase the cache size - org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory#setCacheSize
are you requesting a different default behaviour?
> Messages using Message Groups can arrive out of order when using CachedMessageGroupMap
> --------------------------------------------------------------------------------------
>
> Key: AMQ-6851
> URL: https://issues.apache.org/jira/browse/AMQ-6851
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.12.0, 5.15.2
> Environment: Linux, CentOS 7
> openjdk version "1.8.0_151"
> OpenJDK Runtime Environment (build 1.8.0_151-b12)
> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
> Reporter: Joshua Montgomery
>
> The default broker behavior for message groups uses a CachedMessageGroupMap with a least-recently-used cache with a capacity of 1024. When more that 1024 group IDs are used messages can be consumed out-of-order.
> Scenario.
> Configure two consumers for a queue.
> Send a message with group ID '0' that requires a long time to consume.
> Send 1024 additional messages with group IDs '1' through '1024' that require a short time to consume.
> Send a message of group ID '0' that requires a short time to consume.
> Expected:
> The second message in group '0' is consumed *after* the first message in group '0'
> Actual:
> The second message in group '0' is consumed *before* the first message in group '0' has finished.
> The LRU cache is evicting the group to consumer mapping for group '0' before the second message arrives, allowing the second message in group '0' to be processed by a different consumer than the first message.
> Using the MessageGroupHashBucket or the SimpleMessageGroupMap results in the expected behavior.
> {code}
> package com.example.outoforderjms;
> import java.io.Serializable;
> import java.time.Instant;
> import java.time.ZoneId;
> import java.time.format.DateTimeFormatter;
> import java.util.Locale;
> import javax.jms.ConnectionFactory;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.pool.PooledConnectionFactory;
> import org.springframework.context.annotation.AnnotationConfigApplicationContext;
> import org.springframework.context.annotation.Bean;
> import org.springframework.context.annotation.Configuration;
> import org.springframework.jms.annotation.EnableJms;
> import org.springframework.jms.annotation.JmsListener;
> import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessagePostProcessor;
> @EnableJms
> @Configuration
> public class OutOfOrderJms {
> private static final int MODULUS = 1025;
> private static final int COUNT = MODULUS + 1;
> private static final String QUEUE_NAME = "MessageGroupTest";
> public static void main(String[] args) {
> AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
> ctx.register(OutOfOrderJms.class);
> ctx.refresh();
> JmsTemplate template = new JmsTemplate();
> template.setConnectionFactory(CONNECTION_FACTORY);
> for (int i = 0; i < COUNT; i++) {
> SomeMessage someMessage = new SomeMessage(i, Integer.toString(i % MODULUS));
> if (someMessage.getGroup().equals("0")) {
> System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " producing message " + someMessage);
> }
> template.convertAndSend(QUEUE_NAME, someMessage, getMessageGroupPostProcessor(someMessage));
> }
> }
> private static String getTimeStamp() {
> DateTimeFormatter formatter =
> DateTimeFormatter.ofPattern("hh:mm:ss:SSSS")
> .withLocale(Locale.US)
> .withZone(ZoneId.systemDefault());
> return formatter.format(Instant.now());
> }
> private static MessagePostProcessor getMessageGroupPostProcessor(Serializable object) {
> return message -> {
> SomeMessage m = ((SomeMessage) object);
> message.setStringProperty(
> "JMSXGroupID", m.getGroup());
> return message;
> };
> }
> @JmsListener(destination = QUEUE_NAME, containerFactory = "containerFactory")
> private void process(SomeMessage someMessage) throws InterruptedException {
> // Simulate long-processing message for first message produced.
> if (someMessage.getMessage() == 0) {
> for (int i = 10; i > 0; i--) {
> Thread.sleep(1000);
> System.out.println(i + " ");
> }
> }
> if (someMessage.getGroup().equals("0") || someMessage.getGroup().equals("1")) {
> System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " consuming message " + someMessage);
> }
> }
> @Bean
> public DefaultJmsListenerContainerFactory containerFactory() {
> DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
> factory.setConnectionFactory(CONNECTION_FACTORY);
> factory.setConcurrency("1-2");
> return factory;
> }
> private static ConnectionFactory CONNECTION_FACTORY = new PooledConnectionFactory(
> new ActiveMQConnectionFactory(
> "admin",
> "admin",
> "failover:tcp://localhost:61616")
> );
> private static class SomeMessage implements Serializable {
> private final int message;
> private final String group;
> private SomeMessage(int message, String group) {
> this.message = message;
> this.group = group;
> }
> int getMessage() {
> return message;
> }
> String getGroup() {
> return group;
> }
> @Override
> public String toString() {
> return "SomeMessage{" +
> "message=" + message +
> ", group='" + group + '\'' +
> '}';
> }
> }
> }
> {code}
> Output shows message 1025 finishing before message 0
> {code}
> 03:11:15:1730 main producing message SomeMessage{message=0, group='0'}
> 03:11:15:2220 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1, group='1'}
> 10
> 9
> 8
> 03:11:18:9530 main producing message SomeMessage{message=1025, group='0'}
> 03:11:18:9540 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1025, group='0'}
> 7
> 6
> 5
> 4
> 3
> 2
> 1
> 03:11:25:2130 DefaultMessageListenerContainer-1 consuming message SomeMessage{message=0, group='0'}
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)